Skip to content

Commit

Permalink
Skip verifying workflow which has already passed retention time (#4770)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**

Previous logic skip workflow of which retention is within a range of
current time. But we actually just need to skip workflow already passed
retention time.

<!-- Tell your future self why have you made these changes -->
**Why?**


<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**


<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
hehaifengcn authored Aug 11, 2023
1 parent 0ce1234 commit 815c8a3
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 39 deletions.
18 changes: 9 additions & 9 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1633,15 +1633,15 @@ var (
)

// Force replication
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
EncounterNotFoundWorkflowCount = NewCounterDef("encounter_not_found_workflow_count")
EncounterCloseToRetentionWorkflowCount = NewCounterDef("encounter_close_to_retention_workflow_count")
GenerateReplicationTasksLatency = NewTimerDef("generate_replication_tasks_latency")
VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success")
VerifyReplicationTaskNotFound = NewCounterDef("verify_replication_task_not_found")
VerifyReplicationTaskFailed = NewCounterDef("verify_replication_task_failed")
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency")
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
EncounterNotFoundWorkflowCount = NewCounterDef("encounter_not_found_workflow_count")
EncounterPassRetentionWorkflowCount = NewCounterDef("encounter_pass_retention_workflow_count")
GenerateReplicationTasksLatency = NewTimerDef("generate_replication_tasks_latency")
VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success")
VerifyReplicationTaskNotFound = NewCounterDef("verify_replication_task_not_found")
VerifyReplicationTaskFailed = NewCounterDef("verify_replication_task_failed")
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
Expand Down
10 changes: 4 additions & 6 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,11 @@ func (a *activities) canSkipWorkflowExecution(
return true, reasonZombieWorkflow, nil
}

// source and target cluster handles workflow retention separately. For a workflow which is abort to be deleted,
// it may be already deleted on target cluster but still exist on source, which cause verification delay.
// Here, we skip workflow of which retention time is close to current time to continue the verification.
// Skip verifying workflow which has already passed retention time.
if closeTime := resp.GetDatabaseMutableState().GetExecutionInfo().GetCloseTime(); closeTime != nil && ns != nil && ns.Retention() > 0 {
deleteTime := closeTime.Add(ns.Retention())
if isCloseToCurrentTime(deleteTime, request.RetentionBiasDuration) {
a.forceReplicationMetricsHandler.Counter(metrics.EncounterCloseToRetentionWorkflowCount.GetMetricName()).Record(1)
if deleteTime.Before(time.Now()) {
a.forceReplicationMetricsHandler.Counter(metrics.EncounterPassRetentionWorkflowCount.GetMetricName()).Record(1)
return true, reasonWorkflowCloseToRetention, nil
}
}
Expand Down Expand Up @@ -644,7 +642,7 @@ func (a *activities) verifyReplicationTasks(
}

const (
defaultNoProgressNotRetryableTimeout = 15 * time.Minute
defaultNoProgressNotRetryableTimeout = 30 * time.Minute
)

func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verifyReplicationTasksRequest) (verifyReplicationTasksResponse, error) {
Expand Down
24 changes: 0 additions & 24 deletions service/worker/migration/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,6 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() {
}
}

// Now
// │ │ bias │
// ───────────────────┼────────▼─────────┼──────
// closeTime │ Skip Range │
// │ deleteTime
// └───────────────────┘
// retention

func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() {
bias := time.Minute
request := verifyReplicationTasksRequest{
Expand All @@ -479,14 +471,6 @@ func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() {
},
{
30 * time.Second,
true,
},
{
-(bias + time.Minute),
false,
},
{
bias + time.Minute,
false,
},
}
Expand Down Expand Up @@ -618,11 +602,3 @@ func (s *activitiesSuite) TestGenerateReplicationTasks_Failed() {
// Only the generation of 1st execution suceeded.
s.Equal(0, lastHeartBeat)
}

func (s *activitiesSuite) Test_isCloseToCurrentTime() {
d := time.Minute
now := time.Now()
s.True(isCloseToCurrentTime(now, d))
s.False(isCloseToCurrentTime(now.Add(2*time.Minute), d))
s.False(isCloseToCurrentTime(now.Add(-2*time.Minute), d))
}

0 comments on commit 815c8a3

Please # to comment.