Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Don't dispatch expired tasks from taskReader buffer #3161

Merged
merged 1 commit into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,9 +1091,9 @@ type (
// its mandatory to specify it. On success this method returns the number of rows
// actually deleted. If the underlying storage doesn't support "limit", all rows
// less than or equal to taskID will be deleted.
// On success, this method returns:
// - number of rows actually deleted, if limit is honored
// - UnknownNumRowsDeleted, when all rows below value are deleted
// On success, this method returns either:
// - UnknownNumRowsAffected (this means all rows below value are deleted)
// - number of rows deleted, which may be equal to limit
CompleteTasksLessThan(ctx context.Context, request *CompleteTasksLessThanRequest) (int, error)
}

Expand Down
47 changes: 31 additions & 16 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (s *matchingEngineSuite) TestPollWorkflowTaskQueues() {
Execution: execution,
ScheduledEventId: scheduledEventID,
TaskQueue: stickyTaskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddWorkflowTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -510,7 +510,7 @@ func (s *matchingEngineSuite) AddTasksTest(taskType enumspb.TaskQueueType, isFor
Execution: execution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}
if isForwarded {
addRequest.ForwardedSource = forwardedFrom
Expand All @@ -522,7 +522,7 @@ func (s *matchingEngineSuite) AddTasksTest(taskType enumspb.TaskQueueType, isFor
Execution: execution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}
if isForwarded {
addRequest.ForwardedSource = forwardedFrom
Expand Down Expand Up @@ -571,7 +571,7 @@ func (s *matchingEngineSuite) TestTaskWriterShutdown() {
NamespaceId: namespaceID.String(),
Execution: execution,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

// stop the task writer explicitly
Expand Down Expand Up @@ -616,7 +616,7 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -819,7 +819,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}
_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
wg.Wait()
Expand Down Expand Up @@ -985,7 +985,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -1132,7 +1132,7 @@ func (s *matchingEngineSuite) TestConcurrentPublishConsumeWorkflowTasks() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddWorkflowTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -1572,7 +1572,7 @@ func (s *matchingEngineSuite) TestAddTaskAfterStartFailure() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -1623,7 +1623,7 @@ func (s *matchingEngineSuite) TestTaskQueueManagerGetTaskBatch() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(1),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}

_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
Expand Down Expand Up @@ -1767,7 +1767,7 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}

const taskCount = 20
const taskCount = 20 // must be multiple of 4
const rangeSize = 10
s.matchingEngine.config.RangeSize = rangeSize
s.matchingEngine.config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskQueueInfo(2)
Expand All @@ -1788,11 +1788,15 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
Execution: workflowExecution,
ScheduledEventId: scheduledEventID,
TaskQueue: taskQueue,
ScheduleToStartTimeout: timestamp.DurationFromSeconds(5),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(100),
}
if i%2 == 0 {
switch i % 4 {
case 0:
// simulates creating a task whose scheduledToStartTimeout is already expired
addRequest.ScheduleToStartTimeout = timestamp.DurationFromSeconds(-5)
case 2:
// simulates creating a task which will time out in the buffer
addRequest.ScheduleToStartTimeout = timestamp.DurationPtr(250 * time.Millisecond)
}
_, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest)
s.NoError(err)
Expand All @@ -1804,7 +1808,11 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {

// wait until all tasks are loaded by into in-memory buffers by task queue manager
// the buffer size should be one less than expected because dispatcher will dequeue the head
s.True(s.awaitCondition(func() bool { return len(tlMgr.taskReader.taskBuffer) >= (taskCount/2 - 1) }, time.Second))
// 1/4 should be thrown out because they are expired before they hit the buffer
s.True(s.awaitCondition(func() bool { return len(tlMgr.taskReader.taskBuffer) >= (3*taskCount/4 - 1) }, time.Second))

// ensure the 1/4 of tasks with small ScheduleToStartTimeout will be expired when they come out of the buffer
time.Sleep(300 * time.Millisecond)

maxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes

Expand All @@ -1826,9 +1834,16 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
}
remaining -= taskCount / 2
// since every other task is expired, we expect half the tasks to be deleted
// after poll consumed 1/4th of what is available
s.EqualValues(remaining, s.taskManager.getTaskCount(tlID))
// after poll consumed 1/4th of what is available.
// however, the gc is best-effort and might not run exactly when we want it to.
// various thread interleavings between the two task reader threads and this one
// might leave the gc behind by up to 3 tasks, or ahead by up to 1.
delta := remaining - s.taskManager.getTaskCount(tlID)
s.Truef(-3 <= delta && delta <= 1, "remaining %d, getTaskCount %d", remaining, s.taskManager.getTaskCount(tlID))
}
// ensure full gc for the next case (twice in case one doesn't get the gc lock)
tlMgr.taskGC.RunNow(context.Background(), tlMgr.taskAckManager.getAckLevel())
tlMgr.taskGC.RunNow(context.Background(), tlMgr.taskAckManager.getAckLevel())
}
}

Expand Down
13 changes: 10 additions & 3 deletions service/matching/taskGC.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"context"
"sync/atomic"
"time"

"go.temporal.io/server/common/persistence"
)

type taskGC struct {
Expand Down Expand Up @@ -78,10 +80,15 @@ func (tgc *taskGC) tryDeleteNextBatch(ctx context.Context, ackLevel int64, ignor
}
tgc.lastDeleteTime = time.Now().UTC()
n, err := tgc.db.CompleteTasksLessThan(ctx, ackLevel+1, batchSize)
switch {
case err != nil:
if err != nil {
return
case n < batchSize:
}
// implementation behavior for CompleteTasksLessThan:
// - unit test, cassandra: always return UnknownNumRowsAffected (in this case means "all")
// - sql: return number of rows affected (should be <= batchSize)
// if we get UnknownNumRowsAffected or a smaller number than our limit, we know we got
// everything <= ackLevel, so we can reset ours. if not, we may have to try again.
if n == persistence.UnknownNumRowsAffected || n < batchSize {
tgc.ackLevel = ackLevel
}
}
Expand Down
8 changes: 8 additions & 0 deletions service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ dispatchLoop:
}
task := newInternalTask(taskInfo, tr.tlMgr.completeTask, enumsspb.TASK_SOURCE_DB_BACKLOG, "", false)
for {
// We checked if the task was expired before putting it in the buffer, but it
// might have expired while it sat in the buffer, so we should check again.
if taskqueue.IsTaskExpired(taskInfo) {
task.finish(nil)
tr.scope().IncCounter(metrics.ExpiredTasksPerTaskQueueCounter)
// Don't try to set read level here because it may have been advanced already.
break
}
err := tr.tlMgr.DispatchTask(ctx, task)
if err == nil {
break
Expand Down
6 changes: 2 additions & 4 deletions service/worker/scanner/taskqueue/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ func (s *Scavenger) deleteHandlerLog(key *p.TaskQueueKey, state *taskQueueState,
// 1. if task has valid TTL -> TTL reached -> delete
// 2. if task has 0 TTL / no TTL -> logic need to additionally check if corresponding workflow still exists
func IsTaskExpired(t *persistencespb.AllocatedTaskInfo) bool {
tExpiry := timestamp.TimeValue(t.Data.ExpiryTime)
tEpoch := time.Unix(0, 0).UTC()
tNow := time.Now().UTC()
return tExpiry.After(tEpoch) && tNow.After(tExpiry)
expiry := timestamp.TimeValue(t.GetData().GetExpiryTime())
return expiry.Unix() > 0 && expiry.Before(time.Now())
}