From 4c85529d9ee53a7986194c19145089cdc4d67ba4 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Wed, 9 Aug 2023 16:14:41 -0700 Subject: [PATCH] Improve task range completion prerequisite check --- service/history/queues/queue_base.go | 2 +- service/history/queues/queue_base_test.go | 72 ++++++++++++++++++++++- 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 9e85cd3ccdd..3afb2645a31 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -392,7 +392,7 @@ func (p *queueBase) checkpoint() { // for the queue p.metricsHandler.Counter(metrics.TaskBatchCompleteCounter.GetMetricName()).Record(1) if newExclusiveDeletionHighWatermark.CompareTo(p.exclusiveDeletionHighWatermark) > 0 || - p.updateShardRangeID() { + (p.updateShardRangeID() && newExclusiveDeletionHighWatermark.CompareTo(tasks.MinimumKey) > 0) { // when shard rangeID is updated, perform range completion again in case the underlying persistence implementation // serves traffic based on the persisted shardInfo err := p.rangeCompleteTasks(p.exclusiveDeletionHighWatermark, newExclusiveDeletionHighWatermark) diff --git a/service/history/queues/queue_base_test.go b/service/history/queues/queue_base_test.go index a48390f90f6..2cd3ce64cfe 100644 --- a/service/history/queues/queue_base_test.go +++ b/service/history/queues/queue_base_test.go @@ -27,6 +27,7 @@ package queues import ( "context" "errors" + "math/rand" "testing" "time" @@ -420,7 +421,7 @@ func (s *queueBaseSuite) TestProcessNewRange() { s.True(base.nonReadableScope.Range.Equals(NewRange(scopes[0].Range.ExclusiveMax, tasks.MaximumKey))) } -func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() { +func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks_PerformRangeCompletion() { scopeMinKey := tasks.MaximumKey readerScopes := map[int64][]Scope{} readerIDs := []int64{DefaultReaderId, 2, 3} @@ -504,6 +505,75 @@ func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() { s.True(scopeMinKey.CompareTo(base.exclusiveDeletionHighWatermark) == 0) } +func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks_SkipRangeCompletion() { + // task range completion should be skipped when there's no task to delete + scopeMinKey := tasks.MinimumKey + readerScopes := map[int64][]Scope{ + DefaultReaderId: { + { + Range: NewRange(scopeMinKey, tasks.NewKey(time.Now(), rand.Int63())), + Predicate: predicates.Universal[tasks.Task](), + }, + }, + } + queueState := &queueState{ + readerScopes: readerScopes, + exclusiveReaderHighWatermark: tasks.MaximumKey, + } + persistenceState := ToPersistenceQueueState(queueState) + + mockShard := shard.NewTestContext( + s.controller, + &persistencespb.ShardInfo{ + ShardId: 0, + RangeId: 10, + QueueStates: map[int32]*persistencespb.QueueState{ + tasks.CategoryIDTimer: persistenceState, + }, + }, + s.config, + ) + mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() + mockShard.Resource.ExecutionMgr.EXPECT().RegisterHistoryTaskReader(gomock.Any(), gomock.Any()).Return(nil).Times(len(readerScopes)) + + base := newQueueBase( + mockShard, + tasks.CategoryTimer, + nil, + s.mockScheduler, + s.mockRescheduler, + NewNoopPriorityAssigner(), + nil, + s.options, + s.rateLimiter, + NoopReaderCompletionFn, + s.logger, + s.metricsHandler, + ) + base.checkpointTimer = time.NewTimer(s.options.CheckpointInterval()) + + s.True(scopeMinKey.CompareTo(base.exclusiveDeletionHighWatermark) == 0) + + // set to a smaller value so that delete will be triggered + currentLowWatermark := tasks.MinimumKey + base.exclusiveDeletionHighWatermark = currentLowWatermark + + gomock.InOrder( + mockShard.Resource.ExecutionMgr.EXPECT().UpdateHistoryTaskReaderProgress(gomock.Any(), gomock.Any()).Times(len(readerScopes)), + mockShard.Resource.ShardMgr.EXPECT().UpdateShard(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, request *persistence.UpdateShardRequest) error { + s.QueueStateEqual(persistenceState, request.ShardInfo.QueueStates[tasks.CategoryIDTimer]) + return nil + }, + ).Times(1), + ) + + base.checkpoint() + + s.True(scopeMinKey.CompareTo(base.exclusiveDeletionHighWatermark) == 0) +} + func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() { exclusiveReaderHighWatermark := NewRandomKey() queueState := &queueState{