Skip to content

Fix/aggregate output ordering streaming#21107

Open
xudong963 wants to merge 4 commits intoapache:mainfrom
xudong963:agg_order
Open

Fix/aggregate output ordering streaming#21107
xudong963 wants to merge 4 commits intoapache:mainfrom
xudong963:agg_order

Conversation

@xudong963
Copy link
Member

@xudong963 xudong963 commented Mar 23, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

What changes are included in this PR?

This PR updates EnforceDistribution to keep order-preserving repartition variants when preserving input ordering allows the parent operator to remain incremental/streaming.

Previously, order-preserving variants could be removed when prefer_existing_sort = false or when there was no explicit ordering requirement, even if dropping the ordering would force a parent operator such as AggregateExec to fall back to blocking execution. This change adds a targeted preserving_order_enables_streaming check and uses it to avoid replacing RepartitionExec(..., preserve_order=true) / SortPreservingMergeExec when that preserved ordering is what enables streaming behavior.

As a result, the optimizer now prefers keeping order-preserving repartitioning in these cases, and the updated sqllogictests reflect the new physical plans: instead of inserting a SortExec above a plain repartition, plans now retain RepartitionExec(... preserve_order=true) so sorted or partially sorted aggregates can continue running incrementally.

Are these changes tested?

Are there any user-facing changes?

No extra sort needed for these cases

@xudong963 xudong963 marked this pull request as draft March 23, 2026 05:36
@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Mar 23, 2026
@xudong963 xudong963 changed the title Fix/aggregate output ordering streaming (#33) Fix/aggregate output ordering streaming Mar 23, 2026
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions github-actions bot added the core Core DataFusion crate label Mar 23, 2026
@xudong963 xudong963 added the performance Make DataFusion faster label Mar 23, 2026
@xudong963 xudong963 marked this pull request as ready for review March 23, 2026 09:33
@xudong963 xudong963 requested a review from alamb March 23, 2026 09:33
@alamb
Copy link
Contributor

alamb commented Mar 24, 2026

Previously, order-preserving variants could be removed when prefer_existing_sort = false or when there was no explicit ordering requirement, even if dropping the ordering would force a parent operator such as AggregateExec to fall back to blocking execution.

I am not sure this is a "bug" necessarily -- more like a tradeoff. I believe the enforce_distribution plan will attempt to increase plan parallelism even if it has to resort by default

My understanding is that this is what the prefer_existing_sort setting controls

prefer_existing_sort = false

So if you want plans to keep existing sorts and not increase parallelism in that case, you should set prefer_existing_sort = true -- this is what we do in InfluxDB FWIW

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xudong963 -- this change looks good to me. As I understand it it avoids adding a HashRepartitioning (and uses a more memory efficient operator) so that sounds like a win all around

I was worried that this change would result in trading off "more sortedness" for "less parallelism" but from what I can see that is not the case.

One thing we may want to consider it gating this behavior behind the "prefer existing sort" flag -- that way

I don't think we should be ignoring the errors, but otherwise the code and tests look good to me.

10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
03)----RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4, preserve_order=true, sort_exprs=generated_id@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this plan looks better to me (it still is fully parallelized and now uses avoids an unecessary hash repartition

// Parent is blocking even with ordering — no benefit
return false;
}
// Build parent with an unordered child (simulating CoalescePartitionsExec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this commet is strange to me -- the code adds a CoalescePartitionsExec -- so I don't think it is "simulating" anything

let with_ordered =
match Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
Ok(p) => p,
Err(_) => return false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should ignore the Err here or below as it could mask real errors / a bug with this code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate optimizer Optimizer rules performance Make DataFusion faster sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants