Skip to content

Commit d9c1763

Browse files
authored
Merge pull request #269 from rails/proper-async-mode
Implement an `async` execution mode to run Solid Queue workers and dispatchers in the same process
2 parents f0ca6b4 + cdd87ef commit d9c1763

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1086
-517
lines changed

README.md

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ $ bundle exec rake solid_queue:start
6666

6767
This will start processing jobs in all queues using the default configuration. See [below](#configuration) to learn more about configuring Solid Queue.
6868

69-
For small projects, you can run Solid Queue on the same machine as your webserver. When you're ready to scale, Solid Queue supports horizontal scaling out-of-the-box. You can run Solid Queue on a separate server from your webserver, or even run `bundle exec rake solid_queue:start` on multiple machines at the same time. If you'd like to designate some machines to be only dispatchers or only workers, use `bundle exec rake solid_queue:dispatch` or `bundle exec rake solid_queue:work`, respectively.
69+
For small projects, you can run Solid Queue on the same machine as your webserver. When you're ready to scale, Solid Queue supports horizontal scaling out-of-the-box. You can run Solid Queue on a separate server from your webserver, or even run `bundle exec rake solid_queue:start` on multiple machines at the same time. Depending on the configuration, you can designate some machines to run only dispatchers or only workers. See the [configuration](#configuration) section for more details on this.
7070

7171
## Requirements
7272
Besides Rails 7.1, Solid Queue works best with MySQL 8+ or PostgreSQL 9.5+, as they support `FOR UPDATE SKIP LOCKED`. You can use it with older versions, but in that case, you might run into lock waits if you run multiple workers for the same queue.
@@ -75,10 +75,12 @@ Besides Rails 7.1, Solid Queue works best with MySQL 8+ or PostgreSQL 9.5+, as t
7575

7676
### Workers and dispatchers
7777

78-
We have three types of processes in Solid Queue:
78+
We have three types of actors in Solid Queue:
7979
- _Workers_ are in charge of picking jobs ready to run from queues and processing them. They work off the `solid_queue_ready_executions` table.
8080
- _Dispatchers_ are in charge of selecting jobs scheduled to run in the future that are due and _dispatching_ them, which is simply moving them from the `solid_queue_scheduled_executions` table over to the `solid_queue_ready_executions` table so that workers can pick them up. They're also in charge of managing [recurring tasks](#recurring-tasks), dispatching jobs to process them according to their schedule. On top of that, they do some maintenance work related to [concurrency controls](#concurrency-controls).
81-
- The _supervisor_ forks workers and dispatchers according to the configuration, controls their heartbeats, and sends them signals to stop and start them when needed.
81+
- The _supervisor_ runs workers and dispatchers according to the configuration, controls their heartbeats, and stops and starts them when needed.
82+
83+
By default, Solid Queue runs in `fork` mode. This means the supervisor will fork a separate process for each supervised worker/dispatcher. There's also an `async` mode where each worker and dispatcher will be run as a thread of the supervisor process. This can be used with [the provided Puma plugin](#puma-plugin)
8284

8385
By default, Solid Queue will try to find your configuration under `config/solid_queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG`. This is what this configuration looks like:
8486

@@ -98,7 +100,18 @@ production:
98100
processes: 3
99101
```
100102
101-
Everything is optional. If no configuration is provided, Solid Queue will run with one dispatcher and one worker with default settings.
103+
Everything is optional. If no configuration at all is provided, Solid Queue will run with one dispatcher and one worker with default settings. If you want to run only dispatchers or workers, you just need to include that section alone in the configuration. For example, with the following configuration:
104+
105+
```yml
106+
production:
107+
dispatchers:
108+
- polling_interval: 1
109+
batch_size: 500
110+
concurrency_maintenance_interval: 300
111+
```
112+
the supervisor will run 1 dispatcher and no workers.
113+
114+
Here's an overview of the different options:
102115
103116
- `polling_interval`: the time interval in seconds that workers and dispatchers will wait before checking for more jobs. This time defaults to `1` second for dispatchers and `0.1` seconds for workers.
104117
- `batch_size`: the dispatcher will dispatch jobs in batches of this size. The default is 500.
@@ -118,7 +131,7 @@ Everything is optional. If no configuration is provided, Solid Queue will run wi
118131

119132
Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names.
120133
- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
121-
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
134+
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#running-as-a-fork-or-asynchronously).
122135
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
123136
- `recurring_tasks`: a list of recurring tasks the dispatcher will manage. Read more details about this one in the [Recurring tasks](#recurring-tasks) section.
124137

@@ -292,6 +305,18 @@ plugin :solid_queue
292305
```
293306
to your `puma.rb` configuration.
294307

308+
### Running as a fork or asynchronously
309+
310+
By default, the Puma plugin will fork additional processes for each worker and dispatcher so that they run in different processes. This provides the best isolation and performance, but can have additional memory usage.
311+
312+
Alternatively, workers and dispatchers can be run within the same Puma process(s). To do so just configure the plugin as:
313+
314+
```ruby
315+
plugin :solid_queue
316+
solid_queue_mode :async
317+
```
318+
319+
Note that in this case, the `processes` configuration option will be ignored.
295320

296321
## Jobs and transactional integrity
297322
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you.

UPGRADING.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Upgrading to version 0.4.x
2+
This version introduced an _async_ mode to run the supervisor and have all workers and dispatchers run as part of the same process as the supervisor, instead of separate, forked, processes. Together with this, we introduced some changes in how the supervisor is started. Prior this change, you could choose whether you wanted to run workers, dispatchers or both, by starting Solid Queue as `solid_queue:work` or `solid_queue:dispatch`. From version 0.4.0, the only option available is:
3+
4+
```
5+
$ bundle exec rake solid_queue:start
6+
```
7+
Whether the supervisor starts workers, dispatchers or both will depend on your configuration. For example, if you don't configure any dispatchers, only workers will be started. That is, with this configuration:
8+
9+
```yml
10+
production:
11+
workers:
12+
- queues: [ real_time, background ]
13+
threads: 5
14+
polling_interval: 0.1
15+
processes: 3
16+
```
17+
the supervisor will run 3 workers, each one with 5 threads, and no supervisors. With this configuration:
18+
```yml
19+
production:
20+
dispatchers:
21+
- polling_interval: 1
22+
batch_size: 500
23+
concurrency_maintenance_interval: 300
24+
```
25+
the supervisor will run 1 dispatcher and no workers.
26+
27+
28+
# Upgrading to version 0.3.x
29+
30+
This version introduced support for [recurring (cron-style) jobs](https://github.com/rails/solid_queue/blob/main/README.md#recurring-tasks), and it needs a new DB migration for it. To install it, just run:
31+
32+
```bash
33+
$ bin/rails solid_queue:install:migrations
34+
```
35+
36+
Or, if you're using a different database for Solid Queue:
37+
38+
```bash
39+
$ bin/rails solid_queue:install:migrations DATABASE=<the_name_of_your_solid_queue_db>
40+
```
41+
42+
And then run the migrations.

app/models/solid_queue/claimed_execution.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@ class << self
1515
def claiming(job_ids, process_id, &block)
1616
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }
1717

18-
insert_all!(job_data)
19-
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
20-
block.call(claimed)
18+
SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload|
19+
insert_all!(job_data)
20+
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
21+
block.call(claimed)
22+
23+
payload[:size] = claimed.size
24+
payload[:claimed_job_ids] = claimed.map(&:job_id)
25+
end
2126
end
2227
end
2328

app/models/solid_queue/process.rb

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
# frozen_string_literal: true
22

33
class SolidQueue::Process < SolidQueue::Record
4-
include Prunable
4+
include Executor, Prunable
55

6-
belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :forks
7-
has_many :forks, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, dependent: :destroy
8-
has_many :claimed_executions
6+
belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :supervisees
7+
has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, dependent: :destroy
98

109
store :metadata, coder: JSON
1110

12-
after_destroy -> { claimed_executions.release_all }
13-
1411
def self.register(**attributes)
15-
SolidQueue.instrument :register_process, **attributes do
16-
create!(attributes.merge(last_heartbeat_at: Time.current))
12+
SolidQueue.instrument :register_process, **attributes do |payload|
13+
create!(attributes.merge(last_heartbeat_at: Time.current)).tap do |process|
14+
payload[:process_id] = process.id
15+
end
1716
end
1817
rescue Exception => error
1918
SolidQueue.instrument :register_process, **attributes.merge(error: error)
@@ -25,7 +24,9 @@ def heartbeat
2524
end
2625

2726
def deregister(pruned: false)
28-
SolidQueue.instrument :deregister_process, process: self, pruned: pruned, claimed_size: claimed_executions.size do |payload|
27+
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do |payload|
28+
payload[:claimed_size] = claimed_executions.size if claims_executions?
29+
2930
destroy!
3031
rescue Exception => error
3132
payload[:error] = error
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Process
5+
module Executor
6+
extend ActiveSupport::Concern
7+
8+
included do
9+
has_many :claimed_executions
10+
11+
after_destroy -> { claimed_executions.release_all }, if: :claims_executions?
12+
end
13+
14+
private
15+
def claims_executions?
16+
kind == "Worker"
17+
end
18+
end
19+
end
20+
end

app/models/solid_queue/process/prunable.rb

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
# frozen_string_literal: true
22

3-
module SolidQueue::Process::Prunable
4-
extend ActiveSupport::Concern
3+
module SolidQueue
4+
class Process
5+
module Prunable
6+
extend ActiveSupport::Concern
57

6-
included do
7-
scope :prunable, -> { where(last_heartbeat_at: ..SolidQueue.process_alive_threshold.ago) }
8-
end
8+
included do
9+
scope :prunable, -> { where(last_heartbeat_at: ..SolidQueue.process_alive_threshold.ago) }
10+
end
911

10-
class_methods do
11-
def prune
12-
SolidQueue.instrument :prune_processes, size: 0 do |payload|
13-
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
14-
payload[:size] += batch.size
12+
class_methods do
13+
def prune
14+
SolidQueue.instrument :prune_processes, size: 0 do |payload|
15+
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
16+
payload[:size] += batch.size
1517

16-
batch.each { |process| process.deregister(pruned: true) }
18+
batch.each { |process| process.deregister(pruned: true) }
19+
end
20+
end
1721
end
1822
end
1923
end

lib/puma/plugin/solid_queue.rb

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,50 @@
11
require "puma/plugin"
22

3+
module Puma
4+
class DSL
5+
def solid_queue_mode(mode = :fork)
6+
@options[:solid_queue_mode] = mode.to_sym
7+
end
8+
end
9+
end
10+
311
Puma::Plugin.create do
4-
attr_reader :puma_pid, :solid_queue_pid, :log_writer
12+
attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor
513

614
def start(launcher)
715
@log_writer = launcher.log_writer
816
@puma_pid = $$
917

10-
in_background do
11-
monitor_solid_queue
18+
if launcher.options[:solid_queue_mode] == :async
19+
start_async(launcher)
20+
else
21+
start_forked(launcher)
1222
end
23+
end
1324

14-
launcher.events.on_booted do
15-
@solid_queue_pid = fork do
16-
Thread.new { monitor_puma }
17-
SolidQueue::Supervisor.start(mode: :all)
25+
private
26+
def start_forked(launcher)
27+
in_background do
28+
monitor_solid_queue
1829
end
30+
31+
launcher.events.on_booted do
32+
@solid_queue_pid = fork do
33+
Thread.new { monitor_puma }
34+
SolidQueue::Supervisor.start(mode: :fork)
35+
end
36+
end
37+
38+
launcher.events.on_stopped { stop_solid_queue }
39+
launcher.events.on_restart { stop_solid_queue }
1940
end
2041

21-
launcher.events.on_stopped { stop_solid_queue }
22-
launcher.events.on_restart { stop_solid_queue }
23-
end
42+
def start_async(launcher)
43+
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
44+
launcher.events.on_stopped { solid_queue_supervisor.stop }
45+
launcher.events.on_restart { solid_queue_supervisor.stop; solid_queue_supervisor.start }
46+
end
2447

25-
private
2648
def stop_solid_queue
2749
Process.waitpid(solid_queue_pid, Process::WNOHANG)
2850
log "Stopping Solid Queue..."

lib/solid_queue/configuration.rb

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,30 @@ class Configuration
1717
recurring_tasks: []
1818
}
1919

