Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Update replication use branch token #3447

Merged
merged 5 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
465 changes: 262 additions & 203 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ func (s *TaskSerializer) replicationHistoryTaskToProto(
NextEventId: historyTask.NextEventID,
BranchToken: historyTask.BranchToken,
NewRunBranchToken: historyTask.NewRunBranchToken,
NewRunId: historyTask.NewRunID,
VisibilityTime: &historyTask.VisibilityTimestamp,
}
}
Expand All @@ -1036,6 +1037,7 @@ func (s *TaskSerializer) replicationHistoryTaskFromProto(
Version: historyTask.Version,
BranchToken: historyTask.BranchToken,
NewRunBranchToken: historyTask.NewRunBranchToken,
NewRunID: historyTask.NewRunId,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ message ReplicationTaskInfo {
reserved 14;
int64 task_id = 15;
google.protobuf.Timestamp visibility_time = 16 [(gogoproto.stdtime) = true];
string new_run_id = 17;
}

// visibility_task_data column
Expand Down
112 changes: 85 additions & 27 deletions service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (
currentClusterName string
shard shard.Context
config *configs.Config
historyCache workflow.Cache
workflowCache workflow.Cache
executionMgr persistence.ExecutionManager
metricsClient metrics.Client
logger log.Logger
Expand All @@ -88,7 +88,7 @@ var (

func NewAckManager(
shard shard.Context,
historyCache workflow.Cache,
workflowCache workflow.Cache,
executionMgr persistence.ExecutionManager,
logger log.Logger,
) AckManager {
Expand All @@ -104,7 +104,7 @@ func NewAckManager(
currentClusterName: currentClusterName,
shard: shard,
config: shard.GetConfig(),
historyCache: historyCache,
workflowCache: workflowCache,
executionMgr: executionMgr,
metricsClient: shard.GetMetricsClient(),
logger: log.With(logger, tag.ComponentReplicatorQueue),
Expand Down Expand Up @@ -423,7 +423,7 @@ func (p *ackMgrImpl) generateHistoryReplicationTask(
workflowID := taskInfo.WorkflowID
runID := taskInfo.RunID
taskID := taskInfo.TaskID
return p.processReplication(
replicationTask, err := p.processReplication(
ctx,
true, // still necessary to send out history replication message if workflow closed
namespaceID,
Expand All @@ -439,35 +439,16 @@ func (p *ackMgrImpl) generateHistoryReplicationTask(
return nil, err
}

// BranchToken will not set in get dlq replication message request
if len(taskInfo.BranchToken) == 0 {
taskInfo.BranchToken = branchToken
}

eventsBlob, err := p.getEventsBlob(
ctx,
taskInfo.BranchToken,
branchToken,
taskInfo.FirstEventID,
taskInfo.NextEventID,
)
if err != nil {
return nil, err
}

var newRunEventsBlob *commonpb.DataBlob
if len(taskInfo.NewRunBranchToken) != 0 {
// only get the first batch
newRunEventsBlob, err = p.getEventsBlob(
ctx,
taskInfo.NewRunBranchToken,
common.FirstEventID,
common.FirstEventID+1,
)
if err != nil {
return nil, err
}
}

replicationTask := &replicationspb.ReplicationTask{
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK,
SourceTaskId: taskID,
Expand All @@ -478,14 +459,26 @@ func (p *ackMgrImpl) generateHistoryReplicationTask(
RunId: runID,
VersionHistoryItems: versionHistoryItems,
Events: eventsBlob,
NewRunEvents: newRunEventsBlob,
// NewRunEvents will be set in processNewRunReplication
},
},
VisibilityTime: &taskInfo.VisibilityTimestamp,
}
return replicationTask, nil
},
)
if err != nil {
return replicationTask, err
}
return p.processNewRunReplication(
ctx,
namespaceID,
workflowID,
taskInfo.NewRunID,
taskInfo.NewRunBranchToken,
taskInfo.Version,
replicationTask,
)
}

func (p *ackMgrImpl) generateSyncWorkflowStateTask(
Expand Down Expand Up @@ -570,11 +563,11 @@ func (p *ackMgrImpl) processReplication(
RunId: runID,
}

context, release, err := p.historyCache.GetOrCreateWorkflowExecution(
context, release, err := p.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
execution,
workflow.CallerTypeAPI,
workflow.CallerTypeTask,
)
if err != nil {
return nil, err
Expand All @@ -596,6 +589,71 @@ func (p *ackMgrImpl) processReplication(
}
}

func (p *ackMgrImpl) processNewRunReplication(
ctx context.Context,
namespaceID namespace.ID,
workflowID string,
newRunID string,
branchToken []byte,
taskVersion int64,
task *replicationspb.ReplicationTask,
) (retReplicationTask *replicationspb.ReplicationTask, retError error) {

attr, ok := task.Attributes.(*replicationspb.ReplicationTask_HistoryTaskAttributes)
if !ok {
return nil, serviceerror.NewInternal("Wrong replication task to process new run replication.")
}

var newRunBranchToken []byte
if len(newRunID) > 0 {
newRunContext, releaseFn, err := p.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: newRunID,
},
workflow.CallerTypeTask,
)
if err != nil {
return nil, err
}
defer func() { releaseFn(retError) }()

newRunMutableState, err := newRunContext.LoadMutableState(ctx)
if err != nil {
return nil, err
}
_, newRunBranchToken, err = getVersionHistoryItems(
newRunMutableState,
common.FirstEventID,
taskVersion,
)
if err != nil {
return nil, err
}
} else if len(branchToken) != 0 {
newRunBranchToken = branchToken
}

var newRunEventsBlob *commonpb.DataBlob
if len(newRunBranchToken) > 0 {
// only get the first batch
var err error
newRunEventsBlob, err = p.getEventsBlob(
ctx,
newRunBranchToken,
common.FirstEventID,
common.FirstEventID+1,
)
if err != nil {
return nil, err
}
}
attr.HistoryTaskAttributes.NewRunEvents = newRunEventsBlob
return task, nil
}

func getVersionHistoryItems(
mutableState workflow.MutableState,
eventID int64,
Expand Down
8 changes: 4 additions & 4 deletions service/history/replication/ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (s *ackManagerSuite) TestSyncActivity_WorkflowCompleted() {
ScheduledEventID: scheduledEventID,
}

context, release, _ := s.replicationAckManager.historyCache.GetOrCreateWorkflowExecution(
context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
Expand Down Expand Up @@ -323,7 +323,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityCompleted() {
ScheduledEventID: scheduledEventID,
}

context, release, _ := s.replicationAckManager.historyCache.GetOrCreateWorkflowExecution(
context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
Expand Down Expand Up @@ -378,7 +378,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityRetry() {
ScheduledEventID: scheduledEventID,
}

context, release, _ := s.replicationAckManager.historyCache.GetOrCreateWorkflowExecution(
context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
Expand Down Expand Up @@ -491,7 +491,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityRunning() {
ScheduledEventID: scheduledEventID,
}

context, release, _ := s.replicationAckManager.historyCache.GetOrCreateWorkflowExecution(
context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
Expand Down
7 changes: 5 additions & 2 deletions service/history/tasks/history_replication_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ type (
FirstEventID int64
NextEventID int64
Version int64
BranchToken []byte
NewRunBranchToken []byte
// deprecated
BranchToken []byte
// deprecated
NewRunBranchToken []byte
NewRunID string
}
)

Expand Down
14 changes: 1 addition & 13 deletions service/history/timerQueueTaskExecutorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package history

import (
"bytes"
"context"

commonpb "go.temporal.io/api/common/v1"
Expand All @@ -35,7 +34,6 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
Expand Down Expand Up @@ -133,17 +131,7 @@ func (t *timerQueueTaskExecutorBase) executeDeleteHistoryEventTask(
return err
}
if ok := VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), lastWriteVersion, task.Version, task); !ok {
currentBranchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}
// the mutable state has a newer version and the branch token is updated
// use task branch token to delete the original branch
if !bytes.Equal(task.BranchToken, currentBranchToken) {
return t.deleteHistoryBranch(ctx, task.BranchToken)
}
t.logger.Error("Different mutable state versions have the same branch token", tag.TaskVersion(task.Version), tag.LastEventVersion(lastWriteVersion))
return serviceerror.NewInternal("Mutable state has different version but same branch token")
return nil
}

return t.deleteManager.DeleteWorkflowExecutionByRetention(
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,11 +725,13 @@ func (c *ContextImpl) mergeContinueAsNewReplicationTasks(
delete(newWorkflowSnapshot.Tasks, tasks.CategoryReplication)

newRunBranchToken := newRunTask.BranchToken
newRunID := newRunTask.RunID
taskUpdated := false
for _, replicationTask := range currentWorkflowMutation.Tasks[tasks.CategoryReplication] {
if task, ok := replicationTask.(*tasks.HistoryReplicationTask); ok {
taskUpdated = true
task.NewRunBranchToken = newRunBranchToken
task.NewRunID = newRunID
}
}
if !taskUpdated {
Expand Down
4 changes: 0 additions & 4 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,6 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks(
FirstEventID: firstEvent.GetEventId(),
NextEventID: lastEvent.GetEventId() + 1,
Version: version,
BranchToken: branchToken,
NewRunBranchToken: nil,
})
return nil
}
Expand Down Expand Up @@ -589,8 +587,6 @@ func (r *TaskGeneratorImpl) GenerateMigrationTasks(
FirstEventID: executionInfo.LastFirstEventId,
NextEventID: lastItem.GetEventId() + 1,
Version: lastItem.GetVersion(),
BranchToken: versionHistory.BranchToken,
NewRunBranchToken: nil,
}, nil
}
}
Expand Down