Skip to content

Commit 1ce1895

Browse files
authored
Rapids shuffle manager V2 phase 2 : skips the partial file merge if possible (#14090)
- phase 1 design: https://github.com/NVIDIA/spark-rapids/pull/14090/changes#diff-986110ce9e01fa823d715caefb69f3cbebbd64c9915740bb5369d733ee5b1edf - phase 2 design: https://github.com/NVIDIA/spark-rapids/pull/14090/changes#diff-8dcf252d92655b955712e184b4c31a59611ca6ef763368bbec0f7d95cc71828e Actually, we'll check in phase 1 work with #13724 first. After #13724 is checked in, this PR will has much less changes than now. # Improvement phase 1 vs. main : NDS overall **8.7%** improvement on dataproc (details in https://github.com/binmahone/mahone-dataproc-1224-formal-ab-4w8e-mtread20-8ssd-results/blob/main/REPORT.md) phase 1 + phase 2 vs. main: about **18%** NDS overall improvement based on early results, detailed report on the way. <img width="1800" height="1500" alt="image" src="https://github.com/user-attachments/assets/048f6f7d-dc80-442d-ac21-0270c6828d90" /> --------- Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org> Signed-off-by: Hongbin Ma <mahongbin@apache.org>
1 parent bcf497e commit 1ce1895

21 files changed

Lines changed: 3783 additions & 392 deletions

docs/design/rapids_shuffle_manager_v2_phase1_design.md

Lines changed: 649 additions & 0 deletions
Large diffs are not rendered by default.

docs/design/rapids_shuffle_manager_v2_phase2_design.md

Lines changed: 529 additions & 0 deletions
Large diffs are not rendered by default.

docs/dev/shuffle-metrics.md

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Shuffle Metrics: SparkRapidsShuffleDiskSavingsEvent
2+
3+
When using MULTITHREADED shuffle mode with `spark.rapids.shuffle.multithreaded.skipMerge=true`,
4+
the RAPIDS Accelerator emits `SparkRapidsShuffleDiskSavingsEvent` to the Spark event log.
5+
This document explains how to interpret and aggregate these events.
6+
7+
## Event Format
8+
9+
Each executor posts its own event when it cleans up shuffle data. A single shuffle may have
10+
multiple events in the eventlog (one per executor that participated in the shuffle write).
11+
12+
Event format in eventlog (JSON):
13+
```json
14+
{"Event":"com.nvidia.spark.rapids.SparkRapidsShuffleDiskSavingsEvent",
15+
"shuffleId":0,"bytesFromMemory":7868,"bytesFromDisk":0}
16+
```
17+
18+
## Why Custom Events Instead of Task Metrics
19+
20+
Spark task metrics are committed when a task completes. However, shuffle data lifecycle
21+
extends beyond task completion - buffers may be spilled to disk after a task finishes but
22+
before the shuffle data is read. The final `bytesFromMemory` vs `bytesFromDisk` statistics
23+
can only be determined when shuffle cleanup occurs (after the SQL query completes), at
24+
which point task metrics are no longer updatable.
25+
26+
## Field Descriptions
27+
28+
| Field | Description |
29+
|-------|-------------|
30+
| `shuffleId` | The Spark shuffle ID |
31+
| `bytesFromMemory` | Bytes kept in memory and never written to disk (actual disk I/O savings) |
32+
| `bytesFromDisk` | Bytes spilled to disk due to memory pressure |
33+
34+
The sum of `bytesFromMemory` across all events should approximately match the total
35+
"Shuffle Bytes Written" reported in task metrics.
36+
37+
## Aggregating Events
38+
39+
To get application-wide totals from an eventlog:
40+
41+
```bash
42+
grep "SparkRapidsShuffleDiskSavingsEvent" eventlog | \
43+
jq -s '{
44+
totalBytesFromMemory: (map(.bytesFromMemory) | add),
45+
totalBytesFromDisk: (map(.bytesFromDisk) | add),
46+
diskSavingsBytes: (map(.bytesFromMemory) | add)
47+
}'
48+
```
49+
50+
## Timing Considerations
51+
52+
The cleanup mechanism uses a polling model where executors poll the driver every 1 second.
53+
For short-running applications or scripts, the session may exit before executors have a
54+
chance to poll and report their statistics.
55+
56+
To ensure all events are captured, add a short delay before exiting:
57+
58+
```scala
59+
// After your last query completes
60+
Thread.sleep(2000) // Wait for executor cleanup polling
61+
spark.stop()
62+
```
63+
64+
For long-running applications or interactive sessions (spark-shell, notebooks), this is
65+
typically not an issue as there is enough time between queries for cleanup to complete.

sql-plugin-api/src/main/scala/com/nvidia/spark/rapids/RapidsShuffleHeartbeatHandler.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023, NVIDIA CORPORATION.
2+
* Copyright (c) 2023-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,3 +41,47 @@ trait RapidsShuffleHeartbeatHandler {
4141
/** Called when a new peer is seen via heartbeats */
4242
def addPeer(peer: BlockManagerId): Unit
4343
}
44+
45+
// ============================================================================
46+
// Shuffle Cleanup RPC Messages
47+
// ============================================================================
48+
49+
/**
50+
* Statistics for a single shuffle cleanup operation.
51+
*
52+
* @param shuffleId the shuffle ID that was cleaned up
53+
* @param bytesFromMemory bytes that were read from memory (never spilled to disk)
54+
* @param bytesFromDisk bytes that were read from disk (spilled at some point)
55+
* @param numExpansions number of buffer expansions that occurred
56+
* @param numSpills number of buffers that were spilled to disk
57+
* @param numForcedFileOnly number of buffers that used forced file-only mode
58+
*/
59+
case class ShuffleCleanupStats(
60+
shuffleId: Int,
61+
bytesFromMemory: Long,
62+
bytesFromDisk: Long,
63+
numExpansions: Int = 0,
64+
numSpills: Int = 0,
65+
numForcedFileOnly: Int = 0) extends Serializable
66+
67+
/**
68+
* Executor polls driver for shuffles that need to be cleaned up.
69+
*
70+
* @param executorId identifier for the executor
71+
*/
72+
case class RapidsShuffleCleanupPollMsg(executorId: String)
73+
74+
/**
75+
* Driver response with shuffle IDs that need cleanup.
76+
*
77+
* @param shuffleIds list of shuffle IDs to clean up
78+
*/
79+
case class RapidsShuffleCleanupResponseMsg(shuffleIds: Array[Int])
80+
81+
/**
82+
* Executor reports cleanup statistics to driver.
83+
*
84+
* @param executorId identifier for the executor
85+
* @param stats cleanup statistics for each shuffle
86+
*/
87+
case class RapidsShuffleCleanupStatsMsg(executorId: String, stats: Array[ShuffleCleanupStats])

0 commit comments

Comments
 (0)