From dae8a5152e4c245d215bd923b747c9959138fec0 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 8 Aug 2022 09:58:06 -0700 Subject: [PATCH] Fix queue processor throttling logic (#3195) --- service/history/queueProcessor.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index 7ae66ebda78..dc83f393d97 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -69,9 +69,10 @@ type ( scheduler queues.Scheduler rescheduler queues.Rescheduler - lastPollTime time.Time - backoffTimer *time.Timer - readTaskRetrier backoff.Retrier + lastPollTime time.Time + backoffTimerLock sync.Mutex + backoffTimer *time.Timer + readTaskRetrier backoff.Retrier notifyCh chan struct{} status int32 @@ -261,21 +262,23 @@ func (p *queueProcessorBase) processBatch() { func (p *queueProcessorBase) verifyReschedulerSize() bool { passed := p.rescheduler.Len() < p.options.MaxReschdulerSize() - if passed && p.backoffTimer != nil { - p.backoffTimer.Stop() - p.backoffTimer = nil - } - if !passed && p.backoffTimer == nil { + if !passed { p.throttle(p.options.PollBackoffInterval()) } - return passed } func (p *queueProcessorBase) throttle(duration time.Duration) { + p.backoffTimerLock.Lock() + defer p.backoffTimerLock.Unlock() + if p.backoffTimer == nil { p.backoffTimer = time.AfterFunc(duration, func() { + p.backoffTimerLock.Lock() + defer p.backoffTimerLock.Unlock() + p.notifyNewTask() // re-enqueue the event + p.backoffTimer = nil }) } }