diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index d22b7ce0017..7edbe8f89c0 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -1091,9 +1091,9 @@ type ( // its mandatory to specify it. On success this method returns the number of rows // actually deleted. If the underlying storage doesn't support "limit", all rows // less than or equal to taskID will be deleted. - // On success, this method returns: - // - number of rows actually deleted, if limit is honored - // - UnknownNumRowsDeleted, when all rows below value are deleted + // On success, this method returns either: + // - UnknownNumRowsAffected (this means all rows below value are deleted) + // - number of rows deleted, which may be equal to limit CompleteTasksLessThan(ctx context.Context, request *CompleteTasksLessThanRequest) (int, error) } diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index b230ae3bcd8..917c8a7e227 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -345,7 +345,7 @@ func (s *matchingEngineSuite) TestPollWorkflowTaskQueues() { Execution: execution, ScheduledEventId: scheduledEventID, TaskQueue: stickyTaskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } _, err := s.matchingEngine.AddWorkflowTask(s.handlerContext, &addRequest) @@ -510,7 +510,7 @@ func (s *matchingEngineSuite) AddTasksTest(taskType enumspb.TaskQueueType, isFor Execution: execution, ScheduledEventId: scheduledEventID, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } if isForwarded { addRequest.ForwardedSource = forwardedFrom @@ -522,7 +522,7 @@ func (s *matchingEngineSuite) AddTasksTest(taskType enumspb.TaskQueueType, isFor Execution: execution, ScheduledEventId: scheduledEventID, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } if isForwarded { addRequest.ForwardedSource = forwardedFrom @@ -571,7 +571,7 @@ func (s *matchingEngineSuite) TestTaskWriterShutdown() { NamespaceId: namespaceID.String(), Execution: execution, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } // stop the task writer explicitly @@ -616,7 +616,7 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() { Execution: workflowExecution, ScheduledEventId: scheduledEventID, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } _, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest) @@ -819,7 +819,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() { Execution: workflowExecution, ScheduledEventId: scheduledEventID, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } _, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest) wg.Wait() @@ -985,7 +985,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities( Execution: workflowExecution, ScheduledEventId: scheduledEventID, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } _, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest) @@ -1132,7 +1132,7 @@ func (s *matchingEngineSuite) TestConcurrentPublishConsumeWorkflowTasks() { Execution: workflowExecution, ScheduledEventId: scheduledEventID, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } _, err := s.matchingEngine.AddWorkflowTask(s.handlerContext, &addRequest) @@ -1572,7 +1572,7 @@ func (s *matchingEngineSuite) TestAddTaskAfterStartFailure() { Execution: workflowExecution, ScheduledEventId: scheduledEventID, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } _, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest) @@ -1623,7 +1623,7 @@ func (s *matchingEngineSuite) TestTaskQueueManagerGetTaskBatch() { Execution: workflowExecution, ScheduledEventId: scheduledEventID, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(1), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } _, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest) @@ -1767,7 +1767,7 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() { Kind: enumspb.TASK_QUEUE_KIND_NORMAL, } - const taskCount = 20 + const taskCount = 20 // must be multiple of 4 const rangeSize = 10 s.matchingEngine.config.RangeSize = rangeSize s.matchingEngine.config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskQueueInfo(2) @@ -1788,11 +1788,15 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() { Execution: workflowExecution, ScheduledEventId: scheduledEventID, TaskQueue: taskQueue, - ScheduleToStartTimeout: timestamp.DurationFromSeconds(5), + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), } - if i%2 == 0 { + switch i % 4 { + case 0: // simulates creating a task whose scheduledToStartTimeout is already expired addRequest.ScheduleToStartTimeout = timestamp.DurationFromSeconds(-5) + case 2: + // simulates creating a task which will time out in the buffer + addRequest.ScheduleToStartTimeout = timestamp.DurationPtr(250 * time.Millisecond) } _, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest) s.NoError(err) @@ -1804,7 +1808,11 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() { // wait until all tasks are loaded by into in-memory buffers by task queue manager // the buffer size should be one less than expected because dispatcher will dequeue the head - s.True(s.awaitCondition(func() bool { return len(tlMgr.taskReader.taskBuffer) >= (taskCount/2 - 1) }, time.Second)) + // 1/4 should be thrown out because they are expired before they hit the buffer + s.True(s.awaitCondition(func() bool { return len(tlMgr.taskReader.taskBuffer) >= (3*taskCount/4 - 1) }, time.Second)) + + // ensure the 1/4 of tasks with small ScheduleToStartTimeout will be expired when they come out of the buffer + time.Sleep(300 * time.Millisecond) maxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes @@ -1826,9 +1834,16 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() { } remaining -= taskCount / 2 // since every other task is expired, we expect half the tasks to be deleted - // after poll consumed 1/4th of what is available - s.EqualValues(remaining, s.taskManager.getTaskCount(tlID)) + // after poll consumed 1/4th of what is available. + // however, the gc is best-effort and might not run exactly when we want it to. + // various thread interleavings between the two task reader threads and this one + // might leave the gc behind by up to 3 tasks, or ahead by up to 1. + delta := remaining - s.taskManager.getTaskCount(tlID) + s.Truef(-3 <= delta && delta <= 1, "remaining %d, getTaskCount %d", remaining, s.taskManager.getTaskCount(tlID)) } + // ensure full gc for the next case (twice in case one doesn't get the gc lock) + tlMgr.taskGC.RunNow(context.Background(), tlMgr.taskAckManager.getAckLevel()) + tlMgr.taskGC.RunNow(context.Background(), tlMgr.taskAckManager.getAckLevel()) } } diff --git a/service/matching/taskGC.go b/service/matching/taskGC.go index b4e7dccd9a4..02eac03e10a 100644 --- a/service/matching/taskGC.go +++ b/service/matching/taskGC.go @@ -28,6 +28,8 @@ import ( "context" "sync/atomic" "time" + + "go.temporal.io/server/common/persistence" ) type taskGC struct { @@ -78,10 +80,15 @@ func (tgc *taskGC) tryDeleteNextBatch(ctx context.Context, ackLevel int64, ignor } tgc.lastDeleteTime = time.Now().UTC() n, err := tgc.db.CompleteTasksLessThan(ctx, ackLevel+1, batchSize) - switch { - case err != nil: + if err != nil { return - case n < batchSize: + } + // implementation behavior for CompleteTasksLessThan: + // - unit test, cassandra: always return UnknownNumRowsAffected (in this case means "all") + // - sql: return number of rows affected (should be <= batchSize) + // if we get UnknownNumRowsAffected or a smaller number than our limit, we know we got + // everything <= ackLevel, so we can reset ours. if not, we may have to try again. + if n == persistence.UnknownNumRowsAffected || n < batchSize { tgc.ackLevel = ackLevel } } diff --git a/service/matching/taskReader.go b/service/matching/taskReader.go index 5f9e09c4c7b..eb0159ea178 100644 --- a/service/matching/taskReader.go +++ b/service/matching/taskReader.go @@ -114,6 +114,14 @@ dispatchLoop: } task := newInternalTask(taskInfo, tr.tlMgr.completeTask, enumsspb.TASK_SOURCE_DB_BACKLOG, "", false) for { + // We checked if the task was expired before putting it in the buffer, but it + // might have expired while it sat in the buffer, so we should check again. + if taskqueue.IsTaskExpired(taskInfo) { + task.finish(nil) + tr.scope().IncCounter(metrics.ExpiredTasksPerTaskQueueCounter) + // Don't try to set read level here because it may have been advanced already. + break + } err := tr.tlMgr.DispatchTask(ctx, task) if err == nil { break diff --git a/service/worker/scanner/taskqueue/handler.go b/service/worker/scanner/taskqueue/handler.go index be14416dd5d..a1e4075488b 100644 --- a/service/worker/scanner/taskqueue/handler.go +++ b/service/worker/scanner/taskqueue/handler.go @@ -148,8 +148,6 @@ func (s *Scavenger) deleteHandlerLog(key *p.TaskQueueKey, state *taskQueueState, // 1. if task has valid TTL -> TTL reached -> delete // 2. if task has 0 TTL / no TTL -> logic need to additionally check if corresponding workflow still exists func IsTaskExpired(t *persistencespb.AllocatedTaskInfo) bool { - tExpiry := timestamp.TimeValue(t.Data.ExpiryTime) - tEpoch := time.Unix(0, 0).UTC() - tNow := time.Now().UTC() - return tExpiry.After(tEpoch) && tNow.After(tExpiry) + expiry := timestamp.TimeValue(t.GetData().GetExpiryTime()) + return expiry.Unix() > 0 && expiry.Before(time.Now()) }