From dd4323a0283fe943bd7f9cfd04ae513ce8745706 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Mon, 12 Feb 2024 15:54:45 -0800 Subject: [PATCH] Handle data corruption in history resend (#5398) Handle data corruption in history resend. 1. When source cluster has a data corruption issue, it can ship partial data in resend history. With the two layers of resending (this is to handle resend across different workflow runs), this will cause a chain reaction. Unit tests --- .../history/replication/executable_task.go | 40 ++++++++----- .../replication/executable_task_test.go | 56 ++++++++++++++++--- 2 files changed, 75 insertions(+), 21 deletions(-) diff --git a/service/history/replication/executable_task.go b/service/history/replication/executable_task.go index 0c230d4ba18..6d28cc6581f 100644 --- a/service/history/replication/executable_task.go +++ b/service/history/replication/executable_task.go @@ -32,10 +32,8 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" - historyspb "go.temporal.io/server/api/history/v1" - "go.temporal.io/server/common/persistence/versionhistory" - "go.temporal.io/server/service/history/shard" + historyspb "go.temporal.io/server/api/history/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" @@ -44,8 +42,10 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence/versionhistory" serviceerrors "go.temporal.io/server/common/serviceerror" ctasks "go.temporal.io/server/common/tasks" + "go.temporal.io/server/service/history/shard" ) //go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination executable_task_mock.go @@ -252,7 +252,7 @@ func (e *ExecutableTaskImpl) Reschedule() { func (e *ExecutableTaskImpl) IsRetryableError(err error) bool { switch err.(type) { - case *serviceerror.InvalidArgument: + case *serviceerror.InvalidArgument, *serviceerror.DataLoss: return false default: return true @@ -373,16 +373,30 @@ func (e *ExecutableTaskImpl) Resend( // c. attempt failed due to old workflow does not exist // d. return error to resend new workflow before the branching point + if resendErr.NamespaceId == retryErr.NamespaceId && + resendErr.WorkflowId == retryErr.WorkflowId && + resendErr.RunId == retryErr.RunId { + e.Logger.Error("error resend history on the same workflow run", + tag.WorkflowNamespaceID(retryErr.NamespaceId), + tag.WorkflowID(retryErr.WorkflowId), + tag.WorkflowRunID(retryErr.RunId), + tag.NewStringTag("first-resend-error", retryErr.Error()), + tag.NewStringTag("second-resend-error", resendErr.Error()), + ) + return false, serviceerror.NewDataLoss("failed to get requested data while resending history") + } // handle 2nd resend error, then 1st resend error - if _, err := e.Resend(ctx, remoteCluster, resendErr, remainingAttempt); err == nil { + _, err := e.Resend(ctx, remoteCluster, resendErr, remainingAttempt) + if err == nil { return e.Resend(ctx, remoteCluster, retryErr, remainingAttempt) } - e.Logger.Error("error resend history for history event", - tag.WorkflowNamespaceID(retryErr.NamespaceId), - tag.WorkflowID(retryErr.WorkflowId), - tag.WorkflowRunID(retryErr.RunId), - tag.Value(retryErr), - tag.Error(resendErr), + e.Logger.Error("error resend 2nd workflow history for history event", + tag.WorkflowNamespaceID(resendErr.NamespaceId), + tag.WorkflowID(resendErr.WorkflowId), + tag.WorkflowRunID(resendErr.RunId), + tag.NewStringTag("first-resend-error", retryErr.Error()), + tag.NewStringTag("second-resend-error", resendErr.Error()), + tag.Error(err), ) return false, resendErr default: @@ -390,8 +404,8 @@ func (e *ExecutableTaskImpl) Resend( tag.WorkflowNamespaceID(retryErr.NamespaceId), tag.WorkflowID(retryErr.WorkflowId), tag.WorkflowRunID(retryErr.RunId), - tag.Value(retryErr), - tag.Error(resendErr), + tag.NewStringTag("first-resend-error", retryErr.Error()), + tag.NewStringTag("second-resend-error", resendErr.Error()), ) return false, resendErr } diff --git a/service/history/replication/executable_task_test.go b/service/history/replication/executable_task_test.go index 6f481bf33a8..91c39322ff1 100644 --- a/service/history/replication/executable_task_test.go +++ b/service/history/replication/executable_task_test.go @@ -38,18 +38,16 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" - historyspb "go.temporal.io/server/api/history/v1" - "go.temporal.io/server/common/collection" - "go.temporal.io/server/common/definition" - "go.temporal.io/server/common/dynamicconfig" - "go.temporal.io/server/service/history/configs" - "go.temporal.io/server/service/history/tests" + historyspb "go.temporal.io/server/api/history/v1" "go.temporal.io/server/api/historyservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/client" "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/collection" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" @@ -57,7 +55,9 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" ctasks "go.temporal.io/server/common/tasks" "go.temporal.io/server/common/xdc" + "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tests" ) type ( @@ -356,7 +356,7 @@ func (s *executableTaskSuite) TestResend_ResendError_Success() { anotherResendErr := &serviceerrors.RetryReplication{ NamespaceId: resendErr.NamespaceId, WorkflowId: resendErr.WorkflowId, - RunId: resendErr.RunId, + RunId: uuid.NewString(), StartEventId: rand.Int63(), StartEventVersion: rand.Int63(), EndEventId: rand.Int63(), @@ -419,7 +419,7 @@ func (s *executableTaskSuite) TestResend_ResendError_Error() { anotherResendErr := &serviceerrors.RetryReplication{ NamespaceId: resendErr.NamespaceId, WorkflowId: resendErr.WorkflowId, - RunId: resendErr.RunId, + RunId: uuid.NewString(), StartEventId: rand.Int63(), StartEventVersion: rand.Int63(), EndEventId: rand.Int63(), @@ -456,6 +456,46 @@ func (s *executableTaskSuite) TestResend_ResendError_Error() { s.False(doContinue) } +func (s *executableTaskSuite) TestResend_SecondResendError_SameWorkflowRun() { + remoteCluster := cluster.TestAlternativeClusterName + resendErr := &serviceerrors.RetryReplication{ + NamespaceId: uuid.NewString(), + WorkflowId: uuid.NewString(), + RunId: uuid.NewString(), + StartEventId: rand.Int63(), + StartEventVersion: rand.Int63(), + EndEventId: rand.Int63(), + EndEventVersion: rand.Int63(), + } + + anotherResendErr := &serviceerrors.RetryReplication{ + NamespaceId: resendErr.NamespaceId, + WorkflowId: resendErr.WorkflowId, + RunId: resendErr.RunId, + StartEventId: rand.Int63(), + StartEventVersion: rand.Int63(), + EndEventId: rand.Int63(), + EndEventVersion: rand.Int63(), + } + + s.ndcHistoryResender.EXPECT().SendSingleWorkflowHistory( + gomock.Any(), + remoteCluster, + namespace.ID(resendErr.NamespaceId), + resendErr.WorkflowId, + resendErr.RunId, + resendErr.StartEventId, + resendErr.StartEventVersion, + resendErr.EndEventId, + resendErr.EndEventVersion, + ).Return(anotherResendErr) + + doContinue, err := s.task.Resend(context.Background(), remoteCluster, resendErr, ResendAttempt) + var dataLossErr *serviceerror.DataLoss + s.ErrorAs(err, &dataLossErr) + s.False(doContinue) +} + func (s *executableTaskSuite) TestResend_Error() { remoteCluster := cluster.TestAlternativeClusterName resendErr := &serviceerrors.RetryReplication{