feat(autopipeline): #3 wire into Client and ClusterClient#3867
feat(autopipeline): #3 wire into Client and ClusterClient#3867ndyakov wants to merge 21 commits into
Conversation
🛡️ Jit Security Scan Results✅ No security findings were detected in this PR
Security scan by Jit
|
There was a problem hiding this comment.
Pull request overview
This PR makes the new autopipelining engine usable from application code by exposing AutoPipeline() (blocking) and AsyncAutoPipeline() (deferred) on both *Client and *ClusterClient, adds cluster-aware sharding/slot memoization, and introduces an optional dedicated connection pool for pipelining with configurable (larger) buffers. It also wires observability/maintnotifications hooks into the extra pool, updates command accessors to support deferred execution, and adds extensive tests + documentation/examples.
Changes:
- Add public
AutoPipeline()/AsyncAutoPipeline()APIs onClientandClusterClient, with lifecycle wiring (cache + Close) and cluster slot-based sharding. - Add optional dedicated pipeline connection pool (
Pipeline*options) and integrate it with OTel metrics, maintnotifications, and streaming credentials hooks. - Introduce deferred-command readiness (
await) and update many command types + tests/docs/examples to support async autopipeline behavior.
Reviewed changes
Copilot reviewed 45 out of 46 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| timeseries_commands.go | Add await() to TS command accessors for deferred execution. |
| sentinel.go | Plumb pipeline-pool sizing/buffer options through failover options; create pipeline pool. |
| search_commands.go | Add await() to several search/FT command accessors; minor formatting cleanup. |
| ring.go | Add pipeline-pool option fields to RingOptions and pass through to Options. |
| redis.go | Add optional pipeline pool, withPipelineConn, OTel registration, autopipeliner caching/Close wiring, and route pipeline execution via pipeline pool. |
| README.md | Document new autopipeline APIs with usage examples. |
| probabilistic.go | Add await() to probabilistic module command accessors. |
| pipeline.go | Add putPipeliner to clear pipeline references post-batch for GC friendliness. |
| pipeline_buffer_test.go | New tests covering pipeline pool existence/stats and backwards-compat behavior. |
| osscluster.go | Add pipeline pool options to ClusterOptions, add cluster autopipeline APIs, slot caching, and sharding function. |
| osscluster_autopipeline_test.go | New basic cluster autopipeline tests (routing/concurrency/cross-slot). |
| osscluster_autopipeline_correctness_test.go | New correctness tests for cross-slot routing and per-goroutine order on cluster. |
| osscluster_autopipeline_buffer_test.go | New cluster test for zero-copy buffer ops via autopipeline. |
| options.go | Add PipelineReadBufferSize, PipelineWriteBufferSize, PipelinePoolSize, and AutoPipelineConfig documentation/config fields. |
| maintnotifications/manager.go | Support attaching independent maintnotifications hooks to additional pools and cleanly detaching on Close. |
| maintnotifications/additional_pool_hook_test.go | New tests for additional-pool hook attachment/detachment behavior. |
| json.go | Add await() to JSON command accessors; avoid deadlock by using rawErr() in readReply. |
| internal/proto/reader.go | Optimize ReadStringInto to read payload+CRLF in one read when capacity allows. |
| internal/proto/reader_test.go | New tests validating ReadStringInto fast/slow paths keep stream aligned. |
| internal/pool/pool.go | Add PipelineStats to pool stats struct for optional pipeline pool. |
| internal/otel/metrics.go | Extend pool registration/unregistration to include optional pipeline pool. |
| example/autopipeline/main.go | New runnable benchmark/example comparing normal vs autopipeline approaches. |
| example/autopipeline/go.sum | Module sums for the new example module. |
| example/autopipeline/go.mod | New example module definition (with replace to local repo). |
| command.go | Add slot caching + async readiness hooks; update error helpers to use rawErr() to avoid deadlocks. |
| autopipeline.go | New autopipeliner implementation (sharded queues, batching, concurrency control, delayed flush, etc.). |
| autopipeline_typed_test.go | New tests for typed commands through autopipeliner. |
| autopipeline_test.go | New comprehensive autopipeline tests (batching, timing, concurrency, closing, etc.). |
| autopipeline_singleton_test.go | New test guarding concurrent first-call singleton behavior. |
| autopipeline_sequential_test.go | New tests for sequential usage patterns. |
| autopipeline_faces_test.go | New tests intended for “faces” behavior and ordering semantics. |
| autopipeline_faces_split_test.go | New tests explicitly covering blocking vs async face behavior. |
| autopipeline_correctness_test.go | New correctness tests (no cross-talk, ordering, no lost cmds, error isolation). |
| autopipeline_config_test.go | New test ensuring caller config isn’t mutated by default-filling. |
| autopipeline_cmdable_test.go | New tests verifying autopipeliner implements Cmdable surface. |
| autopipeline_close_wiring_test.go | New test ensuring client.Close closes shared autopipeliner. |
| autopipeline_close_race_test.go | New stress test for Close vs concurrent submissions. |
| autopipeline_buffer_test.go | New test for zero-copy buffer ops via autopipeline. |
| autopipeline_blocking_test.go | New tests verifying blocking commands are not autopipelined. |
| autopipeline_accessor_test.go | New test intended to verify secondary accessors block correctly via await(). |
| adaptive_delay_test.go | New unit tests/benchmarks for adaptive delay calculation. |
| .golangci.yml | Narrow unused exemptions for stacked PR development of autopipeline/pipeline symbols. |
| .github/workflows/test-e2e.yml | Expand PR branch patterns to include feature/**. |
| .github/workflows/govulncheck.yml | Expand PR branch patterns to include feature/**. |
| .github/workflows/codeql-analysis.yml | Expand PR branch patterns to include feature/**. |
| .github/workflows/build.yml | Expand PR branch patterns to include feature/**. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Blocking face: drop-in for a normal client, batched under the hood. | ||
| ap := rdb.AutoPipeline() | ||
| defer ap.Close() |
| Both faces take an optional `*AutoPipelineConfig` (e.g. | ||
| `rdb.AsyncAutoPipeline(&redis.AutoPipelineConfig{MaxConcurrentBatches: 80, Unordered: true})`) | ||
| and work on `ClusterClient` too: commands are routed to the correct shard per | ||
| key, so a single batch may span many slots. Autopipelining is only a win under |
| ap := rdb.AutoPipeline() // blocking face (default parallel-batch config) | ||
| defer ap.Close() |
| // Default async config is ordered (MaxConcurrentBatches=1). | ||
| return benchReadLater(ctx, rdb, rdb.AsyncAutoPipeline()) |
| c := redis.NewClient(&redis.Options{Addr: ":6379"}) | ||
| defer c.Close() | ||
| c.FlushDB(ctx) | ||
| fap, err := c.AutoPipeline() |
| c := redis.NewClient(&redis.Options{Addr: ":6379"}) | ||
| defer c.Close() | ||
| c.FlushDB(ctx) | ||
| fap, err := c.AutoPipeline() |
| }) | ||
| defer c.Close() | ||
| c.FlushDB(ctx) | ||
| fap, err := c.AutoPipeline() |
| var queueSlicePool = sync.Pool{ | ||
| New: func() interface{} { s := make([]Cmder, 0, 100); return &s }, | ||
| } | ||
|
|
||
| func getQueueSlice(capacity int) []Cmder { | ||
| slice := (*queueSlicePool.Get().(*[]Cmder))[:0] | ||
| if cap(slice) < capacity { | ||
| queueSlicePool.Put(&slice) | ||
| return make([]Cmder, 0, capacity) | ||
| } | ||
| return slice | ||
| } | ||
|
|
||
| func putQueueSlice(slice []Cmder) { | ||
| if cap(slice) <= 1000 { | ||
| full := slice[:cap(slice)] | ||
| for i := range full { | ||
| full[i] = nil | ||
| } | ||
| queueSlicePool.Put(&slice) | ||
| } | ||
| } |
| // cachedSlot/setCachedSlot memoize the cluster slot so it is computed once | ||
| // (in the autopipeline shard router) and reused at pipeline-flush routing. | ||
| cachedSlot() (int, bool) | ||
| setCachedSlot(int) |
7b07700 to
cd7654f
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 46 out of 47 changed files in this pull request and generated 13 comments.
Comments suppressed due to low confidence (1)
search_commands.go:2967
- FTHybridCmd.Result/Val await deferred execution, but CursorResult/CursorVal and RawVal/RawResult still return fields without awaiting. In AsyncAutoPipeline mode, callers can observe unset cursor/raw values (or race). These accessors should call await() like Val/Result.
| cmd := NewCmd(ctx, args...) | ||
| if len(args) == 0 { | ||
| cmd.SetErr(ErrClosed) | ||
| return cmd | ||
| } |
| // Blocking face: drop-in for a normal client, batched under the hood. | ||
| ap := rdb.AutoPipeline() | ||
| defer ap.Close() | ||
|
|
| ap := rdb.AsyncAutoPipeline() // ordered by default | ||
| defer ap.Close() | ||
|
|
| Both faces take an optional `*AutoPipelineConfig` (e.g. | ||
| `rdb.AsyncAutoPipeline(&redis.AutoPipelineConfig{MaxConcurrentBatches: 80, Unordered: true})`) | ||
| and work on `ClusterClient` too: commands are routed to the correct shard per |
| c := redis.NewClient(&redis.Options{Addr: ":6379"}) | ||
| defer c.Close() | ||
| c.FlushDB(ctx) | ||
| fap, err := c.AutoPipeline() | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| defer fap.Close() |
| func (cmd *AggregateCmd) Val() *FTAggregateResult { | ||
| cmd.await() | ||
| return cmd.val | ||
| } | ||
|
|
||
| func (cmd *AggregateCmd) Result() (*FTAggregateResult, error) { | ||
| cmd.await() | ||
| return cmd.val, cmd.err |
| func (cmd *FTInfoCmd) Result() (FTInfoResult, error) { | ||
| cmd.await() | ||
| return cmd.val, cmd.err | ||
| } | ||
|
|
||
| func (cmd *FTInfoCmd) Val() FTInfoResult { | ||
| cmd.await() | ||
| return cmd.val |
| func (cmd *FTSpellCheckCmd) Result() ([]SpellCheckResult, error) { | ||
| cmd.await() | ||
| return cmd.val, cmd.err | ||
| } | ||
|
|
||
| func (cmd *FTSpellCheckCmd) Val() []SpellCheckResult { | ||
| cmd.await() | ||
| return cmd.val |
| func (cmd *FTSearchCmd) Result() (FTSearchResult, error) { | ||
| cmd.await() | ||
| return cmd.val, cmd.err | ||
| } | ||
|
|
||
| func (cmd *FTSearchCmd) Val() FTSearchResult { | ||
| cmd.await() | ||
| return cmd.val | ||
| } |
| func (cmd *FTSynDumpCmd) Val() []FTSynDumpResult { | ||
| cmd.await() | ||
| return cmd.val | ||
| } | ||
|
|
||
| func (cmd *FTSynDumpCmd) Result() ([]FTSynDumpResult, error) { | ||
| cmd.await() | ||
| return cmd.val, cmd.err | ||
| } |
AutoPipeline()/AsyncAutoPipeline() return (*AutoPipeliner, error); the README snippets showed single-value calls that wouldn't compile. Capture and check err. Reported by Copilot on PR #3867.
cd7654f to
cd450c6
Compare
AutoPipeline()/AsyncAutoPipeline() return (*AutoPipeliner, error); the README snippets showed single-value calls that wouldn't compile. Capture and check err. Reported by Copilot on PR #3867.
cd450c6 to
6436549
Compare
| // cachedSlot/setCachedSlot memoize the cluster slot so it is computed once | ||
| // (in the autopipeline shard router) and reused at pipeline-flush routing. | ||
| cachedSlot() (int, bool) | ||
| setCachedSlot(int) | ||
|
|
||
| readTimeout() *time.Duration | ||
| readReply(rd *proto.Reader) error | ||
| readRawReply(rd *proto.Reader) error | ||
| SetErr(error) | ||
| Err() error | ||
|
|
||
| // setReady marks a command as asynchronously pending (autopipeline async | ||
| // faces); await blocks the public accessors until it has executed; rawErr | ||
| // reads the error without awaiting (internal execution path). | ||
| setReady(<-chan struct{}) | ||
| await() | ||
| rawErr() error |
| func cmdString(cmd Cmder, val interface{}) string { | ||
| b := make([]byte, 0, 64) | ||
|
|
||
| for i, arg := range cmd.Args() { | ||
| if i > 0 { | ||
| b = append(b, ' ') | ||
| } | ||
| b = internal.AppendArg(b, arg) | ||
| } | ||
|
|
||
| if err := cmd.Err(); err != nil { | ||
| if err := cmd.rawErr(); err != nil { | ||
| b = append(b, ": "...) | ||
| b = append(b, err.Error()...) |
| // pipelinePool is an optional separate connection pool for pipelining | ||
| // operations, used when PipelineReadBufferSize/PipelineWriteBufferSize is | ||
| // set so pipelines can use large buffers without bloating the main pool. | ||
| // nil means pipelines use connPool. | ||
| pipelinePool pool.Pooler |
| // PipelineReadBufferSize, PipelineWriteBufferSize and PipelinePoolSize | ||
| // configure an optional separate connection pool used for pipelining, with | ||
| // its own (typically larger) buffers. See the same-named fields on Options | ||
| // for details. Zero values disable the separate pool. |
| // PipelineReadBufferSize, PipelineWriteBufferSize and PipelinePoolSize | ||
| // configure an optional separate connection pool used for pipelining on | ||
| // each shard, with its own (typically larger) buffers. See the same-named | ||
| // fields on Options for details. Zero values disable the separate pool. |
| // PipelineReadBufferSize, PipelineWriteBufferSize and PipelinePoolSize | ||
| // configure an optional separate connection pool used for pipelining on | ||
| // each node, with its own (typically larger) buffers. See the same-named | ||
| // fields on Options for details. Zero values disable the separate pool. |
| ```go | ||
| rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) | ||
| defer rdb.Close() | ||
|
|
| ```go | ||
| ap, err := rdb.AsyncAutoPipeline() // ordered by default | ||
| if err != nil { | ||
| log.Fatal(err) | ||
| } | ||
| defer ap.Close() | ||
|
|
AutoPipeline()/AsyncAutoPipeline() return (*AutoPipeliner, error); the README snippets showed single-value calls that wouldn't compile. Capture and check err. Reported by Copilot on PR #3867.
6436549 to
f7b225c
Compare
AutoPipeline()/AsyncAutoPipeline() return (*AutoPipeliner, error); the README snippets showed single-value calls that wouldn't compile. Capture and check err. Reported by Copilot on PR #3867.
f7b225c to
0331ead
Compare
… window accumulateBatch waited the full defaultAccumulateWindow (200µs) whenever the queue was below MaxBatchSize, even for a single blocking caller with nothing left to coalesce, so every low-concurrency command paid the whole window. The doc described a "stops growing" early exit that was never implemented. Implement it for the implicit default window only: a burst timer fires when no new command arrives for defaultAccumulateGap (20µs); each enqueue resets it, so concurrent load keeps coalescing up to the window cap while a lone caller flushes ~one gap after enqueuing. An explicit MaxFlushDelay / AdaptiveDelay is an intentional accumulation window and is still waited in full. defaultAccumulateWindow becomes a var so tests can enlarge it deterministically.
…ch stream DefaultBlockingAutoPipelineConfig used MaxConcurrentBatches:50 "for throughput", but that is pessimal. With many flush permits, each command finds a free permit and flushes on its own before others accumulate, collapsing batch size toward one command per round-trip — i.e. non-pipelined throughput at high latency. Backpressure from a single in-flight batch (MaxConcurrentBatches:1) instead lets callers whose commands return re-enqueue and flush together as the next batch, so batches stay deep (a near-continuous, double-buffered pipeline). Measured (get=80/set=20, 64B, localhost, 128 blocking workers): before (b=50): ~59k ops/s, p50 2.9ms, avg batch 1.6 after (b=1): ~420k ops/s, p50 0.30ms, avg batch ~94 Higher throughput AND ~10x lower latency, and it is naturally ordered. The async default (DefaultAutoPipelineConfig) is already MaxConcurrentBatches:1; for maximum throughput use AsyncAutoPipeline with a window of in-flight commands (inflight>1).
Add an opt-in separate connection pool for pipeline/autopipeline execution
(PipelinePoolSize), so heavy pipelining doesn't starve the regular command
pool. withPipelineConn draws from it when configured and falls back to the
main pool otherwise, draining push notifications before returning a connection.
Attach the existing per-pool listeners to the dedicated pool as well, so its
connections get the same lifecycle handling as the main pool:
- streaming credentials re-auth hook
- maintnotifications handoff hook (MOVING/MIGRATING). Because that hook's
failed-handoff removal target is its own single pool field, the pipeline
pool gets an independent hook (Manager.InitPoolHookForPool) bound to it,
rather than sharing the main pool's hook — otherwise a pipeline-pool
connection that failed handoff would be removed from the wrong pool. The
extra hook shares the Manager, so MOVING/MIGRATING tracking stays central,
and is shut down and detached on Manager.Close.
The recommended 512 KiB overstated the benefit. Benchmarks show pipeline throughput climbs from the 32 KiB default to ~64 KiB and then plateaus; beyond ~128 KiB there is no further gain and very large buffers (>=512 KiB) can regress throughput and waste memory. The only effect of the buffer is holding one batch's wire bytes so it flushes in a single syscall, so size it to roughly MaxBatchSize x average-command-bytes. Update the recommendation, example, and memory-impact figures accordingly.
The optional pipeline connection pool was created, used and closed but never registered with the OTel recorder, so its connection-count / pending-request metrics were invisible. Add a pipelinePool param to otel.RegisterPools / UnregisterPools (registered as a regular pool under a _pipeline name suffix) and pass c.pipelinePool from NewClient and the client teardown. nil when no pipeline pool is configured, so default clients are unaffected.
Address Copilot review on PR #3865: - withPipelineConn now honors opt.Limiter (Allow() before acquiring, ReportResult() on every exit incl. the early init/re-acquire failures), mirroring getConn/releaseConn. Enabling the pipeline pool no longer silently bypasses throttling and failure reporting. - InitPoolHookForPool now holds hooksMu across the closed-check, append and AddPoolHook, closing a (construction-only) window where Close could tear the hook down between append and attach, leaking it. AddPoolHook is lock-free and never re-enters the manager, so this is deadlock-safe. - pipeline_buffer_test.go uses the shared redisAddr instead of a hardcoded localhost:6379 so it respects the env/CI Redis address.
Shard count was derived as min(GOMAXPROCS, MaxConcurrentBatches, 16), which silently fragmented the command queue as soon as a user raised MaxConcurrentBatches for throughput: 16 shards each accumulate a sliver of the load, so batches collapse toward one command per flush and pipelining disappears. Measured on a 14-core host (get=80/set=20, 64B, 128 goroutines, async window 8): MaxConcurrentBatches=50 meant 16 shards and 413k ops/s at 1.4ms p50, while the identical permit budget on ONE shard reaches 1.75M ops/s at 0.51ms p50 — the fragmentation, not the permit count, was the regression. Add AutoPipelineConfig.NumShards (0 = auto: one shard) and stop deriving the shard count from the permit budget. One deep queue plus 2-4 permits is the sweet spot: batch N+1 accumulates and executes while batch N's replies are in flight (async, window 8: 1.78M ops/s at 0.53ms p50 versus 1.0M at 1.0ms with a single permit). Every shard keeps at least one permit, so the effective global concurrency is max(NumShards, MaxConcurrentBatches). numAutoPipelineShards stays as the cluster-wiring default, where slot-routed shards keep each batch on one node.
Add Client.AutoPipeline() and ClusterClient.AutoPipeline() returning a deferred *AutoPipeliner, plus WithAutoPipeline() and propagation of the pipeline-pool options. Options gains AutoPipelineConfig.
…outing Standalone and cluster tests for the deferred AutoPipeline API: per-goroutine ordering, no cross-talk or lost commands, error isolation, config validation (panic on MaxConcurrentBatches>1 without Unordered), zero-copy GetToBuffer/SetFromBuffer, adaptive delay, and cluster cross-slot routing.
README documents the deferred AutoPipeline API and the ordered-default / Unordered-for-parallel rule. example/autopipeline benchmarks four usage styles (normal blocking, ordered blocking-read, ordered read-later, unordered read-later) and prints a decision guide with throughput and ordering.
The four AutoPipeline / AsyncAutoPipeline methods (Client and ClusterClient) now return (*AutoPipeliner, error) instead of *AutoPipeliner. These run from post-init calls, so an invalid config must surface as a returned error the caller can handle rather than a panic. On error no instance is cached, so a later call with a valid config still succeeds. Tests updated to the two-value form; the unsafe-config test now asserts an error instead of a panic.
The (*AutoPipeliner).AutoPipeline() stub was removed (orphan, satisfied no interface); drop the test that asserted it returns itself.
…ructor NewAutoPipeliner is now unexported; the unsafe-config test calls c.AsyncAutoPipeline (same error-on-bad-config contract). Update a stale doc comment reference too.
Remove a stray double blank line after the AsyncAutoPipeline methods so golangci-lint's gofmt check passes.
The README autopipeline section uses 'autopipelining' and 'goroutine(s)', which aren't in the spellcheck dictionary, failing check-spelling on the PR. Add them to .github/wordlist.txt.
Three independent test-job failures, all in test code / the example module: - Blocking-face ginkgo specs panicked: ap.Do returns a generic *redis.Cmd, but the specs type-asserted *StringSliceCmd/*StringCmd. Use *redis.Cmd with its StringSlice()/Text() accessors. - Cluster autopipeline tests failed in the standalone test-ce job: the :7000 tests had no skip guard, and the :16600 tests guarded only on Ping (which passes against unjoined nodes that then return CLUSTERDOWN/MOVED). Add a shared skipIfClusterUnhealthy that gates on CLUSTER INFO cluster_state:ok, and use it in all cluster autopipeline tests. - example/autopipeline didn't compile (govulncheck 'all modules' load error): it called AutoPipeline()/AsyncAutoPipeline() in single-value context after they gained the error return. Assign (ap, err) and panic on err.
The README autopipeline section uses 'async' and 'runnable' in prose; add them to .github/wordlist.txt. Verified the full README passes pyspelling via the exact CI image (jonasbn/github-action-spellcheck:0.62.0).
AutoPipeline()/AsyncAutoPipeline() return (*AutoPipeliner, error); the README snippets showed single-value calls that wouldn't compile. Capture and check err. Reported by Copilot on PR #3867.
… gating Two white-box tests for the default-window stops-growing behavior: - StopsGrowingDefaultWindow enlarges defaultAccumulateWindow to 1s and asserts a lone caller still flushes in well under it (deterministic, no sub-ms timing). - ExplicitDelayWaitsFullWindow guards the gating: an explicit MaxFlushDelay is still waited in full, so stops-growing never short-circuits a configured window. Config is passed as an argument to AutoPipeline (the blocking face does not read Options.AutoPipelineConfig).
…not parallel batches
The engine now defaults to a single shard (deep queue) for standalone clients. Cluster clients still want several shards — commands are routed to shards by slot so each batch stays on one node — so the cluster wiring fills in the numAutoPipelineShards default when the user leaves NumShards at 0, copying the config rather than mutating the caller's struct. Tests cover: standalone defaulting to one shard regardless of the permit budget, an explicit NumShards override, negative-NumShards validation, and the cluster default (copy semantics + explicit passthrough).
0331ead to
776f522
Compare
Wires the AutoPipeliner engine (#1) into the public API. Adds
AutoPipeline()andAsyncAutoPipeline()on both*Clientand*ClusterClient, plus cluster slot-sharding and the full test suite + README.This is where autopipelining becomes usable from application code.
Two faces, one engine:
AutoPipeline()— blocking, drop-in. Eachap.Set(...)call blocks untilexecuted, exactly like a plain client, but concurrent callers' commands are
batched into pipelines behind the scenes. Per-goroutine order is preserved.
AsyncAutoPipeline()— deferred. Calls return immediately; reading theresult (
Val/Result/Err) blocks until the batch executes. Submit a windowof commands, then drain results, for the highest throughput.
Both return
(*AutoPipeliner, error)— an invalid config (e.g.MaxConcurrentBatches > 1withoutUnordered) is a returned error, never apanic, since these are post-init calls. The instance is cached and shared per
client (first call's config wins) and is closed when the client closes.
Cluster:
ClusterClientinstalls slot-based shard routing so each shard'sbatch lands on a single master node, keeping per-node pipelines deep instead of
splitting every batch across all nodes at flush. The exact slot is computed once
per command and reused at flush.
Depends on #1 (engine) and #2 (optional pipeline pool).