Skip to content

Emit multiple batches from GpuProjectExec split-retry instead of concatenating#14877

Open
thirtiseven wants to merge 6 commits into
NVIDIA:mainfrom
thirtiseven:project_split_retry_streaming
Open

Emit multiple batches from GpuProjectExec split-retry instead of concatenating#14877
thirtiseven wants to merge 6 commits into
NVIDIA:mainfrom
thirtiseven:project_split_retry_streaming

Conversation

@thirtiseven
Copy link
Copy Markdown
Collaborator

@thirtiseven thirtiseven commented May 25, 2026

Fixes #14868.

Description

Follow-up to #14724. GpuProjectExec.runWithSplitRetry previously concatenated the sub-batches produced by row-split retry back into a single output batch (via ConcatAndConsumeAll.buildNonEmptyBatchFromTypes). That concat exists only to preserve the single-batch contract of projectAndCloseWithRetrySingleBatch. It has two costs:

  • Peak memory during concat is sum(pieces) + concatenated output, so a workload that only fits when split can still OOM at concat time. The withRetryNoSplit wrap can spill but cannot split the concat.
  • GpuProjectExec already returns Iterator[ColumnarBatch], so for the operator itself there is no reason to recombine.

This PR adds a streaming variant that returns the per-piece iterator from withRetry directly and switches GpuProjectExec.internalDoExecuteColumnar to flatMap over it. Other callers of projectAndCloseWithRetrySingleBatch (GpuExpandExec, GpuGenerateExec, GpuAggregateExec pre/post-step, GpuTakeOrderedAndProjectExec, GpuArrowEvalPythonExec, GpuBroadcastHashJoinExecBase) are left unchanged — those operators embed a project inside a larger flow that aligns one output batch to one input batch (projection-index alignment in GpuExpandIterator, the SpillableColumnarBatch -> withRetry-split shape inside GpuAggregateIterator, etc.), and migrating them is case-by-case work tracked under the umbrella issue #7866.

Implementation

  • New GpuProjectExec.runStreamingWithSplitRetry returns Iterator[ColumnarBatch] (no concat).
  • Existing GpuProjectExec.runWithSplitRetry is reduced to a thin wrapper: drain the streaming iterator and concat — behavior is unchanged for all current callers other than GpuProjectExec itself.
  • New GpuTieredProject.projectAndCloseStreamingWithSplitRetry dispatches to the streaming variant when areAllRetryable && PROJECT_SPLIT_RETRY_ENABLED; otherwise wraps the single-batch fallback in Iterator.single so the caller can uniformly flatMap.
  • GpuProjectExec.internalDoExecuteColumnar now flatMaps over the streaming iterator. The NVTX/opTime range is split into two parts so coverage matches the prior code: an outer range around spillable construction + retry-framework setup (which also captures the eager fallback projection), and an inner range around each lazy pieces.next() on the streaming path. closeOnExcept(sb) was added around the streaming-entry call to defend against a synchronous failure in addTaskCompletionListener on a cancelled task.

Behavior change to flag

numOutputBatches is now incremented once per emitted piece instead of once per input batch — matches the long-standing per-batch counting in GpuFilterExec.filterAndCloseWithRetry. Same input under split-retry now reports N output batches; any tooling that compared input vs output batch counts on Project should be updated.

Happy path perf tests:

Config Warmup (s) Run 1 (s) Run 2 (s) Run 3 (s) Avg (s) Speedup vs off
spark.rapids.sql.projectExec.splitRetry.enabled=false 4.425 1.751 1.586 1.540 1.626 1.000x
spark.rapids.sql.projectExec.splitRetry.enabled=true 1.458 1.519 1.493 1.488 1.500 1.084x

Checklists

Documentation

  • Updated for new or modified user-facing features or behaviors
  • No user-facing change

Testing

  • Added or modified tests to cover new code paths
  • Covered by existing tests
  • Not required

Performance

  • Tests ran and results are added in the PR description
  • Issue filed with a link in the PR description
  • Not required

thirtiseven and others added 2 commits May 25, 2026 16:26
Adds a streaming variant of the split-retry path so that on GPU OOM the
projection's sub-batches flow downstream as separate batches instead of
being concatenated. The old runWithSplitRetry is preserved as a thin
single-batch wrapper around the new streaming entry for callers that
still need the single-batch contract (joins, aggregates, expand, etc.).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 25, 2026

Greptile Summary

This PR removes the concat step in GpuProjectExec.runWithSplitRetry by introducing a streaming variant (runStreamingWithSplitRetry) that returns the per-piece iterator from withRetry directly, and wires GpuProjectExec.internalDoExecuteColumnar to flatMap over it. The backward-compatible runWithSplitRetry is kept as a thin concat wrapper for other callers.

  • GpuProjectExec.runStreamingWithSplitRetry returns Iterator[ColumnarBatch] directly, with a task-completion guard that closes sb if the iterator is abandoned before the first next() call.
  • GpuTieredProject.projectAndCloseStreamingWithSplitRetry dispatches to the streaming path when areAllRetryable && PROJECT_SPLIT_RETRY_ENABLED, and wraps the single-batch fallback in a lazy one-shot iterator so all callers can uniformly flatMap; allowMultipleOutputBatches = localEnablePreSplit preserves the one-output-per-input invariant needed by Window operations.
  • numOutputBatches is now incremented per emitted piece (matching the existing behavior in GpuFilterExec), so any tooling comparing input vs output batch counts on Project should be updated.

