diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index f2207691..7d010593 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -172,8 +172,11 @@ def replace_fork(pid, status) end end + # When a supervised fork crashes or exits we need to mark all the + # executions it had claimed as failed so that they can be retried + # by some other worker. def handle_claimed_jobs_by(terminated_fork, status) - if registered_process = process.supervisees.find_by(name: terminated_fork.name) + if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) error = Processes::ProcessExitError.new(status) registered_process.fail_all_claimed_executions_with(error) end diff --git a/test/test_helper.rb b/test/test_helper.rb index f54b73f2..c2e3ead3 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -97,4 +97,16 @@ def silent_on_thread_error_for(exceptions, on_thread_error) end end end + + # Waits until the given block returns truthy or the timeout is reached. + # Similar to other helper methods in this file but waits *for* a condition + # instead of *while* it is true. + def wait_for(timeout: 1.second, interval: 0.05) + Timeout.timeout(timeout) do + loop do + break if skip_active_record_query_cache { yield } + sleep interval + end + end + end end diff --git a/test/test_helpers/processes_test_helper.rb b/test/test_helpers/processes_test_helper.rb index 9a6d0f65..927ddfde 100644 --- a/test/test_helpers/processes_test_helper.rb +++ b/test/test_helpers/processes_test_helper.rb @@ -62,14 +62,19 @@ def find_processes_registered_as(kind) def terminate_process(pid, timeout: 10, signal: :TERM) signal_process(pid, signal) - wait_for_process_termination_with_timeout(pid, timeout: timeout) + wait_for_process_termination_with_timeout(pid, timeout: timeout, signaled: signal) end - def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0) + def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0, signaled: nil) Timeout.timeout(timeout) do if process_exists?(pid) - Process.waitpid(pid) - assert exitstatus, $?.exitstatus + begin + status = Process.waitpid2(pid).last + assert_equal exitstatus, status.exitstatus, "Expected pid #{pid} to exit with status #{exitstatus}" if status.exitstatus + assert_equal signaled, Signal.list.key(status.termsig).to_sym, "Expected pid #{pid} to be terminated with signal #{signaled}" if status.termsig + rescue Errno::ECHILD + # Child pid already reaped + end end end rescue Timeout::Error diff --git a/test/unit/process_recovery_test.rb b/test/unit/process_recovery_test.rb new file mode 100644 index 00000000..ec777fa7 --- /dev/null +++ b/test/unit/process_recovery_test.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +require "test_helper" + +class ProcessRecoveryTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + setup do + @pid = nil + JobResult.delete_all + end + + teardown do + terminate_process(@pid) if @pid + JobResult.delete_all + end + + test "supervisor handles missing process record and fails claimed executions properly" do + # Start a supervisor with one worker + @pid = run_supervisor_as_fork(workers: [ { queues: "*", polling_interval: 0.1, processes: 1 } ]) + wait_for_registered_processes(2, timeout: 1.second) # Supervisor + 1 worker + + supervisor_process = SolidQueue::Process.find_by(kind: "Supervisor", pid: @pid) + assert supervisor_process + + worker_process = SolidQueue::Process.find_by(kind: "Worker") + assert worker_process + + # Enqueue a job and manually claim it for the worker to avoid timing races + job = enqueue_store_result_job(42) + claimed_execution = SolidQueue::ReadyExecution.claim("*", 5, worker_process.id).first + assert claimed_execution.present? + assert_equal worker_process.id, claimed_execution.process_id + + # Simulate supervisor process record disappearing + supervisor_process.delete + assert_nil SolidQueue::Process.find_by(id: supervisor_process.id) + + # Terminate the worker process + worker_pid = worker_process.pid + terminate_process(worker_pid, signal: :KILL) + + + # Wait for the supervisor to reap the worker and fail the job + wait_for_failed_executions(1, timeout: 5.seconds) + + # Assert the execution is failed + failed_execution = SolidQueue::FailedExecution.last + assert failed_execution.present? + assert_equal "SolidQueue::Processes::ProcessExitError", failed_execution.exception_class + + # Ensure supervisor replaces the worker (even though its own record was missing) + wait_for_registered_processes(2, timeout: 5.seconds) + assert_operator SolidQueue::Process.where(kind: "Worker").count, :>=, 1 + end + + private + def assert_registered_workers_for(*queues, supervisor_pid: nil) + workers = find_processes_registered_as("Worker") + registered_queues = workers.map { |process| process.metadata["queues"] }.compact + assert_equal queues.map(&:to_s).sort, registered_queues.sort + if supervisor_pid + assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq + end + end + + def enqueue_store_result_job(value, queue_name = :default, **options) + StoreResultJob.set(queue: queue_name).perform_later(value, **options) + end + + def assert_no_claimed_jobs + skip_active_record_query_cache do + assert_empty SolidQueue::ClaimedExecution.all + end + end + + def wait_for_claimed_executions(count, timeout: 1.second) + wait_for(timeout: timeout) { SolidQueue::ClaimedExecution.count == count } + end + + def wait_for_failed_executions(count, timeout: 1.second) + wait_for(timeout: timeout) { SolidQueue::FailedExecution.count == count } + end +end diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 108ebb6f..7a531ad2 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -185,6 +185,33 @@ class SupervisorTest < ActiveSupport::TestCase end end + # Regression test for supervisor failing to handle claimed jobs when its own + # process record has been pruned (NoMethodError in #handle_claimed_jobs_by). + test "handle_claimed_jobs_by fails claimed executions even if supervisor record is missing" do + worker_name = "worker-test-#{SecureRandom.hex(4)}" + + worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name) + + job = StoreResultJob.perform_later(42) + claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first + + terminated_fork = Struct.new(:name).new(worker_name) + + DummyStatus = Struct.new(:pid, :exitstatus) do + def signaled? = false + def termsig = nil + end + status = DummyStatus.new(worker_process.pid, 1) + + supervisor = SolidQueue::Supervisor.allocate + + supervisor.send(:handle_claimed_jobs_by, terminated_fork, status) + + failed = SolidQueue::FailedExecution.find_by(job_id: claimed_execution.job_id) + assert failed.present? + assert_equal "SolidQueue::Processes::ProcessExitError", failed.exception_class + end + private def assert_registered_workers(supervisor_pid: nil, count: 1) assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid)