From 62ba7b0cb1a6a8bb53f538117c4ae0e0c2b823c4 Mon Sep 17 00:00:00 2001 From: The Major <spartan364@gmail.com> Date: Tue, 25 Feb 2025 02:38:55 +0000 Subject: [PATCH] Have all processes yield self to lifecycle hooks --- README.md | 18 ++++- lib/solid_queue.rb | 6 +- lib/solid_queue/dispatcher.rb | 4 +- lib/solid_queue/lifecycle_hooks.rb | 2 +- lib/solid_queue/scheduler.rb | 2 +- lib/solid_queue/worker.rb | 6 +- test/integration/lifecycle_hooks_test.rb | 91 +++++++++++++++++++----- 7 files changed, 100 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index c77ed953..98aa5c0d 100644 --- a/README.md +++ b/README.md @@ -379,6 +379,8 @@ And into two different points in the worker's, dispatcher's and scheduler's life - `(worker|dispatcher|scheduler)_start`: after the worker/dispatcher/scheduler has finished booting and right before it starts the polling loop or loading the recurring schedule. - `(worker|dispatcher|scheduler)_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`). +Each of these hooks has an instance of the supervisor/worker/dispatcher/scheduler yielded to the block so that you may read its configuration for logging or metrics reporting purposes. + You can use the following methods with a block to do this: ```ruby SolidQueue.on_start @@ -396,8 +398,20 @@ SolidQueue.on_scheduler_stop For example: ```ruby -SolidQueue.on_start { start_metrics_server } -SolidQueue.on_stop { stop_metrics_server } +SolidQueue.on_start do |supervisor| + MyMetricsReporter.process_name = supervisor.name + + start_metrics_server +end + +SolidQueue.on_stop do |_supervisor| + stop_metrics_server +end + +SolidQueue.on_worker_start do |worker| + MyMetricsReporter.process_name = worker.name + MyMetricsReporter.queues = worker.queues.join(',') +end ``` These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this. diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 02b88d05..e0d51c8c 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -45,15 +45,15 @@ module SolidQueue [ Dispatcher, Scheduler, Worker ].each do |process| define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block| - process.on_start { block.call } + process.on_start(&block) end define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block| - process.on_stop { block.call } + process.on_stop(&block) end define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block| - process.on_exit { block.call } + process.on_exit(&block) end end diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index 6f7ec245..1583e1dd 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -3,7 +3,7 @@ module SolidQueue class Dispatcher < Processes::Poller include LifecycleHooks - attr_accessor :batch_size, :concurrency_maintenance + attr_reader :batch_size after_boot :run_start_hooks after_boot :start_concurrency_maintenance @@ -26,6 +26,8 @@ def metadata end private + attr_reader :concurrency_maintenance + def poll batch = dispatch_next_batch diff --git a/lib/solid_queue/lifecycle_hooks.rb b/lib/solid_queue/lifecycle_hooks.rb index 0403459a..ec43b7a7 100644 --- a/lib/solid_queue/lifecycle_hooks.rb +++ b/lib/solid_queue/lifecycle_hooks.rb @@ -43,7 +43,7 @@ def run_exit_hooks def run_hooks_for(event) self.class.lifecycle_hooks.fetch(event, []).each do |block| - block.call + block.call(self) rescue Exception => exception handle_thread_error(exception) end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index d3164ed5..3cec90fa 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -5,7 +5,7 @@ class Scheduler < Processes::Base include Processes::Runnable include LifecycleHooks - attr_accessor :recurring_schedule + attr_reader :recurring_schedule after_boot :run_start_hooks after_boot :schedule_recurring_tasks diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 54d4d870..e036a5fd 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -8,12 +8,14 @@ class Worker < Processes::Poller before_shutdown :run_stop_hooks after_shutdown :run_exit_hooks - attr_accessor :queues, :pool + attr_reader :queues, :pool def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) - @queues = Array(options[:queues]) + # Ensure that the queues array is deep frozen to prevent accidental modification + @queues = Array(options[:queues]).map(&:freeze).freeze + @pool = Pool.new(options[:threads], on_idle: -> { wake_up }) super(**options) diff --git a/test/integration/lifecycle_hooks_test.rb b/test/integration/lifecycle_hooks_test.rb index 7da73228..b2fd50da 100644 --- a/test/integration/lifecycle_hooks_test.rb +++ b/test/integration/lifecycle_hooks_test.rb @@ -6,24 +6,76 @@ class LifecycleHooksTest < ActiveSupport::TestCase self.use_transactional_tests = false test "run lifecycle hooks" do - SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) } - SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) } - SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) } + SolidQueue.on_start do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_start") + end - SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) } - SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) } - SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) } + SolidQueue.on_stop do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_stop") + end - SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) } - SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) } - SolidQueue.on_dispatcher_exit { JobResult.create!(status: :hook_called, value: :dispatcher_exit) } + SolidQueue.on_exit do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_exit") + end - SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) } - SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) } - SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) } + SolidQueue.on_worker_start do |w| + name = w.class.name.demodulize.downcase + queues = w.queues.join("_") + JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_start") + end - pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false) - wait_for_registered_processes(4) + SolidQueue.on_worker_stop do |w| + name = w.class.name.demodulize.downcase + queues = w.queues.join("_") + JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_stop") + end + + SolidQueue.on_worker_exit do |w| + name = w.class.name.demodulize.downcase + queues = w.queues.join("_") + JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_exit") + end + + SolidQueue.on_dispatcher_start do |d| + name = d.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_start") + end + + SolidQueue.on_dispatcher_stop do |d| + name = d.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_stop") + end + + SolidQueue.on_dispatcher_exit do |d| + name = d.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_exit") + end + + SolidQueue.on_scheduler_start do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_start") + end + + SolidQueue.on_scheduler_stop do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_stop") + end + + SolidQueue.on_scheduler_exit do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_exit") + end + + pid = run_supervisor_as_fork( + workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 1 } ], + dispatchers: [ { batch_size: 100 } ], + skip_recurring: false + ) + + wait_for_registered_processes(5) terminate_process(pid) wait_for_registered_processes(0) @@ -31,15 +83,16 @@ class LifecycleHooksTest < ActiveSupport::TestCase results = skip_active_record_query_cache do job_results = JobResult.where(status: :hook_called) - assert_equal 12, job_results.count + assert_equal 15, job_results.count job_results end - assert_equal({ "hook_called" => 12 }, results.map(&:status).tally) + assert_equal({ "hook_called" => 15 }, results.map(&:status).tally) assert_equal %w[ - start stop exit - worker_start worker_stop worker_exit - dispatcher_start dispatcher_stop dispatcher_exit + supervisor_start supervisor_stop supervisor_exit + worker_first_queue_start worker_first_queue_stop worker_first_queue_exit + worker_second_queue_start worker_second_queue_stop worker_second_queue_exit + dispatcher_100_start dispatcher_100_stop dispatcher_100_exit scheduler_start scheduler_stop scheduler_exit ].sort, results.map(&:value).sort ensure