Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Split sync workflow state logic to dedicated file #4784

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions service/history/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ type (
executionManager persistence.ExecutionManager
queueProcessors map[tasks.Category]queues.Queue
replicationAckMgr replication.AckManager
nDCReplicator ndc.HistoryReplicator
nDCActivityReplicator ndc.ActivityReplicator
nDCHistoryReplicator ndc.HistoryReplicator
nDCActivityStateReplicator ndc.ActivityStateReplicator
nDCWorkflowStateReplicator ndc.WorkflowStateReplicator
replicationProcessorMgr replication.TaskProcessor
eventNotifier events.Notifier
tokenSerializer common.TaskTokenSerializer
Expand Down Expand Up @@ -207,18 +208,25 @@ func NewEngineWithShardContext(
executionManager,
logger,
)
historyEngImpl.nDCReplicator = ndc.NewHistoryReplicator(
historyEngImpl.nDCHistoryReplicator = ndc.NewHistoryReplicator(
shard,
workflowCache,
historyEngImpl.eventsReapplier,
logger,
eventSerializer,
logger,
)
historyEngImpl.nDCActivityReplicator = ndc.NewActivityReplicator(
historyEngImpl.nDCActivityStateReplicator = ndc.NewActivityStateReplicator(
shard,
workflowCache,
logger,
)
historyEngImpl.nDCWorkflowStateReplicator = ndc.NewWorkflowStateReplicator(
shard,
workflowCache,
historyEngImpl.eventsReapplier,
eventSerializer,
logger,
)
}
historyEngImpl.workflowRebuilder = NewWorkflowRebuilder(
shard,
Expand Down Expand Up @@ -613,17 +621,22 @@ func (e *historyEngineImpl) ReplicateEventsV2(
ctx context.Context,
replicateRequest *historyservice.ReplicateEventsV2Request,
) error {
return e.nDCHistoryReplicator.ApplyEvents(ctx, replicateRequest)
}

return e.nDCReplicator.ApplyEvents(ctx, replicateRequest)
func (e *historyEngineImpl) SyncActivity(
ctx context.Context,
request *historyservice.SyncActivityRequest,
) (retError error) {
return e.nDCActivityStateReplicator.SyncActivityState(ctx, request)
}

// ReplicateWorkflowState is an experimental method to replicate workflow state. This should not expose outside of history service role.
func (e *historyEngineImpl) ReplicateWorkflowState(
ctx context.Context,
request *historyservice.ReplicateWorkflowStateRequest,
) error {

return e.nDCReplicator.ApplyWorkflowState(ctx, request)
return e.nDCWorkflowStateReplicator.SyncWorkflowState(ctx, request)
}

func (e *historyEngineImpl) SyncShardStatus(
Expand All @@ -645,14 +658,6 @@ func (e *historyEngineImpl) SyncShardStatus(
return nil
}

func (e *historyEngineImpl) SyncActivity(
ctx context.Context,
request *historyservice.SyncActivityRequest,
) (retError error) {

return e.nDCActivityReplicator.SyncActivity(ctx, request)
}

// ResetWorkflowExecution terminates current workflow (if running) and replay & create new workflow
// Consistency guarantee: always write
func (e *historyEngineImpl) ResetWorkflowExecution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination activity_replicator_mock.go
//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination activity_state_replicator_mock.go

package ndc

Expand Down Expand Up @@ -59,34 +59,34 @@ const (
)

type (
ActivityReplicator interface {
SyncActivity(
ActivityStateReplicator interface {
SyncActivityState(
ctx context.Context,
request *historyservice.SyncActivityRequest,
) error
}

ActivityReplicatorImpl struct {
ActivityStateReplicatorImpl struct {
workflowCache wcache.Cache
clusterMetadata cluster.Metadata
logger log.Logger
}
)

func NewActivityReplicator(
func NewActivityStateReplicator(
shard shard.Context,
workflowCache wcache.Cache,
logger log.Logger,
) *ActivityReplicatorImpl {
) *ActivityStateReplicatorImpl {

return &ActivityReplicatorImpl{
return &ActivityStateReplicatorImpl{
workflowCache: workflowCache,
clusterMetadata: shard.GetClusterMetadata(),
logger: log.With(logger, tag.ComponentHistoryReplicator),
}
}

func (r *ActivityReplicatorImpl) SyncActivity(
func (r *ActivityStateReplicatorImpl) SyncActivityState(
ctx context.Context,
request *historyservice.SyncActivityRequest,
) (retError error) {
Expand Down Expand Up @@ -213,7 +213,7 @@ func (r *ActivityReplicatorImpl) SyncActivity(
)
}

func (r *ActivityReplicatorImpl) testRefreshActivityTimerTaskMask(
func (r *ActivityStateReplicatorImpl) testRefreshActivityTimerTaskMask(
version int64,
attempt int32,
activityInfo *persistencespb.ActivityInfo,
Expand All @@ -231,7 +231,7 @@ func (r *ActivityReplicatorImpl) testRefreshActivityTimerTaskMask(
return false
}

func (r *ActivityReplicatorImpl) testActivity(
func (r *ActivityStateReplicatorImpl) testActivity(
version int64,
attempt int32,
lastHeartbeatTime time.Time,
Expand Down Expand Up @@ -271,7 +271,7 @@ func (r *ActivityReplicatorImpl) testActivity(
return true
}

func (r *ActivityReplicatorImpl) testVersionHistory(
func (r *ActivityStateReplicatorImpl) testVersionHistory(
namespaceID namespace.ID,
workflowID string,
runID string,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading