From 9c8390d2f62fe13e9fb47864ed389a42da6baaaf Mon Sep 17 00:00:00 2001 From: Alexandre Chakroun <alexandre.chakroun@doctolib.com> Date: Tue, 28 Jan 2025 15:47:20 +0100 Subject: [PATCH 1/5] Configure jobs to be enqueued on different shards --- .../queue_adapters/solid_queue_adapter.rb | 22 ++- test/dummy/app/jobs/shard_two_job.rb | 8 + test/dummy/config/database.yml | 8 + test/dummy/config/environments/test.rb | 7 +- test/dummy/db/queue_shard_two_schema.rb | 141 ++++++++++++++++++ test/unit/multisharding_test.rb | 51 +++++++ 6 files changed, 233 insertions(+), 4 deletions(-) create mode 100644 test/dummy/app/jobs/shard_two_job.rb create mode 100644 test/dummy/db/queue_shard_two_schema.rb create mode 100644 test/unit/multisharding_test.rb diff --git a/lib/active_job/queue_adapters/solid_queue_adapter.rb b/lib/active_job/queue_adapters/solid_queue_adapter.rb index d3042194..cef94039 100644 --- a/lib/active_job/queue_adapters/solid_queue_adapter.rb +++ b/lib/active_job/queue_adapters/solid_queue_adapter.rb @@ -8,20 +8,36 @@ module QueueAdapters # # Rails.application.config.active_job.queue_adapter = :solid_queue class SolidQueueAdapter + def initialize(db_shard: nil) + @db_shard = db_shard + end + def enqueue_after_transaction_commit? true end def enqueue(active_job) # :nodoc: - SolidQueue::Job.enqueue(active_job) + select_shard { SolidQueue::Job.enqueue(active_job) } end def enqueue_at(active_job, timestamp) # :nodoc: - SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp)) + select_shard do + SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp)) + end end def enqueue_all(active_jobs) # :nodoc: - SolidQueue::Job.enqueue_all(active_jobs) + select_shard { SolidQueue::Job.enqueue_all(active_jobs) } + end + + private + + def select_shard + if @db_shard + ActiveRecord::Base.connected_to(shard: @db_shard) { yield } + else + yield + end end end end diff --git a/test/dummy/app/jobs/shard_two_job.rb b/test/dummy/app/jobs/shard_two_job.rb new file mode 100644 index 00000000..674f1df2 --- /dev/null +++ b/test/dummy/app/jobs/shard_two_job.rb @@ -0,0 +1,8 @@ +class ShardTwoJob < ApplicationJob + self.queue_adapter = ActiveJob::QueueAdapters::SolidQueueAdapter.new(db_shard: :queue_shard_two) + queue_as :background + + def perform(arg) + JobBuffer.add(arg) + end +end diff --git a/test/dummy/config/database.yml b/test/dummy/config/database.yml index fdb186a5..7302c422 100644 --- a/test/dummy/config/database.yml +++ b/test/dummy/config/database.yml @@ -48,6 +48,10 @@ development: <<: *default database: <%= database_name_from("development_queue") %> migrations_paths: db/queue_migrate + queue_shard_two: + <<: *default + database: <%= database_name_from("development_queue_shard_two") %> + migrations_paths: db/queue_migrate test: primary: @@ -65,3 +69,7 @@ test: <<: *default database: <%= database_name_from("test_queue") %> migrations_paths: db/queue_migrate + queue_shard_two: + <<: *default + database: <%= database_name_from("test_queue_shard_two") %> + migrations_paths: db/queue_migrate diff --git a/test/dummy/config/environments/test.rb b/test/dummy/config/environments/test.rb index a5a99232..57fd3057 100644 --- a/test/dummy/config/environments/test.rb +++ b/test/dummy/config/environments/test.rb @@ -49,7 +49,12 @@ # Replace the default in-process and non-durable queuing backend for Active Job. config.active_job.queue_adapter = :solid_queue - config.solid_queue.connects_to = { database: { writing: :queue } } + config.solid_queue.connects_to = { + shards: { + queue: { writing: :queue }, + queue_shard_two: { writing: :queue_shard_two } + } + } # Annotate rendered view with file names. # config.action_view.annotate_rendered_view_with_filenames = true diff --git a/test/dummy/db/queue_shard_two_schema.rb b/test/dummy/db/queue_shard_two_schema.rb new file mode 100644 index 00000000..697c2e92 --- /dev/null +++ b/test/dummy/db/queue_shard_two_schema.rb @@ -0,0 +1,141 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.1].define(version: 1) do + create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.string "concurrency_key", null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + + create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.bigint "process_id" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true + t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + end + + create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.text "error" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true + end + + create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "queue_name", null: false + t.string "class_name", null: false + t.text "arguments" + t.integer "priority", default: 0, null: false + t.string "active_job_id" + t.datetime "scheduled_at" + t.datetime "finished_at" + t.string "concurrency_key" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" + t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" + t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" + t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" + end + + create_table "solid_queue_pauses", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "queue_name", null: false + t.datetime "created_at", null: false + t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true + end + + create_table "solid_queue_processes", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "kind", null: false + t.datetime "last_heartbeat_at", null: false + t.bigint "supervisor_id" + t.integer "pid", null: false + t.string "hostname" + t.text "metadata" + t.datetime "created_at", null: false + t.string "name", null: false + t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" + t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" + end + + create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true + t.index ["priority", "job_id"], name: "index_solid_queue_poll_all" + t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" + end + + create_table "solid_queue_recurring_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "task_key", null: false + t.datetime "run_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true + t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true + end + + create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "command", limit: 2048 + t.string "class_name" + t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 + t.boolean "static", default: true, null: false + t.text "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" + end + + create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "scheduled_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true + t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all" + end + + create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "key", null: false + t.integer "value", default: 1, null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" + t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true + end + + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade +end diff --git a/test/unit/multisharding_test.rb b/test/unit/multisharding_test.rb new file mode 100644 index 00000000..d5053ad8 --- /dev/null +++ b/test/unit/multisharding_test.rb @@ -0,0 +1,51 @@ +require "test_helper" + +class MultishardingTest < ActiveSupport::TestCase + test "jobs are enqueued in the right shard" do + assert_difference -> { SolidQueue::Job.count }, 1 do + assert_difference -> do + ActiveRecord::Base.connected_to( + shard: :queue_shard_two + ) { SolidQueue::Job.count } + end, + 1 do + AddToBufferJob.perform_later "hey!" + ShardTwoJob.perform_later "coucou!" + end + end + end + + test "jobs are enqueued for later in the right shard" do + assert_difference -> { SolidQueue::ScheduledExecution.count }, 1 do + assert_difference -> do + ActiveRecord::Base.connected_to( + shard: :queue_shard_two + ) { SolidQueue::ScheduledExecution.count } + end, + 1 do + AddToBufferJob.set(wait: 1).perform_later "hey!" + ShardTwoJob.set(wait: 1).perform_later "coucou!" + end + end + end + + test "jobs are enqueued in bulk in the right shard" do + active_jobs = [ + AddToBufferJob.new(2), + ShardTwoJob.new(6), + AddToBufferJob.new(3), + ShardTwoJob.new(7) + ] + + assert_difference -> { SolidQueue::Job.count }, 2 do + assert_difference -> do + ActiveRecord::Base.connected_to( + shard: :queue_shard_two + ) { SolidQueue::Job.count } + end, + 2 do + ActiveJob.perform_all_later(active_jobs) + end + end + end +end From 0bc399adaca1736437ddfc5f0c3d7e918f77d49e Mon Sep 17 00:00:00 2001 From: Alexandre Chakroun <alexandre.chakroun@doctolib.com> Date: Wed, 29 Jan 2025 15:57:00 +0100 Subject: [PATCH 2/5] Enable shard selection for processes --- .../queue_adapters/solid_queue_adapter.rb | 6 ++++-- lib/solid_queue.rb | 2 ++ lib/solid_queue/processes/base.rb | 17 +++++++++++++++++ test/integration/jobs_lifecycle_test.rb | 16 ++++++++++++++++ test/test_helpers/jobs_test_helper.rb | 9 +++++++++ 5 files changed, 48 insertions(+), 2 deletions(-) diff --git a/lib/active_job/queue_adapters/solid_queue_adapter.rb b/lib/active_job/queue_adapters/solid_queue_adapter.rb index cef94039..3241b07e 100644 --- a/lib/active_job/queue_adapters/solid_queue_adapter.rb +++ b/lib/active_job/queue_adapters/solid_queue_adapter.rb @@ -33,8 +33,10 @@ def enqueue_all(active_jobs) # :nodoc: private def select_shard - if @db_shard - ActiveRecord::Base.connected_to(shard: @db_shard) { yield } + shard = @db_shard || SolidQueue.primary_shard + + if shard + ActiveRecord::Base.connected_to(shard: shard) { yield } else yield end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 1e1961e6..5050d76f 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -41,6 +41,8 @@ module SolidQueue mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes + mattr_accessor :primary_shard, :active_shard + delegate :on_start, :on_stop, to: Supervisor def on_worker_start(...) diff --git a/lib/solid_queue/processes/base.rb b/lib/solid_queue/processes/base.rb index 59ec9f1a..cc5ef8e3 100644 --- a/lib/solid_queue/processes/base.rb +++ b/lib/solid_queue/processes/base.rb @@ -6,6 +6,23 @@ class Base include Callbacks # Defines callbacks needed by other concerns include AppExecutor, Registrable, Procline + after_boot -> do + if SolidQueue.connects_to.key?(:shards) + # Record the name of the primary shard, which should be used for + # adapter less jobs + if SolidQueue.primary_shard.nil? + SolidQueue.primary_shard = SolidQueue.connects_to[:shards].keys.first + end + + # Move active_shard to first position in connects_to[:shards] Hash to + # make it the default + if SolidQueue.active_shard.present? && + SolidQueue.connects_to[:shards].key?(SolidQueue.active_shard) + SolidQueue::Record.default_shard = SolidQueue.active_shard + end + end + end + attr_reader :name def initialize(*) diff --git a/test/integration/jobs_lifecycle_test.rb b/test/integration/jobs_lifecycle_test.rb index decab5b0..098a9e90 100644 --- a/test/integration/jobs_lifecycle_test.rb +++ b/test/integration/jobs_lifecycle_test.rb @@ -31,6 +31,22 @@ class JobsLifecycleTest < ActiveSupport::TestCase assert_equal 2, SolidQueue::Job.finished.count end + test "enqueue and run jobs from different shards" do + AddToBufferJob.perform_later "hey" + ShardTwoJob.perform_later "ho" + + change_active_shard_to(:queue_shard_two) do + @dispatcher.start + @worker.start + + wait_for_jobs_to_finish_for(2.seconds) + end + + assert_equal [ "ho" ], JobBuffer.values.sort + assert_equal 1, SolidQueue::ReadyExecution.count + assert_equal 1, ActiveRecord::Base.connected_to(shard: :queue_shard_two) { SolidQueue::Job.finished.count } + end + test "enqueue and run jobs that fail without retries" do RaisingJob.perform_later(ExpectedTestError, "A") RaisingJob.perform_later(ExpectedTestError, "B") diff --git a/test/test_helpers/jobs_test_helper.rb b/test/test_helpers/jobs_test_helper.rb index d0833fcf..2dc9b9d9 100644 --- a/test/test_helpers/jobs_test_helper.rb +++ b/test/test_helpers/jobs_test_helper.rb @@ -34,4 +34,13 @@ def assert_claimed_jobs(count = 1) assert_equal count, SolidQueue::ClaimedExecution.count end end + + def change_active_shard_to(new_shard_name, &block) + old_shard_name = SolidQueue.active_shard + SolidQueue.active_shard = new_shard_name + block.call + ensure + SolidQueue.active_shard = old_shard_name + SolidQueue::Record.connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to + end end From 032a2c13051ebd5734e049da55a8f1e7d39fc0ad Mon Sep 17 00:00:00 2001 From: Alexandre Chakroun <alexandre.chakroun@doctolib.com> Date: Wed, 29 Jan 2025 17:54:04 +0100 Subject: [PATCH 3/5] Clean up --- app/models/solid_queue/record.rb | 11 +++++++- .../queue_adapters/solid_queue_adapter.rb | 6 ++--- lib/solid_queue/engine.rb | 10 +++++++ lib/solid_queue/processes/base.rb | 17 ------------ test/test_helper.rb | 2 +- test/test_helpers/jobs_test_helper.rb | 9 ------- .../test_helpers/multisharding_test_helper.rb | 17 ++++++++++++ test/unit/multisharding_test.rb | 27 +++++++------------ 8 files changed, 50 insertions(+), 49 deletions(-) create mode 100644 test/test_helpers/multisharding_test_helper.rb diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb index d73e41b2..53e5b47c 100644 --- a/app/models/solid_queue/record.rb +++ b/app/models/solid_queue/record.rb @@ -4,7 +4,16 @@ module SolidQueue class Record < ActiveRecord::Base self.abstract_class = true - connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to + def self.connects_to_and_set_active_shard + connects_to(**SolidQueue.connects_to) + + if SolidQueue.connects_to.key?(:shards) && + SolidQueue.connects_to[:shards].key?(SolidQueue.active_shard) + self.default_shard = SolidQueue.active_shard + end + end + + connects_to_and_set_active_shard if SolidQueue.connects_to def self.non_blocking_lock if SolidQueue.use_skip_locked diff --git a/lib/active_job/queue_adapters/solid_queue_adapter.rb b/lib/active_job/queue_adapters/solid_queue_adapter.rb index 3241b07e..e3c4b2f2 100644 --- a/lib/active_job/queue_adapters/solid_queue_adapter.rb +++ b/lib/active_job/queue_adapters/solid_queue_adapter.rb @@ -32,13 +32,13 @@ def enqueue_all(active_jobs) # :nodoc: private - def select_shard + def select_shard(&block) shard = @db_shard || SolidQueue.primary_shard if shard - ActiveRecord::Base.connected_to(shard: shard) { yield } + ActiveRecord::Base.connected_to(shard: shard) { block.call } else - yield + block.call end end end diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index 99e14150..8fc86061 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -45,5 +45,15 @@ class Engine < ::Rails::Engine SolidQueue::Processes::Base.include SolidQueue::Processes::OgInterruptible end end + + initializer "solid_queue.shard_configuration" do + ActiveSupport.on_load(:solid_queue) do + # Record the name of the primary shard, which should be used for + # adapter less jobs + if SolidQueue.connects_to.key?(:shards) && SolidQueue.primary_shard.nil? + SolidQueue.primary_shard = SolidQueue.connects_to[:shards].keys.first + end + end + end end end diff --git a/lib/solid_queue/processes/base.rb b/lib/solid_queue/processes/base.rb index cc5ef8e3..59ec9f1a 100644 --- a/lib/solid_queue/processes/base.rb +++ b/lib/solid_queue/processes/base.rb @@ -6,23 +6,6 @@ class Base include Callbacks # Defines callbacks needed by other concerns include AppExecutor, Registrable, Procline - after_boot -> do - if SolidQueue.connects_to.key?(:shards) - # Record the name of the primary shard, which should be used for - # adapter less jobs - if SolidQueue.primary_shard.nil? - SolidQueue.primary_shard = SolidQueue.connects_to[:shards].keys.first - end - - # Move active_shard to first position in connects_to[:shards] Hash to - # make it the default - if SolidQueue.active_shard.present? && - SolidQueue.connects_to[:shards].key?(SolidQueue.active_shard) - SolidQueue::Record.default_shard = SolidQueue.active_shard - end - end - end - attr_reader :name def initialize(*) diff --git a/test/test_helper.rb b/test/test_helper.rb index f54b73f2..4a12a728 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -28,7 +28,7 @@ class ExpectedTestError < RuntimeError; end class ActiveSupport::TestCase - include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper + include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper, MultishardingTestHelper setup do @_on_thread_error = SolidQueue.on_thread_error diff --git a/test/test_helpers/jobs_test_helper.rb b/test/test_helpers/jobs_test_helper.rb index 2dc9b9d9..d0833fcf 100644 --- a/test/test_helpers/jobs_test_helper.rb +++ b/test/test_helpers/jobs_test_helper.rb @@ -34,13 +34,4 @@ def assert_claimed_jobs(count = 1) assert_equal count, SolidQueue::ClaimedExecution.count end end - - def change_active_shard_to(new_shard_name, &block) - old_shard_name = SolidQueue.active_shard - SolidQueue.active_shard = new_shard_name - block.call - ensure - SolidQueue.active_shard = old_shard_name - SolidQueue::Record.connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to - end end diff --git a/test/test_helpers/multisharding_test_helper.rb b/test/test_helpers/multisharding_test_helper.rb new file mode 100644 index 00000000..98ba3f13 --- /dev/null +++ b/test/test_helpers/multisharding_test_helper.rb @@ -0,0 +1,17 @@ +module MultishardingTestHelper + private + + def connected_to_shard_two(&block) + ActiveRecord::Base.connected_to(shard: :queue_shard_two) { block.call } + end + + def change_active_shard_to(new_shard_name, &block) + old_shard_name = SolidQueue.active_shard + SolidQueue.active_shard = new_shard_name + SolidQueue::Record.connects_to_and_set_active_shard + block.call + ensure + SolidQueue.active_shard = old_shard_name + SolidQueue::Record.connects_to_and_set_active_shard + end +end diff --git a/test/unit/multisharding_test.rb b/test/unit/multisharding_test.rb index d5053ad8..106795c5 100644 --- a/test/unit/multisharding_test.rb +++ b/test/unit/multisharding_test.rb @@ -3,26 +3,22 @@ class MultishardingTest < ActiveSupport::TestCase test "jobs are enqueued in the right shard" do assert_difference -> { SolidQueue::Job.count }, 1 do - assert_difference -> do - ActiveRecord::Base.connected_to( - shard: :queue_shard_two - ) { SolidQueue::Job.count } - end, - 1 do + assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 1 do AddToBufferJob.perform_later "hey!" ShardTwoJob.perform_later "coucou!" end end end + test "jobs are enqueued in the right shard no matter the primary shard" do + assert_difference -> { SolidQueue::Job.count }, 1 do + change_active_shard_to(:queue_shard_two) { AddToBufferJob.perform_later "hey!" } + end + end + test "jobs are enqueued for later in the right shard" do assert_difference -> { SolidQueue::ScheduledExecution.count }, 1 do - assert_difference -> do - ActiveRecord::Base.connected_to( - shard: :queue_shard_two - ) { SolidQueue::ScheduledExecution.count } - end, - 1 do + assert_difference -> { connected_to_shard_two { SolidQueue::ScheduledExecution.count } }, 1 do AddToBufferJob.set(wait: 1).perform_later "hey!" ShardTwoJob.set(wait: 1).perform_later "coucou!" end @@ -38,12 +34,7 @@ class MultishardingTest < ActiveSupport::TestCase ] assert_difference -> { SolidQueue::Job.count }, 2 do - assert_difference -> do - ActiveRecord::Base.connected_to( - shard: :queue_shard_two - ) { SolidQueue::Job.count } - end, - 2 do + assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 2 do ActiveJob.perform_all_later(active_jobs) end end From 5d6e5b9639c39e8320887196ef9b194d6ff0c2a2 Mon Sep 17 00:00:00 2001 From: Alexandre Chakroun <alexandre.chakroun@doctolib.com> Date: Thu, 30 Jan 2025 15:47:57 +0100 Subject: [PATCH 4/5] Add shard_selection_lambda --- .../queue_adapters/solid_queue_adapter.rb | 13 ++++--- lib/solid_queue.rb | 2 +- test/unit/multisharding_test.rb | 37 +++++++++++++++++++ 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/lib/active_job/queue_adapters/solid_queue_adapter.rb b/lib/active_job/queue_adapters/solid_queue_adapter.rb index e3c4b2f2..9ae293fd 100644 --- a/lib/active_job/queue_adapters/solid_queue_adapter.rb +++ b/lib/active_job/queue_adapters/solid_queue_adapter.rb @@ -17,23 +17,26 @@ def enqueue_after_transaction_commit? end def enqueue(active_job) # :nodoc: - select_shard { SolidQueue::Job.enqueue(active_job) } + select_shard(active_job:) { SolidQueue::Job.enqueue(active_job) } end def enqueue_at(active_job, timestamp) # :nodoc: - select_shard do + select_shard(active_job:) do SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp)) end end def enqueue_all(active_jobs) # :nodoc: - select_shard { SolidQueue::Job.enqueue_all(active_jobs) } + select_shard(active_jobs:) { SolidQueue::Job.enqueue_all(active_jobs) } end private - def select_shard(&block) - shard = @db_shard || SolidQueue.primary_shard + def select_shard(active_job: nil, active_jobs: nil, &block) + shard = + SolidQueue.shard_selection_lambda&.call(active_job:, active_jobs:) || + @db_shard || + SolidQueue.primary_shard if shard ActiveRecord::Base.connected_to(shard: shard) { block.call } diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 5050d76f..68bfd079 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -41,7 +41,7 @@ module SolidQueue mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes - mattr_accessor :primary_shard, :active_shard + mattr_accessor :primary_shard, :active_shard, :shard_selection_lambda delegate :on_start, :on_stop, to: Supervisor diff --git a/test/unit/multisharding_test.rb b/test/unit/multisharding_test.rb index 106795c5..5ab086ea 100644 --- a/test/unit/multisharding_test.rb +++ b/test/unit/multisharding_test.rb @@ -16,6 +16,16 @@ class MultishardingTest < ActiveSupport::TestCase end end + test "shard_selection_lambda can override which shard is used to enqueue individual jobs" do + shard_selection_lambda = ->(active_job:, active_jobs:) { :queue_shard_two if active_job.arguments.first == "hey!" } + + with_shard_selection_lambda(shard_selection_lambda) do + assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 1 do + AddToBufferJob.perform_later "hey!" + end + end + end + test "jobs are enqueued for later in the right shard" do assert_difference -> { SolidQueue::ScheduledExecution.count }, 1 do assert_difference -> { connected_to_shard_two { SolidQueue::ScheduledExecution.count } }, 1 do @@ -39,4 +49,31 @@ class MultishardingTest < ActiveSupport::TestCase end end end + + test "shard_selection_lambda can override which shard is used to enqueue jobs in bulk" do + active_jobs = [ + AddToBufferJob.new(2), + ShardTwoJob.new(6), + AddToBufferJob.new(3), + ShardTwoJob.new(7) + ] + shard_selection_lambda = ->(active_job:, active_jobs:) { :queue_shard_two if active_jobs.size == 2 } + + with_shard_selection_lambda(shard_selection_lambda) do + assert_difference -> { SolidQueue::Job.count }, 0 do + assert_difference -> { connected_to_shard_two { SolidQueue::Job.count } }, 4 do + ActiveJob.perform_all_later(active_jobs) + end + end + end + end + + private + + def with_shard_selection_lambda(lambda, &block) + SolidQueue.shard_selection_lambda = lambda + block.call + ensure + SolidQueue.shard_selection_lambda = nil + end end From 9d0aa96297837b3c52127dbc0bdef2d557eb91a6 Mon Sep 17 00:00:00 2001 From: Alexandre Chakroun <alexandre.chakroun@doctolib.com> Date: Thu, 30 Jan 2025 15:48:04 +0100 Subject: [PATCH 5/5] Add documentation --- README.md | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/README.md b/README.md index c77ed953..24cb2b42 100644 --- a/README.md +++ b/README.md @@ -618,6 +618,73 @@ my_periodic_resque_job: and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time. +## Multisharding + +If your application reaches a point where the pressure on the database used for jobs is such that you need to spread the load over multiple databases, then this section is for you. + +You can extend the Solid Queue database configuration to use different shards: + + ```ruby + config.solid_queue.connects_to = { + shards: { + queue_shard_one: { writing: :queue_shard_one }, + queue_shard_two: { writing: :queue_shard_two } + } + } + ``` + +Queue database shards will need to have been defined in `config/database.yml` as shown in the installation section. Both shards need to share the same schema, and down the line share the same migration configuration: + + ```yaml + production: + primary: + <<: *default + database: storage/production.sqlite3 + queue_shard_one: + <<: *default + database: storage/production_queue_shard_one.sqlite3 + migrations_paths: db/queue_migrate + queue_shard_two: + <<: *default + database: storage/production_queue_shard_two.sqlite3 + migrations_paths: db/queue_migrate + ``` + +Simply converting a simpler database configuration such as `config.solid_queue.connects_to = { database: { writing: :queue } }` to `config.solid_queue.connects_to = { shards: { queue: { writing: :queue } } }` will not have any effects on the behavior of Solid Queue. + +### Configuration + +In `config/environments/production.rb` or for the environment you want to enable Solid Queue in, you can define the following options: + + ```ruby + config.solid_queue.primary_shard = :queue_shard_one # optional + config.solid_queue.active_shard = ENV["SOLID_QUEUE_ACTIVE_SHARD"]&.to_sym + config.solid_queue.shard_selection_lambda = ->(active_job:, active_jobs:) { nil } + ``` + +- `config.solid_queue.primary_shard` is the shard that will be used to enqueue or schedule jobs without any specific adapter configuration. It defaults to the first shard found in `config.solid_queue.connects_to` (ActiveRecord default) +- `config.solid_queue.active_shard` is the shard that will be used by workers, dispatchers and schedulers to manage and process jobs. It defaults to the `primary_shard`. + With a basic Solid Queue configuration and the option described above you can start a worker and dispatcher working on the `queue_shard_two` shard by running `SOLID_QUEUE_ACTIVE_SHARD=queue_shard_two bin/jobs start` +- `config.solid_queue.shard_selection_lambda` helps you define a custom strategy to determine in which shard a job should be enqueued. It accepts keyword parameters `active_job` when a single job is enqueued or scheduled and `active_jobs` when jobs are bulk enqueued. If the lambda is defined but returns `nil`, Solid Queue will use the adapter defined for the job. + +### Enqueueing jobs in different shards + +Individual jobs can be assigned to shards by leveraging their `queue_adapter` property: + + ```ruby + class SomeJob < ApplicationJob + self.queue_adapter = ActiveJob::QueueAdapters::SolidQueueAdapter.new(db_shard: :queue_shard_two) + ``` + +This job will be enqueued in the shard named `queue_shard_two`. + +Alternatively you can define a lambda to implement a custom strategy for defining which shard a job will be enqueued to: + + ```ruby + config.solid_queue.shard_selection_lambda = ->(active_job:, active_jobs:) { SolidQueue.connects_to[:shards].keys.sample } # pick a shard randomly + ``` + + ## Inspiration Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot.