Skip to content

Commit a9ee00a

Browse files
authored
Have all processes yield self to lifecycle hooks (#516)
1 parent 2fe1b3e commit a9ee00a

File tree

7 files changed

+100
-29
lines changed

7 files changed

+100
-29
lines changed

README.md

+16-2
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,8 @@ And into two different points in the worker's, dispatcher's and scheduler's life
379379
- `(worker|dispatcher|scheduler)_start`: after the worker/dispatcher/scheduler has finished booting and right before it starts the polling loop or loading the recurring schedule.
380380
- `(worker|dispatcher|scheduler)_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).
381381

382+
Each of these hooks has an instance of the supervisor/worker/dispatcher/scheduler yielded to the block so that you may read its configuration for logging or metrics reporting purposes.
383+
382384
You can use the following methods with a block to do this:
383385
```ruby
384386
SolidQueue.on_start
@@ -396,8 +398,20 @@ SolidQueue.on_scheduler_stop
396398

397399
For example:
398400
```ruby
399-
SolidQueue.on_start { start_metrics_server }
400-
SolidQueue.on_stop { stop_metrics_server }
401+
SolidQueue.on_start do |supervisor|
402+
MyMetricsReporter.process_name = supervisor.name
403+
404+
start_metrics_server
405+
end
406+
407+
SolidQueue.on_stop do |_supervisor|
408+
stop_metrics_server
409+
end
410+
411+
SolidQueue.on_worker_start do |worker|
412+
MyMetricsReporter.process_name = worker.name
413+
MyMetricsReporter.queues = worker.queues.join(',')
414+
end
401415
```
402416

403417
These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.

lib/solid_queue.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ module SolidQueue
4545

4646
[ Dispatcher, Scheduler, Worker ].each do |process|
4747
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
48-
process.on_start { block.call }
48+
process.on_start(&block)
4949
end
5050

5151
define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block|
52-
process.on_stop { block.call }
52+
process.on_stop(&block)
5353
end
5454

5555
define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block|
56-
process.on_exit { block.call }
56+
process.on_exit(&block)
5757
end
5858
end
5959

lib/solid_queue/dispatcher.rb

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
module SolidQueue
44
class Dispatcher < Processes::Poller
55
include LifecycleHooks
6-
attr_accessor :batch_size, :concurrency_maintenance
6+
attr_reader :batch_size
77

88
after_boot :run_start_hooks
99
after_boot :start_concurrency_maintenance
@@ -26,6 +26,8 @@ def metadata
2626
end
2727

2828
private
29+
attr_reader :concurrency_maintenance
30+
2931
def poll
3032
batch = dispatch_next_batch
3133

lib/solid_queue/lifecycle_hooks.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def run_exit_hooks
4343

4444
def run_hooks_for(event)
4545
self.class.lifecycle_hooks.fetch(event, []).each do |block|
46-
block.call
46+
block.call(self)
4747
rescue Exception => exception
4848
handle_thread_error(exception)
4949
end

lib/solid_queue/scheduler.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ class Scheduler < Processes::Base
55
include Processes::Runnable
66
include LifecycleHooks
77

8-
attr_accessor :recurring_schedule
8+
attr_reader :recurring_schedule
99

1010
after_boot :run_start_hooks
1111
after_boot :schedule_recurring_tasks

lib/solid_queue/worker.rb

+4-2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ class Worker < Processes::Poller
88
before_shutdown :run_stop_hooks
99
after_shutdown :run_exit_hooks
1010

11-
attr_accessor :queues, :pool
11+
attr_reader :queues, :pool
1212

1313
def initialize(**options)
1414
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)
1515

16-
@queues = Array(options[:queues])
16+
# Ensure that the queues array is deep frozen to prevent accidental modification
17+
@queues = Array(options[:queues]).map(&:freeze).freeze
18+
1719
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
1820

1921
super(**options)

test/integration/lifecycle_hooks_test.rb

+72-19
Original file line numberDiff line numberDiff line change
@@ -6,40 +6,93 @@ class LifecycleHooksTest < ActiveSupport::TestCase
66
self.use_transactional_tests = false
77

