Reconnection bug#92
Open
appliedfunctor wants to merge 5 commits into
Open
Conversation
The v4 fs2-rabbit backend ran the consumer stream in a .background
fiber and silently ignored the fiber outcome (.flatMap { _ => ... }).
When a network blip caused the underlying channel to close, the stream
terminated with an error that was swallowed and the consumer was never
restarted.
The v3 Java backend used DefaultConsumer on an AutorecoveringChannel;
the Java AMQP client transparently re-registers those consumers after
connection recovery. The fs2-rabbit backend had no equivalent
mechanism.
Two bugs are fixed:
1. Connection recovery: registerConsumer now wraps runConsumer in a
recursive handleErrorWith retry loop (consumerWithRecovery) that
sleeps for config.networkRecoveryInterval then creates a fresh
channel and re-registers the consumer. A fresh channel is used on
each retry to avoid the dead internal fs2-rabbit queue that the
auto-recovered channel would otherwise accumulate messages into.
2. Handler exceptions: previously Async[F].fromEither(res) propagated
handler errors through the stream and killed it. Now handler
exceptions are logged and resolved via exceptionalAction, keeping
the stream alive.
Also adds config: AmqpClientConfig to Fs2RabbitAmqpClient so the
recovery delay can be driven by networkRecoveryInterval, and
StrictLogging for recovery/error logging.
Regression tests added in ConsumerConnectionRecoverySpec.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Adds a '4.0.3 and above' section explaining the bug (consumer stream silently dying on connection drop with no restart), which backend is affected (Fs2RabbitAmqpClient only, not JavaBackendAmqpClient), how it was fixed, and directs users to upgrade to v4.0.3. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Restores sonatypeCentralRelease (reverts accidental change to sonatypeCentralUpload that was included in the connection recovery commit). sonatypeCentralRelease uploads and automatically promotes the artifact; sonatypeCentralUpload only uploads and requires a manual release step. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
There was a problem hiding this comment.
Pull request overview
Fixes a reconnection failure mode in the fs2-rabbit backend (Fs2RabbitAmqpClient.registerConsumer) where consumer-stream failures could be swallowed and never restarted after an AMQP network blip, plus prevents handler exceptions from terminating the consumer stream.
Changes:
- Add an error-recovering consumer loop that recreates the channel/consumer and retries after
networkRecoveryInterval. - Convert handler exceptions into
exceptionalActionrather than propagating them through the stream. - Add README migration note and regression tests describing the bug/fix.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| README.md | Documents the v4.0.0–4.0.2 fs2-rabbit reconnection bug and upgrade guidance for 4.0.3. |
| backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/Fs2RabbitAmqpClient.scala | Implements retry-based recovery with fresh channel creation and handler-exception handling. |
| backendFs2Rabbit/src/test/scala/fs2rabbit/ConsumerConnectionRecoverySpec.scala | Adds regression tests for consumer recovery and handler-exception behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+74
to
+90
| "not kill the consumer stream when a handler throws an exception" in { | ||
| for { | ||
| processedCount <- IO.ref(0) | ||
| attempt <- IO.ref(0) | ||
|
|
||
| // First call simulates a handler exception; second call succeeds. | ||
| // With the fix the stream restarts and processes messages. | ||
| runConsumer: IO[Unit] = attempt.updateAndGet(_ + 1).flatMap { | ||
| case 1 => IO.raiseError(new RuntimeException("Handler blew up")) | ||
| case _ => processedCount.update(_ + 1) | ||
| } | ||
|
|
||
| _ <- consumerWithRecovery(runConsumer, delay = 50.millis) | ||
|
|
||
| count <- processedCount.get | ||
| } yield count shouldBe 1 | ||
| } |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
JosBogan
approved these changes
Jun 26, 2026
Extract acquireConsumerStream as a protected override point in Fs2RabbitAmqpClient so tests can inject controlled in-memory streams without a real RabbitMQ connection. Tests now subclass Fs2RabbitAmqpClient and override acquireConsumerStream to return a sequence of (acker, stream) pairs from a Ref, exercising the real registerConsumer implementation: - retry loop (consumerWithRecovery) recovers after a stream failure - handler exceptions are caught and routed to exceptionalAction rather than killing the stream - Ack/DeadLetter/RequeueImmediately map to the correct AckResult Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Root Cause
The bug was in Fs2RabbitAmqpClient.registerConsumer in the v4 fs2-rabbit backend.
The old code ran the consumer stream in .background and silently discarded the outcome:
When a network blip caused the AMQP channel to close, the fs2-rabbit stream terminated with an error. That error was never observed, and no restart was attempted.
v3 didn't have this problem because it used DefaultConsumer on a Java AutorecoveringChannel, which the AMQP Java client automatically re-registers after recovery — no application-level retry needed. v4 switched to an fs2 stream-based consumer which has no such built-in recovery.
A secondary bug: handler exceptions propagated via Async[F].fromEither(res) and also killed the stream instead of using exceptionalAction.
Fix
Connection recovery — wrapped the consumer loop in a recursive handleErrorWith retry (consumerWithRecovery) that sleeps for config.networkRecoveryInterval then creates a fresh channel and consumer on each attempt. A fresh channel is critical because the auto-recovered channel's dead internal fs2-rabbit queue would otherwise accumulate un-acked messages.
Handler exceptions — now caught and resolved via exceptionalAction (logged + the action applied) rather than crashing the stream.
config: AmqpClientConfig added to the class so the recovery delay tracks networkRecoveryInterval.
Tests
Three regression tests added in ConsumerConnectionRecoverySpec: