Skip to content

Commit

Permalink
Fix GenerateMigrationTasks behavior (#4987)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
* GenerateMigrationTasks API should also propagate pending activity info

<!-- Tell your future self why have you made these changes -->
**Why?**
For namespace migration, history events as well as (updated) activity
info both should be replicated

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
N/A

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
N/A

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
N/A
  • Loading branch information
wxing1292 authored and rodrigozhou committed Oct 30, 2023
1 parent 1a30833 commit 8dc833c
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 21 deletions.
4 changes: 2 additions & 2 deletions service/history/api/replication/generate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func GenerateTask(
defer func() { wfContext.GetReleaseFn()(retError) }()

mutableState := wfContext.GetMutableState()
task, stateTransitionCount, err := mutableState.GenerateMigrationTasks()
replicationTasks, stateTransitionCount, err := mutableState.GenerateMigrationTasks()
if err != nil {
return nil, err
}
Expand All @@ -79,7 +79,7 @@ func GenerateTask(
WorkflowID: request.Execution.WorkflowId,
RunID: request.Execution.RunId,
Tasks: map[tasks.Category][]tasks.Task{
tasks.CategoryReplication: {task},
tasks.CategoryReplication: replicationTasks,
},
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ type (
StartTransaction(entry *namespace.Namespace) (bool, error)
CloseTransactionAsMutation(transactionPolicy TransactionPolicy) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error)
CloseTransactionAsSnapshot(transactionPolicy TransactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error)
GenerateMigrationTasks() (tasks.Task, int64, error)
GenerateMigrationTasks() ([]tasks.Task, int64, error)

// ContinueAsNewMinBackoff calculate minimal backoff for next ContinueAsNew run.
// Input backoffDuration is current backoff for next run.
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4606,7 +4606,7 @@ func (ms *MutableStateImpl) UpdateDuplicatedResource(
ms.appliedEvents[id] = struct{}{}
}

func (ms *MutableStateImpl) GenerateMigrationTasks() (tasks.Task, int64, error) {
func (ms *MutableStateImpl) GenerateMigrationTasks() ([]tasks.Task, int64, error) {
return ms.taskGenerator.GenerateMigrationTasks()
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 29 additions & 13 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type (
GenerateHistoryReplicationTasks(
events []*historypb.HistoryEvent,
) error
GenerateMigrationTasks() (tasks.Task, int64, error)
GenerateMigrationTasks() ([]tasks.Task, int64, error)
}

TaskGeneratorImpl struct {
Expand Down Expand Up @@ -614,7 +614,7 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks(
return nil
}

func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, int64, error) {
func (r *TaskGeneratorImpl) GenerateMigrationTasks() ([]tasks.Task, int64, error) {
executionInfo := r.mutableState.GetExecutionInfo()
versionHistory, err := versionhistory.GetCurrentVersionHistory(executionInfo.GetVersionHistories())
if err != nil {
Expand All @@ -625,21 +625,37 @@ func (r *TaskGeneratorImpl) GenerateMigrationTasks() (tasks.Task, int64, error)
return nil, 0, err
}

workflowKey := r.mutableState.GetWorkflowKey()

if r.mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
return &tasks.SyncWorkflowStateTask{
return []tasks.Task{&tasks.SyncWorkflowStateTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
WorkflowKey: workflowKey,
Version: lastItem.GetVersion(),
}, 1, nil
} else {
return &tasks.HistoryReplicationTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
FirstEventID: executionInfo.LastFirstEventId,
NextEventID: lastItem.GetEventId() + 1,
Version: lastItem.GetVersion(),
}, executionInfo.StateTransitionCount, nil
}}, 1, nil
}

now := time.Now().UTC()
replicationTasks := make([]tasks.Task, 0, len(r.mutableState.GetPendingActivityInfos())+1)
replicationTasks = append(replicationTasks, &tasks.HistoryReplicationTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: workflowKey,
FirstEventID: executionInfo.LastFirstEventId,
NextEventID: lastItem.GetEventId() + 1,
Version: lastItem.GetVersion(),
})
activityIDs := make(map[int64]struct{}, len(r.mutableState.GetPendingActivityInfos()))
for activityID := range r.mutableState.GetPendingActivityInfos() {
activityIDs[activityID] = struct{}{}
}
replicationTasks = append(replicationTasks, convertSyncActivityInfos(
now,
workflowKey,
r.mutableState.GetPendingActivityInfos(),
activityIDs,
)...)
return replicationTasks, executionInfo.StateTransitionCount, nil

}

func (r *TaskGeneratorImpl) getTimerSequence() TimerSequence {
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8dc833c

Please # to comment.