diff --git a/service/worker/migration/activities.go b/service/worker/migration/activities.go index 0499463797f..64f014e2941 100644 --- a/service/worker/migration/activities.go +++ b/service/worker/migration/activities.go @@ -111,8 +111,15 @@ func (a *activities) checkReplicationOnce(ctx context.Context, waitRequest waitR for _, shard := range resp.Shards { clusterInfo, hasClusterInfo := shard.RemoteClusters[waitRequest.RemoteCluster] if hasClusterInfo { - if shard.MaxReplicationTaskId-clusterInfo.AckedTaskId <= waitRequest.AllowedLaggingTasks || - (clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId] && + // WE are all caught up + if shard.MaxReplicationTaskId == clusterInfo.AckedTaskId { + readyShardCount++ + continue + } + + // Caught up to the last checked IDs, and within allowed lagging range + if clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId] && + (shard.MaxReplicationTaskId-clusterInfo.AckedTaskId <= waitRequest.AllowedLaggingTasks || shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime) <= waitRequest.AllowedLagging) { readyShardCount++ continue