Skip to content

Commit

Permalink
Minor tweak to migration workflow wait replication check (#2839)
Browse files Browse the repository at this point in the history
  • Loading branch information
meiliang86 committed Jun 1, 2022
1 parent aad0f85 commit bd87aa3
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,15 @@ func (a *activities) checkReplicationOnce(ctx context.Context, waitRequest waitR
for _, shard := range resp.Shards {
clusterInfo, hasClusterInfo := shard.RemoteClusters[waitRequest.RemoteCluster]
if hasClusterInfo {
if shard.MaxReplicationTaskId-clusterInfo.AckedTaskId <= waitRequest.AllowedLaggingTasks ||
(clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId] &&
// WE are all caught up
if shard.MaxReplicationTaskId == clusterInfo.AckedTaskId {
readyShardCount++
continue
}

// Caught up to the last checked IDs, and within allowed lagging range
if clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId] &&
(shard.MaxReplicationTaskId-clusterInfo.AckedTaskId <= waitRequest.AllowedLaggingTasks ||
shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime) <= waitRequest.AllowedLagging) {
readyShardCount++
continue
Expand Down

0 comments on commit bd87aa3

Please # to comment.