-
Notifications
You must be signed in to change notification settings - Fork 284
rapids shuffle manager V2 phase 1: writer use as much memory as allowed and pipelined write #13724
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 65 commits
b8938b5
79a0ef3
b44abce
b585d53
e8abcdd
5c96d7d
1ce3011
1b45069
8aec476
e827c6b
9a72121
8ae0564
c0fabcb
a2e1046
f63abff
38effe1
9c295ee
b322eb8
f898e39
8247995
5c6e21d
a274008
9180f51
e96db1c
cb3441f
f718d84
719db76
ff73518
1a5514f
049f27b
fcef7ae
7ecfc9d
9ebb8ec
08dcc92
d13f2d5
2e3002c
4f02ffb
d1d4ec3
f4dc778
cacabe8
430bf83
3c2ccf2
d8bc949
505bd71
fa823d1
213f4f6
36c2fc1
dec3087
ee84bf1
ac94597
7fb1fcb
370b160
b5cd6a0
ae6dc4a
777746a
42ed602
d62c3f1
750105b
49a144c
12dc848
5c21dde
fff9edf
bae6683
2d54f91
7c83565
d721c3f
bd7072a
58593d2
d4bded4
d475dd6
6ccca45
95a20ce
f4647c1
1ea67d8
440d9e7
873be38
2a4ce4b
28b631c
ed6753f
c01e26f
9a34a53
5084dff
50131e6
d90050e
55af52b
8a651c8
9add7a8
acfc2d5
bfa042b
72fa08a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -546,6 +546,41 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") | |
| .bytesConf(ByteUnit.BYTE) | ||
| .createWithDefault(-1) | ||
|
|
||
| val PARTIAL_FILE_BUFFER_INITIAL_SIZE = | ||
| conf("spark.rapids.memory.host.partialFileBufferInitialSize") | ||
| .doc("The initial size in bytes for a host memory buffer used by " + | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would like to understand where you got the default values, and it would be nice to call out how this and the other configs interact with each other. It is also not clear how these relate to shuffle. I get that these are internal configs, but just from the description I don't know anything about what they do. I don't knwo what a SpillablePartialFileHandle is.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added more comments, please check |
||
| "SpillablePartialFileHandle. The buffer can expand dynamically up to " + | ||
| "partialFileBufferMaxSize. A smaller initial size reduces upfront memory " + | ||
| "allocation but may require more expansions.") | ||
| .startupOnly() | ||
| .internal() | ||
| .bytesConf(ByteUnit.BYTE) | ||
| .createWithDefault(1024L * 1024 * 1024) // 1GB default | ||
|
binmahone marked this conversation as resolved.
Outdated
|
||
|
|
||
| val PARTIAL_FILE_BUFFER_MAX_SIZE = | ||
| conf("spark.rapids.memory.host.partialFileBufferMaxSize") | ||
| .doc("The maximum size in bytes for a single host memory buffer used by " + | ||
| "SpillablePartialFileHandle. When a buffer needs to expand beyond this limit, " + | ||
| "it will be spilled to disk instead. This prevents excessive memory usage " + | ||
| "for large shuffle partitions.") | ||
| .startupOnly() | ||
| .internal() | ||
| .bytesConf(ByteUnit.BYTE) | ||
| .createWithDefault(8L * 1024 * 1024 * 1024) // 8GB | ||
|
|
||
| val PARTIAL_FILE_BUFFER_MEMORY_THRESHOLD = | ||
| conf("spark.rapids.memory.host.partialFileBufferMemoryThreshold") | ||
| .doc("The host memory usage threshold (as a fraction from 0.0 to 1.0) for deciding " + | ||
| "whether to use memory-based buffering for partial files. When host memory usage " + | ||
| "exceeds this threshold, file-based storage will be used directly. This threshold " + | ||
| "also applies when expanding buffers dynamically.") | ||
| .startupOnly() | ||
| .internal() | ||
| .doubleConf | ||
| .checkValue(v => v > 0.0 && v <= 1.0, | ||
| "The memory threshold must be in the range (0.0, 1.0]") | ||
| .createWithDefault(0.5) | ||
|
|
||
| val UNSPILL = conf("spark.rapids.memory.gpu.unspill.enabled") | ||
| .doc("When a spilled GPU buffer is needed again, should it be unspilled, or only copied " + | ||
| "back into GPU memory temporarily. Unspilling may be useful for GPU buffers that are " + | ||
|
|
@@ -3281,6 +3316,12 @@ class RapidsConf(conf: Map[String, String]) extends Logging { | |
|
|
||
| lazy val hostSpillStorageSize: Long = get(HOST_SPILL_STORAGE_SIZE) | ||
|
|
||
| lazy val partialFileBufferInitialSize: Long = get(PARTIAL_FILE_BUFFER_INITIAL_SIZE) | ||
|
|
||
| lazy val partialFileBufferMaxSize: Long = get(PARTIAL_FILE_BUFFER_MAX_SIZE) | ||
|
|
||
| lazy val partialFileBufferMemoryThreshold: Double = get(PARTIAL_FILE_BUFFER_MEMORY_THRESHOLD) | ||
|
|
||
| lazy val isUnspillEnabled: Boolean = get(UNSPILL) | ||
|
|
||
| lazy val needDecimalGuarantees: Boolean = get(NEED_DECIMAL_OVERFLOW_GUARANTEES) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -526,17 +526,19 @@ object SpillableHostBuffer { | |
| * must be <= than buffer.getLength, otherwise this function throws | ||
| * and closes `buffer` | ||
| * @param buffer the buffer to make spillable | ||
| * @param priority the initial spill priority of this buffer | ||
| * @param overrideTaskPriority optional task priority to override the default task priority | ||
| */ | ||
| def apply(buffer: HostMemoryBuffer, | ||
| length: Long, | ||
| priority: Long): SpillableHostBuffer = { | ||
| overrideTaskPriority: Option[Long] = None): SpillableHostBuffer = { | ||
| closeOnExcept(buffer) { _ => | ||
| require(length <= buffer.getLength, | ||
| s"Attempted to add a host spillable with a length ${length} B which is " + | ||
| s"greater than the backing host buffer length ${buffer.getLength} B") | ||
| } | ||
| new SpillableHostBuffer(SpillableHostBufferHandle(buffer), length) | ||
| val handle = SpillableHostBufferHandle(buffer) | ||
| overrideTaskPriority.foreach(priority => handle.taskPriority = priority) | ||
|
binmahone marked this conversation as resolved.
Outdated
|
||
| new SpillableHostBuffer(handle, length) | ||
| } | ||
|
Comment on lines
531
to
540
|
||
|
|
||
| def sliceWithRetry(shb: SpillableHostBuffer, start: Long, len: Long): HostMemoryBuffer = { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.