Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix record child workflow complete mutable state stale check #2673

Merged
merged 3 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,8 @@ func (e *historyEngineImpl) ResetStickyTaskQueue(
ctx,
namespaceID,
*resetRequest.Execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -1385,7 +1386,8 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -1535,7 +1537,8 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(
ctx,
namespaceID,
workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
workflowTypeName = mutableState.GetWorkflowType().GetName()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
Expand Down Expand Up @@ -1614,7 +1617,8 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(
var taskQueue string
var workflowTypeName string
err = e.updateWorkflowExecution(ctx, namespaceID, workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
workflowTypeName = mutableState.GetWorkflowType().GetName()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
Expand Down Expand Up @@ -1716,7 +1720,8 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(
ctx,
namespaceID,
workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
workflowTypeName = mutableState.GetWorkflowType().GetName()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
Expand Down Expand Up @@ -1810,7 +1815,8 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat(
ctx,
namespaceID,
workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
e.logger.Debug("Heartbeat failed")
return nil, consts.ErrWorkflowCompleted
Expand Down Expand Up @@ -1883,7 +1889,8 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution(
}

return e.updateWorkflow(ctx, namespaceID, execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
// the request to cancel this workflow is a success even
// if the target workflow has already finished
Expand Down Expand Up @@ -1951,7 +1958,8 @@ func (e *historyEngineImpl) SignalWorkflowExecution(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if request.GetRequestId() != "" && mutableState.IsSignalRequested(request.GetRequestId()) {
return &updateWorkflowAction{
noop: true,
Expand Down Expand Up @@ -2259,7 +2267,8 @@ func (e *historyEngineImpl) RemoveSignalMutableState(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -2299,7 +2308,8 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -2375,7 +2385,8 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand All @@ -2387,7 +2398,22 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(
// Check mutable state to make sure child execution is in pending child executions
ci, isRunning := mutableState.GetChildExecutionInfo(initiatedID)
if !isRunning && initiatedID >= mutableState.GetNextEventID() {
return nil, consts.ErrStaleState
// possible stale mutable state, try reload mutable state
//
// TODO: use initiate event ID and version to verify if the child exists or not
//
// NOTE: do not return ErrStaleState here, as in xdc there's no guarantee that parent
// will have the child information and its next eventID will larger than the initiatedID
// in the request after forced failover.
// If ErrStaleState is returned, the logic for this handler and processing of CloseWorkflowExecution
// task will keep retrying infinitely.
workflowContext.getContext().Clear()
mutableState, err = workflowContext.reloadMutableState(ctx)
if err != nil {
return nil, err
}

ci, isRunning = mutableState.GetChildExecutionInfo(initiatedID)
}
if !isRunning || ci.StartedId == common.EmptyEventID {
return nil, serviceerror.NewNotFound("Pending child execution not found.")
Expand Down Expand Up @@ -2663,11 +2689,8 @@ func (e *historyEngineImpl) updateWorkflowWithNewHelper(

UpdateHistoryLoop:
for attempt := 1; attempt <= conditionalRetryCount; attempt++ {
weContext := workflowContext.getContext()
mutableState := workflowContext.getMutableState()

// conduct caller action
postActions, err := action(weContext, mutableState)
postActions, err := action(workflowContext)
if err != nil {
if err == consts.ErrStaleState {
// Handler detected that cached workflow mutable could potentially be stale
Expand All @@ -2689,6 +2712,7 @@ UpdateHistoryLoop:
return nil
}

mutableState := workflowContext.getMutableState()
if postActions.createWorkflowTask {
// Create a transfer task to schedule a workflow task
if !mutableState.HasPendingWorkflowTask() {
Expand Down Expand Up @@ -3064,7 +3088,8 @@ func (e *historyEngineImpl) applyWorkflowIDReusePolicyHelper(
case enumsspb.WORKFLOW_EXECUTION_STATE_CREATED,
enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING:
if wfIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING {
return func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
return func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -3196,7 +3221,9 @@ func (e *historyEngineImpl) ReapplyEvents(
ctx,
namespaceID,
currentExecution,
func(context workflow.Context, mutableState workflow.MutableState) (action *updateWorkflowAction, retErr error) {
func(workflowContext workflowContext) (action *updateWorkflowAction, retErr error) {
context := workflowContext.getContext()
mutableState := workflowContext.getMutableState()
// Filter out reapply event from the same cluster
toReapplyEvents := make([]*historypb.HistoryEvent, 0, len(reapplyEvents))
lastWriteVersion, err := mutableState.GetLastWriteVersion()
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflowExecutionUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
}
)

type updateWorkflowActionFunc func(workflow.Context, workflow.MutableState) (*updateWorkflowAction, error)
type updateWorkflowActionFunc func(workflowContext) (*updateWorkflowAction, error)

func (w *workflowContextImpl) getContext() workflow.Context {
return w.context
Expand Down
9 changes: 6 additions & 3 deletions service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskScheduled(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -177,7 +178,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskStarted(
ctx,
namespaceID,
execution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down Expand Up @@ -281,7 +283,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed(
ctx,
namespaceID,
workflowExecution,
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
func(workflowContext workflowContext) (*updateWorkflowAction, error) {
mutableState := workflowContext.getMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
Expand Down