From d98fa4840552ab5e45427dd0eb5b75eed01f61e8 Mon Sep 17 00:00:00 2001 From: yux0 Date: Wed, 7 Feb 2024 10:38:30 -0800 Subject: [PATCH 1/2] Handle data corruption in history resend --- .../history/replication/executable_task.go | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/service/history/replication/executable_task.go b/service/history/replication/executable_task.go index 02dc6494728..ea0951f959b 100644 --- a/service/history/replication/executable_task.go +++ b/service/history/replication/executable_task.go @@ -32,6 +32,7 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" @@ -234,7 +235,7 @@ func (e *ExecutableTaskImpl) Reschedule() { func (e *ExecutableTaskImpl) IsRetryableError(err error) bool { switch err.(type) { - case *serviceerror.InvalidArgument: + case *serviceerror.InvalidArgument, *serviceerror.DataLoss: return false default: return true @@ -355,16 +356,30 @@ func (e *ExecutableTaskImpl) Resend( // c. attempt failed due to old workflow does not exist // d. return error to resend new workflow before the branching point + if resendErr.NamespaceId == retryErr.NamespaceId && + resendErr.WorkflowId == retryErr.WorkflowId && + resendErr.RunId == retryErr.RunId { + e.Logger.Error("error resend history on the same workflow run", + tag.WorkflowNamespaceID(retryErr.NamespaceId), + tag.WorkflowID(retryErr.WorkflowId), + tag.WorkflowRunID(retryErr.RunId), + tag.NewStringTag("first-resend-error", retryErr.Error()), + tag.NewStringTag("second-resend-error", resendErr.Error()), + ) + return false, serviceerror.NewDataLoss("failed to get requested data while resending history") + } // handle 2nd resend error, then 1st resend error - if _, err := e.Resend(ctx, remoteCluster, resendErr, remainingAttempt); err == nil { + _, err := e.Resend(ctx, remoteCluster, resendErr, remainingAttempt) + if err == nil { return e.Resend(ctx, remoteCluster, retryErr, remainingAttempt) } - e.Logger.Error("error resend history for history event", - tag.WorkflowNamespaceID(retryErr.NamespaceId), - tag.WorkflowID(retryErr.WorkflowId), - tag.WorkflowRunID(retryErr.RunId), - tag.Value(retryErr), - tag.Error(resendErr), + e.Logger.Error("error resend 2nd workflow history for history event", + tag.WorkflowNamespaceID(resendErr.NamespaceId), + tag.WorkflowID(resendErr.WorkflowId), + tag.WorkflowRunID(resendErr.RunId), + tag.NewStringTag("first-resend-error", retryErr.Error()), + tag.NewStringTag("second-resend-error", resendErr.Error()), + tag.Error(err), ) return false, resendErr default: @@ -372,8 +387,8 @@ func (e *ExecutableTaskImpl) Resend( tag.WorkflowNamespaceID(retryErr.NamespaceId), tag.WorkflowID(retryErr.WorkflowId), tag.WorkflowRunID(retryErr.RunId), - tag.Value(retryErr), - tag.Error(resendErr), + tag.NewStringTag("first-resend-error", retryErr.Error()), + tag.NewStringTag("second-resend-error", resendErr.Error()), ) return false, resendErr } From cb286cf0ac524ce51d631449190e6d67d4324a77 Mon Sep 17 00:00:00 2001 From: yux0 Date: Wed, 7 Feb 2024 10:48:12 -0800 Subject: [PATCH 2/2] add unit test --- .../replication/executable_task_test.go | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/service/history/replication/executable_task_test.go b/service/history/replication/executable_task_test.go index 2f7acb1cc5c..4a0c88104b9 100644 --- a/service/history/replication/executable_task_test.go +++ b/service/history/replication/executable_task_test.go @@ -37,6 +37,7 @@ import ( "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/replication/eventhandler" @@ -351,7 +352,7 @@ func (s *executableTaskSuite) TestResend_ResendError_Success() { anotherResendErr := &serviceerrors.RetryReplication{ NamespaceId: resendErr.NamespaceId, WorkflowId: resendErr.WorkflowId, - RunId: resendErr.RunId, + RunId: uuid.NewString(), StartEventId: rand.Int63(), StartEventVersion: rand.Int63(), EndEventId: rand.Int63(), @@ -414,7 +415,7 @@ func (s *executableTaskSuite) TestResend_ResendError_Error() { anotherResendErr := &serviceerrors.RetryReplication{ NamespaceId: resendErr.NamespaceId, WorkflowId: resendErr.WorkflowId, - RunId: resendErr.RunId, + RunId: uuid.NewString(), StartEventId: rand.Int63(), StartEventVersion: rand.Int63(), EndEventId: rand.Int63(), @@ -451,6 +452,46 @@ func (s *executableTaskSuite) TestResend_ResendError_Error() { s.False(doContinue) } +func (s *executableTaskSuite) TestResend_SecondResendError_SameWorkflowRun() { + remoteCluster := cluster.TestAlternativeClusterName + resendErr := &serviceerrors.RetryReplication{ + NamespaceId: uuid.NewString(), + WorkflowId: uuid.NewString(), + RunId: uuid.NewString(), + StartEventId: rand.Int63(), + StartEventVersion: rand.Int63(), + EndEventId: rand.Int63(), + EndEventVersion: rand.Int63(), + } + + anotherResendErr := &serviceerrors.RetryReplication{ + NamespaceId: resendErr.NamespaceId, + WorkflowId: resendErr.WorkflowId, + RunId: resendErr.RunId, + StartEventId: rand.Int63(), + StartEventVersion: rand.Int63(), + EndEventId: rand.Int63(), + EndEventVersion: rand.Int63(), + } + + s.ndcHistoryResender.EXPECT().SendSingleWorkflowHistory( + gomock.Any(), + remoteCluster, + namespace.ID(resendErr.NamespaceId), + resendErr.WorkflowId, + resendErr.RunId, + resendErr.StartEventId, + resendErr.StartEventVersion, + resendErr.EndEventId, + resendErr.EndEventVersion, + ).Return(anotherResendErr) + + doContinue, err := s.task.Resend(context.Background(), remoteCluster, resendErr, ResendAttempt) + var dataLossErr *serviceerror.DataLoss + s.ErrorAs(err, &dataLossErr) + s.False(doContinue) +} + func (s *executableTaskSuite) TestResend_Error() { remoteCluster := cluster.TestAlternativeClusterName resendErr := &serviceerrors.RetryReplication{