Skip to content

Commit

Permalink
Add lifecycle hooks as well (start, stop)
Browse files Browse the repository at this point in the history
As these would be needed for job-iteration to know when the worker is
going to stop.
  • Loading branch information
rosa committed Sep 3, 2024
1 parent 7f84cb0 commit f961df0
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 94 deletions.
22 changes: 19 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,33 @@ Finally, run the migrations:
$ bin/rails db:migrate
```

## Supervisor's lifecycle hooks
You can hook into two different points in the supervisor's life in Solid Queue:
## Lifecycle hooks

In Solid queue, you can hook into two different points in the supervisor's life:
- `start`: after the supervisor has finished booting and right before it forks workers and dispatchers.
- `stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown.

To do that, you just need to call `SolidQueue.on_start` and `SolidQueue.on_stop` with a block, like this:
And into two different points in a worker's life:
- `worker_start`: after the worker has finished booting and right before it starts the polling loop.
- `worker_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).

You can use the following methods with a block to do this:
```ruby
SolidQueue.on_start
SolidQueue.on_stop
SolidQueue.on_worker_start
SolidQueue.on_worker_stop
```

For example:
```ruby
SolidQueue.on_start { start_metrics_server }
SolidQueue.on_stop { stop_metrics_server }
```

These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.


### Other configuration settings
_Note_: The settings in this section should be set in your `config/application.rb` or your environment config like this: `config.solid_queue.silence_polling = true`
Expand Down
8 changes: 8 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ module SolidQueue

delegate :on_start, :on_stop, to: Supervisor

def on_worker_start(...)
Worker.on_start(...)
end

def on_worker_stop(...)
Worker.on_stop(...)
end

def supervisor?
supervisor
end
Expand Down
43 changes: 43 additions & 0 deletions lib/solid_queue/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

module SolidQueue
module LifecycleHooks
extend ActiveSupport::Concern

included do
mattr_reader :lifecycle_hooks, default: { start: [], stop: [] }
end

class_methods do
def on_start(&block)
self.lifecycle_hooks[:start] << block
end

def on_stop(&block)
self.lifecycle_hooks[:stop] << block
end

def clear_hooks
self.lifecycle_hooks[:start] = []
self.lifecycle_hooks[:stop] = []
end
end

private
def run_start_hooks
run_hooks_for :start
end

def run_stop_hooks
run_hooks_for :stop
end

def run_hooks_for(event)
self.class.lifecycle_hooks.fetch(event, []).each do |block|
block.call
rescue Exception => exception
handle_thread_error(exception)
end
end
end
end
18 changes: 10 additions & 8 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ module Runnable
attr_writer :mode

def start
@stopped = false

SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) { boot }
end
boot

if running_async?
@thread = create_thread { run }
Expand All @@ -33,9 +29,15 @@ def mode
end

def boot
if running_as_fork?
register_signal_handlers
set_procline
SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) do
@stopped = false

if running_as_fork?
register_signal_handlers
set_procline
end
end
end
end

Expand Down
3 changes: 2 additions & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

module SolidQueue
class Supervisor < Processes::Base
include Maintenance, LifecycleHooks, Signals, Pidfiled
include LifecycleHooks
include Maintenance, Signals, Pidfiled

class << self
def start(load_configuration_from: nil)
Expand Down
45 changes: 0 additions & 45 deletions lib/solid_queue/supervisor/lifecycle_hooks.rb

This file was deleted.

5 changes: 5 additions & 0 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

module SolidQueue
class Worker < Processes::Poller
include LifecycleHooks

after_boot :run_start_hooks
before_shutdown :run_stop_hooks

attr_accessor :queues, :pool

def initialize(**options)
Expand Down
52 changes: 52 additions & 0 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require "test_helper"

class LifecycleHooksTest < ActiveSupport::TestCase
self.use_transactional_tests = false

test "run lifecycle hooks" do
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }

SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }

pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: "*" } ] })
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

results = skip_active_record_query_cache do
assert_equal 4, JobResult.count
JobResult.last(4)
end

assert_equal "hook_called", results.map(&:status).first
assert_equal [ "start", "stop", "worker_start", "worker_stop" ], results.map(&:value).sort
ensure
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
end

test "handle errors on lifecycle hooks" do
previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) }
SolidQueue.on_start { raise RuntimeError, "everything is broken" }

pid = run_supervisor_as_fork
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

result = skip_active_record_query_cache { JobResult.last }

assert_equal "error", result.status
assert_equal "everything is broken", result.value
ensure
SolidQueue.on_thread_error = previous_on_thread_error
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
end
end
37 changes: 0 additions & 37 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,43 +128,6 @@ class SupervisorTest < ActiveSupport::TestCase
end
end

test "run lifecycle hooks" do
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }

pid = run_supervisor_as_fork
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

results = skip_active_record_query_cache { JobResult.last(2) }

assert_equal "hook_called", results.map(&:status).first
assert_equal [ "start", "stop" ], results.map(&:value)
ensure
SolidQueue::Supervisor.clear_hooks
end

test "handle errors on lifecycle hooks" do
previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) }
SolidQueue.on_start { raise RuntimeError, "everything is broken" }

pid = run_supervisor_as_fork
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

result = skip_active_record_query_cache { JobResult.last }

assert_equal "error", result.status
assert_equal "everything is broken", result.value
ensure
SolidQueue.on_thread_error = previous_on_thread_error
SolidQueue::Supervisor.clear_hooks
end

private
def assert_registered_workers(supervisor_pid: nil, count: 1)
assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid)
Expand Down

0 comments on commit f961df0

Please # to comment.