Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions ruby/lib/ci/queue/redis/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,22 @@ def build
@build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config)
end

# Retry queue is pre-populated with failed test entries from the previous run.
# Don't replace them with the full preresolved/lazy test list.
# QueuePopulationStrategy#configure_lazy_queue will still set entry_resolver,
# so poll uses LazyEntryResolver to lazily load test files on demand.
# The random/batch_size params are intentionally ignored since we keep
# the existing queue contents as-is.
#
# Note: populate (non-stream) is intentionally NOT overridden here.
# RSpec and non-lazy Minitest retries call populate to build the
# @index mapping test IDs to runnable objects, which poll needs to
# yield proper test/example instances. In those paths, @queue contains
# bare test IDs that match @index keys, so populate works correctly.
def stream_populate(tests, random: nil, batch_size: nil)
self
end

private

attr_reader :redis
Expand Down
7 changes: 4 additions & 3 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,10 @@ def retrying?
def retry_queue
failures = build.failed_tests.to_set
log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1)
log = log.map { |entry| CI::Queue::QueueEntry.test_id(entry) }
log.select! { |test_id| failures.include?(test_id) }
log.uniq!
# Keep full entries (test_id + file_path) so lazy loading can resolve them.
# Filter by test_id against failures without stripping file paths.
log.select! { |entry| failures.include?(CI::Queue::QueueEntry.test_id(entry)) }
log.uniq! { |entry| CI::Queue::QueueEntry.test_id(entry) }
log.reverse!
Retry.new(log, config, redis: redis)
end
Expand Down
49 changes: 45 additions & 4 deletions ruby/lib/ci/queue/static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def from_uri(uri, config)
TEN_MINUTES = 60 * 10

attr_reader :progress, :total
attr_accessor :entry_resolver

def initialize(tests, config)
@queue = tests
Expand Down Expand Up @@ -50,6 +51,16 @@ def populate(tests, random: nil)
self
end

# Support lazy loading mode: accept an enumerator of entries and
# store them in queue order (no shuffling). This preserves the
# exact order from the input file for local reproduction.
def stream_populate(tests, random: nil, batch_size: nil)
@queue = []
tests.each { |entry| @queue << entry }
@total = @queue.size
self
end

def with_heartbeat(id, lease: nil)
yield
end
Expand Down Expand Up @@ -79,11 +90,15 @@ def expired?
end

def populated?
!!defined?(@index)
!!defined?(@index) || @queue.any?
end

def to_a
@queue.map { |i| index.fetch(i) }
if defined?(@index) && @index
@queue.map { |i| index.fetch(i) }
else
@queue.dup
end
end

def size
Expand All @@ -101,9 +116,28 @@ def running
def poll
while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && reserved_test = @queue.shift
reserved_tests << reserved_test
yield index.fetch(reserved_test)
if entry_resolver
resolved = entry_resolver.call(reserved_test)
# Track the original queue entry so requeue can push it back
# with its full payload (file path, load-error data, etc.).
reserved_entries[resolved.id] = reserved_test if resolved.respond_to?(:id)
yield resolved
elsif defined?(@index) && @index
# Queue entries may be JSON-formatted (with test_id + file_path) while
# the index is keyed by bare test_id from populate. Try the raw entry
# first, then fall back to extracting the test_id.
test_id = begin
CI::Queue::QueueEntry.test_id(reserved_test)
rescue JSON::ParserError
reserved_test
end
yield index.fetch(test_id)
else
yield reserved_test
end
end
reserved_tests.clear
reserved_entries.clear
end

def exhausted?
Expand Down Expand Up @@ -134,7 +168,10 @@ def requeue(entry)
return false unless should_requeue?(test_id)

requeues[test_id] += 1
@queue.unshift(test_id)
# Push back the original queue entry (with file path / load-error payload)
# so entry_resolver can fully resolve it on the next poll iteration.
original_entry = reserved_entries.delete(test_id) || test_id
@queue.unshift(original_entry)
true
end

Expand All @@ -150,6 +187,10 @@ def requeues
@requeues ||= Hash.new(0)
end

def reserved_entries
@reserved_entries ||= {}
end

def reserved_tests
@reserved_tests ||= Concurrent::Set.new
end
Expand Down
143 changes: 143 additions & 0 deletions ruby/test/ci/queue/redis_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,149 @@ def test_retry_queue_with_all_tests_passing_2
assert_equal retry_test_order, retry_test_order
end

def test_retry_queue_preserves_full_entries_with_file_paths
# Use stream_populate with file-path entries (as in preresolved mode),
# then verify retry_queue preserves the full entry including the file path.
@redis.flushdb
build_id = 'retry-file-paths'
leader = worker(1, populate: false, lazy_load_streaming_timeout: 2, queue_init_timeout: 2, build_id: build_id)
consumer = worker(2, populate: false, lazy_load_streaming_timeout: 2, queue_init_timeout: 2, build_id: build_id)
consumer.entry_resolver = ->(entry) { entry }

