Skip to content

Commit

Permalink
Merge pull request #317 from rails/start-stop-callbacks
Browse files Browse the repository at this point in the history
Implement `start` and `stop` lifecycle hooks for the supervisor and workers
  • Loading branch information
rosa authored Sep 4, 2024
2 parents 3ef85e1 + 18f89e5 commit 835ec4d
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 13 deletions.
32 changes: 31 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,47 @@ Finally, run the migrations:
$ bin/rails db:migrate
```

## Lifecycle hooks

In Solid queue, you can hook into two different points in the supervisor's life:
- `start`: after the supervisor has finished booting and right before it forks workers and dispatchers.
- `stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown.

And into two different points in a worker's life:
- `worker_start`: after the worker has finished booting and right before it starts the polling loop.
- `worker_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).

You can use the following methods with a block to do this:
```ruby
SolidQueue.on_start
SolidQueue.on_stop
SolidQueue.on_worker_start
SolidQueue.on_worker_stop
```

For example:
```ruby
SolidQueue.on_start { start_metrics_server }
SolidQueue.on_stop { stop_metrics_server }
```

These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.


### Other configuration settings
_Note_: The settings in this section should be set in your `config/application.rb` or your environment config like this: `config.solid_queue.silence_polling = true`

There are several settings that control how Solid Queue works that you can set as well:
- `logger`: the logger you want Solid Queue to use. Defaults to the app logger.
- `app_executor`: the [Rails executor](https://guides.rubyonrails.org/threading_and_code_execution.html#executor) used to wrap asynchronous operations, defaults to the app executor
- `on_thread_error`: custom lambda/Proc to call when there's an error within a thread that takes the exception raised as argument. Defaults to
- `on_thread_error`: custom lambda/Proc to call when there's an error within a Solid Queue thread that takes the exception raised as argument. Defaults to

```ruby
-> (exception) { Rails.error.report(exception, handled: false) }
```
**This is not used for errors raised within a job execution**. Errors happening in jobs are handled by Active Job's `retry_on` or `discard_on`, and ultimately will result in [failed jobs](#failed-jobs-and-retries). This is for errors happening within Solid Queue itself.

- `use_skip_locked`: whether to use `FOR UPDATE SKIP LOCKED` when performing locking reads. This will be automatically detected in the future, and for now, you'd only need to set this to `false` if your database doesn't support it. For MySQL, that'd be versions < 8, and for PostgreSQL, versions < 9.5. If you use SQLite, this has no effect, as writes are sequential.
- `process_heartbeat_interval`: the heartbeat interval that all processes will follow—defaults to 60 seconds.
- `process_alive_threshold`: how long to wait until a process is considered dead after its last heartbeat—defaults to 5 minutes.
Expand Down
10 changes: 10 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

delegate :on_start, :on_stop, to: Supervisor

def on_worker_start(...)
Worker.on_start(...)
end

def on_worker_stop(...)
Worker.on_stop(...)
end

def supervisor?
supervisor
end
Expand Down
43 changes: 43 additions & 0 deletions lib/solid_queue/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

module SolidQueue
module LifecycleHooks
extend ActiveSupport::Concern

included do
mattr_reader :lifecycle_hooks, default: { start: [], stop: [] }
end

class_methods do
def on_start(&block)
self.lifecycle_hooks[:start] << block
end

def on_stop(&block)
self.lifecycle_hooks[:stop] << block
end

def clear_hooks
self.lifecycle_hooks[:start] = []
self.lifecycle_hooks[:stop] = []
end
end

private
def run_start_hooks
run_hooks_for :start
end

def run_stop_hooks
run_hooks_for :stop
end

def run_hooks_for(event)
self.class.lifecycle_hooks.fetch(event, []).each do |block|
block.call
rescue Exception => exception
handle_thread_error(exception)
end
end
end
end
22 changes: 10 additions & 12 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ module Runnable
attr_writer :mode

def start
@stopped = false

SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) { boot }
end
boot

if running_async?
@thread = create_thread { run }
Expand All @@ -25,10 +21,6 @@ def stop
@thread&.join
end

def alive?
!running_async? || @thread.alive?
end

private
DEFAULT_MODE = :async

Expand All @@ -37,9 +29,15 @@ def mode
end

def boot
if running_as_fork?
register_signal_handlers
set_procline
SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) do
@stopped = false

if running_as_fork?
register_signal_handlers
set_procline
end
end
end
end

Expand Down
3 changes: 3 additions & 0 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

module SolidQueue
class Supervisor < Processes::Base
include LifecycleHooks
include Maintenance, Signals, Pidfiled

class << self
Expand All @@ -27,6 +28,7 @@ def initialize(configuration)

def start
boot
run_start_hooks

start_processes
launch_maintenance_task
Expand All @@ -36,6 +38,7 @@ def start

def stop
@stopped = true
run_stop_hooks
end

private
Expand Down
5 changes: 5 additions & 0 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

module SolidQueue
class Worker < Processes::Poller
include LifecycleHooks

after_boot :run_start_hooks
before_shutdown :run_stop_hooks

attr_accessor :queues, :pool

def initialize(**options)
Expand Down
52 changes: 52 additions & 0 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require "test_helper"

class LifecycleHooksTest < ActiveSupport::TestCase
self.use_transactional_tests = false

test "run lifecycle hooks" do
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }

SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }

pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: "*" } ] })
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

results = skip_active_record_query_cache do
assert_equal 4, JobResult.count
JobResult.last(4)
end

assert_equal "hook_called", results.map(&:status).first
assert_equal [ "start", "stop", "worker_start", "worker_stop" ], results.map(&:value).sort
ensure
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
end

test "handle errors on lifecycle hooks" do
previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) }
SolidQueue.on_start { raise RuntimeError, "everything is broken" }

pid = run_supervisor_as_fork
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

result = skip_active_record_query_cache { JobResult.last }

assert_equal "error", result.status
assert_equal "everything is broken", result.value
ensure
SolidQueue.on_thread_error = previous_on_thread_error
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
end
end
File renamed without changes.

0 comments on commit 835ec4d

Please # to comment.