diff --git a/service/history/api/replication/generate_task.go b/service/history/api/replication/generate_task.go index ca8f92059b0..54557f0f166 100644 --- a/service/history/api/replication/generate_task.go +++ b/service/history/api/replication/generate_task.go @@ -67,7 +67,7 @@ func GenerateTask( defer func() { wfContext.GetReleaseFn()(retError) }() mutableState := wfContext.GetMutableState() - task, stateTransitionCount, err := mutableState.GenerateMigrationTasks() + replicationTasks, stateTransitionCount, err := mutableState.GenerateMigrationTasks() if err != nil { return nil, err } @@ -79,7 +79,7 @@ func GenerateTask( WorkflowID: request.Execution.WorkflowId, RunID: request.Execution.RunId, Tasks: map[tasks.Category][]tasks.Task{ - tasks.CategoryReplication: {task}, + tasks.CategoryReplication: replicationTasks, }, }) if err != nil { diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index efe380a1106..deed7830d80 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -307,7 +307,7 @@ type ( StartTransaction(entry *namespace.Namespace) (bool, error) CloseTransactionAsMutation(transactionPolicy TransactionPolicy) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error) CloseTransactionAsSnapshot(transactionPolicy TransactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error) - GenerateMigrationTasks() (tasks.Task, int64, error) + GenerateMigrationTasks() ([]tasks.Task, int64, error) // ContinueAsNewMinBackoff calculate minimal backoff for next ContinueAsNew run. // Input backoffDuration is current backoff for next run. diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index b51e2673c0b..5bc972a0939 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4606,7 +4606,7 @@ func (ms *MutableStateImpl) UpdateDuplicatedResource( ms.appliedEvents[id] = struct{}{} } -func (ms *MutableStateImpl) GenerateMigrationTasks() (tasks.Task, int64, error) { +func (ms *MutableStateImpl) GenerateMigrationTasks() ([]tasks.Task, int64, error) { return ms.taskGenerator.GenerateMigrationTasks() } diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index 7631301d626..8c76322bd2c 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -985,10 +985,10 @@ func (mr *MockMutableStateMockRecorder) FlushBufferedEvents() *gomock.Call { } // GenerateMigrationTasks mocks base method. -func (m *MockMutableState) GenerateMigrationTasks() (tasks.Task, int64, error) { +func (m *MockMutableState) GenerateMigrationTasks() ([]tasks.Task, int64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GenerateMigrationTasks") - ret0, _ := ret[0].(tasks.Task) + ret0, _ := ret[0].([]tasks.Task) ret1, _ := ret[1].(int64) ret2, _ := ret[2].(error) return ret0, ret1, ret2 diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index e160fdc54bb..1e317529685 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -102,7 +102,7 @@ type ( GenerateHistoryReplicationTasks( events []*historypb.HistoryEvent, ) error - GenerateMigrationTasks() (tasks.Task, int64, error) + GenerateMigrationTasks() ([]tasks.Task, int64, error) } TaskGeneratorImpl struct { @@ -614,7 +614,7 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks( return nil } -func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, int64, error) { +func (r *TaskGeneratorImpl) GenerateMigrationTasks() ([]tasks.Task, int64, error) { executionInfo := r.mutableState.GetExecutionInfo() versionHistory, err := versionhistory.GetCurrentVersionHistory(executionInfo.GetVersionHistories()) if err != nil { @@ -625,21 +625,37 @@ func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, int64, error) return nil, 0, err } + workflowKey := r.mutableState.GetWorkflowKey() + if r.mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED { - return &tasks.SyncWorkflowStateTask{ + return []tasks.Task{&tasks.SyncWorkflowStateTask{ // TaskID, VisibilityTimestamp is set by shard - WorkflowKey: r.mutableState.GetWorkflowKey(), + WorkflowKey: workflowKey, Version: lastItem.GetVersion(), - }, 1, nil - } else { - return &tasks.HistoryReplicationTask{ - // TaskID, VisibilityTimestamp is set by shard - WorkflowKey: r.mutableState.GetWorkflowKey(), - FirstEventID: executionInfo.LastFirstEventId, - NextEventID: lastItem.GetEventId() + 1, - Version: lastItem.GetVersion(), - }, executionInfo.StateTransitionCount, nil + }}, 1, nil } + + now := time.Now().UTC() + replicationTasks := make([]tasks.Task, 0, len(r.mutableState.GetPendingActivityInfos())+1) + replicationTasks = append(replicationTasks, &tasks.HistoryReplicationTask{ + // TaskID, VisibilityTimestamp is set by shard + WorkflowKey: workflowKey, + FirstEventID: executionInfo.LastFirstEventId, + NextEventID: lastItem.GetEventId() + 1, + Version: lastItem.GetVersion(), + }) + activityIDs := make(map[int64]struct{}, len(r.mutableState.GetPendingActivityInfos())) + for activityID := range r.mutableState.GetPendingActivityInfos() { + activityIDs[activityID] = struct{}{} + } + replicationTasks = append(replicationTasks, convertSyncActivityInfos( + now, + workflowKey, + r.mutableState.GetPendingActivityInfos(), + activityIDs, + )...) + return replicationTasks, executionInfo.StateTransitionCount, nil + } func (r *TaskGeneratorImpl) getTimerSequence() TimerSequence { diff --git a/service/history/workflow/task_generator_mock.go b/service/history/workflow/task_generator_mock.go index bf31a7e292f..6586f848f5e 100644 --- a/service/history/workflow/task_generator_mock.go +++ b/service/history/workflow/task_generator_mock.go @@ -174,10 +174,10 @@ func (mr *MockTaskGeneratorMockRecorder) GenerateHistoryReplicationTasks(events } // GenerateMigrationTasks mocks base method. -func (m *MockTaskGenerator) GenerateMigrationTasks() (tasks.Task, int64, error) { +func (m *MockTaskGenerator) GenerateMigrationTasks() ([]tasks.Task, int64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GenerateMigrationTasks") - ret0, _ := ret[0].(tasks.Task) + ret0, _ := ret[0].([]tasks.Task) ret1, _ := ret[1].(int64) ret2, _ := ret[2].(error) return ret0, ret1, ret2