Skip to content

Commit

Permalink
Don't dispatch expired tasks from taskReader buffer (#3161)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Jul 28, 2022
1 parent b43f516 commit 2342964
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 26 deletions.
6 changes: 3 additions & 3 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,9 +1091,9 @@ type (
// its mandatory to specify it. On success this method returns the number of rows
// actually deleted. If the underlying storage doesn't support "limit", all rows
// less than or equal to taskID will be deleted.
// On success, this method returns:
// - number of rows actually deleted, if limit is honored
// - UnknownNumRowsDeleted, when all rows below value are deleted
// On success, this method returns either:
// - UnknownNumRowsAffected (this means all rows below value are deleted)
// - number of rows deleted, which may be equal to limit
CompleteTasksLessThan(ctx context.Context, request *CompleteTasksLessThanRequest) (int, error)
}

Expand Down
47 changes: 31 additions & 16 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (s *matchingEngineSuite) TestPollWorkflowTaskQueues() {
Execution: execution,
ScheduledEventId: scheduledEventID,
TaskQueue: stickyTaskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddWorkflowTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -510,7 +510,7 @@ func (s *matchingEngineSuite) AddTasksTest(taskType enumspb.TaskQueueType, isFor
Execution: execution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}
if isForwarded {
addRequest.ForwardedSource = forwardedFrom
Expand All @@ -522,7 +522,7 @@ func (s *matchingEngineSuite) AddTasksTest(taskType enumspb.TaskQueueType, isFor
Execution: execution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}
if isForwarded {
addRequest.ForwardedSource = forwardedFrom
Expand Down Expand Up @@ -571,7 +571,7 @@ func (s *matchingEngineSuite) TestTaskWriterShutdown() {
NamespaceId: namespaceID.String(),
Execution: execution,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

// stop the task writer explicitly
Expand Down Expand Up @@ -616,7 +616,7 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -819,7 +819,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}
_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
wg.Wait()
Expand Down Expand Up @@ -985,7 +985,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -1132,7 +1132,7 @@ func (s *matchingEngineSuite) TestConcurrentPublishConsumeWorkflowTasks() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddWorkflowTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -1572,7 +1572,7 @@ func (s *matchingEngineSuite) TestAddTaskAfterStartFailure() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -1623,7 +1623,7 @@ func (s *matchingEngineSuite) TestTaskQueueManagerGetTaskBatch() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -1767,7 +1767,7 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}

const taskCount = 20
const taskCount = 20 // must be multiple of 4
const rangeSize = 10
s.matchingEngine.config.RangeSize = rangeSize
s.matchingEngine.config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskQueueInfo(2)
Expand All @@ -1788,11 +1788,15 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(5),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}
if i%2 == 0 {
switch i % 4 {
case 0:
// simulates creating a task whose scheduledToStartTimeout is already expired
addRequest.ScheduleToStartTimeout = timestamp.DurationFromSeconds(-5)
case 2:
// simulates creating a task which will time out in the buffer
addRequest.ScheduleToStartTimeout = timestamp.DurationPtr(250 * time.Millisecond)
}
_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
s.NoError(err)
Expand All @@ -1804,7 +1808,11 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {

// wait until all tasks are loaded by into in-memory buffers by task queue manager
// the buffer size should be one less than expected because dispatcher will dequeue the head
s.True(s.awaitCondition(func() bool { return len(tlMgr.taskReader.taskBuffer) >= (taskCount/2 - 1) }, time.Second))
// 1/4 should be thrown out because they are expired before they hit the buffer
s.True(s.awaitCondition(func() bool { return len(tlMgr.taskReader.taskBuffer) >= (3*taskCount/4 - 1) }, time.Second))

// ensure the 1/4 of tasks with small ScheduleToStartTimeout will be expired when they come out of the buffer
time.Sleep(300 * time.Millisecond)

maxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes

Expand All @@ -1826,9 +1834,16 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
}
remaining -= taskCount / 2
// since every other task is expired, we expect half the tasks to be deleted
// after poll consumed 1/4th of what is available
s.EqualValues(remaining, s.taskManager.getTaskCount(tlID))
// after poll consumed 1/4th of what is available.
// however, the gc is best-effort and might not run exactly when we want it to.
// various thread interleavings between the two task reader threads and this one
// might leave the gc behind by up to 3 tasks, or ahead by up to 1.
delta := remaining - s.taskManager.getTaskCount(tlID)
s.Truef(-3 <= delta && delta <= 1, "remaining %d, getTaskCount %d", remaining, s.taskManager.getTaskCount(tlID))
}
// ensure full gc for the next case (twice in case one doesn't get the gc lock)
tlMgr.taskGC.RunNow(context.Background(), tlMgr.taskAckManager.getAckLevel())
tlMgr.taskGC.RunNow(context.Background(), tlMgr.taskAckManager.getAckLevel())
}
}

Expand Down
13 changes: 10 additions & 3 deletions service/matching/taskGC.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"context"
"sync/atomic"
"time"

"go.temporal.io/server/common/persistence"
)

type taskGC struct {
Expand Down Expand Up @@ -78,10 +80,15 @@ func (tgc *taskGC) tryDeleteNextBatch(ctx context.Context, ackLevel int64, ignor
}
tgc.lastDeleteTime = time.Now().UTC()
n, err := tgc.db.CompleteTasksLessThan(ctx, ackLevel+1, batchSize)
switch {
case err != nil:
if err != nil {
return
case n < batchSize:
}
// implementation behavior for CompleteTasksLessThan:
// - unit test, cassandra: always return UnknownNumRowsAffected (in this case means "all")
// - sql: return number of rows affected (should be <= batchSize)
// if we get UnknownNumRowsAffected or a smaller number than our limit, we know we got
// everything <= ackLevel, so we can reset ours. if not, we may have to try again.
if n == persistence.UnknownNumRowsAffected || n < batchSize {
tgc.ackLevel = ackLevel
}
}
Expand Down
8 changes: 8 additions & 0 deletions service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ dispatchLoop:
}
task := newInternalTask(taskInfo, tr.tlMgr.completeTask, enumsspb.TASK_SOURCE_DB_BACKLOG, "", false)
for {
// We checked if the task was expired before putting it in the buffer, but it
// might have expired while it sat in the buffer, so we should check again.
if taskqueue.IsTaskExpired(taskInfo) {
task.finish(nil)
tr.scope().IncCounter(metrics.ExpiredTasksPerTaskQueueCounter)
// Don't try to set read level here because it may have been advanced already.
break
}
err := tr.tlMgr.DispatchTask(ctx, task)
if err == nil {
break
Expand Down
6 changes: 2 additions & 4 deletions service/worker/scanner/taskqueue/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ func (s *Scavenger) deleteHandlerLog(key *p.TaskQueueKey, state *taskQueueState,
// 1. if task has valid TTL -> TTL reached -> delete
// 2. if task has 0 TTL / no TTL -> logic need to additionally check if corresponding workflow still exists
func IsTaskExpired(t *persistencespb.AllocatedTaskInfo) bool {
tExpiry := timestamp.TimeValue(t.Data.ExpiryTime)
tEpoch := time.Unix(0, 0).UTC()
tNow := time.Now().UTC()
return tExpiry.After(tEpoch) && tNow.After(tExpiry)
expiry := timestamp.TimeValue(t.GetData().GetExpiryTime())
return expiry.Unix() > 0 && expiry.Before(time.Now())
}

0 comments on commit 2342964

Please # to comment.