feat: FlushGate to limit concurrent flush operations#31
Merged
Conversation
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
With 100+ tables syncing, all flush threads can execute flush_buffer() concurrently, causing peak memory spikes and DuckLake commit lock contention. FlushGate is a global-per-group semaphore that limits concurrent flushes so only N threads flush at once while the rest continue buffering in low-memory mode. - FlushGate struct with try_acquire/release/set_max/active_count - Drain requests (TRUNCATE) bypass the gate for correctness - GUC: duckpipe.max_concurrent_flushes (default 4, range 1-1000, SIGHUP) - CLI: --max-concurrent-flushes for daemon - Dynamic update: GUC changes take effect each cycle without restart Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
77066a0 to
9622bef
Compare
…etric FlushGate improvements: - Exponential retry speedup: each consecutive denial halves the retry interval (5s → 2.5s → 1.25s → 625ms), so denied threads check more frequently as buffered data grows - Blocking acquire after MAX_GATE_RETRIES (4) denials: thread blocks on Condvar until a slot opens, guaranteeing progress and preventing unbounded buffering - release() notifies waiting threads via Condvar::notify_one() Code cleanup from review: - FlushGate visibility: pub → private (only used internally) - release(): saturating_sub → debug_assert + plain subtraction - Removed dead defensive check in gate-full branch Expose active_flushes metric: - GroupMetricsSlot gains active_flushes field (SHM) - worker_status() SQL function returns active_flushes column - metrics() JSON includes active_flushes in groups - Daemon MetricsCache + /metrics endpoint include active_flushes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the exponential-speedup retry approach with a strict FIFO ticket queue. Each flush thread takes a ticket on arrival and waits until its ticket is served — no starvation possible regardless of timing or concurrency patterns. On timeout, the thread forfeits its ticket so it doesn't block the queue. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
FlushGate acquire timeout now adapts to the median of the last 64 flush durations (floor 1s), falling back to flush_interval until enough data is collected. This ensures threads wait proportionally to actual flush costs rather than an arbitrary fixed interval. Also updates CODE_WALKTHROUGH.md, PARALLELISM.md, and PROGRESS.md to document FlushGate, ticket-based FIFO, adaptive timeout, active_flushes metric, and all new GUCs/CLI args. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add gate_wait_avg_ms and gate_timeouts metrics to FlushGate, exposed in worker_status(), metrics(), and daemon /metrics endpoint. These show flush thread contention (avg time waiting for a gate slot) and how often threads give up waiting (timeouts). Refactor SHM metrics from anonymous tuples to named GroupMetrics and TableMetrics structs defined in duckpipe-core, re-exported by duckpipe-pg. Eliminates unmaintainable HashMap<i32, (i64, i64, ...)> patterns across the codebase. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…teStats struct - Move adaptive timeout computation (median flush duration) inside acquire() so callers no longer need to manage timeout logic - Merge record_duration() into release(flush_duration) to eliminate a separate lock acquisition per flush - Replace bare (i64, i64) return from gate_stats() with named GateStats struct for self-documenting field access Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
FlushGatesemaphore toFlushCoordinatorthat limits how many flush threads can executeflush_buffer()concurrently per sync groupduckpipe.max_concurrent_flushes(default 4, range 1–1000, SIGHUP-level) with dynamic runtime updates--max-concurrent-flushesfor the standalone daemonTest plan
cargo check— no errorscargo fmt— formattedmake installcheck— all 31 regression tests passALTER SYSTEM SET duckpipe.max_concurrent_flushes = 1; SELECT pg_reload_conf();— verify only 1 flush at a time in debug logs🤖 Generated with Claude Code