Skip to content

Commit d82e39c

Browse files
committed
Fixes issue #482: Work with < Rubies 3.2
The current Thread::Queue based Interruptible can not work with Ruby version earlier than 3.2. Given that SQ sets it's minimum supported version of Ruby is derived from "full support of Rails 7.1", this in theory mandates support for Ruby 2.7.8. However, other dependencies force the minimum version of Ruby to: 3.1.6 This commit adds a boot time check of the Ruby version and selects either the current or original implementation of Interruptible.
1 parent b197a9a commit d82e39c

File tree

5 files changed

+61
-8
lines changed

5 files changed

+61
-8
lines changed

Diff for: lib/solid_queue/dispatcher.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def metadata
2525
def poll
2626
batch = dispatch_next_batch
2727

28-
batch.size.zero? ? polling_interval : 0.seconds
28+
batch.zero? ? polling_interval : 0.seconds
2929
end
3030

3131
def dispatch_next_batch

Diff for: lib/solid_queue/engine.rb

+8
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,13 @@ class Engine < ::Rails::Engine
3737
include ActiveJob::ConcurrencyControls
3838
end
3939
end
40+
41+
initializer "solid_queue.include_interruptible_concern" do
42+
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new("3.2")
43+
SolidQueue::Processes::Base.include SolidQueue::Processes::Interruptible
44+
else
45+
SolidQueue::Processes::Base.include SolidQueue::Processes::OgInterruptible
46+
end
47+
end
4048
end
4149
end

Diff for: lib/solid_queue/processes/base.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ module SolidQueue
44
module Processes
55
class Base
66
include Callbacks # Defines callbacks needed by other concerns
7-
include AppExecutor, Registrable, Interruptible, Procline
7+
include AppExecutor, Registrable, Procline
88

99
attr_reader :name
1010

Diff for: lib/solid_queue/processes/interruptible.rb

+10-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module SolidQueue::Processes
44
module Interruptible
5+
include SolidQueue::AppExecutor
6+
57
def wake_up
68
interrupt
79
end
@@ -13,17 +15,19 @@ def interrupt
1315
end
1416

1517
# Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up.
16-
# @param time [Numeric] the time to sleep. 0 returns immediately.
17-
# @return [true, nil]
18-
# * returns `true` if an interrupt was requested via #wake_up between the
19-
# last call to `interruptible_sleep` and now, resulting in an early return.
20-
# * returns `nil` if it slept the full `time` and was not interrupted.
18+
# @param time [Numeric, Duration] the time to sleep. 0 returns immediately.
2119
def interruptible_sleep(time)
2220
# Invoking this from the main thread may result in significant slowdown.
2321
# Utilizing asynchronous execution (Futures) addresses this performance issue.
2422
Concurrent::Promises.future(time) do |timeout|
25-
queue.pop(timeout:).tap { queue.clear }
23+
queue.clear unless queue.pop(timeout:).nil?
24+
end.on_rejection! do |e|
25+
wrapped_exception = RuntimeError.new("Interruptible#interruptible_sleep - #{e.class}: #{e.message}")
26+
wrapped_exception.set_backtrace(e.backtrace)
27+
handle_thread_error(wrapped_exception)
2628
end.value
29+
30+
nil
2731
end
2832

2933
def queue

Diff for: lib/solid_queue/processes/og_interruptible.rb

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# frozen_string_literal: true
2+
3+
# frozen_string_literal: true
4+
5+
module SolidQueue::Processes
6+
# The original implementation of Interruptible that works
7+
# with Ruby 3.1 and earlier
8+
module OgInterruptible
9+
def wake_up
10+
interrupt
11+
end
12+
13+
private
14+
SELF_PIPE_BLOCK_SIZE = 11
15+
16+
def interrupt
17+
self_pipe[:writer].write_nonblock(".")
18+
rescue Errno::EAGAIN, Errno::EINTR
19+
# Ignore writes that would block and retry
20+
# if another signal arrived while writing
21+
retry
22+
end
23+
24+
def interruptible_sleep(time)
25+
if time > 0 && self_pipe[:reader].wait_readable(time)
26+
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
27+
end
28+
rescue Errno::EAGAIN, Errno::EINTR
29+
end
30+
31+
# Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html)
32+
def self_pipe
33+
@self_pipe ||= create_self_pipe
34+
end
35+
36+
def create_self_pipe
37+
reader, writer = IO.pipe
38+
{ reader: reader, writer: writer }
39+
end
40+
end
41+
end

0 commit comments

Comments
 (0)