Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Update backfill history in sync workflow state #3042

Merged
merged 4 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,13 +1049,13 @@ type (

// AppendHistoryNodes add a node to history node table
AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// AppendRawHistoryNodes add a node of raw histories to history ndoe table
// AppendRawHistoryNodes add a node of raw histories to history node table
AppendRawHistoryNodes(ctx context.Context, request *AppendRawHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error)
// ReadHistoryBranchByBatch returns history node data for a branch ByBatch
ReadHistoryBranchByBatch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchByBatchResponse, error)
// ReadHistoryBranch returns history node data for a branch
// ReadHistoryBranchReverse returns history node data for a branch
ReadHistoryBranchReverse(ctx context.Context, request *ReadHistoryBranchReverseRequest) (*ReadHistoryBranchReverseResponse, error)
// ReadRawHistoryBranch returns history node raw data for a branch ByBatch
// NOTE: this API should only be used by 3+DC
Expand Down
104 changes: 104 additions & 0 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,110 @@ func (s *integrationClustersTestSuite) TestForceMigration_ClosedWorkflow() {
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, descResp.GetWorkflowExecutionInfo().Status)
}

func (s *integrationClustersTestSuite) TestForceMigration_ResetWorkflow() {
testCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

namespace := "force-replication" + common.GenerateRandomString(5)
s.registerNamespace(namespace, true)

taskqueue := "integration-force-replication-reset-task-queue"
client1, worker1 := s.newClientAndWorker(s.cluster1.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker1")

testWorkflowFn := func(ctx workflow.Context) error {
return nil
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()

// Start wf1
workflowID := "force-replication-test-reset-wf-1"
run1, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn)

s.NoError(err)
s.NotEmpty(run1.GetRunID())
s.logger.Info("start wf1", tag.WorkflowRunID(run1.GetRunID()))
// wait until wf1 complete
err = run1.Get(testCtx, nil)
s.NoError(err)

resp, err := client1.ResetWorkflowExecution(testCtx, &workflowservice.ResetWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: run1.GetRunID(),
},
Reason: "test",
WorkflowTaskFinishEventId: 3,
RequestId: uuid.New(),
})
s.NoError(err)
resetRun := client1.GetWorkflow(testCtx, workflowID, resp.GetRunId())
err = resetRun.Get(testCtx, nil)
s.NoError(err)

frontendClient1 := s.cluster1.GetFrontendClient()
// Update ns to have 2 clusters
_, err = frontendClient1.UpdateNamespace(testCtx, &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
Clusters: clusterReplicationConfig,
},
})
s.NoError(err)

// Wait for ns cache to pick up the change
time.Sleep(cacheRefreshInterval)

nsResp, err := frontendClient1.DescribeNamespace(testCtx, &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
s.True(nsResp.IsGlobalNamespace)
s.Equal(2, len(nsResp.ReplicationConfig.Clusters))

// Start force-replicate wf
sysClient, err := sdkclient.Dial(sdkclient.Options{
HostPort: s.cluster1.GetHost().FrontendGRPCAddress(),
Namespace: "temporal-system",
})
forceReplicationWorkflowID := "force-replication-wf"
sysWfRun, err := sysClient.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: forceReplicationWorkflowID,
TaskQueue: sw.DefaultWorkerTaskQueue,
WorkflowRunTimeout: time.Second * 30,
}, "force-replication", migration.ForceReplicationParams{
Namespace: namespace,
OverallRps: 10,
})
s.NoError(err)
err = sysWfRun.Get(testCtx, nil)
s.NoError(err)

