Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

refactor(pause): stop run execution #3046

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

roggervalf
Copy link
Collaborator

Why

  1. Why is this change necessary? As part of trying to fix the processing of jobs when closing a worker. When pausing a job we don't need to wait for a promise, instead just set it as paused and stop run execution

How

  1. How did you implement this? Remove promise from pause method and refactor resume method to re-execute run method

Additional Notes (Optional)

Notice that we wait for pending jobs in https://github.com/taskforcesh/bullmq/blob/master/src/classes/worker.ts#L954 when pausing a worker by default, but we can possibly add paused promise into async queue https://github.com/taskforcesh/bullmq/blob/master/src/classes/worker.ts#L586, that means that we can get pause method hanging

@roggervalf roggervalf force-pushed the refactor-pause-worker branch from d2dedbd to a7c8d1c Compare February 2, 2025 20:46
Copy link
Contributor

@manast manast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is correct. When the worker is paused it must be awaiting something otherwise it will loop at maximum speed doing nothing consuming all the CPU.

@@ -476,6 +475,9 @@ export class Worker<
numTotal < this._concurrency &&
(!this.limitUntil || numTotal == 0)
) {
if (this.paused) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be moved to the while loop condition expression instead.

@@ -582,11 +584,7 @@ export class Worker<
{ block = true }: GetNextJobOptions = {},
): Promise<Job<DataType, ResultType, NameType> | undefined> {
if (this.paused) {
if (block) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be a reason why we block here, it could be that the loop will run super fast doing nothing consuming all the CPU, this needs to be properly tested.

@roggervalf
Copy link
Collaborator Author

I am not sure this is correct. When the worker is paused it must be awaiting something otherwise it will loop at maximum speed doing nothing consuming all the CPU.

I modified the test case to verify that runExecution is resolved after pausing

@roggervalf roggervalf force-pushed the refactor-pause-worker branch from ec03b53 to c36a209 Compare February 3, 2025 16:34
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants