Skip to content

Commit

Permalink
Add lifecycle hooks for Worker as well (start, stop)
Browse files Browse the repository at this point in the history
As these would be needed for job-iteration to know when the worker is
going to stop.
  • Loading branch information
rosa committed Sep 3, 2024
1 parent 7f84cb0 commit 18f89e5
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 94 deletions.
22 changes: 19 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,33 @@ Finally, run the migrations:
$ bin/rails db:migrate
```

## Supervisor's lifecycle hooks
You can hook into two different points in the supervisor's life in Solid Queue:
## Lifecycle hooks

In Solid queue, you can hook into two different points in the supervisor's life:
- `start`: after the supervisor has finished booting and right before it forks workers and dispatchers.
- `stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown.

To do that, you just need to call `SolidQueue.on_start` and `SolidQueue.on_stop` with a block, like this:
And into two different points in a worker's life:
- `worker_start`: after the worker has finished booting and right before it starts the polling loop.
- `worker_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).

You can use the following methods with a block to do this:
```ruby
SolidQueue.on_start
SolidQueue.on_stop
SolidQueue.on_worker_start
SolidQueue.on_worker_stop
```

For example:
```ruby
SolidQueue.on_start { start_metrics_server }
SolidQueue.on_stop { stop_metrics_server }
```

These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.


### Other configuration settings
_Note_: The settings in this section should be set in your `config/application.rb` or your environment config like this: `config.solid_queue.silence_polling = true`
Expand Down
8 changes: 8 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ module SolidQueue

delegate :on_start, :on_stop, to: Supervisor

def on_worker_start(...)
Worker.on_start(...)
end

def on_worker_stop(...)
Worker.on_stop(...)
end

def supervisor?
supervisor
end
Expand Down
43 changes: 43 additions & 0 deletions lib/solid_queue/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

module SolidQueue
module LifecycleHooks
extend ActiveSupport::Concern

included do
mattr_reader :lifecycle_hooks, default: { start: [], stop: [] }
end

class_methods do
def on_start(&block)
self.lifecycle_hooks[:start] << block
end

def on_stop(&block)
self.lifecycle_hooks[:stop] << block
end

def clear_hooks
self.lifecycle_hooks[:start] = []
self.lifecycle_hooks[:stop] = []
end
end

private
def run_start_hooks
run_hooks_for :start
end

def run_stop_hooks
run_hooks_for :stop
end

def run_hooks_for(event)
self.class.lifecycle_hooks.fetch(event, []).each do |block|
block.call
rescue Exception => exception
handle_thread_error(exception)
end
end
end
end
18 changes: 10 additions & 8 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ module Runnable
attr_writer :mode

def start
@stopped = false

SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) { boot }
end
boot

if running_async?
@thread = create_thread { run }
Expand All @@ -33,9 +29,15 @@ def mode
end

def boot
if running_as_fork?
register_signal_handlers
set_procline
SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) do
@stopped = false

if running_as_fork?
register_signal_handlers
set_procline
end
end
end
end

Expand Down
3 changes: 2 additions & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

module SolidQueue
class Supervisor < Processes::Base
include Maintenance, LifecycleHooks, Signals, Pidfiled
include LifecycleHooks
include Maintenance, Signals, Pidfiled

class << self
def start(load_configuration_from: nil)
Expand Down
45 changes: 0 additions & 45 deletions lib/solid_queue/supervisor/lifecycle_hooks.rb

This file was deleted.

5 changes: 5 additions & 0 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

module SolidQueue
class Worker < Processes::Poller
include LifecycleHooks

after_boot :run_start_hooks
before_shutdown :run_stop_hooks

attr_accessor :queues, :pool

def initialize(**options)
Expand Down
52 changes: 52 additions & 0 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require "test_helper"

class LifecycleHooksTest < ActiveSupport::TestCase
self.use_transactional_tests = false

test "run lifecycle hooks" do
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }

SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }

pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: "*" } ] })
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

results = skip_active_record_query_cache do
assert_equal 4, JobResult.count
JobResult.last(4)
end

assert_equal "hook_called", results.map(&:status).first
assert_equal [ "start", "stop", "worker_start", "worker_stop" ], results.map(&:value).sort
ensure
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
end

test "handle errors on lifecycle hooks" do
previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) }
SolidQueue.on_start { raise RuntimeError, "everything is broken" }

pid = run_supervisor_as_fork
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

result = skip_active_record_query_cache { JobResult.last }

assert_equal "error", result.status
assert_equal "everything is broken", result.value
ensure
SolidQueue.on_thread_error = previous_on_thread_error
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
end
end
37 changes: 0 additions & 37 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,43 +128,6 @@ class SupervisorTest < ActiveSupport::TestCase
end
end

test "run lifecycle hooks" do
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }

pid = run_supervisor_as_fork
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

results = skip_active_record_query_cache { JobResult.last(2) }

assert_equal "hook_called", results.map(&:status).first
assert_equal [ "start", "stop" ], results.map(&:value)
ensure
SolidQueue::Supervisor.clear_hooks
end

test "handle errors on lifecycle hooks" do
previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) }
SolidQueue.on_start { raise RuntimeError, "everything is broken" }

pid = run_supervisor_as_fork
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

result = skip_active_record_query_cache { JobResult.last }

assert_equal "error", result.status
assert_equal "everything is broken", result.value
ensure
SolidQueue.on_thread_error = previous_on_thread_error
SolidQueue::Supervisor.clear_hooks
end

private
def assert_registered_workers(supervisor_pid: nil, count: 1)
assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid)
Expand Down

0 comments on commit 18f89e5

Please # to comment.