Skip to content

Commit 1df5a2d

Browse files
committed
Have all processes yield self to lifecycle hooks
1 parent 9cd6bc3 commit 1df5a2d

File tree

9 files changed

+99
-36
lines changed

9 files changed

+99
-36
lines changed

README.md

+21-2
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,8 @@ And into two different points in the worker's, dispatcher's and scheduler's life
379379
- `(worker|dispatcher|scheduler)_start`: after the worker/dispatcher/scheduler has finished booting and right before it starts the polling loop or loading the recurring schedule.
380380
- `(worker|dispatcher|scheduler)_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).
381381

382+
Each of these hooks has an instance of the supervisor/worker/dispatcher/scheduler yielded to the block so that you may read its configuration for logging or metrics reporting purposes.
383+
382384
You can use the following methods with a block to do this:
383385
```ruby
384386
SolidQueue.on_start
@@ -396,8 +398,25 @@ SolidQueue.on_scheduler_stop
396398

397399
For example:
398400
```ruby
399-
SolidQueue.on_start { start_metrics_server }
400-
SolidQueue.on_stop { stop_metrics_server }
401+
SolidQueue.on_start do |supervisor|
402+
Rails.logger.info "Supervisor #{supervisor.name} started"
403+
404+
start_metrics_server
405+
end
406+
407+
SolidQueue.on_stop do |supervisor|
408+
Rails.logger.info "Supervisor #{supervisor.name} stopped"
409+
410+
stop_metrics_server
411+
end
412+
413+
SolidQueue.on_worker_start do |worker|
414+
Rails.logger.info "Worker #{worker.name} started with queues: #{worker.queues.join(',')}"
415+
end
416+
417+
SolidQueue.on_worker_stop do |worker|
418+
Rails.logger.info "Worker #{worker.name} stopped with queues: #{worker.queues.join(',')}"
419+
end
401420
```
402421

403422
These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.

lib/solid_queue.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ module SolidQueue
4545

4646
[ Dispatcher, Scheduler, Worker ].each do |process|
4747
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
48-
process.on_start { block.call }
48+
process.on_start(&block)
4949
end
5050

5151
define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block|
52-
process.on_stop { block.call }
52+
process.on_stop(&block)
5353
end
5454

5555
define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block|
56-
process.on_exit { block.call }
56+
process.on_exit(&block)
5757
end
5858
end
5959

lib/solid_queue/dispatcher.rb

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
module SolidQueue
44
class Dispatcher < Processes::Poller
55
include LifecycleHooks
6-
attr_accessor :batch_size, :concurrency_maintenance
6+
attr_reader :batch_size
77

88
after_boot :run_start_hooks
99
after_boot :start_concurrency_maintenance
@@ -26,6 +26,8 @@ def metadata
2626
end
2727

2828
private
29+
attr_reader :concurrency_maintenance
30+
2931
def poll
3032
batch = dispatch_next_batch
3133

lib/solid_queue/lifecycle_hooks.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def run_exit_hooks
4343

4444
def run_hooks_for(event)
4545
self.class.lifecycle_hooks.fetch(event, []).each do |block|
46-
block.call
46+
block.call(self)
4747
rescue Exception => exception
4848
handle_thread_error(exception)
4949
end

lib/solid_queue/scheduler.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ class Scheduler < Processes::Base
55
include Processes::Runnable
66
include LifecycleHooks
77

8-
attr_accessor :recurring_schedule
9-
108
after_boot :run_start_hooks
119
after_boot :schedule_recurring_tasks
1210
before_shutdown :unschedule_recurring_tasks
@@ -24,6 +22,8 @@ def metadata
2422
end
2523

2624
private
25+
attr_reader :recurring_schedule
26+
2727
SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks
2828

2929
def run

lib/solid_queue/worker.rb

+6-2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ class Worker < Processes::Poller
88
before_shutdown :run_stop_hooks
99
after_shutdown :run_exit_hooks
1010

11-
attr_accessor :queues, :pool
11+
attr_reader :queues
1212

1313
def initialize(**options)
1414
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)
1515

16-
@queues = Array(options[:queues])
16+
# Ensure that the queues array is deep frozen to prevent accidental modification
17+
@queues = Array(options[:queues]).map(&:freeze).freeze
18+
1719
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
1820

1921
super(**options)
@@ -24,6 +26,8 @@ def metadata
2426
end
2527

2628
private
29+
attr_reader :pool
30+
2731
def poll
2832
claim_executions.then do |executions|
2933
executions.each do |execution|

test/integration/lifecycle_hooks_test.rb

+58-20
Original file line numberDiff line numberDiff line change
@@ -6,41 +6,79 @@ class LifecycleHooksTest < ActiveSupport::TestCase
66
self.use_transactional_tests = false
77

88
test "run lifecycle hooks" do
9-
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
10-
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }
11-
SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) }
9+
SolidQueue.on_start do |s|
10+
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_start")
11+
end
1212

