Skip to content

Commit c2a0780

Browse files
committed
add worker_ids, and have workers yield self to hooks
1 parent 9cd6bc3 commit c2a0780

File tree

6 files changed

+71
-15
lines changed

6 files changed

+71
-15
lines changed

README.md

+11
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,9 @@ And into two different points in the worker's, dispatcher's and scheduler's life
379379
- `(worker|dispatcher|scheduler)_start`: after the worker/dispatcher/scheduler has finished booting and right before it starts the polling loop or loading the recurring schedule.
380380
- `(worker|dispatcher|scheduler)_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).
381381

382+
The hooks for workers will have the worker instance yielded to the block so that you may read its configuration
383+
for logging or other metrics reporting purposes.
384+
382385
You can use the following methods with a block to do this:
383386
```ruby
384387
SolidQueue.on_start
@@ -398,6 +401,14 @@ For example:
398401
```ruby
399402
SolidQueue.on_start { start_metrics_server }
400403
SolidQueue.on_stop { stop_metrics_server }
404+
405+
SolidQueue.on_worker_start do |worker|
406+
Rails.logger.info "Worker #{worker.worker_id} started with queues: #{worker.queues.join(',')}"
407+
end
408+
409+
SolidQueue.on_worker_stop do |worker|
410+
Rails.logger.info "Worker #{worker.worker_id} stopped with queues: #{worker.queues.join(',')}"
411+
end
401412
```
402413

403414
These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.

lib/solid_queue/configuration.rb

+16-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def instantiate
3333

3434
def initialize(**options)
3535
@options = options.with_defaults(default_options)
36+
@worker_id_counter = 0
3637
end
3738

3839
def configured_processes
@@ -109,12 +110,24 @@ def skip_recurring_tasks?
109110
end
110111

111112
def workers
112-
workers_options.flat_map do |worker_options|
113-
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
114-
processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
113+
workers_options.flat_map { |worker_options| generate_workers(worker_options) }
114+
end
115+
116+
def generate_workers(worker_options)
117+
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
118+
processes.times.map do |idx|
119+
worker_id = next_worker_id
120+
options_with_defaults = worker_options.with_defaults(WORKER_DEFAULTS).merge(worker_id:)
121+
Process.new(:worker, options_with_defaults)
115122
end
116123
end
117124

125+
def next_worker_id
126+
id = @worker_id_counter
127+
@worker_id_counter += 1
128+
id
129+
end
130+
118131
def dispatchers
119132
dispatchers_options.map do |dispatcher_options|
120133
Process.new :dispatcher, dispatcher_options.with_defaults(DISPATCHER_DEFAULTS)

lib/solid_queue/lifecycle_hooks.rb

+9-1
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,18 @@ def run_exit_hooks
4343

4444
def run_hooks_for(event)
4545
self.class.lifecycle_hooks.fetch(event, []).each do |block|
46-
block.call
46+
if yield_self_to_hooks?
47+
block.call(self)
48+
else
49+
block.call
50+
end
4751
rescue Exception => exception
4852
handle_thread_error(exception)
4953
end
5054
end
55+
56+
def yield_self_to_hooks?
57+
false
58+
end
5159
end
5260
end

lib/solid_queue/worker.rb

+12-3
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,24 @@ class Worker < Processes::Poller
88
before_shutdown :run_stop_hooks
99
after_shutdown :run_exit_hooks
1010

11-
attr_accessor :queues, :pool
11+
attr_accessor :pool
12+
13+
attr_reader :worker_id, :queues
1214

1315
def initialize(**options)
1416
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)
1517

16-
@queues = Array(options[:queues])
18+
# Ensure that the queues array is deep frozen to prevent accidental modification
19+
@queues = Array(options[:queues]).map(&:freeze).freeze
20+
1721
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
22+
@worker_id = options[:worker_id]
1823

1924
super(**options)
2025
end
2126

2227
def metadata
23-
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
28+
super.merge(queues: queues.join(","), thread_pool_size: pool.size, worker_id:)
2429
end
2530

2631
private
@@ -54,5 +59,9 @@ def all_work_completed?
5459
def set_procline
5560
procline "waiting for jobs in #{queues.join(",")}"
5661
end
62+
63+
def yield_self_to_hooks?
64+
true
65+
end
5766
end
5867
end

test/integration/jobs_lifecycle_test.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase
66
setup do
77
@_on_thread_error = SolidQueue.on_thread_error
88
SolidQueue.on_thread_error = silent_on_thread_error_for([ ExpectedTestError, RaisingJob::DefaultError ], @_on_thread_error)
9-
@worker = SolidQueue::Worker.new(queues: "background", threads: 3)
9+
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, worker_id: 1)
1010
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2)
1111
end
1212

test/integration/lifecycle_hooks_test.rb

+22-7
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,17 @@ class LifecycleHooksTest < ActiveSupport::TestCase
1010
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }
1111
SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) }
1212

13-
SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
14-
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }
15-
SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) }
13+
SolidQueue.on_worker_start do |w|
14+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_#{w.worker_id}_start")
15+
end
16+
17+
SolidQueue.on_worker_stop do |w|
18+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_#{w.worker_id}_stop")
19+
end
20+
21+
SolidQueue.on_worker_exit do |w|
22+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_#{w.worker_id}_exit")
23+
end
1624

1725
SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
1826
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }
@@ -22,8 +30,13 @@ class LifecycleHooksTest < ActiveSupport::TestCase
2230
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }
2331
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) }
2432

25-
pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
26-
wait_for_registered_processes(4)
33+
pid = run_supervisor_as_fork(
34+
workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 2 } ],
35+
dispatchers: [ { batch_size: 100 } ],
36+
skip_recurring: false
37+
)
38+
39+
wait_for_registered_processes(6)
2740

2841
terminate_process(pid)
2942
wait_for_registered_processes(0)
@@ -35,10 +48,12 @@ class LifecycleHooksTest < ActiveSupport::TestCase
3548
job_results
3649
end
3750

38-
assert_equal({ "hook_called" => 12 }, results.map(&:status).tally)
51+
assert_equal({ "hook_called" => 18 }, results.map(&:status).tally)
3952
assert_equal %w[
4053
start stop exit
41-
worker_start worker_stop worker_exit
54+
worker_first_queue_0_start worker_first_queue_0_stop worker_first_queue_0_exit
55+
worker_second_queue_1_start worker_second_queue_1_stop worker_second_queue_1_exit
56+
worker_second_queue_2_start worker_second_queue_2_stop worker_second_queue_2_exit
4257
dispatcher_start dispatcher_stop dispatcher_exit
4358
scheduler_start scheduler_stop scheduler_exit
4459
].sort, results.map(&:value).sort

0 commit comments

Comments
 (0)