Skip to content

Commit

Permalink
Move refresh workflow to api package (#3477)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Oct 11, 2022
1 parent b292c22 commit 32ea0bf
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 48 deletions.
83 changes: 83 additions & 0 deletions service/history/api/refreshworkflow/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package refreshworkflow

import (
"context"

"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
ctx context.Context,
workflowKey definition.WorkflowKey,
shard shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (retError error) {
err := api.ValidateNamespaceUUID(namespace.ID(workflowKey.NamespaceID))
if err != nil {
return err
}

wfContext, err := workflowConsistencyChecker.GetWorkflowContext(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
workflowKey,
)
if err != nil {
return err
}
defer func() { wfContext.GetReleaseFn()(retError) }()

mutableState := wfContext.GetMutableState()
mutableStateTaskRefresher := workflow.NewTaskRefresher(
shard,
shard.GetConfig(),
shard.GetNamespaceRegistry(),
shard.GetEventsCache(),
shard.GetLogger(),
)
now := shard.GetTimeSource().Now()

err = mutableStateTaskRefresher.RefreshTasks(ctx, now, mutableState)
if err != nil {
return err
}

return shard.AddTasks(ctx, &persistence.AddHistoryTasksRequest{
ShardID: shard.GetShardID(),
// RangeID is set by shard
NamespaceID: workflowKey.NamespaceID,
WorkflowID: workflowKey.WorkflowID,
RunID: workflowKey.RunID,
Tasks: mutableState.PopTasks(),
})
}
52 changes: 4 additions & 48 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"go.temporal.io/server/service/history/api/recordactivitytaskheartbeat"
"go.temporal.io/server/service/history/api/recordactivitytaskstarted"
"go.temporal.io/server/service/history/api/recordchildworkflowcompleted"
"go.temporal.io/server/service/history/api/refreshworkflow"
"go.temporal.io/server/service/history/api/removesignalmutablestate"
replicationapi "go.temporal.io/server/service/history/api/replication"
"go.temporal.io/server/service/history/api/replicationadmin"
Expand Down Expand Up @@ -985,12 +986,6 @@ func (e *historyEngineImpl) NotifyNewTasks(
}
}

func (e *historyEngineImpl) getActiveNamespaceEntry(
namespaceUUID namespace.ID,
) (*namespace.Namespace, error) {
return api.GetActiveNamespace(e.shard, namespaceUUID)
}

func (e *historyEngineImpl) GetReplicationMessages(
ctx context.Context,
pollingCluster string,
Expand Down Expand Up @@ -1059,51 +1054,12 @@ func (e *historyEngineImpl) RefreshWorkflowTasks(
namespaceUUID namespace.ID,
execution commonpb.WorkflowExecution,
) (retError error) {

err := api.ValidateNamespaceUUID(namespaceUUID)
if err != nil {
return err
}

wfContext, err := e.workflowConsistencyChecker.GetWorkflowContext(
return refreshworkflow.Invoke(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
namespaceUUID.String(),
execution.WorkflowId,
execution.RunId,
),
)
if err != nil {
return err
}
defer func() { wfContext.GetReleaseFn()(retError) }()

mutableState := wfContext.GetMutableState()
mutableStateTaskRefresher := workflow.NewTaskRefresher(
definition.NewWorkflowKey(namespaceUUID.String(), execution.WorkflowId, execution.RunId),
e.shard,
e.shard.GetConfig(),
e.shard.GetNamespaceRegistry(),
e.shard.GetEventsCache(),
e.shard.GetLogger(),
e.workflowConsistencyChecker,
)

now := e.shard.GetTimeSource().Now()

err = mutableStateTaskRefresher.RefreshTasks(ctx, now, mutableState)
if err != nil {
return err
}

return e.shard.AddTasks(ctx, &persistence.AddHistoryTasksRequest{
ShardID: e.shard.GetShardID(),
// RangeID is set by shard
NamespaceID: namespaceUUID.String(),
WorkflowID: execution.WorkflowId,
RunID: execution.RunId,
Tasks: mutableState.PopTasks(),
})
}

func (e *historyEngineImpl) GenerateLastHistoryReplicationTasks(
Expand Down

0 comments on commit 32ea0bf

Please # to comment.