Confidence Score: 5/5

The streaming iterator change is well-scoped: it only affects GpuProjectExec's own execution path, backward-compat callers are explicitly left unchanged, and the allowMultipleOutputBatches guard preserves the one-output-per-input invariant for Window operations.

The core logic of routing split-retry pieces directly downstream instead of concatenating them is sound. Resource management for the normal execution path is correct. The two observations raised are narrow edge cases that do not affect correctness for any currently exercised code path.

No files require special attention beyond the two minor points noted.

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala Core streaming split-retry logic. New Iterator wrappers use @volatile guards and onTaskCompletion for sb cleanup when abandoned; resource management is correct for the normal case; previous review threads covered the main edge-case risks.
tests/src/test/scala/com/nvidia/spark/rapids/ProjectSplitRetrySuite.scala Five new unit tests cover streaming split-retry output, no-split single piece, single-piece preservation for non-retryable callers, abandonment cleanup, and non-retryable fallback; coverage of the non-retryable path abandonment case is absent.

Sequence Diagram

sequenceDiagram
    participant RDD as RDD partition
    participant Exec as GpuProjectExec
    participant Tier as GpuTieredProject
    participant Stream as runStreamingWithSplitRetry
    participant Retry as withRetry framework

    RDD->>Exec: flatMap(split)
    Exec->>Exec: SpillableColumnarBatch(split)
    Exec->>Tier: projectAndCloseStreamingWithSplitRetry(sb, allowMultiple)

    alt "areAllRetryable && PROJECT_SPLIT_RETRY_ENABLED && allowMultiple"
        Tier->>Stream: runStreamingWithSplitRetry(sb, retryables, project)
        Stream->>Retry: "withRetry(sb, splitSpillableInHalfByRows) { project }"
        Stream-->>Tier: Iterator[ColumnarBatch] (lazy, task-completion guarded)
    else fallback single-batch
        Tier-->>Tier: wrap projectAndCloseWithRetrySingleBatch in lazy one-shot Iterator
    end

    Tier-->>Exec: pieces: Iterator[ColumnarBatch]
    Exec->>Exec: wrap in metric-tracking Iterator

    loop for each piece
        Exec->>Exec: "NvtxIdWithMetrics { pieces.next() }"
        Exec->>Retry: retryIter.next()
        alt OOM on first attempt
            Retry->>Retry: splitSpillableInHalfByRows(sb)
            Retry->>Retry: project(half1), project(half2)
            Retry-->>Exec: half1 batch
            Note over Exec,Retry: half2 yielded on next iteration
        else no OOM
            Retry-->>Exec: full projected batch
        end
        Exec->>Exec: "numOutputBatches += 1"
        Exec-->>RDD: ColumnarBatch
    end
Loading

Reviews (5): Last reviewed commit: "simplify" | Re-trigger Greptile

Comment thread sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala Outdated
thirtiseven and others added 4 commits May 25, 2026 17:03
The non-retryable fallback returned Iterator.single(projectAndClose(...))
which evaluated the projection eagerly. That combined with the outer
closeOnExcept(sb) to double-close sb when the projection threw (sb was
already closed by withResource inside projectAndCloseWithRetrySingleBatch),
relying on SpillableColumnarBatch.close idempotency (see issue NVIDIA#10161).
The eager result also had no cleanup hook if the task was cancelled
between hasNext returning true and the consumer's next() call.

The fallback now returns a lazy one-shot iterator that defers the
projection until next(), and installs an onTaskCompletion guard that
closes sb if the iterator is abandoned before being iterated. Both
branches of projectAndCloseStreamingWithSplitRetry are now lazy and
self-own sb, so the outer closeOnExcept in internalDoExecuteColumnar
is no longer needed. The inner NvtxIdWithMetrics now correctly covers
the projection work on both branches.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@thirtiseven thirtiseven requested a review from revans2 May 29, 2026 09:09
new Iterator[ColumnarBatch] {
@volatile private var started = false
private val onClose = Option(TaskContext.get()).map { tc =>
onTaskCompletion(tc) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This looks like it is working around a design gap in RmmRapidsRetryIterator.withRetry(input: T, ...).

For the single-input overload, the input is wrapped by SingleItemAutoCloseableIteratorInternal, which knows how to close the input if it was never pulled. However, the task-completion callback in AutoCloseableAttemptSpliterator only closes attemptStack; before the first next(), the single input has not yet been pushed onto that stack (push happens here).

A centralized fix there would also protect other lazy single-input retry users, e.g. GpuFilter, GpuColumnarToRowExec, GpuSortExec, and GpuRunningWindowExec.

Could you either file a follow-up issue, or fix it here if you want to make this common? I think the Project-specific guard is reasonable for this PR, but the streaming withRetry(sb, ...) case should ideally not need every caller to remember this ownership edge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA][Follow-up] Emit multiple batches from GpuProjectExec split-retry instead of concatenating

3 participants