diff --git a/src/classes/worker.ts b/src/classes/worker.ts index fbdbc76cf9..d2c151f698 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -472,6 +472,7 @@ export class Worker< * to arrive at the queue we should not try to fetch more jobs (as it would be pointless) */ while ( + !this.closing && !this.waiting && numTotal < this._concurrency && (!this.limitUntil || numTotal == 0) @@ -819,10 +820,6 @@ will never work with more accuracy than 1ms. */ fetchNextCallback = () => true, jobsInProgress: Set<{ job: Job; ts: number }>, ): Promise> { - if (!job || this.closing || this.paused) { - return; - } - const srcPropagationMedatada = job.opts?.telemetry?.metadata; return this.trace>( diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index f615fe20f6..d12fd4c660 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -88,7 +88,7 @@ else end end -local deduplicationJobId = deduplicateJob(args[1], opts['de'], +local deduplicationJobId = deduplicateJob(opts['de'], jobId, deduplicationKey, eventsKey, maxEvents) if deduplicationJobId then return deduplicationJobId diff --git a/src/commands/addParentJob-4.lua b/src/commands/addParentJob-4.lua index bde421fca8..f4b324d328 100644 --- a/src/commands/addParentJob-4.lua +++ b/src/commands/addParentJob-4.lua @@ -81,7 +81,7 @@ else end end -local deduplicationJobId = deduplicateJob(args[1], opts['de'], +local deduplicationJobId = deduplicateJob(opts['de'], jobId, deduplicationKey, eventsKey, maxEvents) if deduplicationJobId then return deduplicationJobId diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index 7005e91af7..134ea9f45e 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -94,7 +94,7 @@ else end end -local deduplicationJobId = deduplicateJob(args[1], opts['de'], +local deduplicationJobId = deduplicateJob(opts['de'], jobId, deduplicationKey, eventsKey, maxEvents) if deduplicationJobId then return deduplicationJobId diff --git a/src/commands/includes/deduplicateJob.lua b/src/commands/includes/deduplicateJob.lua index ff873e599c..ef1245bdbc 100644 --- a/src/commands/includes/deduplicateJob.lua +++ b/src/commands/includes/deduplicateJob.lua @@ -1,24 +1,24 @@ --[[ - Function to debounce a job. + Function to deduplicate a job. ]] -local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplicationKey, eventsKey, maxEvents) +local function deduplicateJob(deduplicationOpts, jobId, deduplicationKey, eventsKey, maxEvents) local deduplicationId = deduplicationOpts and deduplicationOpts['id'] if deduplicationId then local ttl = deduplicationOpts['ttl'] local deduplicationKeyExists if ttl then - deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX') + deduplicationKeyExists = rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX') else - deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX') + deduplicationKeyExists = rcall('SET', deduplicationKey, jobId, 'NX') end - if deduplicationKeyExists then - local currentDebounceJobId = rcall('GET', deduplicationKey) + if deduplicationKeyExists == false then + local currentDeduplicatedJobId = rcall('GET', deduplicationKey) rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", - "debounced", "jobId", currentDebounceJobId, "debounceId", deduplicationId) + "debounced", "jobId", currentDeduplicatedJobId, "debounceId", deduplicationId) rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", - "deduplicated", "jobId", currentDebounceJobId, "deduplicationId", deduplicationId) - return currentDebounceJobId + "deduplicated", "jobId", currentDeduplicatedJobId, "deduplicationId", deduplicationId) + return currentDeduplicatedJobId end end end diff --git a/tests/test_bulk.ts b/tests/test_bulk.ts index cdaa97ea76..d53d8f093d 100644 --- a/tests/test_bulk.ts +++ b/tests/test_bulk.ts @@ -162,7 +162,7 @@ describe('bulk jobs', () => { await worker.close(); await worker2.close(); await queueEvents.close(); - }); + }).timeout(5000); it('should process jobs with custom ids', async () => { const name = 'test'; diff --git a/tests/test_events.ts b/tests/test_events.ts index 3ec45cc9ce..9c5c01d5b0 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -587,9 +587,15 @@ describe('events', function () { ); let debouncedCounter = 0; - queueEvents.on('debounced', ({ jobId }) => { - debouncedCounter++; + const debounced = new Promise(resolve => { + queueEvents.on('debounced', () => { + debouncedCounter++; + if (debouncedCounter == 2) { + resolve(); + } + }); }); + await job.remove(); await queue.add(testName, { foo: 'bar' }, { debounce: { id: 'a1' } }); @@ -602,6 +608,11 @@ describe('events', function () { { debounce: { id: 'a1' } }, ); await secondJob.remove(); + await debounced; + + const getDeboundedJobId = await queue.getDebounceJobId('a1'); + + expect(getDeboundedJobId).to.be.null; expect(debouncedCounter).to.be.equal(2); }); @@ -822,6 +833,9 @@ describe('events', function () { await secondJob.remove(); await deduplication; + const getDeduplicationJobId = await queue.getDeduplicationJobId('a1'); + + expect(getDeduplicationJobId).to.be.null; expect(deduplicatedCounter).to.be.equal(2); }); });