13-
SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
14-
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }
15-
SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) }
13+
SolidQueue.on_stop do |s|
14+
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_stop")
15+
end
1616

17-
SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
18-
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }
19-
SolidQueue.on_dispatcher_exit { JobResult.create!(status: :hook_called, value: :dispatcher_exit) }
17+
SolidQueue.on_exit do |s|
18+
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_exit")
19+
end
2020

21-
SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) }
22-
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }
23-
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) }
21+
SolidQueue.on_worker_start do |w|
22+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_start")
23+
end
2424

25-
pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
26-
wait_for_registered_processes(4)
25+
SolidQueue.on_worker_stop do |w|
26+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_stop")
27+
end
28+
29+
SolidQueue.on_worker_exit do |w|
30+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_exit")
31+
end
32+
33+
SolidQueue.on_dispatcher_start do |d|
34+
JobResult.create!(status: :hook_called, value: "dispatcher_#{d.batch_size}_start")
35+
end
36+
37+
SolidQueue.on_dispatcher_stop do |d|
38+
JobResult.create!(status: :hook_called, value: "dispatcher_#{d.batch_size}_stop")
39+
end
40+
41+
SolidQueue.on_dispatcher_exit do |d|
42+
JobResult.create!(status: :hook_called, value: "dispatcher_#{d.batch_size}_exit")
43+
end
44+
45+
SolidQueue.on_scheduler_start do |s|
46+
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_start")
47+
end
48+
49+
SolidQueue.on_scheduler_stop do |s|
50+
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_stop")
51+
end
52+
53+
SolidQueue.on_scheduler_exit do |s|
54+
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_exit")
55+
end
56+
57+
pid = run_supervisor_as_fork(
58+
workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 1 } ],
59+
dispatchers: [ { batch_size: 100 } ],
60+
skip_recurring: false
61+
)
62+
63+
wait_for_registered_processes(5)
2764

2865
terminate_process(pid)
2966
wait_for_registered_processes(0)
3067

3168

3269
results = skip_active_record_query_cache do
3370
job_results = JobResult.where(status: :hook_called)
34-
assert_equal 12, job_results.count
71+
assert_equal 15, job_results.count
3572
job_results
3673
end
3774

38-
assert_equal({ "hook_called" => 12 }, results.map(&:status).tally)
75+
assert_equal({ "hook_called" => 15 }, results.map(&:status).tally)
3976
assert_equal %w[
40-
start stop exit
41-
worker_start worker_stop worker_exit
42-
dispatcher_start dispatcher_stop dispatcher_exit
43-
scheduler_start scheduler_stop scheduler_exit
77+
Supervisor_start Supervisor_stop Supervisor_exit
78+
worker_first_queue_start worker_first_queue_stop worker_first_queue_exit
79+
worker_second_queue_start worker_second_queue_stop worker_second_queue_exit
80+
dispatcher_100_start dispatcher_100_stop dispatcher_100_exit
81+
Scheduler_start Scheduler_stop Scheduler_exit
4482
].sort, results.map(&:value).sort
4583
ensure
4684
SolidQueue::Supervisor.clear_hooks

test/unit/configuration_test.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ def assert_processes(configuration, kind, count, **attributes)
135135
end
136136

137137
def assert_has_recurring_task(scheduler, key:, **attributes)
138-
assert_equal 1, scheduler.recurring_schedule.configured_tasks.count
139-
task = scheduler.recurring_schedule.configured_tasks.detect { |t| t.key == key }
138+
assert_equal 1, scheduler.instance_variable_get(:@recurring_schedule).configured_tasks.count
139+
task = scheduler.instance_variable_get(:@recurring_schedule).configured_tasks.detect { |t| t.key == key }
140140

141141
attributes.each do |attr, value|
142142
assert_equal_value value, task.public_send(attr)

test/unit/worker_test.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class WorkerTest < ActiveSupport::TestCase
156156
@worker.start
157157
wait_for_registered_processes(1, timeout: 1.second)
158158

159-
assert_not @worker.pool.shutdown?
159+
assert_not @worker.instance_variable_get(:@pool).shutdown?
160160

161161
process = SolidQueue::Process.first
162162
assert_equal "Worker", process.kind
@@ -165,8 +165,8 @@ class WorkerTest < ActiveSupport::TestCase
165165

166166
# And now just wait until the worker tries to heartbeat and realises
167167
# it needs to stop
168-
wait_while_with_timeout(2) { !@worker.pool.shutdown? }
169-
assert @worker.pool.shutdown?
168+
wait_while_with_timeout(2) { !@worker.instance_variable_get(:@pool).shutdown? }
169+
assert @worker.instance_variable_get(:@pool).shutdown?
170170
ensure
171171
SolidQueue.process_heartbeat_interval = old_heartbeat_interval
172172
end

0 commit comments

Comments
 (0)