Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Update backfill history in sync workflow state #3042

Merged
merged 4 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,13 +1049,13 @@ type (

// AppendHistoryNodes add a node to history node table
AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// AppendRawHistoryNodes add a node of raw histories to history ndoe table
// AppendRawHistoryNodes add a node of raw histories to history node table
AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error)
// ReadHistoryBranchByBatch returns history node data for a branch ByBatch
ReadHistoryBranchByBatch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchByBatchResponse, error)
// ReadHistoryBranch returns history node data for a branch
// ReadHistoryBranchReverse returns history node data for a branch
ReadHistoryBranchReverse(ctx context.Context, request *ReadHistoryBranchReverseRequest) (*ReadHistoryBranchReverseResponse, error)
// ReadRawHistoryBranch returns history node raw data for a branch ByBatch
// NOTE: this API should only be used by 3+DC
Expand Down
104 changes: 104 additions & 0 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,110 @@ func (s *integrationClustersTestSuite) TestForceMigration_ClosedWorkflow() {
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, descResp.GetWorkflowExecutionInfo().Status)
}

func (s *integrationClustersTestSuite) TestForceMigration_ResetWorkflow() {
testCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

namespace := "force-replication" + common.GenerateRandomString(5)
s.registerNamespace(namespace, true)

taskqueue := "integration-force-replication-reset-task-queue"
client1, worker1 := s.newClientAndWorker(s.cluster1.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker1")

testWorkflowFn := func(ctx workflow.Context) error {
return nil
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()

// Start wf1
workflowID := "force-replication-test-reset-wf-1"
run1, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn)

s.NoError(err)
s.NotEmpty(run1.GetRunID())
s.logger.Info("start wf1", tag.WorkflowRunID(run1.GetRunID()))
// wait until wf1 complete
err = run1.Get(testCtx, nil)
s.NoError(err)

resp, err := client1.ResetWorkflowExecution(testCtx, &workflowservice.ResetWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: run1.GetRunID(),
},
Reason: "test",
WorkflowTaskFinishEventId: 3,
RequestId: uuid.New(),
})
s.NoError(err)
resetRun := client1.GetWorkflow(testCtx, workflowID, resp.GetRunId())
err = resetRun.Get(testCtx, nil)
s.NoError(err)

frontendClient1 := s.cluster1.GetFrontendClient()
// Update ns to have 2 clusters
_, err = frontendClient1.UpdateNamespace(testCtx, &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
Clusters: clusterReplicationConfig,
},
})
s.NoError(err)

// Wait for ns cache to pick up the change
time.Sleep(cacheRefreshInterval)

nsResp, err := frontendClient1.DescribeNamespace(testCtx, &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
s.True(nsResp.IsGlobalNamespace)
s.Equal(2, len(nsResp.ReplicationConfig.Clusters))

// Start force-replicate wf
sysClient, err := sdkclient.Dial(sdkclient.Options{
HostPort: s.cluster1.GetHost().FrontendGRPCAddress(),
Namespace: "temporal-system",
})
forceReplicationWorkflowID := "force-replication-wf"
sysWfRun, err := sysClient.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: forceReplicationWorkflowID,
TaskQueue: sw.DefaultWorkerTaskQueue,
WorkflowRunTimeout: time.Second * 30,
}, "force-replication", migration.ForceReplicationParams{
Namespace: namespace,
OverallRps: 10,
})
s.NoError(err)
err = sysWfRun.Get(testCtx, nil)
s.NoError(err)

