Skip to content

Commit 9fdb7b3

Browse files
committed
CoalescePartitionsExec fetch is not consistent with one partition and more than one partition (apache#18245)
## Which issue does this PR close? - Closes [apache#18244](apache#18244) ## Rationale for this change In our internal project, the limit will not return right number when CoalescePartitionsExec follow up by our customer operator which is only one partition output. After my investigation i found: CoalescePartitionsExec fetch is not consistent with one partition and more than one partition. ## What changes are included in this PR? Make CoalescePartitionsExec fetch should be consistent when the partition number changes. ## Are these changes tested? Yes ## Are there any user-facing changes? No (cherry picked from commit be85bf4)
1 parent ab03bab commit 9fdb7b3

File tree

1 file changed

+118
-2
lines changed

1 file changed

+118
-2
lines changed

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,18 @@ impl ExecutionPlan for CoalescePartitionsExec {
170170
"CoalescePartitionsExec requires at least one input partition"
171171
),
172172
1 => {
173-
// bypass any threading / metrics if there is a single partition
174-
self.input.execute(0, context)
173+
// single-partition path: execute child directly, but ensure fetch is respected
174+
// (wrap with ObservedStream only if fetch is present so we don't add overhead otherwise)
175+
let child_stream = self.input.execute(0, context)?;
176+
if self.fetch.is_some() {
177+
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
178+
return Ok(Box::pin(ObservedStream::new(
179+
child_stream,
180+
baseline_metrics,
181+
self.fetch,
182+
)));
183+
}
184+
Ok(child_stream)
175185
}
176186
_ => {
177187
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
@@ -361,4 +371,110 @@ mod tests {
361371

362372
collect(coalesce_partitions_exec, task_ctx).await.unwrap();
363373
}
374+
375+
#[tokio::test]
376+
async fn test_single_partition_with_fetch() -> Result<()> {
377+
let task_ctx = Arc::new(TaskContext::default());
378+
379+
// Use existing scan_partitioned with 1 partition (returns 100 rows per partition)
380+
let input = test::scan_partitioned(1);
381+
382+
// Test with fetch=3
383+
let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(3));
384+
385+
let stream = coalesce.execute(0, task_ctx)?;
386+
let batches = common::collect(stream).await?;
387+
388+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
389+
assert_eq!(row_count, 3, "Should only return 3 rows due to fetch=3");
390+
391+
Ok(())
392+
}
393+
394+
#[tokio::test]
395+
async fn test_multi_partition_with_fetch_one() -> Result<()> {
396+
let task_ctx = Arc::new(TaskContext::default());
397+
398+
// Create 4 partitions, each with 100 rows
399+
// This simulates the real-world scenario where each partition has data
400+
let input = test::scan_partitioned(4);
401+
402+
// Test with fetch=1 (the original bug: was returning multiple rows instead of 1)
403+
let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(1));
404+
405+
let stream = coalesce.execute(0, task_ctx)?;
406+
let batches = common::collect(stream).await?;
407+
408+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
409+
assert_eq!(
410+
row_count, 1,
411+
"Should only return 1 row due to fetch=1, not one per partition"
412+
);
413+
414+
Ok(())
415+
}
416+
417+
#[tokio::test]
418+
async fn test_single_partition_without_fetch() -> Result<()> {
419+
let task_ctx = Arc::new(TaskContext::default());
420+
421+
// Use scan_partitioned with 1 partition
422+
let input = test::scan_partitioned(1);
423+
424+
// Test without fetch (should return all rows)
425+
let coalesce = CoalescePartitionsExec::new(input);
426+
427+
let stream = coalesce.execute(0, task_ctx)?;
428+
let batches = common::collect(stream).await?;
429+
430+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
431+
assert_eq!(
432+
row_count, 100,
433+
"Should return all 100 rows when fetch is None"
434+
);
435+
436+
Ok(())
437+
}
438+
439+
#[tokio::test]
440+
async fn test_single_partition_fetch_larger_than_batch() -> Result<()> {
441+
let task_ctx = Arc::new(TaskContext::default());
442+
443+
// Use scan_partitioned with 1 partition (returns 100 rows)
444+
let input = test::scan_partitioned(1);
445+
446+
// Test with fetch larger than available rows
447+
let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(200));
448+
449+
let stream = coalesce.execute(0, task_ctx)?;
450+
let batches = common::collect(stream).await?;
451+
452+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
453+
assert_eq!(
454+
row_count, 100,
455+
"Should return all available rows (100) when fetch (200) is larger"
456+
);
457+
458+
Ok(())
459+
}
460+
461+
#[tokio::test]
462+
async fn test_multi_partition_fetch_exact_match() -> Result<()> {
463+
let task_ctx = Arc::new(TaskContext::default());
464+
465+
// Create 4 partitions, each with 100 rows
466+
let num_partitions = 4;
467+
let csv = test::scan_partitioned(num_partitions);
468+
469+
// Test with fetch=400 (exactly all rows)
470+
let coalesce = CoalescePartitionsExec::new(csv).with_fetch(Some(400));
471+
472+
let stream = coalesce.execute(0, task_ctx)?;
473+
let batches = common::collect(stream).await?;
474+
475+
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
476+
assert_eq!(row_count, 400, "Should return exactly 400 rows");
477+
478+
Ok(())
479+
}
364480
}

0 commit comments

Comments
 (0)