Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use datafusion_common::tree_node::{
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_expr::{JoinType, Operator};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, binary, lit};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{
Expand Down Expand Up @@ -462,6 +464,71 @@ fn aggregate_exec_with_alias(
)
}

fn partitioned_count_aggregate_exec(
input: Arc<dyn ExecutionPlan>,
group_alias_pairs: Vec<(String, String)>,
count_column: &str,
) -> Arc<dyn ExecutionPlan> {
let input_schema = input.schema();
let group_by_expr = group_alias_pairs
.iter()
.map(|(column, alias)| {
(
col(column, &input_schema).unwrap() as Arc<dyn PhysicalExpr>,
alias.clone(),
)
})
.collect::<Vec<_>>();
let partial_group_by = PhysicalGroupBy::new_single(group_by_expr.clone());
let final_group_by = PhysicalGroupBy::new_single(
group_by_expr
.iter()
.enumerate()
.map(|(idx, (_expr, alias))| {
(
Arc::new(Column::new(alias, idx)) as Arc<dyn PhysicalExpr>,
alias.clone(),
)
})
.collect::<Vec<_>>(),
);

let aggr_expr = vec![Arc::new(
AggregateExprBuilder::new(
count_udaf(),
vec![col(count_column, &input_schema).unwrap()],
)
.schema(Arc::clone(&input_schema))
.alias(format!("COUNT({count_column})"))
.build()
.unwrap(),
)];

let partial = Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
partial_group_by,
aggr_expr.clone(),
vec![None],
input,
Arc::clone(&input_schema),
)
.unwrap(),
);

Arc::new(
AggregateExec::try_new(
AggregateMode::FinalPartitioned,
final_group_by,
aggr_expr,
vec![None],
Arc::clone(&partial) as _,
partial.schema(),
)
.unwrap(),
)
}

fn hash_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -3322,6 +3389,71 @@ fn preserve_ordering_through_repartition() -> Result<()> {
Ok(())
}

#[test]
fn preserve_ordering_for_streaming_sorted_aggregate() -> Result<()> {
let schema = schema();
let sort_key: LexOrdering = [PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions::default(),
}]
.into();
let input = parquet_exec_multiple_sorted(vec![sort_key]);
let physical_plan = partitioned_count_aggregate_exec(
input,
vec![("a".to_string(), "a".to_string())],
"b",
);

let test_config = TestConfig::default().with_query_execution_partitions(2);

let plan_distrib = test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
assert_plan!(plan_distrib, @r"
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted
RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC
AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
");

let plan_sort = test_config.to_plan(physical_plan, &SORT_DISTRIB_DISTRIB);
assert_plan!(plan_distrib, plan_sort);

Ok(())
}

#[test]
fn preserve_ordering_for_streaming_partially_sorted_aggregate() -> Result<()> {
let schema = schema();
let sort_key: LexOrdering = [PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions::default(),
}]
.into();
let input = parquet_exec_multiple_sorted(vec![sort_key]);
let physical_plan = partitioned_count_aggregate_exec(
input,
vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "b".to_string()),
],
"c",
);

let test_config = TestConfig::default().with_query_execution_partitions(2);

let plan_distrib = test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
assert_plan!(plan_distrib, @r"
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[COUNT(c)], ordering_mode=PartiallySorted([0])
RepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC
AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[COUNT(c)], ordering_mode=PartiallySorted([0])
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
");

let plan_sort = test_config.to_plan(physical_plan, &SORT_DISTRIB_DISTRIB);
assert_plan!(plan_distrib, plan_sort);

Ok(())
}

#[test]
fn do_not_preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
Expand Down
51 changes: 50 additions & 1 deletion datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,43 @@ fn add_hash_on_top(
///
/// * `input`: Current node.
///
/// Checks whether preserving the child's ordering enables the parent to
/// run in streaming mode. Compares the parent's pipeline behavior with
/// the ordered child vs. an unordered (coalesced) child. If removing the
/// ordering would cause the parent to switch from streaming to blocking,
/// keeping the order-preserving variant is beneficial.
///
/// Only applicable to single-child operators; returns `Ok(false)` for
/// multi-child operators (e.g. joins) where child substitution semantics are
/// ambiguous.
fn preserving_order_enables_streaming(
parent: &Arc<dyn ExecutionPlan>,
ordered_child: &Arc<dyn ExecutionPlan>,
) -> Result<bool> {
// Only applicable to single-child operators that maintain input order
// (e.g. AggregateExec in PartiallySorted mode). Operators that don't
// maintain input order (e.g. SortExec) handle ordering themselves —
// preserving SPM for them is unnecessary.
if parent.children().len() != 1 {
return Ok(false);
}
if !parent.maintains_input_order()[0] {
return Ok(false);
}
// Build parent with the ordered child
let with_ordered =
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)])?;
if with_ordered.pipeline_behavior() == EmissionType::Final {
// Parent is blocking even with ordering — no benefit
return Ok(false);
}
// Build parent with an unordered child via CoalescePartitionsExec.
let unordered_child: Arc<dyn ExecutionPlan> =
Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child)));
let without_ordered = Arc::clone(parent).with_new_children(vec![unordered_child])?;
Ok(without_ordered.pipeline_behavior() == EmissionType::Final)
}

/// # Returns
///
/// Updated node with an execution plan, where the desired single distribution
Expand Down Expand Up @@ -1340,6 +1377,12 @@ pub fn ensure_distribution(
}
};

let streaming_benefit = if child.data {
preserving_order_enables_streaming(&plan, &child.plan)?
} else {
false
};

// There is an ordering requirement of the operator:
if let Some(required_input_ordering) = required_input_ordering {
// Either:
Expand All @@ -1352,6 +1395,7 @@ pub fn ensure_distribution(
.ordering_satisfy_requirement(sort_req.clone())?;

if (!ordering_satisfied || !order_preserving_variants_desirable)
&& !streaming_benefit
&& child.data
{
child = replace_order_preserving_variants(child)?;
Expand All @@ -1372,6 +1416,11 @@ pub fn ensure_distribution(
// Stop tracking distribution changing operators
child.data = false;
} else {
let streaming_benefit = if child.data {
preserving_order_enables_streaming(&plan, &child.plan)?
} else {
false
};
// no ordering requirement
match requirement {
// Operator requires specific distribution.
Expand All @@ -1380,7 +1429,7 @@ pub fn ensure_distribution(
// ordering is pointless. However, if it does maintain
// input order, we keep order-preserving variants so
// ordering can flow through to ancestors that need it.
if !maintains {
if !maintains && !streaming_benefit {
child = replace_order_preserving_variants(child)?;
}
}
Expand Down
27 changes: 12 additions & 15 deletions datafusion/sqllogictest/test_files/agg_func_substitute.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true


query TT
Expand All @@ -64,11 +63,10 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true

query TT
EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
Expand All @@ -82,11 +80,10 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true

query II
SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result
Expand Down
18 changes: 8 additions & 10 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3971,11 +3971,10 @@ logical_plan
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true

# drop table multiple_ordered_table_with_pk
statement ok
Expand Down Expand Up @@ -4011,11 +4010,10 @@ logical_plan
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
physical_plan
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true

statement ok
set datafusion.execution.target_partitions = 1;
Expand Down
Loading
Loading