diff --git a/service/history/api/reapplyevents/api.go b/service/history/api/reapplyevents/api.go new file mode 100644 index 00000000000..50671b8bfa3 --- /dev/null +++ b/service/history/api/reapplyevents/api.go @@ -0,0 +1,241 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package reapplyevents + +import ( + "context" + + "github.com/google/uuid" + enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" + "go.temporal.io/api/serviceerror" + + historyspb "go.temporal.io/server/api/history/v1" + "go.temporal.io/server/common" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence/versionhistory" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/ndc" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/workflow" +) + +func Invoke( + ctx context.Context, + namespaceUUID namespace.ID, + workflowID string, + runID string, + reapplyEvents []*historypb.HistoryEvent, + shard shard.Context, + workflowConsistencyChecker api.WorkflowConsistencyChecker, + workflowResetter ndc.WorkflowResetter, + eventsReapplier ndc.EventsReapplier, +) error { + if shard.GetConfig().SkipReapplicationByNamespaceID(namespaceUUID.String()) { + return nil + } + + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(namespaceUUID.String())) + if err != nil { + return err + } + namespaceID := namespaceEntry.ID() + + return api.GetAndUpdateWorkflowWithNew( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + definition.NewWorkflowKey( + namespaceID.String(), + workflowID, + "", + ), + func(workflowContext api.WorkflowContext) (action *api.UpdateWorkflowAction, retErr error) { + context := workflowContext.GetContext() + mutableState := workflowContext.GetMutableState() + // Filter out reapply event from the same cluster + toReapplyEvents := make([]*historypb.HistoryEvent, 0, len(reapplyEvents)) + lastWriteVersion, err := mutableState.GetLastWriteVersion() + if err != nil { + return nil, err + } + sourceMutableState := mutableState + if sourceMutableState.GetWorkflowKey().RunID != runID { + originCtx, err := workflowConsistencyChecker.GetWorkflowContext( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + definition.NewWorkflowKey(namespaceID.String(), workflowID, runID), + ) + if err != nil { + return nil, err + } + defer func() { originCtx.GetReleaseFn()(retErr) }() + sourceMutableState = originCtx.GetMutableState() + } + + for _, event := range reapplyEvents { + if event.GetVersion() == lastWriteVersion { + // The reapply is from the same cluster. Ignoring. + continue + } + dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion()) + if mutableState.IsResourceDuplicated(dedupResource) { + // already apply the signal + continue + } + versionHistories := sourceMutableState.GetExecutionInfo().GetVersionHistories() + if containsHistoryEvent(versionHistories, event.GetEventId(), event.GetVersion()) { + continue + } + + toReapplyEvents = append(toReapplyEvents, event) + } + if len(toReapplyEvents) == 0 { + return &api.UpdateWorkflowAction{ + Noop: true, + CreateWorkflowTask: false, + }, nil + } + + if !mutableState.IsWorkflowExecutionRunning() { + // need to reset target workflow (which is also the current workflow) + // to accept events to be reapplied + baseRunID := mutableState.GetExecutionState().GetRunId() + resetRunID := uuid.New() + baseRebuildLastEventID := mutableState.GetPreviousStartedEventID() + + // TODO when https://github.com/uber/cadence/issues/2420 is finished, remove this block, + // since cannot reapply event to a finished workflow which had no workflow tasks started + if baseRebuildLastEventID == common.EmptyEventID { + shard.GetLogger().Warn("cannot reapply event to a finished workflow with no workflow task", + tag.WorkflowNamespaceID(namespaceID.String()), + tag.WorkflowID(workflowID), + ) + shard.GetMetricsClient().IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount) + return &api.UpdateWorkflowAction{ + Noop: true, + CreateWorkflowTask: false, + }, nil + } + + baseVersionHistories := mutableState.GetExecutionInfo().GetVersionHistories() + baseCurrentVersionHistory, err := versionhistory.GetCurrentVersionHistory(baseVersionHistories) + if err != nil { + return nil, err + } + baseRebuildLastEventVersion, err := versionhistory.GetVersionHistoryEventVersion(baseCurrentVersionHistory, baseRebuildLastEventID) + if err != nil { + return nil, err + } + baseCurrentBranchToken := baseCurrentVersionHistory.GetBranchToken() + baseNextEventID := mutableState.GetNextEventID() + + err = workflowResetter.ResetWorkflow( + ctx, + namespaceID, + workflowID, + baseRunID, + baseCurrentBranchToken, + baseRebuildLastEventID, + baseRebuildLastEventVersion, + baseNextEventID, + resetRunID.String(), + uuid.New().String(), + ndc.NewWorkflow( + ctx, + shard.GetNamespaceRegistry(), + shard.GetClusterMetadata(), + context, + mutableState, + workflow.NoopReleaseFn, + ), + ndc.EventsReapplicationResetWorkflowReason, + toReapplyEvents, + enumspb.RESET_REAPPLY_TYPE_SIGNAL, + ) + switch err.(type) { + case *serviceerror.InvalidArgument: + // no-op. Usually this is due to reset workflow with pending child workflows + shard.GetLogger().Warn("Cannot reset workflow. Ignoring reapply events.", tag.Error(err)) + case nil: + // no-op + default: + return nil, err + } + return &api.UpdateWorkflowAction{ + Noop: true, + CreateWorkflowTask: false, + }, nil + } + + postActions := &api.UpdateWorkflowAction{ + Noop: false, + CreateWorkflowTask: true, + } + if mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() { + // Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet + postActions.CreateWorkflowTask = false + } + reappliedEvents, err := eventsReapplier.ReapplyEvents( + ctx, + mutableState, + toReapplyEvents, + runID, + ) + if err != nil { + shard.GetLogger().Error("failed to re-apply stale events", tag.Error(err)) + return nil, err + } + if len(reappliedEvents) == 0 { + return &api.UpdateWorkflowAction{ + Noop: true, + CreateWorkflowTask: false, + }, nil + } + return postActions, nil + }, + nil, + shard, + workflowConsistencyChecker, + ) +} + +func containsHistoryEvent( + versionHistories *historyspb.VersionHistories, + reappliedEventID int64, + reappliedEventVersion int64, +) bool { + // Check if the source workflow contains the reapply event. + // If it does, it means the event is received in this cluster, no need to reapply. + _, err := versionhistory.FindFirstVersionHistoryIndexByVersionHistoryItem( + versionHistories, + versionhistory.NewVersionHistoryItem(reappliedEventID, reappliedEventVersion), + ) + return err == nil +} diff --git a/service/history/api/replication_util.go b/service/history/api/replication_util.go new file mode 100644 index 00000000000..ccca50628e0 --- /dev/null +++ b/service/history/api/replication_util.go @@ -0,0 +1,40 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package api + +import ( + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/common/cluster" +) + +func ValidateReplicationConfig( + clusterMetadata cluster.Metadata, +) error { + if !clusterMetadata.IsGlobalNamespaceEnabled() { + return serviceerror.NewUnavailable("The cluster has global namespace disabled. The operation is not supported.") + } + return nil +} diff --git a/service/history/handler.go b/service/history/handler.go index 36195298c27..2779a23f727 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -63,6 +63,7 @@ import ( "go.temporal.io/server/common/resource" "go.temporal.io/server/common/searchattribute" serviceerrors "go.temporal.io/server/common/serviceerror" + "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/replication" @@ -1295,7 +1296,7 @@ func (h *Handler) ReplicateEventsV2(ctx context.Context, request *historyservice return nil, errShuttingDown } - if err := h.validateReplicationConfig(); err != nil { + if err := api.ValidateReplicationConfig(h.clusterMetadata); err != nil { return nil, err } @@ -1370,7 +1371,7 @@ func (h *Handler) SyncActivity(ctx context.Context, request *historyservice.Sync return nil, errShuttingDown } - if err := h.validateReplicationConfig(); err != nil { + if err := api.ValidateReplicationConfig(h.clusterMetadata); err != nil { return nil, err } @@ -1413,7 +1414,7 @@ func (h *Handler) GetReplicationMessages(ctx context.Context, request *historyse if h.isStopped() { return nil, errShuttingDown } - if err := h.validateReplicationConfig(); err != nil { + if err := api.ValidateReplicationConfig(h.clusterMetadata); err != nil { return nil, err } @@ -1474,7 +1475,7 @@ func (h *Handler) GetDLQReplicationMessages(ctx context.Context, request *histor if h.isStopped() { return nil, errShuttingDown } - if err := h.validateReplicationConfig(); err != nil { + if err := api.ValidateReplicationConfig(h.clusterMetadata); err != nil { return nil, err } @@ -1740,7 +1741,7 @@ func (h *Handler) GetReplicationStatus( if h.isStopped() { return nil, errShuttingDown } - if err := h.validateReplicationConfig(); err != nil { + if err := api.ValidateReplicationConfig(h.clusterMetadata); err != nil { return nil, err } @@ -1862,13 +1863,6 @@ func (h *Handler) convertError(err error) error { return err } -func (h *Handler) validateReplicationConfig() error { - if !h.clusterMetadata.IsGlobalNamespaceEnabled() { - return serviceerror.NewUnavailable("The cluster has global namespace disabled. The operation is not supported.") - } - return nil -} - func validateTaskToken(taskToken *tokenspb.Task) error { if taskToken.GetWorkflowId() == "" { return errWorkflowIDNotSet diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 0a4fe6d72bc..0fd5dde0637 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -29,7 +29,6 @@ import ( "sync/atomic" "time" - "github.com/pborman/uuid" "go.opentelemetry.io/otel/trace" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -38,7 +37,6 @@ import ( "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" - historyspb "go.temporal.io/server/api/history/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" replicationspb "go.temporal.io/server/api/replication/v1" @@ -53,12 +51,12 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" - "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/api/describeworkflow" + "go.temporal.io/server/service/history/api/reapplyevents" "go.temporal.io/server/service/history/api/recordactivitytaskheartbeat" "go.temporal.io/server/service/history/api/recordactivitytaskstarted" "go.temporal.io/server/service/history/api/recordchildworkflowcompleted" @@ -1114,175 +1112,7 @@ func (e *historyEngineImpl) ReapplyEvents( runID string, reapplyEvents []*historypb.HistoryEvent, ) error { - - if e.config.SkipReapplicationByNamespaceID(namespaceUUID.String()) { - return nil - } - - namespaceEntry, err := e.getActiveNamespaceEntry(namespaceUUID) - if err != nil { - return err - } - namespaceID := namespaceEntry.ID() - - return api.GetAndUpdateWorkflowWithNew( - ctx, - nil, - api.BypassMutableStateConsistencyPredicate, - definition.NewWorkflowKey( - namespaceID.String(), - workflowID, - "", - ), - func(workflowContext api.WorkflowContext) (action *api.UpdateWorkflowAction, retErr error) { - context := workflowContext.GetContext() - mutableState := workflowContext.GetMutableState() - // Filter out reapply event from the same cluster - toReapplyEvents := make([]*historypb.HistoryEvent, 0, len(reapplyEvents)) - lastWriteVersion, err := mutableState.GetLastWriteVersion() - if err != nil { - return nil, err - } - sourceMutableState := mutableState - if sourceMutableState.GetWorkflowKey().RunID != runID { - originCtx, err := e.workflowConsistencyChecker.GetWorkflowContext( - ctx, - nil, - api.BypassMutableStateConsistencyPredicate, - definition.NewWorkflowKey(namespaceID.String(), workflowID, runID), - ) - if err != nil { - return nil, err - } - defer func() { originCtx.GetReleaseFn()(retErr) }() - sourceMutableState = originCtx.GetMutableState() - } - - for _, event := range reapplyEvents { - if event.GetVersion() == lastWriteVersion { - // The reapply is from the same cluster. Ignoring. - continue - } - dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion()) - if mutableState.IsResourceDuplicated(dedupResource) { - // already apply the signal - continue - } - versionHistories := sourceMutableState.GetExecutionInfo().GetVersionHistories() - if e.containsHistoryEvent(versionHistories, event.GetEventId(), event.GetVersion()) { - continue - } - - toReapplyEvents = append(toReapplyEvents, event) - } - if len(toReapplyEvents) == 0 { - return &api.UpdateWorkflowAction{ - Noop: true, - CreateWorkflowTask: false, - }, nil - } - - if !mutableState.IsWorkflowExecutionRunning() { - // need to reset target workflow (which is also the current workflow) - // to accept events to be reapplied - baseRunID := mutableState.GetExecutionState().GetRunId() - resetRunID := uuid.New() - baseRebuildLastEventID := mutableState.GetPreviousStartedEventID() - - // TODO when https://github.com/uber/cadence/issues/2420 is finished, remove this block, - // since cannot reapply event to a finished workflow which had no workflow tasks started - if baseRebuildLastEventID == common.EmptyEventID { - e.logger.Warn("cannot reapply event to a finished workflow with no workflow task", - tag.WorkflowNamespaceID(namespaceID.String()), - tag.WorkflowID(workflowID), - ) - e.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount) - return &api.UpdateWorkflowAction{ - Noop: true, - CreateWorkflowTask: false, - }, nil - } - - baseVersionHistories := mutableState.GetExecutionInfo().GetVersionHistories() - baseCurrentVersionHistory, err := versionhistory.GetCurrentVersionHistory(baseVersionHistories) - if err != nil { - return nil, err - } - baseRebuildLastEventVersion, err := versionhistory.GetVersionHistoryEventVersion(baseCurrentVersionHistory, baseRebuildLastEventID) - if err != nil { - return nil, err - } - baseCurrentBranchToken := baseCurrentVersionHistory.GetBranchToken() - baseNextEventID := mutableState.GetNextEventID() - - err = e.workflowResetter.ResetWorkflow( - ctx, - namespaceID, - workflowID, - baseRunID, - baseCurrentBranchToken, - baseRebuildLastEventID, - baseRebuildLastEventVersion, - baseNextEventID, - resetRunID, - uuid.New(), - ndc.NewWorkflow( - ctx, - e.shard.GetNamespaceRegistry(), - e.shard.GetClusterMetadata(), - context, - mutableState, - workflow.NoopReleaseFn, - ), - ndc.EventsReapplicationResetWorkflowReason, - toReapplyEvents, - enumspb.RESET_REAPPLY_TYPE_SIGNAL, - ) - switch err.(type) { - case *serviceerror.InvalidArgument: - // no-op. Usually this is due to reset workflow with pending child workflows - e.logger.Warn("Cannot reset workflow. Ignoring reapply events.", tag.Error(err)) - case nil: - // no-op - default: - return nil, err - } - return &api.UpdateWorkflowAction{ - Noop: true, - CreateWorkflowTask: false, - }, nil - } - - postActions := &api.UpdateWorkflowAction{ - Noop: false, - CreateWorkflowTask: true, - } - if mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() { - // Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet - postActions.CreateWorkflowTask = false - } - reappliedEvents, err := e.eventsReapplier.ReapplyEvents( - ctx, - mutableState, - toReapplyEvents, - runID, - ) - if err != nil { - e.logger.Error("failed to re-apply stale events", tag.Error(err)) - return nil, err - } - if len(reappliedEvents) == 0 { - return &api.UpdateWorkflowAction{ - Noop: true, - CreateWorkflowTask: false, - }, nil - } - return postActions, nil - }, - nil, - e.shard, - e.workflowConsistencyChecker, - ) + return reapplyevents.Invoke(ctx, namespaceUUID, workflowID, runID, reapplyEvents, e.shard, e.workflowConsistencyChecker, e.workflowResetter, e.eventsReapplier) } func (e *historyEngineImpl) GetDLQMessages( @@ -1386,17 +1216,3 @@ func (e *historyEngineImpl) GetReplicationStatus( ) (_ *historyservice.ShardReplicationStatus, retError error) { return replicationapi.GetStatus(ctx, request, e.shard, e.replicationAckMgr) } - -func (e *historyEngineImpl) containsHistoryEvent( - versionHistories *historyspb.VersionHistories, - reappliedEventID int64, - reappliedEventVersion int64, -) bool { - // Check if the source workflow contains the reapply event. - // If it does, it means the event is received in this cluster, no need to reapply. - _, err := versionhistory.FindFirstVersionHistoryIndexByVersionHistoryItem( - versionHistories, - versionhistory.NewVersionHistoryItem(reappliedEventID, reappliedEventVersion), - ) - return err == nil -}