Skip to content

Commit

Permalink
Update backfill history in sync workflow state (#3042)
Browse files Browse the repository at this point in the history
* Update backfill history in sync workflow state

* append node with cleanup info

* get last batch node id
  • Loading branch information
yux0 authored Jul 1, 2022
1 parent d968a39 commit 2b31d0d
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 13 deletions.
4 changes: 2 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,13 +1049,13 @@ type (

// AppendHistoryNodes add a node to history node table
AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// AppendRawHistoryNodes add a node of raw histories to history ndoe table
// AppendRawHistoryNodes add a node of raw histories to history node table
AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error)
// ReadHistoryBranchByBatch returns history node data for a branch ByBatch
ReadHistoryBranchByBatch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchByBatchResponse, error)
// ReadHistoryBranch returns history node data for a branch
// ReadHistoryBranchReverse returns history node data for a branch
ReadHistoryBranchReverse(ctx context.Context, request *ReadHistoryBranchReverseRequest) (*ReadHistoryBranchReverseResponse, error)
// ReadRawHistoryBranch returns history node raw data for a branch ByBatch
// NOTE: this API should only be used by 3+DC
Expand Down
104 changes: 104 additions & 0 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,110 @@ func (s *integrationClustersTestSuite) TestForceMigration_ClosedWorkflow() {
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, descResp.GetWorkflowExecutionInfo().Status)
}

func (s *integrationClustersTestSuite) TestForceMigration_ResetWorkflow() {
testCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

namespace := "force-replication" + common.GenerateRandomString(5)
s.registerNamespace(namespace, true)

taskqueue := "integration-force-replication-reset-task-queue"
client1, worker1 := s.newClientAndWorker(s.cluster1.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker1")

testWorkflowFn := func(ctx workflow.Context) error {
return nil
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()

// Start wf1
workflowID := "force-replication-test-reset-wf-1"
run1, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn)

s.NoError(err)
s.NotEmpty(run1.GetRunID())
s.logger.Info("start wf1", tag.WorkflowRunID(run1.GetRunID()))
// wait until wf1 complete
err = run1.Get(testCtx, nil)
s.NoError(err)

resp, err := client1.ResetWorkflowExecution(testCtx, &workflowservice.ResetWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: run1.GetRunID(),
},
Reason: "test",
WorkflowTaskFinishEventId: 3,
RequestId: uuid.New(),
})
s.NoError(err)
resetRun := client1.GetWorkflow(testCtx, workflowID, resp.GetRunId())
err = resetRun.Get(testCtx, nil)
s.NoError(err)

frontendClient1 := s.cluster1.GetFrontendClient()
// Update ns to have 2 clusters
_, err = frontendClient1.UpdateNamespace(testCtx, &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
Clusters: clusterReplicationConfig,
},
})
s.NoError(err)

// Wait for ns cache to pick up the change
time.Sleep(cacheRefreshInterval)

nsResp, err := frontendClient1.DescribeNamespace(testCtx, &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
s.True(nsResp.IsGlobalNamespace)
s.Equal(2, len(nsResp.ReplicationConfig.Clusters))

// Start force-replicate wf
sysClient, err := sdkclient.Dial(sdkclient.Options{
HostPort: s.cluster1.GetHost().FrontendGRPCAddress(),
Namespace: "temporal-system",
})
forceReplicationWorkflowID := "force-replication-wf"
sysWfRun, err := sysClient.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: forceReplicationWorkflowID,
TaskQueue: sw.DefaultWorkerTaskQueue,
WorkflowRunTimeout: time.Second * 30,
}, "force-replication", migration.ForceReplicationParams{
Namespace: namespace,
OverallRps: 10,
})
s.NoError(err)
err = sysWfRun.Get(testCtx, nil)
s.NoError(err)

