Skip to content

Commit

Permalink
fix(bullmq): change hook to handle correct DI initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Romakita committed Nov 25, 2024
1 parent 89edb5f commit 3a64326
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 168 deletions.
63 changes: 26 additions & 37 deletions packages/third-parties/bullmq/src/BullMQModule.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {catchAsyncError} from "@tsed/core";
import {PlatformTest} from "@tsed/platform-http/testing";
import {Queue, Worker} from "bullmq";
import {anything, instance, mock, verify, when} from "ts-mockito";
import {beforeEach} from "vitest";

import {BullMQModule} from "./BullMQModule.js";
import {type BullMQConfig} from "./config/config.js";
Expand Down Expand Up @@ -62,18 +63,17 @@ describe("BullMQModule", () => {
dispatcher = mock(JobDispatcher);
when(dispatcher.dispatch(CustomCronJob)).thenResolve();
});
beforeEach(() => {
queueConstructorSpy.mockClear();
workerConstructorSpy.mockClear();
});

afterEach(PlatformTest.reset);

describe("configuration", () => {
beforeEach(() => {
queueConstructorSpy.mockClear();
workerConstructorSpy.mockClear();
});

describe("merges config correctly", () => {
beforeEach(async () => {
await PlatformTest.create({
beforeEach(() =>
PlatformTest.create({
bullmq: {
queues: ["default", "special"],
connection: {
Expand Down Expand Up @@ -114,8 +114,8 @@ describe("BullMQModule", () => {
use: instance(dispatcher)
}
]
});
});
})
);

it("queue", () => {
expect(queueConstructorSpy).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -161,10 +161,9 @@ describe("BullMQModule", () => {
});
});
});

describe("discover queues from decorators", () => {
beforeEach(async () => {
await PlatformTest.create({
beforeEach(() =>
PlatformTest.create({
bullmq: {
queues: ["special"],
connection: {
Expand Down Expand Up @@ -205,8 +204,8 @@ describe("BullMQModule", () => {
use: instance(dispatcher)
}
]
});
});
})
);

it("queue", () => {
expect(queueConstructorSpy).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -252,42 +251,41 @@ describe("BullMQModule", () => {
});
});
});

describe("disableWorker", () => {
const config = {
queues: ["default", "foo", "bar"],
connection: {},
disableWorker: true
} as BullMQConfig;

beforeEach(async () => {
await PlatformTest.create({
beforeEach(() =>
PlatformTest.create({
bullmq: config,
imports: [
{
token: JobDispatcher,
use: instance(dispatcher)
}
]
});
});
})
);

it("should not create any workers", () => {
expect(workerConstructorSpy).toHaveBeenCalledTimes(0);
});
});

describe("without", () => {
it("skips initialization", async () => {
await PlatformTest.create({
beforeEach(() =>
PlatformTest.create({
imports: [
{
token: JobDispatcher,
use: instance(dispatcher)
}
]
});

})
);
it("skips initialization", async () => {
expect(queueConstructorSpy).not.toHaveBeenCalled();
verify(dispatcher.dispatch(anything())).never();
});
Expand All @@ -301,17 +299,17 @@ describe("BullMQModule", () => {
workerQueues: ["default", "foo"]
} as BullMQConfig;

beforeEach(async () => {
await PlatformTest.create({
beforeEach(() =>
PlatformTest.create({
bullmq: config,
imports: [
{
token: JobDispatcher,
use: instance(dispatcher)
}
]
});
});
})
);

describe("cronjobs", () => {
it("should dispatch cron jobs automatically", () => {
Expand All @@ -331,10 +329,6 @@ describe("BullMQModule", () => {

expect(instance).toBeInstanceOf(Queue);
});

it("should not allow direct injection of the queue", () => {
expect(PlatformTest.get(Queue)).not.toBeInstanceOf(Queue);
});
});

describe("workers", () => {
Expand All @@ -354,10 +348,6 @@ describe("BullMQModule", () => {
expect(PlatformTest.get("bullmq.worker.bar")).toBeUndefined();
});

it("should not allow direct injection of the worker", () => {
expect(PlatformTest.get(Worker)).not.toBeInstanceOf(Worker);
});

it("should run worker and execute processor", async () => {
const bullMQModule = PlatformTest.get<BullMQModule>(BullMQModule);
const worker = PlatformTest.get<JobMethods>("bullmq.job.default.regular");
Expand Down Expand Up @@ -426,7 +416,6 @@ describe("BullMQModule", () => {
});
});
});

describe("with fallback controller", () => {
beforeEach(async () => {
@FallbackJobController("foo")
Expand Down
53 changes: 29 additions & 24 deletions packages/third-parties/bullmq/src/BullMQModule.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
import {DIContext, InjectorService, Module, OnDestroy, runInContext} from "@tsed/di";
import type {BeforeInit} from "@tsed/platform-http";
import {
constant,
DIContext,
inject,
injectable,
injectMany,
injector,
logger,
OnDestroy,
type OnInit,
ProviderType,
runInContext
} from "@tsed/di";
import {getComputedType} from "@tsed/schema";
import {Job, Queue, Worker} from "bullmq";
import {v4} from "uuid";
Expand All @@ -15,12 +26,10 @@ import {getFallbackJobToken, getJobToken} from "./utils/getJobToken.js";
import {mapQueueOptions} from "./utils/mapQueueOptions.js";
import {mapWorkerOptions} from "./utils/mapWorkerOptions.js";

@Module()
export class BullMQModule implements BeforeInit, OnDestroy {
constructor(
private readonly injector: InjectorService,
private readonly dispatcher: JobDispatcher
) {
export class BullMQModule implements OnInit, OnDestroy {
private readonly dispatcher = inject(JobDispatcher);

constructor() {
// build providers allow @Inject(queue) usage in JobController instance
if (this.isEnabled()) {
const queues = [...this.getUniqQueueNames()];
Expand All @@ -36,12 +45,12 @@ export class BullMQModule implements BeforeInit, OnDestroy {
}

get config() {
return this.injector.settings.get<BullMQConfig>("bullmq");
return constant<BullMQConfig>("bullmq")!;
}

$beforeInit() {
$onInit() {
if (this.isEnabled()) {
this.injector.getMany<JobMethods>(BullMQTypes.CRON).map((job) => this.dispatcher.dispatch(getComputedType(job)));
injectMany<JobMethods>(BullMQTypes.CRON).map((job) => this.dispatcher.dispatch(getComputedType(job)));
}
}

Expand All @@ -50,8 +59,8 @@ export class BullMQModule implements BeforeInit, OnDestroy {
return;
}

await Promise.all(this.injector.getMany<Queue>(BullMQTypes.QUEUE).map((queue) => queue.close()));
await Promise.all(this.injector.getMany<Worker>(BullMQTypes.WORKER).map((worker) => worker.close()));
await Promise.all(injectMany<Queue>(BullMQTypes.QUEUE).map((queue) => queue.close()));
await Promise.all(injectMany<Worker>(BullMQTypes.WORKER).map((worker) => worker.close()));
}

isEnabled() {
Expand All @@ -65,14 +74,14 @@ export class BullMQModule implements BeforeInit, OnDestroy {
private buildQueues(queues: string[]) {
queues.forEach((queue) => {
const opts = mapQueueOptions(queue, this.config);
createQueueProvider(this.injector, queue, opts);
createQueueProvider(queue, opts);
});
}

private buildWorkers(workers: string[]) {
workers.forEach((worker) => {
const opts = mapWorkerOptions(worker, this.config);
createWorkerProvider(this.injector, worker, this.onProcess, opts);
createWorkerProvider(worker, this.onProcess, opts);
});
}

Expand All @@ -82,7 +91,7 @@ export class BullMQModule implements BeforeInit, OnDestroy {
*/
private getUniqQueueNames() {
return new Set(
this.injector
injector()
.getProviders([BullMQTypes.JOB, BullMQTypes.CRON, BullMQTypes.FALLBACK_JOB])
.map((provider) => provider.store.get<JobStore>(BULLMQ)?.queue)
.concat(this.config.queues!)
Expand All @@ -91,27 +100,21 @@ export class BullMQModule implements BeforeInit, OnDestroy {
}

private getJob(name: string, queueName: string) {
return (
this.injector.get<JobMethods>(getJobToken(queueName, name)) ||
this.injector.get(getFallbackJobToken(queueName)) ||
this.injector.get(getFallbackJobToken())
);
return inject<JobMethods>(getJobToken(queueName, name)) || inject(getFallbackJobToken(queueName)) || inject(getFallbackJobToken());
}

private onProcess = async (job: Job) => {
const jobService = this.getJob(job.name, job.queueName);

if (!jobService) {
this.injector.logger.warn({
logger().warn({
event: "BULLMQ_JOB_NOT_FOUND",
message: `Job ${job.name} ${job.queueName} not found`
});
return;
}

const $ctx = new DIContext({
injector: this.injector,
logger: this.injector.logger,
id: job.id || v4().split("-").join(""),
additionalProps: {
logType: "bullmq",
Expand Down Expand Up @@ -144,3 +147,5 @@ export class BullMQModule implements BeforeInit, OnDestroy {
}
};
}

injectable(BullMQModule).type(ProviderType.MODULE);
Loading

0 comments on commit 3a64326

Please # to comment.