Skip to content

Commit

Permalink
Handle data corruption in history resend (#5398)
Browse files Browse the repository at this point in the history
Handle data corruption in history resend.

1. When source cluster has a data corruption issue, it can ship partial
data in resend history. With the two layers of resending (this is to
handle resend across different workflow runs), this will cause a chain
reaction.

Unit tests

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
yux0 authored and yycptt committed Apr 26, 2024
1 parent 9b1981c commit dd4323a
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 21 deletions.
40 changes: 27 additions & 13 deletions service/history/replication/executable_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ import (

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/service/history/shard"

historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
Expand All @@ -44,8 +42,10 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
serviceerrors "go.temporal.io/server/common/serviceerror"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/service/history/shard"
)

//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination executable_task_mock.go
Expand Down Expand Up @@ -252,7 +252,7 @@ func (e *ExecutableTaskImpl) Reschedule() {

func (e *ExecutableTaskImpl) IsRetryableError(err error) bool {
switch err.(type) {
case *serviceerror.InvalidArgument:
case *serviceerror.InvalidArgument, *serviceerror.DataLoss:
return false
default:
return true
Expand Down Expand Up @@ -373,25 +373,39 @@ func (e *ExecutableTaskImpl) Resend(
// c. attempt failed due to old workflow does not exist
// d. return error to resend new workflow before the branching point

if resendErr.NamespaceId == retryErr.NamespaceId &&
resendErr.WorkflowId == retryErr.WorkflowId &&
resendErr.RunId == retryErr.RunId {
e.Logger.Error("error resend history on the same workflow run",
tag.WorkflowNamespaceID(retryErr.NamespaceId),
tag.WorkflowID(retryErr.WorkflowId),
tag.WorkflowRunID(retryErr.RunId),
tag.NewStringTag("first-resend-error", retryErr.Error()),
tag.NewStringTag("second-resend-error", resendErr.Error()),
)
return false, serviceerror.NewDataLoss("failed to get requested data while resending history")
}
// handle 2nd resend error, then 1st resend error
if _, err := e.Resend(ctx, remoteCluster, resendErr, remainingAttempt); err == nil {
_, err := e.Resend(ctx, remoteCluster, resendErr, remainingAttempt)
if err == nil {
return e.Resend(ctx, remoteCluster, retryErr, remainingAttempt)
}
e.Logger.Error("error resend history for history event",
tag.WorkflowNamespaceID(retryErr.NamespaceId),
tag.WorkflowID(retryErr.WorkflowId),
tag.WorkflowRunID(retryErr.RunId),
tag.Value(retryErr),
tag.Error(resendErr),
e.Logger.Error("error resend 2nd workflow history for history event",
tag.WorkflowNamespaceID(resendErr.NamespaceId),
tag.WorkflowID(resendErr.WorkflowId),
tag.WorkflowRunID(resendErr.RunId),
tag.NewStringTag("first-resend-error", retryErr.Error()),
tag.NewStringTag("second-resend-error", resendErr.Error()),
tag.Error(err),
)
return false, resendErr
default:
e.Logger.Error("error resend history for history event",
tag.WorkflowNamespaceID(retryErr.NamespaceId),
tag.WorkflowID(retryErr.WorkflowId),
tag.WorkflowRunID(retryErr.RunId),
tag.Value(retryErr),
tag.Error(resendErr),
tag.NewStringTag("first-resend-error", retryErr.Error()),
tag.NewStringTag("second-resend-error", resendErr.Error()),
)
return false, resendErr
}
Expand Down
56 changes: 48 additions & 8 deletions service/history/replication/executable_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,26 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/tests"

historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
serviceerrors "go.temporal.io/server/common/serviceerror"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/common/xdc"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
)

type (
Expand Down Expand Up @@ -356,7 +356,7 @@ func (s *executableTaskSuite) TestResend_ResendError_Success() {
anotherResendErr := &serviceerrors.RetryReplication{
NamespaceId: resendErr.NamespaceId,
WorkflowId: resendErr.WorkflowId,
RunId: resendErr.RunId,
RunId: uuid.NewString(),
StartEventId: rand.Int63(),
StartEventVersion: rand.Int63(),
EndEventId: rand.Int63(),
Expand Down Expand Up @@ -419,7 +419,7 @@ func (s *executableTaskSuite) TestResend_ResendError_Error() {
anotherResendErr := &serviceerrors.RetryReplication{
NamespaceId: resendErr.NamespaceId,
WorkflowId: resendErr.WorkflowId,
RunId: resendErr.RunId,
RunId: uuid.NewString(),
StartEventId: rand.Int63(),
StartEventVersion: rand.Int63(),
EndEventId: rand.Int63(),
Expand Down Expand Up @@ -456,6 +456,46 @@ func (s *executableTaskSuite) TestResend_ResendError_Error() {
s.False(doContinue)
}

func (s *executableTaskSuite) TestResend_SecondResendError_SameWorkflowRun() {
remoteCluster := cluster.TestAlternativeClusterName
resendErr := &serviceerrors.RetryReplication{
NamespaceId: uuid.NewString(),
WorkflowId: uuid.NewString(),
RunId: uuid.NewString(),
StartEventId: rand.Int63(),
StartEventVersion: rand.Int63(),
EndEventId: rand.Int63(),
EndEventVersion: rand.Int63(),
}

anotherResendErr := &serviceerrors.RetryReplication{
NamespaceId: resendErr.NamespaceId,
WorkflowId: resendErr.WorkflowId,
RunId: resendErr.RunId,
StartEventId: rand.Int63(),
StartEventVersion: rand.Int63(),
EndEventId: rand.Int63(),
EndEventVersion: rand.Int63(),
}

s.ndcHistoryResender.EXPECT().SendSingleWorkflowHistory(
gomock.Any(),
remoteCluster,
namespace.ID(resendErr.NamespaceId),
resendErr.WorkflowId,
resendErr.RunId,
resendErr.StartEventId,
resendErr.StartEventVersion,
resendErr.EndEventId,
resendErr.EndEventVersion,
).Return(anotherResendErr)

doContinue, err := s.task.Resend(context.Background(), remoteCluster, resendErr, ResendAttempt)
var dataLossErr *serviceerror.DataLoss
s.ErrorAs(err, &dataLossErr)
s.False(doContinue)
}

func (s *executableTaskSuite) TestResend_Error() {
remoteCluster := cluster.TestAlternativeClusterName
resendErr := &serviceerrors.RetryReplication{
Expand Down

0 comments on commit dd4323a

Please # to comment.