diff --git a/packages/third-parties/bullmq/src/BullMQModule.spec.ts b/packages/third-parties/bullmq/src/BullMQModule.spec.ts index a2d409c73d3..73d3aba9cd3 100644 --- a/packages/third-parties/bullmq/src/BullMQModule.spec.ts +++ b/packages/third-parties/bullmq/src/BullMQModule.spec.ts @@ -4,6 +4,7 @@ import {catchAsyncError} from "@tsed/core"; import {PlatformTest} from "@tsed/platform-http/testing"; import {Queue, Worker} from "bullmq"; import {anything, instance, mock, verify, when} from "ts-mockito"; +import {beforeEach} from "vitest"; import {BullMQModule} from "./BullMQModule.js"; import {type BullMQConfig} from "./config/config.js"; @@ -62,18 +63,17 @@ describe("BullMQModule", () => { dispatcher = mock(JobDispatcher); when(dispatcher.dispatch(CustomCronJob)).thenResolve(); }); + beforeEach(() => { + queueConstructorSpy.mockClear(); + workerConstructorSpy.mockClear(); + }); afterEach(PlatformTest.reset); describe("configuration", () => { - beforeEach(() => { - queueConstructorSpy.mockClear(); - workerConstructorSpy.mockClear(); - }); - describe("merges config correctly", () => { - beforeEach(async () => { - await PlatformTest.create({ + beforeEach(() => + PlatformTest.create({ bullmq: { queues: ["default", "special"], connection: { @@ -114,8 +114,8 @@ describe("BullMQModule", () => { use: instance(dispatcher) } ] - }); - }); + }) + ); it("queue", () => { expect(queueConstructorSpy).toHaveBeenCalledTimes(2); @@ -161,10 +161,9 @@ describe("BullMQModule", () => { }); }); }); - describe("discover queues from decorators", () => { - beforeEach(async () => { - await PlatformTest.create({ + beforeEach(() => + PlatformTest.create({ bullmq: { queues: ["special"], connection: { @@ -205,8 +204,8 @@ describe("BullMQModule", () => { use: instance(dispatcher) } ] - }); - }); + }) + ); it("queue", () => { expect(queueConstructorSpy).toHaveBeenCalledTimes(2); @@ -252,7 +251,6 @@ describe("BullMQModule", () => { }); }); }); - describe("disableWorker", () => { const config = { queues: ["default", "foo", "bar"], @@ -260,8 +258,8 @@ describe("BullMQModule", () => { disableWorker: true } as BullMQConfig; - beforeEach(async () => { - await PlatformTest.create({ + beforeEach(() => + PlatformTest.create({ bullmq: config, imports: [ { @@ -269,25 +267,25 @@ describe("BullMQModule", () => { use: instance(dispatcher) } ] - }); - }); + }) + ); it("should not create any workers", () => { expect(workerConstructorSpy).toHaveBeenCalledTimes(0); }); }); - describe("without", () => { - it("skips initialization", async () => { - await PlatformTest.create({ + beforeEach(() => + PlatformTest.create({ imports: [ { token: JobDispatcher, use: instance(dispatcher) } ] - }); - + }) + ); + it("skips initialization", async () => { expect(queueConstructorSpy).not.toHaveBeenCalled(); verify(dispatcher.dispatch(anything())).never(); }); @@ -301,8 +299,8 @@ describe("BullMQModule", () => { workerQueues: ["default", "foo"] } as BullMQConfig; - beforeEach(async () => { - await PlatformTest.create({ + beforeEach(() => + PlatformTest.create({ bullmq: config, imports: [ { @@ -310,8 +308,8 @@ describe("BullMQModule", () => { use: instance(dispatcher) } ] - }); - }); + }) + ); describe("cronjobs", () => { it("should dispatch cron jobs automatically", () => { @@ -331,10 +329,6 @@ describe("BullMQModule", () => { expect(instance).toBeInstanceOf(Queue); }); - - it("should not allow direct injection of the queue", () => { - expect(PlatformTest.get(Queue)).not.toBeInstanceOf(Queue); - }); }); describe("workers", () => { @@ -354,10 +348,6 @@ describe("BullMQModule", () => { expect(PlatformTest.get("bullmq.worker.bar")).toBeUndefined(); }); - it("should not allow direct injection of the worker", () => { - expect(PlatformTest.get(Worker)).not.toBeInstanceOf(Worker); - }); - it("should run worker and execute processor", async () => { const bullMQModule = PlatformTest.get(BullMQModule); const worker = PlatformTest.get("bullmq.job.default.regular"); @@ -426,7 +416,6 @@ describe("BullMQModule", () => { }); }); }); - describe("with fallback controller", () => { beforeEach(async () => { @FallbackJobController("foo") diff --git a/packages/third-parties/bullmq/src/BullMQModule.ts b/packages/third-parties/bullmq/src/BullMQModule.ts index d9581c39118..c3ff4e8f13a 100644 --- a/packages/third-parties/bullmq/src/BullMQModule.ts +++ b/packages/third-parties/bullmq/src/BullMQModule.ts @@ -1,5 +1,16 @@ -import {DIContext, InjectorService, Module, OnDestroy, runInContext} from "@tsed/di"; -import type {BeforeInit} from "@tsed/platform-http"; +import { + constant, + DIContext, + inject, + injectable, + injectMany, + injector, + logger, + OnDestroy, + type OnInit, + ProviderType, + runInContext +} from "@tsed/di"; import {getComputedType} from "@tsed/schema"; import {Job, Queue, Worker} from "bullmq"; import {v4} from "uuid"; @@ -15,12 +26,10 @@ import {getFallbackJobToken, getJobToken} from "./utils/getJobToken.js"; import {mapQueueOptions} from "./utils/mapQueueOptions.js"; import {mapWorkerOptions} from "./utils/mapWorkerOptions.js"; -@Module() -export class BullMQModule implements BeforeInit, OnDestroy { - constructor( - private readonly injector: InjectorService, - private readonly dispatcher: JobDispatcher - ) { +export class BullMQModule implements OnInit, OnDestroy { + private readonly dispatcher = inject(JobDispatcher); + + constructor() { // build providers allow @Inject(queue) usage in JobController instance if (this.isEnabled()) { const queues = [...this.getUniqQueueNames()]; @@ -36,12 +45,12 @@ export class BullMQModule implements BeforeInit, OnDestroy { } get config() { - return this.injector.settings.get("bullmq"); + return constant("bullmq")!; } - $beforeInit() { + $onInit() { if (this.isEnabled()) { - this.injector.getMany(BullMQTypes.CRON).map((job) => this.dispatcher.dispatch(getComputedType(job))); + injectMany(BullMQTypes.CRON).map((job) => this.dispatcher.dispatch(getComputedType(job))); } } @@ -50,8 +59,8 @@ export class BullMQModule implements BeforeInit, OnDestroy { return; } - await Promise.all(this.injector.getMany(BullMQTypes.QUEUE).map((queue) => queue.close())); - await Promise.all(this.injector.getMany(BullMQTypes.WORKER).map((worker) => worker.close())); + await Promise.all(injectMany(BullMQTypes.QUEUE).map((queue) => queue.close())); + await Promise.all(injectMany(BullMQTypes.WORKER).map((worker) => worker.close())); } isEnabled() { @@ -65,14 +74,14 @@ export class BullMQModule implements BeforeInit, OnDestroy { private buildQueues(queues: string[]) { queues.forEach((queue) => { const opts = mapQueueOptions(queue, this.config); - createQueueProvider(this.injector, queue, opts); + createQueueProvider(queue, opts); }); } private buildWorkers(workers: string[]) { workers.forEach((worker) => { const opts = mapWorkerOptions(worker, this.config); - createWorkerProvider(this.injector, worker, this.onProcess, opts); + createWorkerProvider(worker, this.onProcess, opts); }); } @@ -82,7 +91,7 @@ export class BullMQModule implements BeforeInit, OnDestroy { */ private getUniqQueueNames() { return new Set( - this.injector + injector() .getProviders([BullMQTypes.JOB, BullMQTypes.CRON, BullMQTypes.FALLBACK_JOB]) .map((provider) => provider.store.get(BULLMQ)?.queue) .concat(this.config.queues!) @@ -91,18 +100,14 @@ export class BullMQModule implements BeforeInit, OnDestroy { } private getJob(name: string, queueName: string) { - return ( - this.injector.get(getJobToken(queueName, name)) || - this.injector.get(getFallbackJobToken(queueName)) || - this.injector.get(getFallbackJobToken()) - ); + return inject(getJobToken(queueName, name)) || inject(getFallbackJobToken(queueName)) || inject(getFallbackJobToken()); } private onProcess = async (job: Job) => { const jobService = this.getJob(job.name, job.queueName); if (!jobService) { - this.injector.logger.warn({ + logger().warn({ event: "BULLMQ_JOB_NOT_FOUND", message: `Job ${job.name} ${job.queueName} not found` }); @@ -110,8 +115,6 @@ export class BullMQModule implements BeforeInit, OnDestroy { } const $ctx = new DIContext({ - injector: this.injector, - logger: this.injector.logger, id: job.id || v4().split("-").join(""), additionalProps: { logType: "bullmq", @@ -144,3 +147,5 @@ export class BullMQModule implements BeforeInit, OnDestroy { } }; } + +injectable(BullMQModule).type(ProviderType.MODULE); diff --git a/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.spec.ts b/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.spec.ts index 679f69d273b..c9cc0caf71b 100644 --- a/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.spec.ts +++ b/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.spec.ts @@ -1,6 +1,6 @@ -import {InjectorService} from "@tsed/di"; -import {Queue} from "bullmq"; -import {anything, capture, instance, mock, objectContaining, spy, verify, when} from "ts-mockito"; +import {catchAsyncError} from "@tsed/core"; +import {DITest, inject, injectable, injector} from "@tsed/di"; +import {beforeEach} from "vitest"; import {JobMethods} from "../contracts/index.js"; import {JobController} from "../decorators/index.js"; @@ -27,45 +27,60 @@ class NotConfiguredQueueTestJob implements JobMethods { handle() {} } +function getFixture() { + const dispatcher = inject(JobDispatcher); + const queue = { + name: "default", + add: vi.fn() + }; + + const specialQueue = { + name: "special", + add: vi.fn() + }; + + injectable("bullmq.queue.default").value(queue); + injectable("bullmq.queue.special").value(specialQueue); + injectable("bullmq.job.default.example-job").value(new ExampleTestJob()); + injectable("bullmq.job.default.example-job-with-custom-id-from-job-methods").value(new ExampleJobWithCustomJobIdFromJobMethods()); + + vi.spyOn(injector(), "resolve"); + + return { + dispatcher, + queue, + specialQueue, + job: inject("bullmq.job.default.example-job-with-custom-id-from-job-methods") + }; +} + describe("JobDispatcher", () => { - let injector: InjectorService; - let queue: Queue; - let dispatcher: JobDispatcher; - beforeEach(() => { - injector = mock(InjectorService); - queue = mock(Queue); - when(queue.name).thenReturn("default"); - when(injector.get("bullmq.queue.default")).thenReturn(instance(queue)); - when(injector.get("bullmq.job.default.example-job")).thenReturn(new ExampleTestJob()); - - dispatcher = new JobDispatcher(instance(injector)); - }); + beforeEach(() => DITest.create()); + afterEach(() => DITest.reset()); it("should throw an exception when a queue is not configured", async () => { - when(injector.get("bullmq.queue.not-configured")).thenReturn(undefined); + const {dispatcher} = getFixture(); - await expect(dispatcher.dispatch(NotConfiguredQueueTestJob)).rejects.toThrow(new Error("Queue(not-configured) not defined")); - verify(injector.get("bullmq.queue.not-configured")).once(); - }); + const error = await catchAsyncError(() => dispatcher.dispatch(NotConfiguredQueueTestJob)); + await expect(error).toEqual(new Error("Queue(not-configured) not defined")); + + expect(injector().resolve).toHaveBeenCalledWith("bullmq.queue.not-configured", expect.anything()); + }); it("should dispatch job as type", async () => { + const {dispatcher, queue} = getFixture(); + await dispatcher.dispatch(ExampleTestJob, {msg: "hello test"}); - verify( - queue.add( - "example-job", - objectContaining({msg: "hello test"}), - objectContaining({ - backoff: 69 - }) - ) - ).once(); + expect(queue.add).toHaveBeenCalledOnce(); + expect(queue.add).toHaveBeenCalledWith( + "example-job", + expect.objectContaining({msg: "hello test"}), + expect.objectContaining({backoff: 69}) + ); }); - it("should dispatch job as options", async () => { - const specialQueue = mock(Queue); - when(specialQueue.name).thenReturn("special"); - when(injector.get("bullmq.queue.special")).thenReturn(instance(specialQueue)); + const {dispatcher, specialQueue} = getFixture(); await dispatcher.dispatch( { @@ -75,76 +90,77 @@ describe("JobDispatcher", () => { {msg: "hello test"} ); - verify(specialQueue.add("some-name", objectContaining({msg: "hello test"}), objectContaining({}))).once(); + expect(specialQueue.add).toHaveBeenCalledOnce(); + expect(specialQueue.add).toHaveBeenCalledWith("some-name", expect.objectContaining({msg: "hello test"}), expect.anything()); }); - it("should dispatch job as string", async () => { + const {dispatcher, queue} = getFixture(); + await dispatcher.dispatch("some-name", {msg: "hello test"}); - verify(queue.add("some-name", objectContaining({msg: "hello test"}), objectContaining({}))).once(); + expect(queue.add).toHaveBeenCalledOnce(); + expect(queue.add).toHaveBeenCalledWith("some-name", expect.objectContaining({msg: "hello test"}), expect.anything()); }); - it("should overwrite job options defined by the job", async () => { + const {dispatcher, queue} = getFixture(); + await dispatcher.dispatch(ExampleTestJob, {msg: "hello test"}, {backoff: 42, jobId: "ffeeaa"}); - verify( - queue.add( - "example-job", - objectContaining({msg: "hello test"}), - objectContaining({ - backoff: 42, - jobId: "ffeeaa" - }) - ) - ).once(); + expect(queue.add).toHaveBeenCalledOnce(); + expect(queue.add).toHaveBeenCalledWith( + "example-job", + expect.objectContaining({msg: "hello test"}), + expect.objectContaining({backoff: 42, jobId: "ffeeaa"}) + ); }); - it("should keep existing options and add new ones", async () => { + const {dispatcher, queue} = getFixture(); + await dispatcher.dispatch(ExampleTestJob, {msg: "hello test"}, {jobId: "ffeeaa"}); - verify( - queue.add( - "example-job", - objectContaining({msg: "hello test"}), - objectContaining({ - backoff: 69, - jobId: "ffeeaa" - }) - ) - ).once(); + expect(queue.add).toHaveBeenCalledOnce(); + expect(queue.add).toHaveBeenCalledWith( + "example-job", + expect.objectContaining({msg: "hello test"}), + expect.objectContaining({backoff: 69, jobId: "ffeeaa"}) + ); }); - describe("custom jobId", () => { - let job: ExampleJobWithCustomJobIdFromJobMethods; - beforeEach(() => { - job = new ExampleJobWithCustomJobIdFromJobMethods(); - when(injector.get("bullmq.job.default.example-job-with-custom-id-from-job-methods")).thenReturn(job); - }); - it("should allow setting the job id from within the job", async () => { + const {dispatcher, queue} = getFixture(); + await dispatcher.dispatch(ExampleJobWithCustomJobIdFromJobMethods, "hello world"); - verify(queue.add("example-job-with-custom-id-from-job-methods", "hello world", anything())).once(); + expect(queue.add).toHaveBeenCalledOnce(); + expect(queue.add).toHaveBeenCalledWith("example-job-with-custom-id-from-job-methods", "hello world", expect.anything()); + + const [, , opts] = queue.add.mock.calls.at(-1)!; - const [, , opts] = capture(queue.add).last(); expect(opts).toMatchObject({ jobId: "HELLO WORLD" }); }); it("should pass the payload to the jobId method", async () => { - const spyJob = spy(job); + const {dispatcher, job} = getFixture(); + + vi.spyOn(job, "jobId"); + await dispatcher.dispatch(ExampleJobWithCustomJobIdFromJobMethods, "hello world"); - verify(spyJob.jobId("hello world")).once(); + expect(job.jobId).toHaveBeenCalledOnce(); + expect(job.jobId).toHaveBeenCalledWith("hello world"); }); it("should choose the jobId provided to the dispatcher even when the method is implemented", async () => { + const {dispatcher, queue} = getFixture(); + await dispatcher.dispatch(ExampleJobWithCustomJobIdFromJobMethods, "hello world", { jobId: "I don't think so" }); - const [, , opts] = capture(queue.add).last(); + const [, , opts] = queue.add.mock.calls.at(-1)!; + expect(opts).toMatchObject({ jobId: "I don't think so" }); diff --git a/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.ts b/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.ts index 355158ccf76..e663bec7e7f 100644 --- a/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.ts +++ b/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.ts @@ -1,5 +1,5 @@ -import {Store, Type} from "@tsed/core"; -import {Injectable, InjectorService} from "@tsed/di"; +import {isClass, Store, Type} from "@tsed/core"; +import {inject, injectable} from "@tsed/di"; import {Job as BullMQJob, JobsOptions, Queue} from "bullmq"; import {BULLMQ} from "../constants/constants.js"; @@ -8,10 +8,7 @@ import {getJobToken} from "../utils/getJobToken.js"; import {getQueueToken} from "../utils/getQueueToken.js"; import type {JobDispatcherOptions} from "./JobDispatcherOptions.js"; -@Injectable() export class JobDispatcher { - constructor(private readonly injector: InjectorService) {} - public async dispatch( job: Type, payload?: Parameters[0], @@ -22,7 +19,7 @@ export class JobDispatcher { public async dispatch(job: Type | JobDispatcherOptions | string, payload: unknown, options: JobsOptions = {}): Promise { const {queueName, jobName, defaultJobOptions} = await this.resolveDispatchArgs(job, payload); - const queue = this.injector.get(getQueueToken(queueName)); + const queue = inject(getQueueToken(queueName)); if (!queue) { throw new Error(`Queue(${queueName}) not defined`); @@ -44,6 +41,7 @@ export class JobDispatcher { const store = Store.from(job).get(BULLMQ); queueName = store.queue; jobName = store.name; + defaultJobOptions = await this.retrieveJobOptionsFromClassBasedJob(store, payload); } else if (typeof job === "object") { // job is passed as JobDispatcherOptions @@ -63,13 +61,9 @@ export class JobDispatcher { } private async retrieveJobOptionsFromClassBasedJob(store: JobStore, payload: unknown): Promise { - const job = this.injector.get(getJobToken(store.queue, store.name)); - - if (!job) { - return store.opts; - } - + const job = inject(getJobToken(store.queue, store.name)); const jobId = await job.jobId?.(payload); + if (jobId === undefined) { return store.opts; } @@ -80,3 +74,5 @@ export class JobDispatcher { }; } } + +injectable(JobDispatcher); diff --git a/packages/third-parties/bullmq/src/utils/createQueueProvider.ts b/packages/third-parties/bullmq/src/utils/createQueueProvider.ts index 29aa7ae655c..faae9c91a96 100644 --- a/packages/third-parties/bullmq/src/utils/createQueueProvider.ts +++ b/packages/third-parties/bullmq/src/utils/createQueueProvider.ts @@ -1,19 +1,16 @@ -import {InjectorService} from "@tsed/di"; +import {inject, injectable} from "@tsed/di"; import {Queue, QueueOptions} from "bullmq"; -import {BullMQTypes} from "../constants/BullMQTypes.js"; import {getQueueToken} from "./getQueueToken.js"; -export function createQueueProvider(injector: InjectorService, queue: string, opts: QueueOptions) { +export function createQueueProvider(queue: string, opts: QueueOptions) { const token = getQueueToken(queue); - return injector - .add(token, { - type: BullMQTypes.QUEUE, - useValue: new Queue(queue, opts), - hooks: { - $onDestroy: (queue) => queue.close() - } - }) - .invoke(token); + injectable(token) + .factory(() => new Queue(queue, opts)) + .hooks({ + $onDestroy: (queue: Queue) => queue.close() + }); + + return inject(token); } diff --git a/packages/third-parties/bullmq/src/utils/createWorkerProvider.ts b/packages/third-parties/bullmq/src/utils/createWorkerProvider.ts index 4b9fa3aac51..6705363740d 100644 --- a/packages/third-parties/bullmq/src/utils/createWorkerProvider.ts +++ b/packages/third-parties/bullmq/src/utils/createWorkerProvider.ts @@ -1,19 +1,18 @@ -import {InjectorService} from "@tsed/di"; +import {inject, injectable} from "@tsed/di"; import {Job, Worker, WorkerOptions} from "bullmq"; import {BullMQTypes} from "../constants/BullMQTypes.js"; import {getWorkerToken} from "./getWorkerToken.js"; -export function createWorkerProvider(injector: InjectorService, worker: string, process: (job: Job) => any, opts: WorkerOptions) { +export function createWorkerProvider(worker: string, process: (job: Job) => any, opts: WorkerOptions) { const token = getWorkerToken(worker); - return injector - .add(token, { - type: BullMQTypes.WORKER, - useValue: new Worker(worker, process, opts), - hooks: { - $onDestroy: (worker) => worker.close() - } - }) - .invoke(token); + injectable(token) + .type(BullMQTypes.WORKER) + .value(new Worker(worker, process, opts)) + .hooks({ + $onDestroy: (worker: Worker) => worker.close() + }); + + return inject(token); } diff --git a/packages/third-parties/bullmq/vitest.config.mts b/packages/third-parties/bullmq/vitest.config.mts index 452b24fe8da..439f03ca5e3 100644 --- a/packages/third-parties/bullmq/vitest.config.mts +++ b/packages/third-parties/bullmq/vitest.config.mts @@ -9,11 +9,16 @@ export default defineConfig( ...presets.test, coverage: { ...presets.test.coverage, + exclude: [ + ...presets.test.coverage.exclude, + "**/contracts/**/*", + "**/config/**/*" + ], thresholds: { - statements: 98.92, - branches: 98.55, + statements: 100, + branches: 98.38, functions: 100, - lines: 98.92 + lines: 100 } } }