// Verify all wf in ns is now available in cluster2
client2, _ := s.newClientAndWorker(s.cluster2.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker2")
verifyHistory := func(wfID string, runID string) {
iter1 := client1.GetWorkflowHistory(testCtx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
iter2 := client2.GetWorkflowHistory(testCtx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter1.HasNext() && iter2.HasNext() {
event1, err := iter1.Next()
s.NoError(err)
event2, err := iter2.Next()
s.NoError(err)
s.Equal(event1, event2)
}
s.False(iter1.HasNext())
s.False(iter2.HasNext())
}
verifyHistory(workflowID, run1.GetRunID())
verifyHistory(workflowID, resp.GetRunId())
}

func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
historyResponse, err := client.GetWorkflowExecutionHistory(host.NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Expand Down
138 changes: 129 additions & 9 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ package history

import (
"context"
"fmt"
"sort"
"time"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/adminservice/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/collection"
Expand Down Expand Up @@ -831,42 +835,117 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
lastEventVersion int64,
branchToken []byte,
) (*time.Time, error) {
historyIterator := collection.NewPagingIterator(r.getHistoryPaginationFn(

// Get the last batch node id to check if the history data is already in DB.
localHistoryIterator := collection.NewPagingIterator(r.getHistoryFromLocalPaginationFn(
ctx,
branchToken,
lastEventID,
))
var lastBatchNodeID int64
for localHistoryIterator.HasNext() {
localHistoryBatch, err := localHistoryIterator.Next()
switch err.(type) {
case nil:
if len(localHistoryBatch.GetEvents()) > 0 {
lastBatchNodeID = localHistoryBatch.GetEvents()[0].GetEventId()
}
case *serviceerror.NotFound:
default:
return nil, err
}
}

remoteHistoryIterator := collection.NewPagingIterator(r.getHistoryFromRemotePaginationFn(
ctx,
remoteClusterName,
namespaceName,
namespaceID,
workflowID,
runID,
lastEventID,
lastEventVersion))

lastEventVersion),
)
var lastHistoryBatch *commonpb.DataBlob
prevTxnID := common.EmptyVersion
for historyIterator.HasNext() {
historyBlob, err := historyIterator.Next()
historyBranch, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
if err != nil {
return nil, err
}
latestBranchID := historyBranch.GetBranchId()
var prevBranchID string

sortedAncestors := copyAndSortAncestors(historyBranch.GetAncestors())
sortedAncestorsIdx := 0
historyBranch.Ancestors = nil

BackfillLoop:
for remoteHistoryIterator.HasNext() {
historyBlob, err := remoteHistoryIterator.Next()
if err != nil {
return nil, err
}

if historyBlob.nodeID <= lastBatchNodeID {
// The history batch already in DB.
continue BackfillLoop
}

if sortedAncestorsIdx < len(sortedAncestors) {
currentAncestor := sortedAncestors[sortedAncestorsIdx]
if historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
// update ancestor
historyBranch.Ancestors = append(historyBranch.Ancestors, currentAncestor)
sortedAncestorsIdx++
}
if sortedAncestorsIdx < len(sortedAncestors) {
// use ancestor branch id
currentAncestor = sortedAncestors[sortedAncestorsIdx]
historyBranch.BranchId = currentAncestor.GetBranchId()
if historyBlob.nodeID < currentAncestor.GetBeginNodeId() || historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
return nil, serviceerror.NewInternal(
fmt.Sprintf("The backfill history blob node id %d is not in acestoer range [%d, %d]",
historyBlob.nodeID,
currentAncestor.GetBeginNodeId(),
currentAncestor.GetEndNodeId()),
)
}
} else {
// no more ancestor, use the latest branch ID
historyBranch.BranchId = latestBranchID
}
}

filteredHistoryBranch, err := serialization.HistoryBranchToBlob(historyBranch)
if err != nil {
return nil, err
}
lastHistoryBatch = historyBlob.rawHistory
txnID, err := r.shard.GenerateTaskID()
if err != nil {
return nil, err
}
_, err = r.shard.GetExecutionManager().AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shard.GetShardID(),
IsNewBranch: prevTxnID == common.EmptyVersion,
BranchToken: branchToken,
IsNewBranch: prevBranchID != historyBranch.BranchId,
BranchToken: filteredHistoryBranch.GetData(),
History: historyBlob.rawHistory,
PrevTransactionID: prevTxnID,
TransactionID: txnID,
NodeID: historyBlob.nodeID,
Info: persistence.BuildHistoryGarbageCleanupInfo(
namespaceID.String(),
workflowID,
runID,
),
})
if err != nil {
return nil, err
}
prevTxnID = txnID
prevBranchID = historyBranch.BranchId
lastHistoryBatch = historyBlob.rawHistory
}

var lastEventTime *time.Time
events, _ := r.historySerializer.DeserializeEvents(lastHistoryBatch)
if len(events) > 0 {
Expand All @@ -875,7 +954,21 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
return lastEventTime, nil
}

func (r *nDCHistoryReplicatorImpl) getHistoryPaginationFn(
func copyAndSortAncestors(input []*persistencespb.HistoryBranchRange) []*persistencespb.HistoryBranchRange {
ans := make([]*persistencespb.HistoryBranchRange, len(input))
copy(ans, input)
if len(ans) > 0 {
// sort ans based onf EndNodeID so that we can set BeginNodeID
sort.Slice(ans, func(i, j int) bool { return ans[i].GetEndNodeId() < ans[j].GetEndNodeId() })
ans[0].BeginNodeId = int64(1)
for i := 1; i < len(ans); i++ {
ans[i].BeginNodeId = ans[i-1].GetEndNodeId()
}
}
return ans
}

func (r *nDCHistoryReplicatorImpl) getHistoryFromRemotePaginationFn(
ctx context.Context,
remoteClusterName string,
namespaceName namespace.Name,
Expand Down Expand Up @@ -915,3 +1008,30 @@ func (r *nDCHistoryReplicatorImpl) getHistoryPaginationFn(
return batches, response.NextPageToken, nil
}
}

func (r *nDCHistoryReplicatorImpl) getHistoryFromLocalPaginationFn(
ctx context.Context,
branchToken []byte,
lastEventID int64,
) collection.PaginationFn[*historypb.History] {

return func(paginationToken []byte) ([]*historypb.History, []byte, error) {
response, err := r.shard.GetExecutionManager().ReadHistoryBranchByBatch(ctx, &persistence.ReadHistoryBranchRequest{
ShardID: r.shard.GetShardID(),
BranchToken: branchToken,
MinEventID: common.FirstEventID,
MaxEventID: lastEventID + 1,
PageSize: 100,
NextPageToken: paginationToken,
})
if err != nil {
return nil, nil, err
}

histories := make([]*historypb.History, 0, len(response.History))
for _, history := range response.History {
histories = append(histories, history)
}
return histories, response.NextPageToken, nil
}
}
Loading