20-
def initialize(mode: :work, load_from: nil)
21-
@mode = mode
20+
def initialize(mode: :fork, load_from: nil)
21+
@mode = mode.to_s.inquiry
2222
@raw_config = config_from(load_from)
2323
end
2424

2525
def processes
26-
case mode
27-
when :dispatch then dispatchers
28-
when :work then workers
29-
when :all then dispatchers + workers
30-
else raise "Invalid mode #{mode}"
31-
end
26+
dispatchers + workers
3227
end
3328

3429
def workers
35-
if mode.in? %i[ work all]
36-
workers_options.flat_map do |worker_options|
37-
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
38-
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
30+
workers_options.flat_map do |worker_options|
31+
processes = if mode.fork?
32+
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
33+
else
34+
WORKER_DEFAULTS[:processes]
3935
end
40-
else
41-
[]
36+
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
4237
end
4338
end
4439

4540
def dispatchers
46-
if mode.in? %i[ dispatch all]
47-
dispatchers_options.map do |dispatcher_options|
48-
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
49-
50-
Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
51-
end
41+
dispatchers_options.map do |dispatcher_options|
42+
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
43+
Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
5244
end
5345
end
5446

@@ -68,15 +60,19 @@ def config_from(file_or_hash, env: Rails.env)
6860
end
6961

