-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Coalesce batches inside hash-repartition #18572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
872b975 to
de2def6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds batch coalescing to hash repartitioning to improve performance by reducing the number of small batches sent across partitions. The implementation uses BatchCoalescer from Arrow to buffer and combine small batches into larger ones (target size of 4096 rows) before sending them to output channels.
Key Changes
- Adds
BatchCoalescerusage specifically for hash partitioning operations - Implements buffering logic that accumulates batches until reaching target size
- Includes flush logic at the end of stream processing to send remaining buffered data
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| if is_hash_partitioning { | ||
| for _ in 0..partitioner.num_partitions() { | ||
| coalesce_batches.push(BatchCoalescer::new(stream.schema(), 4096)); |
Copilot
AI
Nov 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded batch size of 4096 should use the configured batch_size from the session config. Other uses of BatchCoalescer::new in the codebase use context.session_config().batch_size() or config.execution.batch_size. Consider passing the batch_size as a parameter to pull_from_input from the caller (consume_input_streams) which has access to the context: Arc<TaskContext>.
| } | ||
| }; | ||
|
|
||
| if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { |
Copilot
AI
Nov 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The send timer metric (metrics.send_time[partition]) is not being tracked for these final flush batches, unlike the main sending logic at line 1245. This will result in inaccurate metrics for hash partitioning operations as the time spent sending flushed batches won't be recorded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let (batch_to_send, is_memory_batch) = | ||
| match channel.reservation.lock().try_grow(size) { | ||
| Ok(_) => { | ||
| // Memory available - send in-memory batch | ||
| (RepartitionBatch::Memory(batch), true) | ||
| } | ||
| Err(_) => { | ||
| // We're memory limited - spill to SpillPool | ||
| // SpillPool handles file handle reuse and rotation | ||
| channel.spill_writer.push_batch(&batch)?; | ||
| // Send marker indicating batch was spilled | ||
| (RepartitionBatch::Spilled, false) | ||
| } | ||
| }; | ||
|
|
||
| if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { | ||
| // If the other end has hung up, it was an early shutdown (e.g. LIMIT) | ||
| // Only shrink memory if it was a memory batch | ||
| if is_memory_batch { | ||
| channel.reservation.lock().shrink(size); | ||
| } | ||
| output_channels.remove(&partition); | ||
| } | ||
| } |
Copilot
AI
Nov 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The batch sending logic in this flush section duplicates the logic from lines 1244-1272. This code duplication makes maintenance harder and increases the risk of inconsistencies. Consider extracting this logic into a helper function that both the main loop and flush section can use. The function could be named something like send_batch_to_channel and accept parameters for the batch, partition, channel, and metrics.
| let (batch_to_send, is_memory_batch) = | ||
| match channel.reservation.lock().try_grow(size) { | ||
| Ok(_) => { | ||
| // Memory available - send in-memory batch | ||
| (RepartitionBatch::Memory(batch), true) | ||
| } | ||
| Err(_) => { | ||
| // We're memory limited - spill to SpillPool | ||
| // SpillPool handles file handle reuse and rotation | ||
| channel.spill_writer.push_batch(&batch)?; | ||
| // Send marker indicating batch was spilled | ||
| (RepartitionBatch::Spilled, false) | ||
| } | ||
| }; | ||
|
|
||
| if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { | ||
| // If the other end has hung up, it was an early shutdown (e.g. LIMIT) | ||
| // Only shrink memory if it was a memory batch | ||
| if is_memory_batch { | ||
| channel.reservation.lock().shrink(size); | ||
| } | ||
| output_channels.remove(&partition); | ||
| } | ||
| } |
Copilot
AI
Nov 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flush section is missing the send_time timing metrics that are recorded in the main loop (line 1246). For consistency and proper performance monitoring, consider wrapping the sending logic with let timer = metrics.send_time[partition].timer(); ... timer.done(); similar to the main loop.
|
Hmm looks like this is not always faster. Perhaps for skewed data it can maks up for it by pushing the coalesce into another thread 🤔 |
Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?