tests = [
EntryTest.new('ATest#test_foo', CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')),
EntryTest.new('ATest#test_bar', CI::Queue::QueueEntry.format('ATest#test_bar', '/tmp/a_test.rb')),
]

leader_thread = Thread.new do
leader.stream_populate(tests, random: Random.new(0), batch_size: 10)
end

timeout_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 2
loop do
status = @redis.get(leader.send(:key, 'master-status'))
break if status == 'ready'
raise "streaming status not set" if Process.clock_gettime(Process::CLOCK_MONOTONIC) > timeout_at
sleep 0.01
end

# Consumer polls all tests, failing the first one
failed_entry = nil
consumer.poll do |entry|
if failed_entry.nil?
failed_entry = entry
consumer.report_failure!
# record_error calls acknowledge internally
consumer.build.record_error(entry, 'Failed')
else
consumer.report_success!
consumer.acknowledge(entry)
end
end

leader_thread.join(2)

retry_queue = consumer.retry_queue
refute_predicate retry_queue, :exhausted?

retry_entries = retry_queue.instance_variable_get(:@queue).dup
assert_equal 1, retry_entries.size
# The critical assertion: retry entry must be a JSON entry with file_path,
# not just the bare test ID. A regression in retry_queue would strip this.
parsed = CI::Queue::QueueEntry.parse(retry_entries.first)
assert parsed[:file_path], "Retry entry should preserve the full entry with file path"
failed_test_id = CI::Queue::QueueEntry.test_id(failed_entry)
assert_equal failed_test_id, CI::Queue::QueueEntry.test_id(retry_entries.first)
ensure
leader_thread&.kill
end

def test_retry_queue_stream_populate_is_noop
target = shuffled_test_list.first
@queue.poll do |test|
if test == target
@queue.report_failure!
# record_error calls acknowledge internally
@queue.build.record_error(test.queue_entry, 'Failed')
else
@queue.report_success!
@queue.acknowledge(test.queue_entry)
end
end

retry_queue = @queue.retry_queue
original_queue_contents = retry_queue.instance_variable_get(:@queue).dup
refute_empty original_queue_contents

# stream_populate should NOT replace the retry queue's contents
dummy_entries = Enumerator.new do |yielder|
yielder << CI::Queue::QueueEntry.format("ZTest#test_zzz", "/tmp/z_test.rb")
end
retry_queue.stream_populate(dummy_entries, random: Random.new(0))

assert_equal original_queue_contents, retry_queue.instance_variable_get(:@queue),
"stream_populate should not replace retry queue contents"
end

def test_retry_queue_works_with_entry_resolver
# Fail a test, then verify retry queue works with entry_resolver (lazy loading)
target = shuffled_test_list.first
@queue.poll do |test|
if test == target
@queue.report_failure!
# record_error calls acknowledge internally
@queue.build.record_error(test.queue_entry, 'Failed')
else
@queue.report_success!
@queue.acknowledge(test.queue_entry)
end
end

retry_queue = @queue.retry_queue

# Set up entry_resolver (as configure_lazy_queue would do)
resolved_entries = []
retry_queue.entry_resolver = ->(entry) {
resolved_entries << entry
entry
}

# stream_populate is a no-op, preserving the retry entries
retry_queue.stream_populate(Enumerator.new { |y| }, random: Random.new(0))

# Poll should use entry_resolver, not index.fetch — no KeyError crash
polled = []
retry_queue.poll do |test|
polled << test
retry_queue.acknowledge(test)
end

assert_equal retry_queue.total, polled.size
assert_equal polled.size, resolved_entries.size,
"All polled entries should have gone through entry_resolver"
end

def test_retry_queue_with_multiple_failures_deduplicates
# Fail multiple tests, verify retry queue deduplicates by test_id
failed_ids = []
@queue.poll do |test|
@queue.report_failure!
@queue.build.record_error(test.queue_entry, 'Failed')
failed_ids << test.id
end

assert_operator failed_ids.size, :>=, 2, "Need multiple failures for this test"

retry_queue = @queue.retry_queue
retry_entries = retry_queue.instance_variable_get(:@queue).dup

# Each failed test should appear exactly once (no duplicates from requeues)
retry_test_ids = retry_entries.map { |e| CI::Queue::QueueEntry.test_id(e) }
assert_equal retry_test_ids.uniq, retry_test_ids,
"Retry queue should not contain duplicate test IDs"
assert_equal failed_ids.uniq.sort, retry_test_ids.sort
end

def test_shutdown
poll(@queue) do
@queue.shutdown!
Expand Down
Loading
Loading