diff --git a/service/history/api/replication/generate_task.go b/service/history/api/replication/generate_task.go new file mode 100644 index 00000000000..cecb2432174 --- /dev/null +++ b/service/history/api/replication/generate_task.go @@ -0,0 +1,86 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "context" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" +) + +func GenerateTask( + ctx context.Context, + request *historyservice.GenerateLastHistoryReplicationTasksRequest, + shard shard.Context, + workflowConsistencyChecker api.WorkflowConsistencyChecker, +) (_ *historyservice.GenerateLastHistoryReplicationTasksResponse, retError error) { + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(request.GetNamespaceId())) + if err != nil { + return nil, err + } + namespaceID := namespaceEntry.ID() + + wfContext, err := workflowConsistencyChecker.GetWorkflowContext( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + definition.NewWorkflowKey( + namespaceID.String(), + request.Execution.WorkflowId, + request.Execution.RunId, + ), + ) + if err != nil { + return nil, err + } + defer func() { wfContext.GetReleaseFn()(retError) }() + + now := shard.GetTimeSource().Now() + task, err := wfContext.GetMutableState().GenerateMigrationTasks(now) + if err != nil { + return nil, err + } + + err = shard.AddTasks(ctx, &persistence.AddHistoryTasksRequest{ + ShardID: shard.GetShardID(), + // RangeID is set by shard + NamespaceID: string(namespaceID), + WorkflowID: request.Execution.WorkflowId, + RunID: request.Execution.RunId, + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryReplication: {task}, + }, + }) + if err != nil { + return nil, err + } + return &historyservice.GenerateLastHistoryReplicationTasksResponse{}, nil +} diff --git a/service/history/api/replication/get_dlq_tasks.go b/service/history/api/replication/get_dlq_tasks.go new file mode 100644 index 00000000000..4873a3deb1a --- /dev/null +++ b/service/history/api/replication/get_dlq_tasks.go @@ -0,0 +1,52 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "context" + + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/service/history/replication" + "go.temporal.io/server/service/history/shard" +) + +func GetDLQTasks( + ctx context.Context, + shard shard.Context, + replicationAckMgr replication.AckManager, + taskInfos []*replicationspb.ReplicationTaskInfo, +) ([]*replicationspb.ReplicationTask, error) { + tasks := make([]*replicationspb.ReplicationTask, 0, len(taskInfos)) + for _, taskInfo := range taskInfos { + task, err := replicationAckMgr.GetTask(ctx, taskInfo) + if err != nil { + shard.GetLogger().Error("Failed to fetch DLQ replication messages.", tag.Error(err)) + return nil, err + } + tasks = append(tasks, task) + } + return tasks, nil +} diff --git a/service/history/api/replication/get_tasks.go b/service/history/api/replication/get_tasks.go new file mode 100644 index 00000000000..dc467d2116b --- /dev/null +++ b/service/history/api/replication/get_tasks.go @@ -0,0 +1,72 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "context" + "time" + + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/service/history/replication" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" +) + +func GetTasks( + ctx context.Context, + shard shard.Context, + replicationAckMgr replication.AckManager, + pollingCluster string, + ackMessageID int64, + ackTimestamp time.Time, + queryMessageID int64, +) (*replicationspb.ReplicationMessages, error) { + + if ackMessageID != persistence.EmptyQueueMessageID { + if err := shard.UpdateQueueClusterAckLevel( + tasks.CategoryReplication, + pollingCluster, + tasks.NewImmediateKey(ackMessageID), + ); err != nil { + shard.GetLogger().Error("error updating replication level for shard", tag.Error(err), tag.OperationFailed) + } + shard.UpdateRemoteClusterInfo(pollingCluster, ackMessageID, ackTimestamp) + } + + replicationMessages, err := replicationAckMgr.GetTasks( + ctx, + pollingCluster, + queryMessageID, + ) + if err != nil { + shard.GetLogger().Error("Failed to retrieve replication messages.", tag.Error(err)) + return nil, err + } + + shard.GetLogger().Debug("Successfully fetched replication messages.", tag.Counter(len(replicationMessages.ReplicationTasks))) + return replicationMessages, nil +} diff --git a/service/history/api/replication/status.go b/service/history/api/replication/status.go new file mode 100644 index 00000000000..f41b27f7bee --- /dev/null +++ b/service/history/api/replication/status.go @@ -0,0 +1,58 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replication + +import ( + "context" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/service/history/replication" + "go.temporal.io/server/service/history/shard" +) + +func GetStatus( + ctx context.Context, + request *historyservice.GetReplicationStatusRequest, + shard shard.Context, + replicationAckMgr replication.AckManager, +) (_ *historyservice.ShardReplicationStatus, retError error) { + resp := &historyservice.ShardReplicationStatus{ + ShardId: shard.GetShardID(), + ShardLocalTime: timestamp.TimePtr(shard.GetTimeSource().Now()), + } + + maxReplicationTaskId, maxTaskVisibilityTimeStamp := replicationAckMgr.GetMaxTaskInfo() + resp.MaxReplicationTaskId = maxReplicationTaskId + resp.MaxReplicationTaskVisibilityTime = timestamp.TimePtr(maxTaskVisibilityTimeStamp) + + remoteClusters, handoverNamespaces, err := shard.GetReplicationStatus(request.RemoteClusters) + if err != nil { + return nil, err + } + resp.RemoteClusters = remoteClusters + resp.HandoverNamespaces = handoverNamespaces + return resp, nil +} diff --git a/service/history/api/replicationadmin/get_dlq.go b/service/history/api/replicationadmin/get_dlq.go new file mode 100644 index 00000000000..809360e20f9 --- /dev/null +++ b/service/history/api/replicationadmin/get_dlq.go @@ -0,0 +1,62 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replicationadmin + +import ( + "context" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/service/history/consts" + "go.temporal.io/server/service/history/replication" + "go.temporal.io/server/service/history/shard" +) + +func GetDLQ( + ctx context.Context, + request *historyservice.GetDLQMessagesRequest, + shard shard.Context, + replicationDLQHandler replication.DLQHandler, +) (*historyservice.GetDLQMessagesResponse, error) { + _, ok := shard.GetClusterMetadata().GetAllClusterInfo()[request.GetSourceCluster()] + if !ok { + return nil, consts.ErrUnknownCluster + } + + tasks, token, err := replicationDLQHandler.GetMessages( + ctx, + request.GetSourceCluster(), + request.GetInclusiveEndMessageId(), + int(request.GetMaximumPageSize()), + request.GetNextPageToken(), + ) + if err != nil { + return nil, err + } + return &historyservice.GetDLQMessagesResponse{ + Type: request.GetType(), + ReplicationTasks: tasks, + NextPageToken: token, + }, nil +} diff --git a/service/history/api/replicationadmin/merge_dlq.go b/service/history/api/replicationadmin/merge_dlq.go new file mode 100644 index 00000000000..4356075e58a --- /dev/null +++ b/service/history/api/replicationadmin/merge_dlq.go @@ -0,0 +1,60 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replicationadmin + +import ( + "context" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/service/history/consts" + "go.temporal.io/server/service/history/replication" + "go.temporal.io/server/service/history/shard" +) + +func MergeDLQ( + ctx context.Context, + request *historyservice.MergeDLQMessagesRequest, + shard shard.Context, + replicationDLQHandler replication.DLQHandler, +) (*historyservice.MergeDLQMessagesResponse, error) { + _, ok := shard.GetClusterMetadata().GetAllClusterInfo()[request.GetSourceCluster()] + if !ok { + return nil, consts.ErrUnknownCluster + } + + token, err := replicationDLQHandler.MergeMessages( + ctx, + request.GetSourceCluster(), + request.GetInclusiveEndMessageId(), + int(request.GetMaximumPageSize()), + request.GetNextPageToken(), + ) + if err != nil { + return nil, err + } + return &historyservice.MergeDLQMessagesResponse{ + NextPageToken: token, + }, nil +} diff --git a/service/history/api/replicationadmin/purge_dlq.go b/service/history/api/replicationadmin/purge_dlq.go new file mode 100644 index 00000000000..7f904263256 --- /dev/null +++ b/service/history/api/replicationadmin/purge_dlq.go @@ -0,0 +1,55 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replicationadmin + +import ( + "context" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/service/history/consts" + "go.temporal.io/server/service/history/replication" + "go.temporal.io/server/service/history/shard" +) + +func PurgeDLQ( + ctx context.Context, + request *historyservice.PurgeDLQMessagesRequest, + shard shard.Context, + replicationDLQHandler replication.DLQHandler, +) (*historyservice.PurgeDLQMessagesResponse, error) { + _, ok := shard.GetClusterMetadata().GetAllClusterInfo()[request.GetSourceCluster()] + if !ok { + return nil, consts.ErrUnknownCluster + } + + if err := replicationDLQHandler.PurgeMessages( + ctx, + request.GetSourceCluster(), + request.GetInclusiveEndMessageId(), + ); err != nil { + return nil, err + } + return &historyservice.PurgeDLQMessagesResponse{}, nil +} diff --git a/service/history/handler.go b/service/history/handler.go index a22b977f99f..36195298c27 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -1628,12 +1628,11 @@ func (h *Handler) PurgeDLQMessages(ctx context.Context, request *historyservice. return nil, h.convertError(err) } - err = engine.PurgeDLQMessages(ctx, request) + resp, err := engine.PurgeDLQMessages(ctx, request) if err != nil { - err = h.convertError(err) - return nil, err + return nil, h.convertError(err) } - return &historyservice.PurgeDLQMessagesResponse{}, nil + return resp, nil } func (h *Handler) MergeDLQMessages(ctx context.Context, request *historyservice.MergeDLQMessagesRequest) (_ *historyservice.MergeDLQMessagesResponse, retError error) { diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 173a9391d27..3dab1b2e22b 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -62,6 +62,8 @@ import ( "go.temporal.io/server/service/history/api/recordactivitytaskheartbeat" "go.temporal.io/server/service/history/api/recordactivitytaskstarted" "go.temporal.io/server/service/history/api/recordchildworkflowcompleted" + replicationapi "go.temporal.io/server/service/history/api/replication" + "go.temporal.io/server/service/history/api/replicationadmin" "go.temporal.io/server/service/history/api/requestcancelworkflow" "go.temporal.io/server/service/history/api/resetstickytaskqueue" respondactivitytaskcandeled "go.temporal.io/server/service/history/api/respondactivitytaskcanceled" @@ -1205,48 +1207,14 @@ func (e *historyEngineImpl) GetReplicationMessages( ackTimestamp time.Time, queryMessageID int64, ) (*replicationspb.ReplicationMessages, error) { - - if ackMessageID != persistence.EmptyQueueMessageID { - if err := e.shard.UpdateQueueClusterAckLevel( - tasks.CategoryReplication, - pollingCluster, - tasks.NewImmediateKey(ackMessageID), - ); err != nil { - e.logger.Error("error updating replication level for shard", tag.Error(err), tag.OperationFailed) - } - e.shard.UpdateRemoteClusterInfo(pollingCluster, ackMessageID, ackTimestamp) - } - - replicationMessages, err := e.replicationAckMgr.GetTasks( - ctx, - pollingCluster, - queryMessageID, - ) - if err != nil { - e.logger.Error("Failed to retrieve replication messages.", tag.Error(err)) - return nil, err - } - e.logger.Debug("Successfully fetched replication messages.", tag.Counter(len(replicationMessages.ReplicationTasks))) - - return replicationMessages, nil + return replicationapi.GetTasks(ctx, e.shard, e.replicationAckMgr, pollingCluster, ackMessageID, ackTimestamp, queryMessageID) } func (e *historyEngineImpl) GetDLQReplicationMessages( ctx context.Context, taskInfos []*replicationspb.ReplicationTaskInfo, ) ([]*replicationspb.ReplicationTask, error) { - - tasks := make([]*replicationspb.ReplicationTask, 0, len(taskInfos)) - for _, taskInfo := range taskInfos { - task, err := e.replicationAckMgr.GetTask(ctx, taskInfo) - if err != nil { - e.logger.Error("Failed to fetch DLQ replication messages.", tag.Error(err)) - return nil, err - } - tasks = append(tasks, task) - } - - return tasks, nil + return replicationapi.GetDLQTasks(ctx, e.shard, e.replicationAckMgr, taskInfos) } func (e *historyEngineImpl) ReapplyEvents( @@ -1431,69 +1399,21 @@ func (e *historyEngineImpl) GetDLQMessages( ctx context.Context, request *historyservice.GetDLQMessagesRequest, ) (*historyservice.GetDLQMessagesResponse, error) { - - _, ok := e.clusterMetadata.GetAllClusterInfo()[request.GetSourceCluster()] - if !ok { - return nil, consts.ErrUnknownCluster - } - - tasks, token, err := e.replicationDLQHandler.GetMessages( - ctx, - request.GetSourceCluster(), - request.GetInclusiveEndMessageId(), - int(request.GetMaximumPageSize()), - request.GetNextPageToken(), - ) - if err != nil { - return nil, err - } - return &historyservice.GetDLQMessagesResponse{ - Type: request.GetType(), - ReplicationTasks: tasks, - NextPageToken: token, - }, nil + return replicationadmin.GetDLQ(ctx, request, e.shard, e.replicationDLQHandler) } func (e *historyEngineImpl) PurgeDLQMessages( ctx context.Context, request *historyservice.PurgeDLQMessagesRequest, -) error { - - _, ok := e.clusterMetadata.GetAllClusterInfo()[request.GetSourceCluster()] - if !ok { - return consts.ErrUnknownCluster - } - - return e.replicationDLQHandler.PurgeMessages( - ctx, - request.GetSourceCluster(), - request.GetInclusiveEndMessageId(), - ) +) (*historyservice.PurgeDLQMessagesResponse, error) { + return replicationadmin.PurgeDLQ(ctx, request, e.shard, e.replicationDLQHandler) } func (e *historyEngineImpl) MergeDLQMessages( ctx context.Context, request *historyservice.MergeDLQMessagesRequest, ) (*historyservice.MergeDLQMessagesResponse, error) { - - _, ok := e.clusterMetadata.GetAllClusterInfo()[request.GetSourceCluster()] - if !ok { - return nil, consts.ErrUnknownCluster - } - - token, err := e.replicationDLQHandler.MergeMessages( - ctx, - request.GetSourceCluster(), - request.GetInclusiveEndMessageId(), - int(request.GetMaximumPageSize()), - request.GetNextPageToken(), - ) - if err != nil { - return nil, err - } - return &historyservice.MergeDLQMessagesResponse{ - NextPageToken: token, - }, nil + return replicationadmin.MergeDLQ(ctx, request, e.shard, e.replicationDLQHandler) } func (e *historyEngineImpl) RebuildMutableState( @@ -1567,70 +1487,14 @@ func (e *historyEngineImpl) GenerateLastHistoryReplicationTasks( ctx context.Context, request *historyservice.GenerateLastHistoryReplicationTasksRequest, ) (_ *historyservice.GenerateLastHistoryReplicationTasksResponse, retError error) { - namespaceEntry, err := e.getActiveNamespaceEntry(namespace.ID(request.GetNamespaceId())) - if err != nil { - return nil, err - } - namespaceID := namespaceEntry.ID() - - wfContext, err := e.workflowConsistencyChecker.GetWorkflowContext( - ctx, - nil, - api.BypassMutableStateConsistencyPredicate, - definition.NewWorkflowKey( - namespaceID.String(), - request.Execution.WorkflowId, - request.Execution.RunId, - ), - ) - if err != nil { - return nil, err - } - defer func() { wfContext.GetReleaseFn()(retError) }() - - now := e.shard.GetTimeSource().Now() - task, err := wfContext.GetMutableState().GenerateMigrationTasks(now) - if err != nil { - return nil, err - } - - err = e.shard.AddTasks(ctx, &persistence.AddHistoryTasksRequest{ - ShardID: e.shard.GetShardID(), - // RangeID is set by shard - NamespaceID: string(namespaceID), - WorkflowID: request.Execution.WorkflowId, - RunID: request.Execution.RunId, - Tasks: map[tasks.Category][]tasks.Task{ - tasks.CategoryReplication: {task}, - }, - }) - if err != nil { - return nil, err - } - return &historyservice.GenerateLastHistoryReplicationTasksResponse{}, nil + return replicationapi.GenerateTask(ctx, request, e.shard, e.workflowConsistencyChecker) } func (e *historyEngineImpl) GetReplicationStatus( ctx context.Context, request *historyservice.GetReplicationStatusRequest, ) (_ *historyservice.ShardReplicationStatus, retError error) { - - resp := &historyservice.ShardReplicationStatus{ - ShardId: e.shard.GetShardID(), - ShardLocalTime: timestamp.TimePtr(e.shard.GetTimeSource().Now()), - } - - maxReplicationTaskId, maxTaskVisibilityTimeStamp := e.replicationAckMgr.GetMaxTaskInfo() - resp.MaxReplicationTaskId = maxReplicationTaskId - resp.MaxReplicationTaskVisibilityTime = timestamp.TimePtr(maxTaskVisibilityTimeStamp) - - remoteClusters, handoverNamespaces, err := e.shard.GetReplicationStatus(request.RemoteClusters) - if err != nil { - return nil, err - } - resp.RemoteClusters = remoteClusters - resp.HandoverNamespaces = handoverNamespaces - return resp, nil + return replicationapi.GetStatus(ctx, request, e.shard, e.replicationAckMgr) } func (e *historyEngineImpl) containsHistoryEvent( diff --git a/service/history/shard/engine.go b/service/history/shard/engine.go index cab4671dbeb..54fe8bd20ed 100644 --- a/service/history/shard/engine.go +++ b/service/history/shard/engine.go @@ -80,7 +80,7 @@ type ( QueryWorkflow(ctx context.Context, request *historyservice.QueryWorkflowRequest) (*historyservice.QueryWorkflowResponse, error) ReapplyEvents(ctx context.Context, namespaceUUID namespace.ID, workflowID string, runID string, events []*historypb.HistoryEvent) error GetDLQMessages(ctx context.Context, messagesRequest *historyservice.GetDLQMessagesRequest) (*historyservice.GetDLQMessagesResponse, error) - PurgeDLQMessages(ctx context.Context, messagesRequest *historyservice.PurgeDLQMessagesRequest) error + PurgeDLQMessages(ctx context.Context, messagesRequest *historyservice.PurgeDLQMessagesRequest) (*historyservice.PurgeDLQMessagesResponse, error) MergeDLQMessages(ctx context.Context, messagesRequest *historyservice.MergeDLQMessagesRequest) (*historyservice.MergeDLQMessagesResponse, error) RebuildMutableState(ctx context.Context, namespaceUUID namespace.ID, execution commonpb.WorkflowExecution) error RefreshWorkflowTasks(ctx context.Context, namespaceUUID namespace.ID, execution commonpb.WorkflowExecution) error diff --git a/service/history/shard/engine_mock.go b/service/history/shard/engine_mock.go index 47551c3c66f..6d9be178faa 100644 --- a/service/history/shard/engine_mock.go +++ b/service/history/shard/engine_mock.go @@ -255,11 +255,12 @@ func (mr *MockEngineMockRecorder) PollMutableState(ctx, request interface{}) *go } // PurgeDLQMessages mocks base method. -func (m *MockEngine) PurgeDLQMessages(ctx context.Context, messagesRequest *historyservice.PurgeDLQMessagesRequest) error { +func (m *MockEngine) PurgeDLQMessages(ctx context.Context, messagesRequest *historyservice.PurgeDLQMessagesRequest) (*historyservice.PurgeDLQMessagesResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PurgeDLQMessages", ctx, messagesRequest) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(*historyservice.PurgeDLQMessagesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 } // PurgeDLQMessages indicates an expected call of PurgeDLQMessages.