diff --git a/service/history/api/refreshworkflow/api.go b/service/history/api/refreshworkflow/api.go new file mode 100644 index 00000000000..dff94c8ccb9 --- /dev/null +++ b/service/history/api/refreshworkflow/api.go @@ -0,0 +1,83 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package refreshworkflow + +import ( + "context" + + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/workflow" +) + +func Invoke( + ctx context.Context, + workflowKey definition.WorkflowKey, + shard shard.Context, + workflowConsistencyChecker api.WorkflowConsistencyChecker, +) (retError error) { + err := api.ValidateNamespaceUUID(namespace.ID(workflowKey.NamespaceID)) + if err != nil { + return err + } + + wfContext, err := workflowConsistencyChecker.GetWorkflowContext( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + workflowKey, + ) + if err != nil { + return err + } + defer func() { wfContext.GetReleaseFn()(retError) }() + + mutableState := wfContext.GetMutableState() + mutableStateTaskRefresher := workflow.NewTaskRefresher( + shard, + shard.GetConfig(), + shard.GetNamespaceRegistry(), + shard.GetEventsCache(), + shard.GetLogger(), + ) + now := shard.GetTimeSource().Now() + + err = mutableStateTaskRefresher.RefreshTasks(ctx, now, mutableState) + if err != nil { + return err + } + + return shard.AddTasks(ctx, &persistence.AddHistoryTasksRequest{ + ShardID: shard.GetShardID(), + // RangeID is set by shard + NamespaceID: workflowKey.NamespaceID, + WorkflowID: workflowKey.WorkflowID, + RunID: workflowKey.RunID, + Tasks: mutableState.PopTasks(), + }) +} diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index fe0611ef072..852a38cba81 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -61,6 +61,7 @@ import ( "go.temporal.io/server/service/history/api/recordactivitytaskheartbeat" "go.temporal.io/server/service/history/api/recordactivitytaskstarted" "go.temporal.io/server/service/history/api/recordchildworkflowcompleted" + "go.temporal.io/server/service/history/api/refreshworkflow" "go.temporal.io/server/service/history/api/removesignalmutablestate" replicationapi "go.temporal.io/server/service/history/api/replication" "go.temporal.io/server/service/history/api/replicationadmin" @@ -985,12 +986,6 @@ func (e *historyEngineImpl) NotifyNewTasks( } } -func (e *historyEngineImpl) getActiveNamespaceEntry( - namespaceUUID namespace.ID, -) (*namespace.Namespace, error) { - return api.GetActiveNamespace(e.shard, namespaceUUID) -} - func (e *historyEngineImpl) GetReplicationMessages( ctx context.Context, pollingCluster string, @@ -1059,51 +1054,12 @@ func (e *historyEngineImpl) RefreshWorkflowTasks( namespaceUUID namespace.ID, execution commonpb.WorkflowExecution, ) (retError error) { - - err := api.ValidateNamespaceUUID(namespaceUUID) - if err != nil { - return err - } - - wfContext, err := e.workflowConsistencyChecker.GetWorkflowContext( + return refreshworkflow.Invoke( ctx, - nil, - api.BypassMutableStateConsistencyPredicate, - definition.NewWorkflowKey( - namespaceUUID.String(), - execution.WorkflowId, - execution.RunId, - ), - ) - if err != nil { - return err - } - defer func() { wfContext.GetReleaseFn()(retError) }() - - mutableState := wfContext.GetMutableState() - mutableStateTaskRefresher := workflow.NewTaskRefresher( + definition.NewWorkflowKey(namespaceUUID.String(), execution.WorkflowId, execution.RunId), e.shard, - e.shard.GetConfig(), - e.shard.GetNamespaceRegistry(), - e.shard.GetEventsCache(), - e.shard.GetLogger(), + e.workflowConsistencyChecker, ) - - now := e.shard.GetTimeSource().Now() - - err = mutableStateTaskRefresher.RefreshTasks(ctx, now, mutableState) - if err != nil { - return err - } - - return e.shard.AddTasks(ctx, &persistence.AddHistoryTasksRequest{ - ShardID: e.shard.GetShardID(), - // RangeID is set by shard - NamespaceID: namespaceUUID.String(), - WorkflowID: execution.WorkflowId, - RunID: execution.RunId, - Tasks: mutableState.PopTasks(), - }) } func (e *historyEngineImpl) GenerateLastHistoryReplicationTasks(