Skip to content

fix(ci): prevent kafka consumer teardown hang with xdist#111261

Open
mchen-sentry wants to merge 3 commits intomasterfrom
mingchen/di-1753-fix-kafka-ci-shard-hang
Open

fix(ci): prevent kafka consumer teardown hang with xdist#111261
mchen-sentry wants to merge 3 commits intomasterfrom
mingchen/di-1753-fix-kafka-ci-shard-hang

Conversation

@mchen-sentry
Copy link
Member

@mchen-sentry mchen-sentry commented Mar 20, 2026

Observed these 99% completion hang failures after xdist. Reproduced on a test branch and a thread dump on the hung process showed:

File "arroyo/backends/kafka/consumer.py", line 348 in assignment_callback
File "arroyo/backends/kafka/consumer.py", line 719 in close
File "arroyo/processing/processor.py", line 567 in _shutdown
File "arroyo/processing/processor.py", line 353 in run
File "sentry/testutils/pytest/kafka.py", line 87 in scope_consumers

Let's run consumer.run() on a daemon thread with a 5-second join timeout. This still attempts a graceful shutdown, but bails out if the broker is slow. If the thread outlives the join, it dies on process exit and the broker detects the dead consumer via heartbeat timeout.

Tested that hangs no longer happen with the fix.

We could also potentially just remove consumer.run() entirely and skip graceful shutdown as the fixture is session-scoped so teardown runs right before process exit.

@mchen-sentry mchen-sentry requested a review from a team as a code owner March 20, 2026 22:26
@linear-code
Copy link

linear-code bot commented Mar 20, 2026

@github-actions github-actions bot added the Scope: Backend Automatically applied to PRs that change backend components label Mar 20, 2026
Copy link
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Missing try/except lets one consumer failure skip others
    • Added try/except block around signal_shutdown() call to ensure one consumer's failure doesn't prevent cleanup of remaining consumers in the loop.

Create PR

Or push these changes by commenting:

@cursor push bf4c8e5e4f
Preview (bf4c8e5e4f)
diff --git a/src/sentry/testutils/pytest/kafka.py b/src/sentry/testutils/pytest/kafka.py
--- a/src/sentry/testutils/pytest/kafka.py
+++ b/src/sentry/testutils/pytest/kafka.py
@@ -81,7 +81,10 @@
 
     for consumer_name, consumer in all_consumers.items():
         if consumer is not None:
-            consumer.signal_shutdown()
+            try:
+                consumer.signal_shutdown()
+            except Exception:
+                _log.warning("Could not shutdown consumer %s", consumer_name)
 
 
 @pytest.fixture(scope="function")

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Copy link
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Run `consumer.run()` on a daemon thread with a 5s join timeout
instead of calling it directly in the `scope_consumers` teardown.

The old teardown called `signal_shutdown()` then `run()` on the main
thread. `run()` calls `_shutdown()` → `consumer.close()`, which sends
a LeaveGroup to the broker and blocks in the rebalance callback.
With xdist, multiple workers tear down simultaneously, the broker
gets slow, and `close()` blocks indefinitely — hanging the CI shard
at ~99%.

The daemon thread still attempts a graceful shutdown but bails out
after 5 seconds. If the thread outlives the join, it dies on process
exit and the broker detects the dead consumer via heartbeat timeout.
Temporary instrumentation to understand why consumer.close() hangs
during scope_consumers teardown with xdist. Logs member_id, time
since last poll, and probes committed() before shutdown.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Scope: Backend Automatically applied to PRs that change backend components

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant