diff --git a/service/worker/migration/activities.go b/service/worker/migration/activities.go index 3622fd1c77b..525bcdd943f 100644 --- a/service/worker/migration/activities.go +++ b/service/worker/migration/activities.go @@ -590,9 +590,17 @@ func (a *activities) verifyReplicationTasks( details *replicationTasksHeartbeatDetails, remoteClient adminservice.AdminServiceClient, ns *namespace.Namespace, + heartbeat func(details replicationTasksHeartbeatDetails), ) (bool, []SkippedWorkflowExecution, error) { start := time.Now() + progress := false defer func() { + if progress { + // Update CheckPoint where there is a progress + details.CheckPoint = time.Now() + } + + heartbeat(*details) a.forceReplicationMetricsHandler.Timer(metrics.VerifyReplicationTasksLatency.GetMetricName()).Record(time.Since(start)) }() @@ -636,6 +644,9 @@ func (a *activities) verifyReplicationTasks( return false, skippedList, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed") } + + heartbeat(*details) + progress = true } return true, skippedList, nil @@ -682,24 +693,19 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify // - more than checkSkipThreshold, it checks if outstanding workflow execution can be skipped locally (#2 and #3) // - more than NonRetryableTimeout, it means potentially #4. The activity returns // non-retryable error and force-replication will fail. - for { // Since replication has a lag, sleep first. time.Sleep(request.VerifyInterval) - lastIndex := details.NextIndex - verified, skippedList, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry) + verified, skippedList, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry, + func(d replicationTasksHeartbeatDetails) { + activity.RecordHeartbeat(ctx, d) + }) + if err != nil { return response, err } - if lastIndex < details.NextIndex { - // Update CheckPoint where there is a progress - details.CheckPoint = time.Now() - } - - activity.RecordHeartbeat(ctx, details) - if len(skippedList) > 0 { response.SkippedWorkflowExecutions = append(response.SkippedWorkflowExecutions, skippedList...) response.SkippedWorkflowCount = len(response.SkippedWorkflowExecutions) diff --git a/service/worker/migration/activities_test.go b/service/worker/migration/activities_test.go index 918041c785e..15bc8929c67 100644 --- a/service/worker/migration/activities_test.go +++ b/service/worker/migration/activities_test.go @@ -340,9 +340,7 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_AlreadyVerified() { _, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request) s.NoError(err) - s.Greater(len(iceptor.replicationRecordedHeartbeats), 0) - lastHeartBeat := iceptor.replicationRecordedHeartbeats[len(iceptor.replicationRecordedHeartbeats)-1] - s.Equal(len(request.Executions), lastHeartBeat.NextIndex) + s.Equal(len(iceptor.replicationRecordedHeartbeats), 1) } type executionState int @@ -383,6 +381,14 @@ func createExecutions(mockClient *adminservicemock.MockAdminServiceClient, state return executions } +type mockHeartBeatRecorder struct { + lastHeartBeat replicationTasksHeartbeatDetails +} + +func (m *mockHeartBeatRecorder) hearbeat(details replicationTasksHeartbeatDetails) { + m.lastHeartBeat = details +} + func (s *activitiesSuite) Test_verifyReplicationTasks() { request := verifyReplicationTasksRequest{ Namespace: mockedNamespace, @@ -393,36 +399,36 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() { ctx := context.TODO() var tests = []struct { - executionStates []executionState - nextIndex int - expectedVerified bool - expectedErr error - expectedIndex int + remoteExecutionStates []executionState + nextIndex int + expectedVerified bool + expectedErr error + expectedIndex int }{ { expectedVerified: true, expectedErr: nil, }, { - executionStates: []executionState{executionFound, executionFound, executionFound, executionFound}, - nextIndex: 0, - expectedVerified: true, - expectedErr: nil, - expectedIndex: 4, + remoteExecutionStates: []executionState{executionFound, executionFound, executionFound, executionFound}, + nextIndex: 0, + expectedVerified: true, + expectedErr: nil, + expectedIndex: 4, }, { - executionStates: []executionState{executionFound, executionFound, executionFound, executionFound}, - nextIndex: 2, - expectedVerified: true, - expectedErr: nil, - expectedIndex: 4, + remoteExecutionStates: []executionState{executionFound, executionFound, executionFound, executionFound}, + nextIndex: 2, + expectedVerified: true, + expectedErr: nil, + expectedIndex: 4, }, { - executionStates: []executionState{executionFound, executionFound, executionNotfound}, - nextIndex: 0, - expectedVerified: false, - expectedErr: nil, - expectedIndex: 2, + remoteExecutionStates: []executionState{executionFound, executionFound, executionNotfound}, + nextIndex: 0, + expectedVerified: false, + expectedErr: nil, + expectedIndex: 2, }, } @@ -432,20 +438,22 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() { }).Return(&completeState, nil).AnyTimes() for _, tc := range tests { + var recorder mockHeartBeatRecorder mockRemoteAdminClient := adminservicemock.NewMockAdminServiceClient(s.controller) - request.Executions = createExecutions(mockRemoteAdminClient, tc.executionStates, tc.nextIndex) + request.Executions = createExecutions(mockRemoteAdminClient, tc.remoteExecutionStates, tc.nextIndex) details := replicationTasksHeartbeatDetails{ NextIndex: tc.nextIndex, } - verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace) + verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, recorder.hearbeat) if tc.expectedErr == nil { s.NoError(err) } s.Equal(tc.expectedVerified, verified) s.Equal(tc.expectedIndex, details.NextIndex) - s.GreaterOrEqual(len(tc.executionStates), details.NextIndex) - if details.NextIndex < len(tc.executionStates) && tc.executionStates[details.NextIndex] == executionNotfound { + s.GreaterOrEqual(len(tc.remoteExecutionStates), details.NextIndex) + s.Equal(recorder.lastHeartBeat, details) + if details.NextIndex < len(tc.remoteExecutionStates) && tc.remoteExecutionStates[details.NextIndex] == executionNotfound { s.Equal(execution1, details.LastNotFoundWorkflowExecution) } } @@ -476,6 +484,7 @@ func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() { } for _, tc := range tests { + var recorder mockHeartBeatRecorder deleteTime := time.Now().Add(tc.deleteDiff) retention := time.Hour closeTime := deleteTime.Add(-retention) @@ -512,9 +521,10 @@ func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() { details := replicationTasksHeartbeatDetails{} ctx := context.TODO() - verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, ns) + verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, ns, recorder.hearbeat) s.NoError(err) s.Equal(tc.verified, verified) + s.Equal(recorder.lastHeartBeat, details) } }