7062
def workers_options
71-
@workers_options ||= (raw_config[:workers] || [ WORKER_DEFAULTS ])
63+
@workers_options ||= options_from_raw_config(:workers, WORKER_DEFAULTS)
7264
.map { |options| options.dup.symbolize_keys }
7365
end
7466

7567
def dispatchers_options
76-
@dispatchers_options ||= (raw_config[:dispatchers] || [ DISPATCHER_DEFAULTS ])
68+
@dispatchers_options ||= options_from_raw_config(:dispatchers, DISPATCHER_DEFAULTS)
7769
.map { |options| options.dup.symbolize_keys }
7870
end
7971

72+
def options_from_raw_config(key, defaults)
73+
raw_config.empty? ? [ defaults ] : Array(raw_config[key])
74+
end
75+
8076
def parse_recurring_tasks(tasks)
8177
Array(tasks).map do |id, options|
8278
Dispatcher::RecurringTask.from_configuration(id, **options)

lib/solid_queue/dispatcher.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
# frozen_string_literal: true
22

33
module SolidQueue
4-
class Dispatcher < Processes::Base
5-
include Processes::Poller
6-
4+
class Dispatcher < Processes::Poller
75
attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule
86

97
after_boot :start_concurrency_maintenance, :load_recurring_schedule
@@ -13,10 +11,11 @@ def initialize(**options)
1311
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)
1412

1513
@batch_size = options[:batch_size]
16-
@polling_interval = options[:polling_interval]
1714

1815
@concurrency_maintenance = ConcurrencyMaintenance.new(options[:concurrency_maintenance_interval], options[:batch_size]) if options[:concurrency_maintenance]
1916
@recurring_schedule = RecurringSchedule.new(options[:recurring_tasks])
17+
18+
super(**options)
2019
end
2120

2221
def metadata

lib/solid_queue/dispatcher/concurrency_maintenance.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def start
2525
end
2626

2727
def stop
28-
@concurrency_maintenance_task.shutdown
28+
@concurrency_maintenance_task&.shutdown
2929
end
3030

3131
private

0 commit comments

Comments
 (0)