Skip to content

Commit a3f8191

Browse files
committed
refine code
1 parent 11501ff commit a3f8191

File tree

2 files changed

+5
-11
lines changed

2 files changed

+5
-11
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3305,8 +3305,7 @@ fn preserve_ordering_for_streaming_sorted_aggregate() -> Result<()> {
33053305

33063306
let test_config = TestConfig::default().with_query_execution_partitions(2);
33073307

3308-
let plan_distrib =
3309-
test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
3308+
let plan_distrib = test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
33103309
assert_plan!(plan_distrib, @r"
33113310
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted
33123311
RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC
@@ -3340,8 +3339,7 @@ fn preserve_ordering_for_streaming_partially_sorted_aggregate() -> Result<()> {
33403339

33413340
let test_config = TestConfig::default().with_query_execution_partitions(2);
33423341

3343-
let plan_distrib =
3344-
test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
3342+
let plan_distrib = test_config.to_plan(physical_plan.clone(), &DISTRIB_DISTRIB_SORT);
33453343
assert_plan!(plan_distrib, @r"
33463344
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[COUNT(c)], ordering_mode=PartiallySorted([0])
33473345
RepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,6 +1381,9 @@ pub fn ensure_distribution(
13811381
}
13821382
};
13831383

1384+
let streaming_benefit = child.data
1385+
&& preserving_order_enables_streaming(&plan, &child.plan);
1386+
13841387
// There is an ordering requirement of the operator:
13851388
if let Some(required_input_ordering) = required_input_ordering {
13861389
// Either:
@@ -1392,8 +1395,6 @@ pub fn ensure_distribution(
13921395
.equivalence_properties()
13931396
.ordering_satisfy_requirement(sort_req.clone())?;
13941397

1395-
let streaming_benefit = child.data
1396-
&& preserving_order_enables_streaming(&plan, &child.plan);
13971398
if (!ordering_satisfied || !order_preserving_variants_desirable)
13981399
&& !streaming_benefit
13991400
&& child.data
@@ -1420,11 +1421,6 @@ pub fn ensure_distribution(
14201421
match requirement {
14211422
// Operator requires specific distribution.
14221423
Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1423-
let streaming_benefit = child.data
1424-
&& preserving_order_enables_streaming(
1425-
&plan,
1426-
&child.plan,
1427-
);
14281424
if !streaming_benefit {
14291425
child =
14301426
replace_order_preserving_variants(child)?;

0 commit comments

Comments
 (0)