Skip to content

Commit

Permalink
Improve error handling for standby transfer task verification (#3050)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jul 2, 2022
1 parent de11646 commit ca22f60
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 27 deletions.
33 changes: 28 additions & 5 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package history

import (
"context"
"errors"
"time"

commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -59,6 +60,10 @@ type (
}
)

var (
errVerificationFailed = errors.New("failed to verify target workflow state")
)

func newTransferQueueStandbyTaskExecutor(
shard shard.Context,
workflowCache workflow.Cache,
Expand Down Expand Up @@ -289,12 +294,21 @@ func (t *transferQueueStandbyTaskExecutor) processCloseExecution(
Clock: executionInfo.ParentClock,
})
switch err.(type) {
case nil, *serviceerror.NotFound, *serviceerror.NamespaceNotFound:
case nil, *serviceerror.NotFound, *serviceerror.NamespaceNotFound, *serviceerror.Unimplemented:
return nil, nil
case *serviceerror.WorkflowNotReady:
return verifyChildCompletionRecordedInfo, nil
default:
return nil, err
t.logger.Error("Failed to verify child execution completion recoreded",
tag.WorkflowNamespaceID(transferTask.GetNamespaceID()),
tag.WorkflowID(transferTask.GetWorkflowID()),
tag.WorkflowRunID(transferTask.GetRunID()),
tag.Error(err),
)

// NOTE: we do not return the error here which will cause the mutable state to be cleared and reloaded upon retry
// it's unnecessary as the error is in the target workflow, not this workflow.
return nil, errVerificationFailed
}
}
return nil, nil
Expand Down Expand Up @@ -434,12 +448,21 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution(
Clock: childWorkflowInfo.Clock,
})
switch err.(type) {
case nil, *serviceerror.NamespaceNotFound:
case nil, *serviceerror.NamespaceNotFound, *serviceerror.Unimplemented:
return nil, nil
case *serviceerror.WorkflowNotReady:
return &startChildExecutionPostActionInfo{}, nil
default:
return nil, err
t.logger.Error("Failed to verify first workflow task scheduled",
tag.WorkflowNamespaceID(transferTask.GetNamespaceID()),
tag.WorkflowID(transferTask.GetWorkflowID()),
tag.WorkflowRunID(transferTask.GetRunID()),
tag.Error(err),
)

// NOTE: we do not return the error here which will cause the mutable state to be cleared and reloaded upon retry
// it's unnecessary as the error is in the target workflow, not this workflow.
return nil, errVerificationFailed
}
}

Expand Down Expand Up @@ -474,7 +497,7 @@ func (t *transferQueueStandbyTaskExecutor) processTransfer(
return err
}
defer func() {
if retError == consts.ErrTaskRetry {
if retError == consts.ErrTaskRetry || retError == errVerificationFailed {
release(nil)
} else {
release(retError)
Expand Down
52 changes: 30 additions & 22 deletions service/history/transferQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package history

import (
"context"
"errors"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -610,45 +611,39 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution() {
TaskID: taskID,
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig()).Times(3)
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), &historyservice.VerifyChildExecutionCompletionRecordedRequest{
expectedVerificationRequest := &historyservice.VerifyChildExecutionCompletionRecordedRequest{
NamespaceId: parentNamespaceID,
ParentExecution: parentExecution,
ChildExecution: &execution,
ParentInitiatedId: parentInitiatedID,
ParentInitiatedVersion: parentInitiatedVersion,
Clock: parentClock,
}).Return(nil, nil)
}

persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig()).Times(5)

s.mockShard.SetCurrentTime(s.clusterName, now)
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, nil)
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)

s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), &historyservice.VerifyChildExecutionCompletionRecordedRequest{
NamespaceId: parentNamespaceID,
ParentExecution: parentExecution,
ChildExecution: &execution,
ParentInitiatedId: parentInitiatedID,
ParentInitiatedVersion: parentInitiatedVersion,
Clock: parentClock,
}).Return(nil, &serviceerror.WorkflowNotReady{})
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, serviceerror.NewUnimplemented("not implemented"))
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)

s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, errors.New("some random error"))
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(errVerificationFailed, err)

s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration))
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, &serviceerror.WorkflowNotReady{})
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(consts.ErrTaskRetry, err)

s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), &historyservice.VerifyChildExecutionCompletionRecordedRequest{
NamespaceId: parentNamespaceID,
ParentExecution: parentExecution,
ChildExecution: &execution,
ParentInitiatedId: parentInitiatedID,
ParentInitiatedVersion: parentInitiatedVersion,
Clock: parentClock,
}).Return(nil, &serviceerror.WorkflowNotReady{})

s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.discardDuration))
s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, &serviceerror.WorkflowNotReady{})
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(consts.ErrTaskDiscarded, err)
}
Expand Down Expand Up @@ -1054,10 +1049,23 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P
persistenceMutableState = s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration))
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)

s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, nil)
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)

s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.WorkflowNotReady{})
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(consts.ErrTaskRetry, err)

s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.Unimplemented{})
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)

s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, errors.New("some random error"))
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Equal(errVerificationFailed, err)

s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.discardDuration))
s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.WorkflowNotReady{})
_, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
Expand Down

0 comments on commit ca22f60

Please # to comment.