From f961df0d2a8cd90f41c04e9acd02a9a1aa906fbc Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 3 Sep 2024 21:40:32 +0200 Subject: [PATCH] Add lifecycle hooks as well (start, stop) As these would be needed for job-iteration to know when the worker is going to stop. --- README.md | 22 ++++++-- lib/solid_queue.rb | 8 +++ lib/solid_queue/lifecycle_hooks.rb | 43 +++++++++++++++ lib/solid_queue/processes/runnable.rb | 18 ++++--- lib/solid_queue/supervisor.rb | 3 +- lib/solid_queue/supervisor/lifecycle_hooks.rb | 45 ---------------- lib/solid_queue/worker.rb | 5 ++ test/integration/lifecycle_hooks_test.rb | 52 +++++++++++++++++++ test/unit/supervisor_test.rb | 37 ------------- 9 files changed, 139 insertions(+), 94 deletions(-) create mode 100644 lib/solid_queue/lifecycle_hooks.rb delete mode 100644 lib/solid_queue/supervisor/lifecycle_hooks.rb create mode 100644 test/integration/lifecycle_hooks_test.rb diff --git a/README.md b/README.md index 01e605fd..9139597d 100644 --- a/README.md +++ b/README.md @@ -208,17 +208,33 @@ Finally, run the migrations: $ bin/rails db:migrate ``` -## Supervisor's lifecycle hooks -You can hook into two different points in the supervisor's life in Solid Queue: +## Lifecycle hooks + +In Solid queue, you can hook into two different points in the supervisor's life: - `start`: after the supervisor has finished booting and right before it forks workers and dispatchers. - `stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown. -To do that, you just need to call `SolidQueue.on_start` and `SolidQueue.on_stop` with a block, like this: +And into two different points in a worker's life: +- `worker_start`: after the worker has finished booting and right before it starts the polling loop. +- `worker_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`). + +You can use the following methods with a block to do this: +```ruby +SolidQueue.on_start +SolidQueue.on_stop + +SolidQueue.on_worker_start +SolidQueue.on_worker_stop +``` + +For example: ```ruby SolidQueue.on_start { start_metrics_server } SolidQueue.on_stop { stop_metrics_server } ``` +These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this. + ### Other configuration settings _Note_: The settings in this section should be set in your `config/application.rb` or your environment config like this: `config.solid_queue.silence_polling = true` diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 2807b83e..739fe143 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -45,6 +45,14 @@ module SolidQueue delegate :on_start, :on_stop, to: Supervisor + def on_worker_start(...) + Worker.on_start(...) + end + + def on_worker_stop(...) + Worker.on_stop(...) + end + def supervisor? supervisor end diff --git a/lib/solid_queue/lifecycle_hooks.rb b/lib/solid_queue/lifecycle_hooks.rb new file mode 100644 index 00000000..fabddac4 --- /dev/null +++ b/lib/solid_queue/lifecycle_hooks.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +module SolidQueue + module LifecycleHooks + extend ActiveSupport::Concern + + included do + mattr_reader :lifecycle_hooks, default: { start: [], stop: [] } + end + + class_methods do + def on_start(&block) + self.lifecycle_hooks[:start] << block + end + + def on_stop(&block) + self.lifecycle_hooks[:stop] << block + end + + def clear_hooks + self.lifecycle_hooks[:start] = [] + self.lifecycle_hooks[:stop] = [] + end + end + + private + def run_start_hooks + run_hooks_for :start + end + + def run_stop_hooks + run_hooks_for :stop + end + + def run_hooks_for(event) + self.class.lifecycle_hooks.fetch(event, []).each do |block| + block.call + rescue Exception => exception + handle_thread_error(exception) + end + end + end +end diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 34782c7e..c2ab045b 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -7,11 +7,7 @@ module Runnable attr_writer :mode def start - @stopped = false - - SolidQueue.instrument(:start_process, process: self) do - run_callbacks(:boot) { boot } - end + boot if running_async? @thread = create_thread { run } @@ -33,9 +29,15 @@ def mode end def boot - if running_as_fork? - register_signal_handlers - set_procline + SolidQueue.instrument(:start_process, process: self) do + run_callbacks(:boot) do + @stopped = false + + if running_as_fork? + register_signal_handlers + set_procline + end + end end end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 48a55d7c..99cb0cc0 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -2,7 +2,8 @@ module SolidQueue class Supervisor < Processes::Base - include Maintenance, LifecycleHooks, Signals, Pidfiled + include LifecycleHooks + include Maintenance, Signals, Pidfiled class << self def start(load_configuration_from: nil) diff --git a/lib/solid_queue/supervisor/lifecycle_hooks.rb b/lib/solid_queue/supervisor/lifecycle_hooks.rb deleted file mode 100644 index 191f3780..00000000 --- a/lib/solid_queue/supervisor/lifecycle_hooks.rb +++ /dev/null @@ -1,45 +0,0 @@ -# frozen_string_literal: true - -module SolidQueue - class Supervisor - module LifecycleHooks - extend ActiveSupport::Concern - - included do - mattr_reader :lifecycle_hooks, default: { start: [], stop: [] } - end - - class_methods do - def on_start(&block) - self.lifecycle_hooks[:start] << block - end - - def on_stop(&block) - self.lifecycle_hooks[:stop] << block - end - - def clear_hooks - self.lifecycle_hooks[:start] = [] - self.lifecycle_hooks[:stop] = [] - end - end - - private - def run_start_hooks - run_hooks_for :start - end - - def run_stop_hooks - run_hooks_for :stop - end - - def run_hooks_for(event) - self.class.lifecycle_hooks.fetch(event, []).each do |block| - block.call - rescue Exception => exception - handle_thread_error(exception) - end - end - end - end -end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index eec644f0..fc203774 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -2,6 +2,11 @@ module SolidQueue class Worker < Processes::Poller + include LifecycleHooks + + after_boot :run_start_hooks + before_shutdown :run_stop_hooks + attr_accessor :queues, :pool def initialize(**options) diff --git a/test/integration/lifecycle_hooks_test.rb b/test/integration/lifecycle_hooks_test.rb new file mode 100644 index 00000000..4b2218f7 --- /dev/null +++ b/test/integration/lifecycle_hooks_test.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require "test_helper" + +class LifecycleHooksTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + test "run lifecycle hooks" do + SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) } + SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) } + + SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) } + SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) } + + pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: "*" } ] }) + wait_for_registered_processes(4) + + terminate_process(pid) + wait_for_registered_processes(0) + + results = skip_active_record_query_cache do + assert_equal 4, JobResult.count + JobResult.last(4) + end + + assert_equal "hook_called", results.map(&:status).first + assert_equal [ "start", "stop", "worker_start", "worker_stop" ], results.map(&:value).sort + ensure + SolidQueue::Supervisor.clear_hooks + SolidQueue::Worker.clear_hooks + end + + test "handle errors on lifecycle hooks" do + previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) } + SolidQueue.on_start { raise RuntimeError, "everything is broken" } + + pid = run_supervisor_as_fork + wait_for_registered_processes(4) + + terminate_process(pid) + wait_for_registered_processes(0) + + result = skip_active_record_query_cache { JobResult.last } + + assert_equal "error", result.status + assert_equal "everything is broken", result.value + ensure + SolidQueue.on_thread_error = previous_on_thread_error + SolidQueue::Supervisor.clear_hooks + SolidQueue::Worker.clear_hooks + end +end diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 901d4922..69d8b0a9 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -128,43 +128,6 @@ class SupervisorTest < ActiveSupport::TestCase end end - test "run lifecycle hooks" do - SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) } - SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) } - - pid = run_supervisor_as_fork - wait_for_registered_processes(4) - - terminate_process(pid) - wait_for_registered_processes(0) - - results = skip_active_record_query_cache { JobResult.last(2) } - - assert_equal "hook_called", results.map(&:status).first - assert_equal [ "start", "stop" ], results.map(&:value) - ensure - SolidQueue::Supervisor.clear_hooks - end - - test "handle errors on lifecycle hooks" do - previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) } - SolidQueue.on_start { raise RuntimeError, "everything is broken" } - - pid = run_supervisor_as_fork - wait_for_registered_processes(4) - - terminate_process(pid) - wait_for_registered_processes(0) - - result = skip_active_record_query_cache { JobResult.last } - - assert_equal "error", result.status - assert_equal "everything is broken", result.value - ensure - SolidQueue.on_thread_error = previous_on_thread_error - SolidQueue::Supervisor.clear_hooks - end - private def assert_registered_workers(supervisor_pid: nil, count: 1) assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid)