From e0d3a94e684d793b2e18f701c1d02064d533a66d Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 5 Mar 2026 13:36:01 +0100 Subject: [PATCH 1/3] Fix/aggregate output ordering streaming (#33) Co-authored-by: Claude Opus 4.6 --- .../src/enforce_distribution.rs | 50 ++++++++++++++++++- .../test_files/agg_func_substitute.slt | 27 +++++----- .../sqllogictest/test_files/group_by.slt | 18 +++---- .../test_files/preserve_file_partitioning.slt | 26 +++++----- .../repartition_subset_satisfaction.slt | 24 ++++----- datafusion/sqllogictest/test_files/unnest.slt | 17 +++---- 6 files changed, 100 insertions(+), 62 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 0ea1b766cd257..9cc597d3f5c8d 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -928,6 +928,43 @@ fn add_hash_on_top( /// /// * `input`: Current node. /// +/// Checks whether preserving the child's ordering enables the parent to +/// run in streaming mode. Compares the parent's pipeline behavior with +/// the ordered child vs. an unordered (coalesced) child. If removing the +/// ordering would cause the parent to switch from streaming to blocking, +/// keeping the order-preserving variant is beneficial. +/// +/// Only applicable to single-child operators; returns `Ok(false)` for +/// multi-child operators (e.g. joins) where child substitution semantics are +/// ambiguous. +fn preserving_order_enables_streaming( + parent: &Arc, + ordered_child: &Arc, +) -> Result { + // Only applicable to single-child operators that maintain input order + // (e.g. AggregateExec in PartiallySorted mode). Operators that don't + // maintain input order (e.g. SortExec) handle ordering themselves — + // preserving SPM for them is unnecessary. + if parent.children().len() != 1 { + return Ok(false); + } + if !parent.maintains_input_order()[0] { + return Ok(false); + } + // Build parent with the ordered child + let with_ordered = + Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)])?; + if with_ordered.pipeline_behavior() == EmissionType::Final { + // Parent is blocking even with ordering — no benefit + return Ok(false); + } + // Build parent with an unordered child via CoalescePartitionsExec. + let unordered_child: Arc = + Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child))); + let without_ordered = Arc::clone(parent).with_new_children(vec![unordered_child])?; + Ok(without_ordered.pipeline_behavior() == EmissionType::Final) +} + /// # Returns /// /// Updated node with an execution plan, where the desired single distribution @@ -1351,7 +1388,13 @@ pub fn ensure_distribution( .equivalence_properties() .ordering_satisfy_requirement(sort_req.clone())?; + let streaming_benefit = if child.data { + preserving_order_enables_streaming(&plan, &child.plan)? + } else { + false + }; if (!ordering_satisfied || !order_preserving_variants_desirable) + && !streaming_benefit && child.data { child = replace_order_preserving_variants(child)?; @@ -1372,6 +1415,11 @@ pub fn ensure_distribution( // Stop tracking distribution changing operators child.data = false; } else { + let streaming_benefit = if child.data { + preserving_order_enables_streaming(&plan, &child.plan)? + } else { + false + }; // no ordering requirement match requirement { // Operator requires specific distribution. @@ -1380,7 +1428,7 @@ pub fn ensure_distribution( // ordering is pointless. However, if it does maintain // input order, we keep order-preserving variants so // ordering can flow through to ancestors that need it. - if !maintains { + if !maintains && !streaming_benefit { child = replace_order_preserving_variants(child)?; } } diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt b/datafusion/sqllogictest/test_files/agg_func_substitute.slt index 2b33452184bc0..e0199c82501e4 100644 --- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt +++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt @@ -45,11 +45,10 @@ logical_plan physical_plan 01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] 02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true +03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST +04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT @@ -64,11 +63,10 @@ logical_plan physical_plan 01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] 02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true +03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST +04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result @@ -82,11 +80,10 @@ logical_plan physical_plan 01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] 02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true +03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST +04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true query II SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 4b1f663bb8164..366326479dab1 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -3971,11 +3971,10 @@ logical_plan 02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d] physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 -04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST +03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) +04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true # drop table multiple_ordered_table_with_pk statement ok @@ -4011,11 +4010,10 @@ logical_plan 02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d] physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8 -04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) -05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true +02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST +03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) +04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true statement ok set datafusion.execution.target_partitions = 1; diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index cfa71ddfccada..7c24a6bd2a5db 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -288,10 +288,9 @@ physical_plan 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)] 03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted -04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true] -05)--------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3 -06)----------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted -07)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet +04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST +05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted +06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet # Verify results without optimization query TIR @@ -361,16 +360,15 @@ physical_plan 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env), max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*), sum(f.value)@4 as sum(f.value)] 03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted -04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true] -05)--------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3 -06)----------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted -07)------------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as env, service@1 as service] -08)--------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4] -09)----------------CoalescePartitionsExec -10)------------------FilterExec: service@2 = log -11)--------------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 -12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -13)----------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST +05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted +06)----------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as env, service@1 as service] +07)------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4] +08)--------------CoalescePartitionsExec +09)----------------FilterExec: service@2 = log +10)------------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 +11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +12)--------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify results without optimization query TTTIR rowsort diff --git a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt index ac2463237bd32..fd49fd900488a 100644 --- a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt +++ b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt @@ -162,10 +162,9 @@ physical_plan 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin, count(Int64(1))@2 as count(*), avg(fact_table_ordered.value)@3 as avg(fact_table_ordered.value)] 03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted -04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 ASC NULLS LAST], preserve_partitioning=[true] -05)--------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1], 3), input_partitions=3 -06)----------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted -07)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet +04)------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 ASC NULLS LAST +05)--------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted +06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet # Verify results without subset satisfaction query TPIR rowsort @@ -375,15 +374,14 @@ physical_plan 06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as time_bin], aggr=[avg(a.max_bin_value)] 07)------------ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, env@2 as env, max(j.value)@3 as max_bin_value] 08)--------------AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@2 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1]) -09)----------------SortExec: expr=[f_dkey@0 ASC NULLS LAST, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 ASC NULLS LAST], preserve_partitioning=[true] -10)------------------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1, env@2], 3), input_partitions=3 -11)--------------------AggregateExec: mode=Partial, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1]) -12)----------------------ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as env, timestamp@1 as timestamp, value@2 as value] -13)------------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4] -14)--------------------------CoalescePartitionsExec -15)----------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] -16)------------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] -17)--------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +09)----------------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1, env@2], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 ASC NULLS LAST +10)------------------AggregateExec: mode=Partial, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1]) +11)--------------------ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as env, timestamp@1 as timestamp, value@2 as value] +12)----------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4] +13)------------------------CoalescePartitionsExec +14)--------------------------FilterExec: service@1 = log, projection=[env@0, d_dkey@2] +15)----------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=A/data.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=D/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/dimension/d_dkey=C/data.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)] +16)------------------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify results without subset satisfaction query TPR rowsort diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index ba499679a9a80..8cfc01380d4b2 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -987,15 +987,14 @@ logical_plan physical_plan 01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)] 02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted -03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4 -05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted -06)----------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar] -07)------------UnnestExec -08)--------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] -11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192] +03)----RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4, preserve_order=true, sort_exprs=generated_id@0 ASC NULLS LAST +04)------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted +05)--------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar] +06)----------UnnestExec +07)------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +09)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] +10)------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192] # Unnest array where data is already ordered by column2 (100, 200, 300, 400) statement ok From 6d9430d11b5946dad61b51a705d6b26e01dafb54 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 23 Mar 2026 17:20:26 +0800 Subject: [PATCH 2/3] add ut --- .../enforce_distribution.rs | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index e14dc389d1b14..ab796098325ed 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -46,6 +46,8 @@ use datafusion_common::tree_node::{ use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr::{JoinType, Operator}; +use datafusion_functions_aggregate::count::count_udaf; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, binary, lit}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{ @@ -462,6 +464,71 @@ fn aggregate_exec_with_alias( ) } +fn partitioned_count_aggregate_exec( + input: Arc, + group_alias_pairs: Vec<(String, String)>, + count_column: &str, +) -> Arc { + let input_schema = input.schema(); + let group_by_expr = group_alias_pairs + .iter() + .map(|(column, alias)| { + ( + col(column, &input_schema).unwrap() as Arc, + alias.clone(), + ) + }) + .collect::>(); + let partial_group_by = PhysicalGroupBy::new_single(group_by_expr.clone()); + let final_group_by = PhysicalGroupBy::new_single( + group_by_expr + .iter() + .enumerate() + .map(|(idx, (_expr, alias))| { + ( + Arc::new(Column::new(alias, idx)) as Arc, + alias.clone(), + ) + }) + .collect::>(), + ); + + let aggr_expr = vec![Arc::new( + AggregateExprBuilder::new( + count_udaf(), + vec![col(count_column, &input_schema).unwrap()], + ) + .schema(Arc::clone(&input_schema)) + .alias(format!("COUNT({count_column})")) + .build() + .unwrap(), + )]; + + let partial = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + partial_group_by, + aggr_expr.clone(), + vec![None], + input, + Arc::clone(&input_schema), + ) + .unwrap(), + ); + + Arc::new( + AggregateExec::try_new( + AggregateMode::FinalPartitioned, + final_group_by, + aggr_expr, + vec![None], + Arc::clone(&partial) as _, + partial.schema(), + ) + .unwrap(), + ) +} + fn hash_join_exec( left: Arc, right: Arc, @@ -3322,6 +3389,73 @@ fn preserve_ordering_through_repartition() -> Result<()> { Ok(()) } +#[test] +fn preserve_ordering_for_streaming_sorted_aggregate() -> Result<()> { + let schema = schema(); + let sort_key: LexOrdering = [PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions::default(), + }] + .into(); + let input = parquet_exec_multiple_sorted(vec![sort_key]); + let physical_plan = partitioned_count_aggregate_exec( + input, + vec![("a".to_string(), "a".to_string())], + "b", + ); + + let test_config = TestConfig::default().with_query_execution_partitions(2); + + let plan_distrib = + test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT); + assert_plan!(plan_distrib, @r" + AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted + RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC + AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted + DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + "); + + let plan_sort = test_config.to_plan(physical_plan, &SORT_DISTRIB_DISTRIB); + assert_plan!(plan_distrib, plan_sort); + + Ok(()) +} + +#[test] +fn preserve_ordering_for_streaming_partially_sorted_aggregate() -> Result<()> { + let schema = schema(); + let sort_key: LexOrdering = [PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions::default(), + }] + .into(); + let input = parquet_exec_multiple_sorted(vec![sort_key]); + let physical_plan = partitioned_count_aggregate_exec( + input, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "b".to_string()), + ], + "c", + ); + + let test_config = TestConfig::default().with_query_execution_partitions(2); + + let plan_distrib = + test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT); + assert_plan!(plan_distrib, @r" + AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[COUNT(c)], ordering_mode=PartiallySorted([0]) + RepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC + AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[COUNT(c)], ordering_mode=PartiallySorted([0]) + DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + "); + + let plan_sort = test_config.to_plan(physical_plan, &SORT_DISTRIB_DISTRIB); + assert_plan!(plan_distrib, plan_sort); + + Ok(()) +} + #[test] fn do_not_preserve_ordering_through_repartition() -> Result<()> { let schema = schema(); From 592b879678144b0efed3acfab88e758b7f3c553a Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 23 Mar 2026 17:30:34 +0800 Subject: [PATCH 3/3] refine code --- .../tests/physical_optimizer/enforce_distribution.rs | 6 ++---- .../physical-optimizer/src/enforce_distribution.rs | 11 ++++++----- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index ab796098325ed..3a6106c45356f 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3406,8 +3406,7 @@ fn preserve_ordering_for_streaming_sorted_aggregate() -> Result<()> { let test_config = TestConfig::default().with_query_execution_partitions(2); - let plan_distrib = - test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT); + let plan_distrib = test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT); assert_plan!(plan_distrib, @r" AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC @@ -3441,8 +3440,7 @@ fn preserve_ordering_for_streaming_partially_sorted_aggregate() -> Result<()> { let test_config = TestConfig::default().with_query_execution_partitions(2); - let plan_distrib = - test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT); + let plan_distrib = test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT); assert_plan!(plan_distrib, @r" AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[COUNT(c)], ordering_mode=PartiallySorted([0]) RepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 9cc597d3f5c8d..504197a2ded5e 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1377,6 +1377,12 @@ pub fn ensure_distribution( } }; + let streaming_benefit = if child.data { + preserving_order_enables_streaming(&plan, &child.plan)? + } else { + false + }; + // There is an ordering requirement of the operator: if let Some(required_input_ordering) = required_input_ordering { // Either: @@ -1388,11 +1394,6 @@ pub fn ensure_distribution( .equivalence_properties() .ordering_satisfy_requirement(sort_req.clone())?; - let streaming_benefit = if child.data { - preserving_order_enables_streaming(&plan, &child.plan)? - } else { - false - }; if (!ordering_satisfied || !order_preserving_variants_desirable) && !streaming_benefit && child.data