feat(redis_admin): chunked celery broker clear with progress bar + wildcard sentinel fix#904
Conversation
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (88.18%) is below the target coverage (90.00%). You can increase the patch coverage or adjust the target coverage. 📢 Thoughts on this report? Let us know! |
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (88.18%) is below the target coverage (90.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #904 +/- ##
==========================================
- Coverage 91.91% 91.89% -0.02%
==========================================
Files 1316 1316
Lines 50380 50586 +206
Branches 1625 1625
==========================================
+ Hits 46305 46485 +180
- Misses 3769 3795 +26
Partials 306 306
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
The cap split in this branch left the streaming clear bound to `CELERY_BROKER_SCAN_LIMIT` (100_000), which is the same window the chart sampler uses. On a queue saturated past 2x cap pass 1 and pass 2 each tombstoned exactly that many matches and the `prev_lset == matches_lset` plateau check spuriously declared convergence, leaving the rest of the queue in place. Fold in #904's fix: drop the cap from `_streaming_celery_clear` so each pass walks `LLEN(queue)`. The plateau check stays for genuine consumer-side drift. Update the `CELERY_BROKER_SCAN_LIMIT` docstring + README to reflect that the clear is intentionally unbounded and only the chart aggregator uses the sample window. Adds the regression test from #904 (renamed to drop the now- irrelevant `MAX_ITEMS_PER_KEY` reference): seeds 41k matching messages and asserts `LLEN == 1` after a single clear pass — fails on the pre-fold #903 (cap = 100k matches) and passes after.
…nd-to-end PR #903's clear-queue path has regressed twice (folded fixes: PR #904 -- streaming clear walks full queue, no artificial cap; and the synthetic-target fix -- clear-by-filter uses a single synthetic target so deep-queue clears don't silently no-op). Both regressions were silent at the existing-test level because the assertions inferred drain success from `total_lset`/`stats.total_found` rather than walking the post-call queue. Add a small focused module that: * Asserts `LLEN(queue) == initial_total - matches` explicitly before/after `celery_broker_clear`. * Walks the full post-call queue with `parse_celery_envelope` and *positively* counts how many envelopes still match the filter triple, so any future regression that silently leaves matches behind (capped scan, dropped tail, etc.) fails immediately. * Exercises queues with `LLEN > CELERY_BROKER_DISPLAY_LIMIT` for every scenario so a queryset-bounded clear can never sneak back in. Three scenarios: * `test_celery_broker_clear_drains_deep_queue_end_to_end` -- CELERY_BROKER_DISPLAY_LIMIT + 500 matching envelopes, single synthetic target through `celery_broker_clear(dry_run=False, keep_one=False)`, asserts `LLEN == 0` and zero remaining matches via the full-queue walk. * `test_celery_broker_clear_keep_one_deep_queue_leaves_exactly_one_at_head` -- CELERY_BROKER_DISPLAY_LIMIT + 100 matching envelopes, `keep_one=True`, asserts exactly one survivor sitting at index 0 (head-of-queue, the next message a worker would BLPOP). * `test_celery_broker_clear_mixed_content_deep_queue_clears_only_filter` -- three task buckets (A: target, B: same repoid+commit but different task, C: same task but different repoid+commit), B + C seeded both before A (head-of-queue) and after A (past the display window) so the clear must walk the full LLEN. Asserts zero A survivors, every B and C survives. Lives in its own module to insulate from churn on the much larger `test_celery_broker_queue.py` and uses `fakeredis` + `SimpleNamespace` so it runs cleanly under \`pytest -m 'not django_db'\`.
Bugbot review on PR #903 caught a high-severity over-broaden in the wildcard fix from ebbb6cf: `_envelope_matches_any_filter` treated `None` slots in any filter tuple as wildcards, but the per-message clear paths (`clear_selected`, `clear_dry_run`, `delete_model`, `delete_queryset`) build filter tuples from materialised rows whose envelope legitimately carries `None` in a slot (e.g. a `sync_repos` task with `repoid=None` and `commitid=None`). Under the prior wildcard semantic, single-row clear of one such message would have tombstoned EVERY message sharing the task name regardless of repoid / commit -- a real data-loss regression in production. Fix introduces a dedicated `_FILTER_ANY` sentinel (a `_FilterWildcard` singleton) for "do not constrain on this slot" that is *distinct from* `None`. The match logic checks `is _FILTER_ANY` (identity) for the wildcard branch and falls through to `==` for everything else, so: * Operator-input paths (`streaming_celery_count`, the `clear_by_filter_view` synthetic target) substitute `_FILTER_ANY` for unset slots and get wildcard semantics. * Per-message paths keep passing real envelope values -- including legitimate `None` slots -- and get exact-tuple membership semantics (`(task, None, None)` matches only `(task, None, None)` envelopes, not every envelope sharing the task name). Adds a regression test (`test_celery_broker_clear_per_message_target_does_not_overmatch_on_none_slots`) seeding two `(sync_repos, None, None)` envelopes alongside three `(sync_repos, repoid, commitid)` envelopes; clearing a single materialised target with `(repoid, commitid) = (None, None)` must tombstone exactly the two matching envelopes, not all five. Pre-fix that test would have asserted `result.count == 5` and `llen == 0` -- a 3-message data loss for the operator who only wanted to clear two stuck messages. Also updates the existing wildcard regression test (`test_celery_broker_clear_synthetic_target_with_wildcard_task_name_drains_all_matches`) to construct its synthetic target with `task_name=_FILTER_ANY` instead of `None`, matching the new contract the `clear_by_filter_view` synthetic target uses.
720a993 to
8566f3f
Compare
…ncel Layered on top of the wildcard sentinel fix from 77047ba so PR #904 ships both the high-severity data-loss fix from #903's bugbot review AND the next round of clear-queue UX work in a single PR. Decouples the destructive clear's wall-clock from the HTTP request lifetime: a confirmed `clear_all` / `clear_keep_one` submit on `clear-by-filter/` no longer runs the streaming clear synchronously inside the gunicorn worker -- it spawns a background daemon thread tracked under `redis_admin:celery_clear_job:<uuid>` in the cache Redis (kept off the broker so a clear that drains the broker can't evict its own progress hash) and 302s the operator to a progress page that polls a status JSON view every ~1s. The page renders a `<progress>` bar, matched/total/drift/pass counters, a status pill (neutral / blue / green / amber / red per state), and a Cancel button that lands at the next chunk boundary (default 1000 messages per LRANGE chunk in the chunked variant -- the synchronous path keeps the existing 10k chunk size unchanged). Service layer ------------- - `start_celery_broker_clear_job(queue, *, user, task_name, repoid, commitid, keep_one, dry_run) -> str` snapshots `LLEN(queue)` as `total_estimated`, writes the initial job hash with `status=pending`, sets a 24h TTL so abandoned jobs auto-cleanup, then starts a daemon thread named `redis_admin.clear_job.<8hex>`. - `get_celery_broker_clear_job(job_id)` reads the hash; the status JSON view casts numerics to int before serialising for a stable wire shape pinned by tests. - `request_cancel_celery_broker_clear_job(job_id)` flips `cancel_requested=1` (idempotent; returns False on unknown id). `_streaming_celery_clear` gains two optional kwargs -- `chunk_size` and `progress_callback` -- so the chunked-job worker and the synchronous `celery_broker_clear` share the same chunk inner-loop. The callback receives a `_ChunkProgress` snapshot (per-chunk deltas + running totals) at every chunk boundary; returning True triggers a clean cancel: in-flight tombstones for the current pass are LREMmed (queue is left clean, not partial- graveyard) and `_StreamingClearStats.cancelled=True`. Cancel only ever lands at chunk boundaries, never mid-LSET, so a tombstone we already wrote is always paired with the matching LREM at pass-end or cancel-drain. Worker exceptions are swallowed: `log.exception(...)` so Sentry captures, plus `status=failed` + `error=<str(exc)>` on the hash so the operator's progress page shows the failure. The daemon thread never propagates to the gunicorn worker. Audit log: a `LogEntry` row is written at job-completion (not job-start) with `mode` ∈ `chunked-dry-run` / `chunked-all-from-bucket` / `chunked-all-but-first` (vs synchronous `dry-run` / `all-from-bucket` / `all-but-first`) plus `job_id`, `passes_run`, `total_drifted`, `cancelled` extras so queries can distinguish chunked from synchronous clears. Why threading and not Celery: the queue we're clearing is the Celery broker, so submitting a control task to it creates a circular dependency, and the operator pod isn't part of the worker fleet so the task wouldn't pick up anyway. Restart-safety trade-off documented in README -- a worker killed mid-job leaves in-flight tombstones in the queue; operator can re-run (the new pass walks the full queue and skips them) or `LREM` by hand. Admin views ----------- Three new superuser-only URLs under `clear-by-filter/job/<uuid>/`: - `clear_by_filter_progress_view` -- renders the progress page HTML with an initial server-side snapshot so the page is informative even before JS loads (matched / processed / drift / pass / status pill / timestamps). - `clear_by_filter_status_view` -- JSON snapshot of the job hash. 404s on unknown / expired ids. - `clear_by_filter_cancel_view` -- POST-only (405 on GET). Sets `cancel_requested=1` and returns 202 with the latest snapshot. `clear_by_filter_view` rewired so confirmed destructive submits now spawn a job and 302 to the progress page; dry-run keeps the synchronous shape (audit-log + re-rendered preview) since the streaming-count walk is fast. Frontend -------- CSP-compliant -- all CSS / JS loads via `{% static %}`, no inline `<script>` / `<style>`. Mirrors the same approach as PR #902's chart fragment loader (`celery_chart_fragment.{js,css}`). - `clear_by_filter_progress.html` -- card layout, native `<progress>` element so screen readers announce progress, real `<form>` for the cancel button so the page degrades gracefully without JS. - `celery_clear_progress.js` -- IIFE, no globals. Polls every 1s, backs off (1s -> 2s -> 5s, capped) on errors. Stops on terminal status, switches the layout (Cancel button reads "Done" / "Cancelled" / "Failed", back link enables). Cancel button hijacks form submit, fires `fetch()` with the CSRF token, prevents double-clicks. - `celery_clear_progress.css` -- Django-admin theme variable driven (light + dark), `prefers-reduced-motion` fallback. Tests ----- 12 new tests in `redis_admin/tests/test_celery_broker_clear_job.py`, 11 of which run under `-m 'not django_db'` (the audit-log test needs `LogEntry` and runs in CI under `@pytest.mark.django_db`): - service: hash shape pinned, deep-queue drain end-to-end (CELERY_BROKER_DISPLAY_LIMIT + 500 messages -> queue empty), cancel halts mid-pass with remaining matches intact, keep_one leaves exactly one survivor at index 0, monotonic progress callback, failure swallows exception + records `status=failed`. - views: status JSON shape pinned, cancel rejects GET (405), unknown id returns 404 JSON, confirmed submit on `clear_by_filter_view` redirects to the progress page, dry-run preview unchanged (no chunked job spawned). - audit (django_db): chunked-mode payload contains `mode`, `job_id`, `cancelled`, `passes_run`. Request timeouts ---------------- In-repo gunicorn `--timeout` is at `apps/codecov-api/api.sh:118` defaulting to 600s (env-overridable via `GUNICORN_TIMEOUT`). The user-observed 120s ceiling is NOT in this repo; most likely sources are external (Cloudflare's free-tier 100s ceiling, GCP HTTPS LB Backend Service timeout, or k8s ingress annotations) and need an infra-side investigation. Knob is documented in the README; this PR doesn't bump it (deploy-tuning decision). Long-term, the chunked-clear flow itself sidesteps the entire upstream-timeout class -- the POST that starts the clear spawns a thread + 302s in well under a second, and each subsequent poll is sub-millisecond, so no HTTP request ever sits on the actual clearing work.
The chunked clear job already decouples the clear's wall-clock from the HTTP request lifetime, so the gunicorn `--timeout` knob is no longer relevant to whether a clear succeeds. Drop the README + PR-body discussion of `apps/codecov-api/api.sh:118`, `GUNICORN_TIMEOUT`, Cloudflare's per-request ceiling, the GCP HTTPS LB Backend Service `timeoutSec`, and the ingress annotations — operators don't need to reason about any of those to use the new progress-bar flow. Trim the corresponding "Why this exists" paragraph in the README and the analogous header comment in services.py to just the design intent.
Two Bugbot findings on PR #904: * MEDIUM (`celery_clear_progress.js`): `setStatusPill` filtered classes by the `celery-clear-status-` prefix, which dropped the base `celery-clear-status-pill` class along with the previous state class. After the first poll the pill lost all base styling (display, padding, border-radius, font-weight, etc.) and only the colour overrides remained. Replace the prefix-strip with an explicit `STATE_CLASSES` enumeration so only state classes are toggled and the base class survives. * LOW (`services.py`): `_ChunkProgress.chunk_index` was documented as 0-based but incremented BEFORE being passed to the callback, effectively making it 1-based (first chunk reported 1, last reported `chunks_total`). Move the increment to AFTER the callback so the snapshot describes the chunk that just finished and the documented contract holds. Adds a regression assertion that snapshots[*].chunk_index == [0, 1, ..., chunks_total - 1].
CI failure on PR #904 traced to a Django + threading transaction- visibility issue, not a logic regression: the chunked-mode audit- log test creates a `User` via `UserFactory()` from the test thread, then `start_celery_broker_clear_job` spawns the daemon worker which opens its own DB connection and tries to insert a `LogEntry` referring to that user. With plain `@pytest.mark. django_db`, the test thread's writes live inside an uncommitted transaction the worker thread can't see, so the FK to `users.id` fails: `ForeignKeyViolation: Key (user_id)=(N) is not present in table "users".` Fix: * Switch the test to `@pytest.mark.django_db(transaction=True)` so the user is committed before the worker thread reads it (Django's `TransactionTestCase` semantics). * Add `close_old_connections()` to the worker thread's `finally` block so the lazily-opened ORM connection drops at thread exit. Without that, `transaction=True`'s truncate-on-teardown would block on our idle connection, and a long-lived gunicorn worker would slowly leak one connection per chunked clear that ever ran on it. `close_old_connections` is the documented helper for this exact background-thread cleanup case. No production-code logic change. The non-DB tests continue to pass (11/11); the `transaction=True` test will be validated by CI's API CI / Test job since this sandbox can't reach Postgres.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 564b40c. Configure here.
…lper Bugbot review on PR #904 (LOW): the operator-input filter-tuple construction `(task_name if task_name else _FILTER_ANY, repoid if repoid is not None else _FILTER_ANY, commitid if commitid else _FILTER_ANY)` was duplicated identically in three sites: `streaming_celery_count`, `_run_celery_broker_clear_job_body`, and `clear_by_filter_view`. If the rule diverged (new filter field, or a slot's truthiness check loosened) the dry-run count, the chunked clear, and the synchronous preview would silently disagree on which envelopes match — a data-loss vector. Extract a `_substitute_filter_any(task_name, repoid, commitid)` helper next to the `_FILTER_ANY` sentinel definition so all three call sites share one rule, and add a regression test that pins the contract (both directly and against the literal inline expression the three sites used to repeat).

Summary
Re-purposes the (already-approved) PR #904 for this next round of redis_admin work, layered on top of #903's just-merged
main:HIGH-severity wildcard data-loss fix (cherry-pick of
77047ba8from the now-orphaned post-feat(redis_admin): split celery_broker caps into display vs scan limits #903 branch)._envelope_matches_any_filterwas treatingNoneslots in any filter tuple as wildcards, but per-message clear paths build filter tuples directly from materialised rows whose envelope legitimately carriesNone(e.g. async_repostask withrepoid=None, commitid=None). Single-row clear of one such message would have tombstoned EVERY message sharing the task name. Fix introduces a dedicated_FILTER_ANYsentinel singleton distinct fromNone; per-message paths keep exact-tuple semantics, operator-input paths (streaming_celery_count,clear_by_filter_view's synthetic target) substitute_FILTER_ANYfor unset slots and get wildcard semantics.Chunked clear with progress bar.
clear_by_filter_view's confirmed-submit no longer runs the streaming clear synchronously inside the HTTP request — it spawns a background daemon thread tracked in the Redis cache connection underredis_admin:celery_clear_job:<uuid>and 302s the browser to a progress page that polls a status JSON endpoint every 1s. The page renders a live<progress>bar, matched / total / drift / pass counters, status pill (neutral / blue / green / amber / red), and a Cancel button that drops out at the next chunk boundary (default 1000 messages per chunk in the chunked variant; the synchronous path keeps its existing 10k chunk size). Job hashes get a 24h TTL so abandoned ones auto-cleanup.Why this lives on PR #904 (not a fresh PR)
PR #904 was already approved before #903 merged. We've reset its branch to
main, cherry-picked the data-loss sentinel fix on top, and rebuilt the chunked-progress diff there so we can ride the existing approval through the merge queue rather than block on a fresh review cycle. Approval may auto-invalidate on the force-push depending on branch protection — re-request review if so.Folded historical PRs
b96458ca9. Its diff (streaming chart + clear + orjson + display vs scan caps) is already onmain.Call sites
redis_admin/admin.py(clear_by_filter_viewconfirmed submit)services.py:~2059start_celery_broker_clear_job(...)then 302 to progress pageredis_admin/admin.py(get_urls)~1785–1798clear-by-filter/job/<uuid:job_id>/redis_admin/admin.py(clear_by_filter_progress_view)~2180get_celery_broker_clear_job(job_id_str)redis_admin/admin.py(clear_by_filter_status_view)~2228get_celery_broker_clear_job(job_id_str)redis_admin/admin.py(clear_by_filter_cancel_view)~2277request_cancel_celery_broker_clear_job(job_id_str)thenget_celery_broker_clear_jobredis_admin/services.py(_run_celery_broker_clear_job_body)_streaming_celery_clear(..., chunk_size=_CELERY_CLEAR_CHUNK_JOB, progress_callback=...),_record_audit(..., extra={"mode": "chunked-...", "job_id": ..., "cancelled": ...})redis_admin/services.py(_streaming_celery_clear)celery_broker_clearcontinues to call withoutchunk_size/progress_callback; default behaviour preserved (no test regressions).Test plan
sync_repos-shaped envelope tombstones only the selected message (covered by tests cherry-picked in8566f3f83).test_clear_job_drains_deep_queue_end_to_end).test_clear_job_cancel_stops_at_chunk_boundary).test_clear_job_keep_one_leaves_one_at_head).test_clear_job_writes_audit_row_on_completion_with_chunked_mode,django_db).status=failed, doesn't bubble to caller (test_clear_job_failure_records_status_failed_and_swallows_exception).test_clear_job_status_view_returns_json_for_running_job).test_clear_job_cancel_view_requires_post).test_clear_job_status_view_404_for_unknown_id).test_clear_by_filter_view_redirects_to_progress_page_on_confirmed_submit,test_clear_by_filter_view_dry_run_preview_unchanged).mainstill pass (drain verification, sort order, family filter dropdown, dry-run preview, streaming-clear keep_one, etc).What's in this PR (file-level)
apps/codecov-api/redis_admin/services.py—_streaming_celery_clearacceptschunk_size+progress_callback; newstart_/get_/request_cancel_celery_broker_clear_job+ the daemon worker; new_StreamingClearStats.cancelledfield;_FILTER_ANYsentinel from the cherry-pick.apps/codecov-api/redis_admin/admin.py— three new URL patterns + views (clear_by_filter_progress_view,clear_by_filter_status_view,clear_by_filter_cancel_view);clear_by_filter_viewrewired so destructive actions spawn a chunked job; dry-run unchanged.apps/codecov-api/redis_admin/templates/admin/redis_admin/celerybrokerqueue/clear_by_filter_progress.html— new (CSP-compliant; loads CSS / JS via{% static %}).apps/codecov-api/redis_admin/static/redis_admin/js/celery_clear_progress.js— new (IIFE, polling + cancel + 1s/2s/5s backoff).apps/codecov-api/redis_admin/static/redis_admin/css/celery_clear_progress.css— new (Django admin theme variables,prefers-reduced-motionfallback).apps/codecov-api/redis_admin/tests/test_celery_broker_clear_job.py— new (12 tests, 11 of which run under-m 'not django_db').apps/codecov-api/redis_admin/README.md— new "Chunked clear jobs" section; updated URL table.Things this PR explicitly does NOT do
redis-admin/split-celery-broker-capsbranch (the cherry-pick source).Note
Medium Risk
Adds a new background-thread, Redis-backed job system for clearing Celery broker queues and changes the destructive clear-by-filter flow to run asynchronously, which could impact operational queue-clearing behavior if bugs exist. Risk is mitigated by superuser-only gating and extensive new tests, plus a fix to prevent over-broad deletes when filter fields are
None.Overview
Updates the
celerybrokerqueueadmin clear-by-filter destructive actions to run as an asynchronous, chunked clear job instead of blocking the HTTP request, redirecting operators to a new progress page that polls a status JSON endpoint and supports cancellation.Introduces a Redis cache–stored job state hash + worker thread implementation, extends the streaming clear to support per-chunk progress/cancel callbacks, and adds new admin routes/templates/static assets for the progress UI.
Fixes a high-severity filtering bug by introducing a
_FILTER_ANYsentinel (distinct fromNone) so operator-supplied “unset” filters act as wildcards without causing per-message clears of(task, None, None)envelopes to over-match and delete unrelated messages; adds comprehensive unit tests and docs for both changes.Reviewed by Cursor Bugbot for commit 4492208. Bugbot is set up for automated code reviews on this repo. Configure here.