// Verify all wf in ns is now available in cluster2
client2, _ := s.newClientAndWorker(s.cluster2.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker2")
verifyHistory := func(wfID string, runID string) {
iter1 := client1.GetWorkflowHistory(testCtx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
iter2 := client2.GetWorkflowHistory(testCtx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter1.HasNext() && iter2.HasNext() {
event1, err := iter1.Next()
s.NoError(err)
event2, err := iter2.Next()
s.NoError(err)
s.Equal(event1, event2)
}
s.False(iter1.HasNext())
s.False(iter2.HasNext())
}
verifyHistory(workflowID, run1.GetRunID())
verifyHistory(workflowID, resp.GetRunId())
}

func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
historyResponse, err := client.GetWorkflowExecutionHistory(host.NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Expand Down
138 changes: 129 additions & 9 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ package history

import (
"context"
"fmt"
"sort"
"time"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/adminservice/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/collection"
Expand Down Expand Up @@ -831,42 +835,117 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
lastEventVersion int64,
branchToken []byte,
) (*time.Time, error) {
historyIterator := collection.NewPagingIterator(r.getHistoryPaginationFn(

// Get the last batch node id to check if the history data is already in DB.
localHistoryIterator := collection.NewPagingIterator(r.getHistoryFromLocalPaginationFn(
ctx,
branchToken,
lastEventID,
))
var lastBatchNodeID int64
for localHistoryIterator.HasNext() {
localHistoryBatch, err := localHistoryIterator.Next()
switch err.(type) {
case nil:
if len(localHistoryBatch.GetEvents()) > 0 {
lastBatchNodeID = localHistoryBatch.GetEvents()[0].GetEventId()
}
case *serviceerror.NotFound:
default:
return nil, err
}
}

remoteHistoryIterator := collection.NewPagingIterator(r.getHistoryFromRemotePaginationFn(
ctx,
remoteClusterName,
namespaceName,
namespaceID,
workflowID,
runID,
lastEventID,
lastEventVersion))

lastEventVersion),
)
var lastHistoryBatch *commonpb.DataBlob
prevTxnID := common.EmptyVersion
for historyIterator.HasNext() {
historyBlob, err := historyIterator.Next()
historyBranch, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
if err != nil {
return nil, err
}
latestBranchID := historyBranch.GetBranchId()
var prevBranchID string

sortedAncestors := copyAndSortAncestors(historyBranch.GetAncestors())
sortedAncestorsIdx := 0
historyBranch.Ancestors = nil

BackfillLoop:
for remoteHistoryIterator.HasNext() {
historyBlob, err := remoteHistoryIterator.Next()
if err != nil {
return nil, err
}

if historyBlob.nodeID <= lastBatchNodeID {
// The history batch already in DB.
continue BackfillLoop
}

if sortedAncestorsIdx < len(sortedAncestors) {
currentAncestor := sortedAncestors[sortedAncestorsIdx]
if historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
// update ancestor
historyBranch.Ancestors = append(historyBranch.Ancestors, currentAncestor)
sortedAncestorsIdx++
}
if sortedAncestorsIdx < len(sortedAncestors) {
// use ancestor branch id
currentAncestor = sortedAncestors[sortedAncestorsIdx]
historyBranch.BranchId = currentAncestor.GetBranchId()
if historyBlob.nodeID < currentAncestor.GetBeginNodeId() || historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
return nil, serviceerror.NewInternal(
fmt.Sprintf("The backfill history blob node id %d is not in acestoer range [%d, %d]",
historyBlob.nodeID,
currentAncestor.GetBeginNodeId(),
currentAncestor.GetEndNodeId()),
)
}
} else {
// no more ancestor, use the latest branch ID
historyBranch.BranchId = latestBranchID
}
}

filteredHistoryBranch, err := serialization.HistoryBranchToBlob(historyBranch)
if err != nil {
return nil, err
}
lastHistoryBatch = historyBlob.rawHistory
txnID, err := r.shard.GenerateTaskID()
if err != nil {
return nil, err
}
_, err = r.shard.GetExecutionManager().AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shard.GetShardID(),
IsNewBranch: prevTxnID == common.EmptyVersion,
BranchToken: branchToken,
IsNewBranch: prevBranchID != historyBranch.BranchId,
BranchToken: filteredHistoryBranch.GetData(),
History: historyBlob.rawHistory,
PrevTransactionID: prevTxnID,
TransactionID: txnID,
NodeID: historyBlob.nodeID,
Info: persistence.BuildHistoryGarbageCleanupInfo(
namespaceID.String(),
workflowID,
runID,
),
})
if err != nil {
return nil, err
}
prevTxnID = txnID
prevBranchID = historyBranch.BranchId
lastHistoryBatch = historyBlob.rawHistory
}

var lastEventTime *time.Time
events, _ := r.historySerializer.DeserializeEvents(lastHistoryBatch)
if len(events) > 0 {
Expand All @@ -875,7 +954,21 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
return lastEventTime, nil
}

func (r *nDCHistoryReplicatorImpl) getHistoryPaginationFn(
func copyAndSortAncestors(input []*persistencespb.HistoryBranchRange) []*persistencespb.HistoryBranchRange {
ans := make([]*persistencespb.HistoryBranchRange, len(input))
copy(ans, input)
if len(ans) > 0 {
// sort ans based onf EndNodeID so that we can set BeginNodeID
sort.Slice(ans, func(i, j int) bool { return ans[i].GetEndNodeId() < ans[j].GetEndNodeId() })
ans[0].BeginNodeId = int64(1)
for i := 1; i < len(ans); i++ {
ans[i].BeginNodeId = ans[i-1].GetEndNodeId()
}
}
return ans
}

func (r *nDCHistoryReplicatorImpl) getHistoryFromRemotePaginationFn(
ctx context.Context,
remoteClusterName string,
namespaceName namespace.Name,
Expand Down Expand Up @@ -915,3 +1008,30 @@ func (r *nDCHistoryReplicatorImpl) getHistoryPaginationFn(
return batches, response.NextPageToken, nil
}
}

func (r *nDCHistoryReplicatorImpl) getHistoryFromLocalPaginationFn(
ctx context.Context,
branchToken []byte,
lastEventID int64,
) collection.PaginationFn[*historypb.History] {

return func(paginationToken []byte) ([]*historypb.History, []byte, error) {
response, err := r.shard.GetExecutionManager().ReadHistoryBranchByBatch(ctx, &persistence.ReadHistoryBranchRequest{
ShardID: r.shard.GetShardID(),
BranchToken: branchToken,
MinEventID: common.FirstEventID,
MaxEventID: lastEventID + 1,
PageSize: 100,
NextPageToken: paginationToken,
})
if err != nil {
return nil, nil, err
}

histories := make([]*historypb.History, 0, len(response.History))
for _, history := range response.History {
histories = append(histories, history)
}
return histories, response.NextPageToken, nil
}
}
Loading

0 comments on commit 2b31d0d

Please # to comment.