diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index dbc520b8b98..3630cdd1505 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -218,19 +218,19 @@ processorPumpLoop: } func (p *queueProcessorBase) processBatch() { - - if !p.verifyReschedulerSize() { - return - } - ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay) if err := p.rateLimiter.Wait(ctx); err != nil { + deadline, _ := ctx.Deadline() + p.throttle(deadline.Sub(p.timeSource.Now())) cancel() - p.notifyNewTask() // re-enqueue the event return } cancel() + if !p.verifyReschedulerSize() { + return + } + p.lastPollTime = p.timeSource.Now() tasks, more, err := p.ackMgr.readQueueTasks() @@ -274,14 +274,20 @@ func (p *queueProcessorBase) verifyReschedulerSize() bool { p.backoffTimer = nil } if !passed && p.backoffTimer == nil { - p.backoffTimer = time.AfterFunc(p.options.PollBackoffInterval(), func() { - p.notifyNewTask() // re-enqueue the event - }) + p.throttle(p.options.PollBackoffInterval()) } return passed } +func (p *queueProcessorBase) throttle(duration time.Duration) { + if p.backoffTimer == nil { + p.backoffTimer = time.AfterFunc(duration, func() { + p.notifyNewTask() // re-enqueue the event + }) + } +} + func (p *queueProcessorBase) submitTask( executable queues.Executable, ) { diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index fc1ad2872dc..6a84c5804a0 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -297,18 +297,19 @@ func (t *timerQueueProcessorBase) internalProcessor() error { } func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*time.Time, error) { - if !t.verifyReschedulerSize() { - return nil, nil - } - ctx, cancel := context.WithTimeout(context.Background(), loadTimerTaskThrottleRetryDelay) if err := t.rateLimiter.Wait(ctx); err != nil { + deadline, _ := ctx.Deadline() + t.notifyNewTimer(deadline) // re-enqueue the event cancel() - t.notifyNewTimer(time.Time{}) // re-enqueue the event return nil, nil } cancel() + if !t.verifyReschedulerSize() { + return nil, nil + } + t.lastPollTime = t.timeSource.Now() timerTasks, nextFireTime, moreTasks, err := t.timerQueueAckMgr.readTimerTasks() if err != nil {