88
test "run lifecycle hooks" do
9-
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
10-
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }
11-
SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) }
9+
SolidQueue.on_start do |s|
10+
name = s.class.name.demodulize.downcase
11+
JobResult.create!(status: :hook_called, value: "#{name}_start")
12+
end
1213

13-
SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
14-
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }
15-
SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) }
14+
SolidQueue.on_stop do |s|
15+
name = s.class.name.demodulize.downcase
16+
JobResult.create!(status: :hook_called, value: "#{name}_stop")
17+
end
1618

17-
SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
18-
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }
19-
SolidQueue.on_dispatcher_exit { JobResult.create!(status: :hook_called, value: :dispatcher_exit) }
19+
SolidQueue.on_exit do |s|
20+
name = s.class.name.demodulize.downcase
21+
JobResult.create!(status: :hook_called, value: "#{name}_exit")
22+
end
2023

21-
SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) }
22-
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }
23-
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) }
24+
SolidQueue.on_worker_start do |w|
25+
name = w.class.name.demodulize.downcase
26+
queues = w.queues.join("_")
27+
JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_start")
28+
end
2429

25-
pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
26-
wait_for_registered_processes(4)
30+
SolidQueue.on_worker_stop do |w|
31+
name = w.class.name.demodulize.downcase
32+
queues = w.queues.join("_")
33+
JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_stop")
34+
end
35+
36+
SolidQueue.on_worker_exit do |w|
37+
name = w.class.name.demodulize.downcase
38+
queues = w.queues.join("_")
39+
JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_exit")
40+
end
41+
42+
SolidQueue.on_dispatcher_start do |d|
43+
name = d.class.name.demodulize.downcase
44+
JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_start")
45+
end
46+
47+
SolidQueue.on_dispatcher_stop do |d|
48+
name = d.class.name.demodulize.downcase
49+
JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_stop")
50+
end
51+
52+
SolidQueue.on_dispatcher_exit do |d|
53+
name = d.class.name.demodulize.downcase
54+
JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_exit")
55+
end
56+
57+
SolidQueue.on_scheduler_start do |s|
58+
name = s.class.name.demodulize.downcase
59+
JobResult.create!(status: :hook_called, value: "#{name}_start")
60+
end
61+
62+
SolidQueue.on_scheduler_stop do |s|
63+
name = s.class.name.demodulize.downcase
64+
JobResult.create!(status: :hook_called, value: "#{name}_stop")
65+
end
66+
67+
SolidQueue.on_scheduler_exit do |s|
68+
name = s.class.name.demodulize.downcase
69+
JobResult.create!(status: :hook_called, value: "#{name}_exit")
70+
end
71+
72+
pid = run_supervisor_as_fork(
73+
workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 1 } ],
74+
dispatchers: [ { batch_size: 100 } ],
75+
skip_recurring: false
76+
)
77+
78+
wait_for_registered_processes(5)
2779

2880
terminate_process(pid)
2981
wait_for_registered_processes(0)
3082

3183

3284
results = skip_active_record_query_cache do
3385
job_results = JobResult.where(status: :hook_called)
34-
assert_equal 12, job_results.count
86+
assert_equal 15, job_results.count
3587
job_results
3688
end
3789

38-
assert_equal({ "hook_called" => 12 }, results.map(&:status).tally)
90+
assert_equal({ "hook_called" => 15 }, results.map(&:status).tally)
3991
assert_equal %w[
40-
start stop exit
41-
worker_start worker_stop worker_exit
42-
dispatcher_start dispatcher_stop dispatcher_exit
92+
supervisor_start supervisor_stop supervisor_exit
93+
worker_first_queue_start worker_first_queue_stop worker_first_queue_exit
94+
worker_second_queue_start worker_second_queue_stop worker_second_queue_exit
95+
dispatcher_100_start dispatcher_100_stop dispatcher_100_exit
4396
scheduler_start scheduler_stop scheduler_exit
4497
].sort, results.map(&:value).sort
4598
ensure

0 commit comments

Comments
 (0)