// Verify all wf in ns is now available in cluster2
client2, _ := s.newClientAndWorker(s.cluster2.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker2")
verifyHistory := func(wfID string, runID string) {
iter1 := client1.GetWorkflowHistory(testCtx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
iter2 := client2.GetWorkflowHistory(testCtx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter1.HasNext() && iter2.HasNext() {
event1, err := iter1.Next()
s.NoError(err)
event2, err := iter2.Next()
s.NoError(err)
s.Equal(event1, event2)
}
s.False(iter1.HasNext())
s.False(iter2.HasNext())
}
verifyHistory(workflowID, run1.GetRunID())
verifyHistory(workflowID, resp.GetRunId())
}

func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
historyResponse, err := client.GetWorkflowExecutionHistory(host.NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Expand Down
129 changes: 122 additions & 7 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ package history

import (
"context"
"fmt"
"sort"
"time"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/adminservice/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/collection"
Expand Down Expand Up @@ -831,7 +835,7 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
lastEventVersion int64,
branchToken []byte,
) (*time.Time, error) {
historyIterator := collection.NewPagingIterator(r.getHistoryPaginationFn(
remotehistoryIterator := collection.NewPagingIterator(r.getHistoryFromRemotePaginationFn(
yux0 marked this conversation as resolved.
Show resolved Hide resolved
ctx,
remoteClusterName,
namespaceName,
Expand All @@ -840,33 +844,103 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
runID,
lastEventID,
lastEventVersion))
localHistoryIterator := collection.NewPagingIterator(r.getHistoryFromLocalPaginationFn(
ctx,
branchToken,
lastEventID,
))

var lastHistoryBatch *commonpb.DataBlob
prevTxnID := common.EmptyVersion
for historyIterator.HasNext() {
historyBlob, err := historyIterator.Next()
historyBranch, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
if err != nil {
return nil, err
}
latestBranchID := historyBranch.GetBranchId()
var prevBranchID string

sortedAncestors := copyAncestors(historyBranch.GetAncestors())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you copy and then substitute original historyBranch instead of just sorting ancestors in place and then create new filteredHistoryBranch and fill in the loop bellow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is easier to achieve. For example, there are 3 branches. B1 has no ancestor. B2 has B1 ancestor, B3 has B1,B2 ancestors.
B1

B2
/
B3

If create new filteredHistoryBranch in the loop, then I need to fill in the previous ancestors.

sortedAncestorsIdx := 0
historyBranch.Ancestors = nil

BackfillLoop:
for remotehistoryIterator.HasNext() {
historyBlob, err := remotehistoryIterator.Next()
if err != nil {
return nil, err
}

HistoryExistenceCheckLoop:
for localHistoryIterator.HasNext() {
yux0 marked this conversation as resolved.
Show resolved Hide resolved
localHistoryBatch, err := localHistoryIterator.Next()
switch err.(type) {
case nil:
if events := localHistoryBatch.GetEvents(); events[0].GetEventId() == historyBlob.nodeID {
// Check if the history batch already in DB.
continue BackfillLoop
}
case *serviceerror.NotFound:
break HistoryExistenceCheckLoop
default:
return nil, err
}
}

if sortedAncestorsIdx < len(sortedAncestors) {
currentAncestor := sortedAncestors[sortedAncestorsIdx]
if historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
// update ancestor
historyBranch.Ancestors = append(historyBranch.Ancestors, currentAncestor)
sortedAncestorsIdx++
}
if sortedAncestorsIdx < len(sortedAncestors) {
// use ancestor branch id
currentAncestor = sortedAncestors[sortedAncestorsIdx]
historyBranch.BranchId = currentAncestor.GetBranchId()
if historyBlob.nodeID < currentAncestor.GetBeginNodeId() || historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
return nil, serviceerror.NewInternal(
fmt.Sprintf("The backfill history blob node id %d is not in acestoer range [%d, %d]",
historyBlob.nodeID,
currentAncestor.GetBeginNodeId(),
currentAncestor.GetEndNodeId()),
)
}
} else {
// no more ancestor, use the latest branch ID
historyBranch.BranchId = latestBranchID
}
}

filteredHistoryBranch, err := serialization.HistoryBranchToBlob(historyBranch)
if err != nil {
return nil, err
}
lastHistoryBatch = historyBlob.rawHistory
txnID, err := r.shard.GenerateTaskID()
if err != nil {
return nil, err
}
_, err = r.shard.GetExecutionManager().AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shard.GetShardID(),
IsNewBranch: prevTxnID == common.EmptyVersion,
BranchToken: branchToken,
IsNewBranch: prevBranchID != historyBranch.BranchId,
BranchToken: filteredHistoryBranch.GetData(),
History: historyBlob.rawHistory,
PrevTransactionID: prevTxnID,
TransactionID: txnID,
NodeID: historyBlob.nodeID,
Info: persistence.BuildHistoryGarbageCleanupInfo(
namespaceID.String(),
workflowID,
runID,
),
})
if err != nil {
return nil, err
}
prevTxnID = txnID
prevBranchID = historyBranch.BranchId
lastHistoryBatch = historyBlob.rawHistory
}

var lastEventTime *time.Time
events, _ := r.historySerializer.DeserializeEvents(lastHistoryBatch)
if len(events) > 0 {
Expand All @@ -875,7 +949,21 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
return lastEventTime, nil
}

func (r *nDCHistoryReplicatorImpl) getHistoryPaginationFn(
func copyAncestors(input []*persistencespb.HistoryBranchRange) []*persistencespb.HistoryBranchRange {
yux0 marked this conversation as resolved.
Show resolved Hide resolved
ans := make([]*persistencespb.HistoryBranchRange, len(input))
copy(ans, input)
if len(ans) > 0 {
// sort ans based onf EndNodeID so that we can set BeginNodeID
sort.Slice(ans, func(i, j int) bool { return (ans)[i].GetEndNodeId() < (ans)[j].GetEndNodeId() })
yux0 marked this conversation as resolved.
Show resolved Hide resolved
(ans)[0].BeginNodeId = int64(1)
for i := 1; i < len(ans); i++ {
(ans)[i].BeginNodeId = (ans)[i-1].GetEndNodeId()
}
}
return ans
}

func (r *nDCHistoryReplicatorImpl) getHistoryFromRemotePaginationFn(
ctx context.Context,
remoteClusterName string,
namespaceName namespace.Name,
Expand Down Expand Up @@ -915,3 +1003,30 @@ func (r *nDCHistoryReplicatorImpl) getHistoryPaginationFn(
return batches, response.NextPageToken, nil
}
}

func (r *nDCHistoryReplicatorImpl) getHistoryFromLocalPaginationFn(
ctx context.Context,
branchToken []byte,
lastEventID int64,
) collection.PaginationFn[*historypb.History] {

return func(paginationToken []byte) ([]*historypb.History, []byte, error) {
response, err := r.shard.GetExecutionManager().ReadHistoryBranchByBatch(ctx, &persistence.ReadHistoryBranchRequest{
ShardID: r.shard.GetShardID(),
BranchToken: branchToken,
MinEventID: common.FirstEventID,
MaxEventID: lastEventID + 1,
PageSize: 100,
NextPageToken: paginationToken,
})
if err != nil {
return nil, nil, err
}

histories := make([]*historypb.History, 0, len(response.History))
for _, history := range response.History {
histories = append(histories, history)
}
return histories, response.NextPageToken, nil
}
}
Loading