From f290af14ff466595a3071bd9a2141fa21be0bc8f Mon Sep 17 00:00:00 2001 From: Joel Warrington <joelw@hey.com> Date: Tue, 25 Feb 2025 01:40:07 -0700 Subject: [PATCH 1/6] Add capability to discard duplicate jobs with concurrency configuration --- .../solid_queue/job/concurrency_controls.rb | 10 +++- app/models/solid_queue/job/executable.rb | 3 +- app/models/solid_queue/semaphore.rb | 12 +++++ lib/active_job/concurrency_controls.rb | 4 +- test/models/solid_queue/job_test.rb | 50 ++++++++++++++++++- 5 files changed, 74 insertions(+), 5 deletions(-) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 6ae12e28..87b723a0 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_duration, to: :job_class + delegate :concurrency_limit, :concurrency_on_duplicate, :concurrency_duration, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end @@ -34,6 +34,14 @@ def blocked? end private + def duplicate? + Semaphore.at_limit?(self) + end + + def discard_on_duplicate? + concurrency_on_duplicate == :discard && duplicate? + end + def acquire_concurrency_lock return true unless concurrency_limited? diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index e2146a67..ba01132f 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -65,7 +65,8 @@ def prepare_for_execution end def dispatch - if acquire_concurrency_lock then ready + if discard_on_duplicate? then discard + elsif acquire_concurrency_lock then ready else block end diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 62eeb035..96cce74a 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -10,6 +10,10 @@ def wait(job) Proxy.new(job).wait end + def at_limit?(job) + Proxy.new(job).at_limit? + end + def signal(job) Proxy.new(job).signal end @@ -39,6 +43,14 @@ def initialize(job) @job = job end + def at_limit? + if semaphore = Semaphore.find_by(key: key) + semaphore.value.zero? + else + false + end + end + def wait if semaphore = Semaphore.find_by(key: key) semaphore.value > 0 && attempt_decrement diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 0ea290f6..6b0f08e8 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -11,15 +11,17 @@ module ConcurrencyControls class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false class_attribute :concurrency_limit + class_attribute :concurrency_on_duplicate class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period end class_methods do - def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period) + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_duplicate: :block) self.concurrency_key = key self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration + self.concurrency_on_duplicate = on_duplicate end end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 17a658d7..eb1154b6 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -10,6 +10,14 @@ def perform(job_result) end end + class DiscardedNonOverlappingJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_duplicate: :discard + end + + class DiscardedOverlappingJob < NonOverlappingJob + limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_duplicate: :discard + end + class NonOverlappingGroupedJob1 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end @@ -98,6 +106,40 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert_equal active_job.concurrency_key, job.concurrency_key end + test "enqueue jobs with discarding concurrency controls" do + assert_ready do + active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A") + assert_equal 1, active_job.concurrency_limit + assert_equal "SolidQueue::JobTest::DiscardedNonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key + end + + assert_discarded do + active_job = DiscardedNonOverlappingJob.perform_later(@result, name: "A") + assert_equal 1, active_job.concurrency_limit + assert_equal "SolidQueue::JobTest::DiscardedNonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key + end + end + + test "enqueue jobs with discarding concurrency controls when below limit" do + assert_ready do + active_job = DiscardedOverlappingJob.perform_later(@result, name: "A") + assert_equal 2, active_job.concurrency_limit + assert_equal "SolidQueue::JobTest::DiscardedOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key + end + + assert_ready do + active_job = DiscardedOverlappingJob.perform_later(@result, name: "A") + assert_equal 2, active_job.concurrency_limit + assert_equal "SolidQueue::JobTest::DiscardedOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key + end + + assert_discarded do + active_job = DiscardedOverlappingJob.perform_later(@result, name: "A") + assert_equal 2, active_job.concurrency_limit + assert_equal "SolidQueue::JobTest::DiscardedOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key + end + end + test "enqueue jobs with concurrency controls in the same concurrency group" do assert_ready do active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A") @@ -289,8 +331,12 @@ def assert_blocked(&block) assert SolidQueue::Job.last.blocked? end - def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block) - assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do + def assert_discarded(&block) + assert_job_counts(discarded: 1, &block) + end + + def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block) + assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked + discarded) do assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block From e561356edcf8f0684b9c9fd5da4f5296d0e203f5 Mon Sep 17 00:00:00 2001 From: Joel Warrington <joelw@hey.com> Date: Tue, 25 Feb 2025 02:02:21 -0700 Subject: [PATCH 2/6] Remove 'duplicate' verbiage and use concurrency limits instead, simplify control flow --- app/models/solid_queue/job/concurrency_controls.rb | 11 ++++------- app/models/solid_queue/job/executable.rb | 5 +++-- lib/active_job/concurrency_controls.rb | 6 +++--- test/models/solid_queue/job_test.rb | 4 ++-- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 87b723a0..230f8811 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -8,7 +8,7 @@ module ConcurrencyControls included do has_one :blocked_execution - delegate :concurrency_limit, :concurrency_on_duplicate, :concurrency_duration, to: :job_class + delegate :concurrency_limit, :concurrency_at_limit, :concurrency_duration, to: :job_class before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? } end @@ -34,16 +34,13 @@ def blocked? end private - def duplicate? - Semaphore.at_limit?(self) - end - - def discard_on_duplicate? - concurrency_on_duplicate == :discard && duplicate? + def discard_concurrent? + concurrency_at_limit == :discard end def acquire_concurrency_lock return true unless concurrency_limited? + return false if Semaphore.at_limit?(self) && discard_concurrent? Semaphore.wait(self) end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index ba01132f..b5e30499 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -65,8 +65,9 @@ def prepare_for_execution end def dispatch - if discard_on_duplicate? then discard - elsif acquire_concurrency_lock then ready + if acquire_concurrency_lock then ready + elsif discard_concurrent? + discard else block end diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 6b0f08e8..76d75dfa 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -11,17 +11,17 @@ module ConcurrencyControls class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false class_attribute :concurrency_limit - class_attribute :concurrency_on_duplicate + class_attribute :concurrency_at_limit class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period end class_methods do - def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_duplicate: :block) + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, at_limit: :block) self.concurrency_key = key self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration - self.concurrency_on_duplicate = on_duplicate + self.concurrency_at_limit = at_limit end end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index eb1154b6..f6fddcb0 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -11,11 +11,11 @@ def perform(job_result) end class DiscardedNonOverlappingJob < NonOverlappingJob - limits_concurrency key: ->(job_result, **) { job_result }, on_duplicate: :discard + limits_concurrency key: ->(job_result, **) { job_result }, at_limit: :discard end class DiscardedOverlappingJob < NonOverlappingJob - limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_duplicate: :discard + limits_concurrency to: 2, key: ->(job_result, **) { job_result }, at_limit: :discard end class NonOverlappingGroupedJob1 < NonOverlappingJob From 90793ad7bb2127a167276a83d066d565eff3959d Mon Sep 17 00:00:00 2001 From: Joel Warrington <joelw@hey.com> Date: Fri, 7 Mar 2025 13:33:12 -0700 Subject: [PATCH 3/6] Fix race condition vulnerability by changing logic to enqueue --- app/models/solid_queue/job/concurrency_controls.rb | 1 - app/models/solid_queue/job/executable.rb | 3 +-- app/models/solid_queue/semaphore.rb | 12 ------------ 3 files changed, 1 insertion(+), 15 deletions(-) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 230f8811..8cb7b8e5 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -40,7 +40,6 @@ def discard_concurrent? def acquire_concurrency_lock return true unless concurrency_limited? - return false if Semaphore.at_limit?(self) && discard_concurrent? Semaphore.wait(self) end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index b5e30499..33849247 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -66,8 +66,7 @@ def prepare_for_execution def dispatch if acquire_concurrency_lock then ready - elsif discard_concurrent? - discard + elsif discard_concurrent? then discard else block end diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 96cce74a..62eeb035 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -10,10 +10,6 @@ def wait(job) Proxy.new(job).wait end - def at_limit?(job) - Proxy.new(job).at_limit? - end - def signal(job) Proxy.new(job).signal end @@ -43,14 +39,6 @@ def initialize(job) @job = job end - def at_limit? - if semaphore = Semaphore.find_by(key: key) - semaphore.value.zero? - else - false - end - end - def wait if semaphore = Semaphore.find_by(key: key) semaphore.value > 0 && attempt_decrement From b8dae8e5d510864214c285f10887580119e983d9 Mon Sep 17 00:00:00 2001 From: Joel Warrington <joelw@hey.com> Date: Fri, 7 Mar 2025 13:40:02 -0700 Subject: [PATCH 4/6] Add assertions when bulk enqueuing jobs with concurrency controls --- test/models/solid_queue/job_test.rb | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index f6fddcb0..5e2f8920 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -120,6 +120,16 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob end end + test "enqueuing multiple jobs with enqueue_all and concurrency controls" do + jobs = [ + DiscardedNonOverlappingJob.new(@result, name: "A"), + DiscardedNonOverlappingJob.new(@result, name: "A") + ] + + enqueued_jobs_count = SolidQueue::Job.enqueue_all(jobs) + assert_equal enqueued_jobs_count, 1 + end + test "enqueue jobs with discarding concurrency controls when below limit" do assert_ready do active_job = DiscardedOverlappingJob.perform_later(@result, name: "A") From ddb513f1b3b3ecc70034c3c00242cd1b2a8338ca Mon Sep 17 00:00:00 2001 From: Joel Warrington <joelw@hey.com> Date: Fri, 7 Mar 2025 17:27:43 -0700 Subject: [PATCH 5/6] Dispatch jobs in the order they were enqueued --- app/models/solid_queue/job.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 8574c1ec..137b2d7c 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -49,7 +49,7 @@ def create_from_active_job(active_job) def create_all_from_active_jobs(active_jobs) job_rows = active_jobs.map { |job| attributes_from_active_job(job) } insert_all(job_rows) - where(active_job_id: active_jobs.map(&:job_id)) + where(active_job_id: active_jobs.map(&:job_id)).order(id: :asc) end def attributes_from_active_job(active_job) From a3a70493e7696dc9f022e304ccd30a93bed63d6e Mon Sep 17 00:00:00 2001 From: Joel Warrington <joelw@hey.com> Date: Sat, 8 Mar 2025 13:22:59 -0700 Subject: [PATCH 6/6] Set ActiveJob successfully_enqueued for both enqueued/blocked and discarded jobs --- app/models/solid_queue/job.rb | 17 ++++++++++------- test/models/solid_queue/job_test.rb | 7 +++++-- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 137b2d7c..e83fdeeb 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -10,19 +10,22 @@ class EnqueueError < StandardError; end class << self def enqueue_all(active_jobs) - active_jobs_by_job_id = active_jobs.index_by(&:job_id) + enqueued_jobs_count = 0 transaction do jobs = create_all_from_active_jobs(active_jobs) - prepare_all_for_execution(jobs).tap do |enqueued_jobs| - enqueued_jobs.each do |enqueued_job| - active_jobs_by_job_id[enqueued_job.active_job_id].provider_job_id = enqueued_job.id - active_jobs_by_job_id[enqueued_job.active_job_id].successfully_enqueued = true - end + enqueued_jobs_by_active_job_id = prepare_all_for_execution(jobs).index_by(&:active_job_id) + + active_jobs.each do |active_job| + job = enqueued_jobs_by_active_job_id[active_job.job_id] + active_job.provider_job_id = job&.id + active_job.successfully_enqueued = job.present? end + + enqueued_jobs_count = enqueued_jobs_by_active_job_id.count end - active_jobs.count(&:successfully_enqueued?) + enqueued_jobs_count end def enqueue(active_job, scheduled_at: Time.current) diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 5e2f8920..d5d3942c 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -122,12 +122,15 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob test "enqueuing multiple jobs with enqueue_all and concurrency controls" do jobs = [ - DiscardedNonOverlappingJob.new(@result, name: "A"), - DiscardedNonOverlappingJob.new(@result, name: "A") + job_1 = DiscardedNonOverlappingJob.new(@result, name: "A"), + job_2 = DiscardedNonOverlappingJob.new(@result, name: "B") ] enqueued_jobs_count = SolidQueue::Job.enqueue_all(jobs) assert_equal enqueued_jobs_count, 1 + + assert job_1.successfully_enqueued? + assert_not job_2.successfully_enqueued? end test "enqueue jobs with discarding concurrency controls when below limit" do