From ca22f60ef55dda3f05e71e6c74768636619ed3c9 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 1 Jul 2022 17:11:06 -0700 Subject: [PATCH] Improve error handling for standby transfer task verification (#3050) --- .../transferQueueStandbyTaskExecutor.go | 33 ++++++++++-- .../transferQueueStandbyTaskExecutor_test.go | 52 +++++++++++-------- 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/service/history/transferQueueStandbyTaskExecutor.go b/service/history/transferQueueStandbyTaskExecutor.go index 1cf526a8282..952bdd5c933 100644 --- a/service/history/transferQueueStandbyTaskExecutor.go +++ b/service/history/transferQueueStandbyTaskExecutor.go @@ -26,6 +26,7 @@ package history import ( "context" + "errors" "time" commonpb "go.temporal.io/api/common/v1" @@ -59,6 +60,10 @@ type ( } ) +var ( + errVerificationFailed = errors.New("failed to verify target workflow state") +) + func newTransferQueueStandbyTaskExecutor( shard shard.Context, workflowCache workflow.Cache, @@ -289,12 +294,21 @@ func (t *transferQueueStandbyTaskExecutor) processCloseExecution( Clock: executionInfo.ParentClock, }) switch err.(type) { - case nil, *serviceerror.NotFound, *serviceerror.NamespaceNotFound: + case nil, *serviceerror.NotFound, *serviceerror.NamespaceNotFound, *serviceerror.Unimplemented: return nil, nil case *serviceerror.WorkflowNotReady: return verifyChildCompletionRecordedInfo, nil default: - return nil, err + t.logger.Error("Failed to verify child execution completion recoreded", + tag.WorkflowNamespaceID(transferTask.GetNamespaceID()), + tag.WorkflowID(transferTask.GetWorkflowID()), + tag.WorkflowRunID(transferTask.GetRunID()), + tag.Error(err), + ) + + // NOTE: we do not return the error here which will cause the mutable state to be cleared and reloaded upon retry + // it's unnecessary as the error is in the target workflow, not this workflow. + return nil, errVerificationFailed } } return nil, nil @@ -434,12 +448,21 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution( Clock: childWorkflowInfo.Clock, }) switch err.(type) { - case nil, *serviceerror.NamespaceNotFound: + case nil, *serviceerror.NamespaceNotFound, *serviceerror.Unimplemented: return nil, nil case *serviceerror.WorkflowNotReady: return &startChildExecutionPostActionInfo{}, nil default: - return nil, err + t.logger.Error("Failed to verify first workflow task scheduled", + tag.WorkflowNamespaceID(transferTask.GetNamespaceID()), + tag.WorkflowID(transferTask.GetWorkflowID()), + tag.WorkflowRunID(transferTask.GetRunID()), + tag.Error(err), + ) + + // NOTE: we do not return the error here which will cause the mutable state to be cleared and reloaded upon retry + // it's unnecessary as the error is in the target workflow, not this workflow. + return nil, errVerificationFailed } } @@ -474,7 +497,7 @@ func (t *transferQueueStandbyTaskExecutor) processTransfer( return err } defer func() { - if retError == consts.ErrTaskRetry { + if retError == consts.ErrTaskRetry || retError == errVerificationFailed { release(nil) } else { release(retError) diff --git a/service/history/transferQueueStandbyTaskExecutor_test.go b/service/history/transferQueueStandbyTaskExecutor_test.go index 28f25d4d623..90289a9cfc2 100644 --- a/service/history/transferQueueStandbyTaskExecutor_test.go +++ b/service/history/transferQueueStandbyTaskExecutor_test.go @@ -26,6 +26,7 @@ package history import ( "context" + "errors" "math/rand" "testing" "time" @@ -610,45 +611,39 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution() { TaskID: taskID, } - persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) - s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig()).Times(3) - s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), &historyservice.VerifyChildExecutionCompletionRecordedRequest{ + expectedVerificationRequest := &historyservice.VerifyChildExecutionCompletionRecordedRequest{ NamespaceId: parentNamespaceID, ParentExecution: parentExecution, ChildExecution: &execution, ParentInitiatedId: parentInitiatedID, ParentInitiatedVersion: parentInitiatedVersion, Clock: parentClock, - }).Return(nil, nil) + } + + persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig()).Times(5) s.mockShard.SetCurrentTime(s.clusterName, now) + s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, nil) _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Nil(err) - s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), &historyservice.VerifyChildExecutionCompletionRecordedRequest{ - NamespaceId: parentNamespaceID, - ParentExecution: parentExecution, - ChildExecution: &execution, - ParentInitiatedId: parentInitiatedID, - ParentInitiatedVersion: parentInitiatedVersion, - Clock: parentClock, - }).Return(nil, &serviceerror.WorkflowNotReady{}) + s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, serviceerror.NewUnimplemented("not implemented")) + _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + s.Nil(err) + + s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, errors.New("some random error")) + _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + s.Equal(errVerificationFailed, err) s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration)) + s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, &serviceerror.WorkflowNotReady{}) _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Equal(consts.ErrTaskRetry, err) - s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), &historyservice.VerifyChildExecutionCompletionRecordedRequest{ - NamespaceId: parentNamespaceID, - ParentExecution: parentExecution, - ChildExecution: &execution, - ParentInitiatedId: parentInitiatedID, - ParentInitiatedVersion: parentInitiatedVersion, - Clock: parentClock, - }).Return(nil, &serviceerror.WorkflowNotReady{}) - s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.discardDuration)) + s.mockHistoryClient.EXPECT().VerifyChildExecutionCompletionRecorded(gomock.Any(), expectedVerificationRequest).Return(nil, &serviceerror.WorkflowNotReady{}) _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Equal(consts.ErrTaskDiscarded, err) } @@ -1054,10 +1049,23 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P persistenceMutableState = s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration)) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + + s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, nil) + _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + s.Nil(err) + s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.WorkflowNotReady{}) _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Equal(consts.ErrTaskRetry, err) + s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.Unimplemented{}) + _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + s.Nil(err) + + s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, errors.New("some random error")) + _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) + s.Equal(errVerificationFailed, err) + s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.discardDuration)) s.mockHistoryClient.EXPECT().VerifyFirstWorkflowTaskScheduled(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.WorkflowNotReady{}) _, err = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))