Skip to content

[SPARK-56034][SQL] Push down Join through Union when the right side is broadcastable#54865

Draft
LuciferYang wants to merge 2 commits intoapache:masterfrom
LuciferYang:SPARK-56034
Draft

[SPARK-56034][SQL] Push down Join through Union when the right side is broadcastable#54865
LuciferYang wants to merge 2 commits intoapache:masterfrom
LuciferYang:SPARK-56034

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Mar 17, 2026

What changes were proposed in this pull request?

This PR adds a new optimizer rule PushDownJoinThroughUnion that transforms:

  Join(Union(c1, c2, ..., cN), right, joinType, cond) 

into:

  Union(Join(c1, right, joinType, cond1), Join(c2, right, joinType, cond2), ...)         

when the right side of the join is small enough to broadcast (by size statistics or explicit BROADCAST hints).

The rule applies to Inner and LeftOuter joins and is gated by a new config spark.sql.optimizer.pushDownJoinThroughUnion.enabled (default true). It is placed after the "Early Filter and Projection Push-Down" batch in the optimizer to ensure accurate data
source statistics are available.

Key implementation details:

  • Uses the "fake self-join + DeduplicateRelations" pattern (same as InlineCTE) to create independent copies of the right subtree with fresh ExprIds for each Union branch.
  • Join condition attributes referencing Union output are rewritten to the corresponding child's output attributes.
  • Conservatively excludes right subtrees containing subqueries to avoid potential issues with DeduplicateRelations and correlated references.

Why are the changes needed?

This is a common pattern in TPC-DS queries (e.g., q2, q5, q54, q5a) and real-world analytics workloads: a large fact table is formed by UNION ALL of multiple sources and then joined with a small dimension table.

By pushing the broadcast join into each Union branch, each branch can independently join with the (small) dimension table and produce a narrower, filtered result before being combined by the Union. This provides the following benefits:

  1. Earlier data reduction per branch: When the join is selective (e.g., a date dimension filter), each branch produces fewer rows earlier in the pipeline, which can reduce the data volume flowing into subsequent operators such as aggregations or further joins.

  2. Independent per-branch optimization: Each branch becomes a self-contained subplan, enabling the optimizer and AQE to make better per-branch decisions (e.g., different join strategies, custom shuffle readers) based on each branch's actual data characteristics.

Note that since the original join is already a BroadcastHashJoin (the right side is broadcastable), the Union result was not being shuffled before this transformation. This rule does not eliminate any shuffle exchange. The broadcast exchange for the right side
is materialized once and reused across branches via the existing ReuseExchangeAndSubquery physical rule, so the broadcast overhead is minimal.

Known limitation: This transformation inserts a Union node between a Join and its parent Aggregate in some query patterns (e.g., TPC-DS q2). Since UnionEstimation does not currently propagate distinctCount through Union ([SPARK-XXXXX]),
AggregateEstimation CBO may fall back to a less accurate size estimate, potentially causing the physical planner to choose a suboptimal join strategy for parent joins. This is a pre-existing gap in UnionEstimation that affects any GROUP BY ... FROM (... UNION ALL ...) pattern with CBO stats, and should be addressed in a separate PR.

Does this PR introduce any user-facing change?

Yes. A new SQL configuration is added:

  • spark.sql.optimizer.pushDownJoinThroughUnion.enabled (default: true): When true, pushes down Join through Union when the join's right side is small enough to broadcast.

Query plans for affected patterns (e.g., TPC-DS q2, q5, q54, q5a) will change — the Join is pushed below the Union, and ReusedExchange replaces duplicate broadcast exchanges within each Union branch.

How was this patch tested?

  1. Added PushDownJoinThroughUnionSuite in sql/catalyst (14 test cases): verifies plan transformation for Inner/LeftOuter joins, attribute rewriting, ExprId uniqueness across Union branches, negative cases (unsupported join types, no condition, Union on right side,
    config disabled, right side too large), and complex right-side subtrees (Filter+Project, Generate/Explode, SubqueryAlias, Aggregate).
  2. Added PushDownJoinThroughUnionSuite in sql/core (7 test cases): end-to-end correctness tests including 2-way and 3-way UNION ALL with broadcast join, LeftOuter join, optimization enabled vs disabled comparison, column pruning, and predicate push-down interaction.
  3. Updated TPC-DS plan stability golden files for affected queries: q2, q5, q54 (approved-plans-v1_4) and q5a (approved-plans-v2_7), including both sf100 and default variants.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

@LuciferYang LuciferYang marked this pull request as draft March 17, 2026 12:42
.createWithDefault(true)

val PUSH_DOWN_JOIN_THROUGH_UNION_ENABLED =
buildConf("spark.sql.optimizer.pushDownJoinThroughUnion.enabled")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it needs to be set to false by default, please let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for true by default because this configuration is only a safe-guard for any future regression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the code, we can use spark.sql.optimizer.excludedRules instead of this, right? Is there any difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @dongjoon-hyun. You're right — spark.sql.optimizer.excludedRules already provides a general mechanism to disable any optimizer rule, and adding a dedicated config for each rule would lead to config proliferation. I'll remove the dedicated config spark.sql.optimizer.pushDownJoinThroughUnion.enabled and rely on excludedRules instead. Thanks for the suggestion!

@LuciferYang LuciferYang marked this pull request as ready for review March 17, 2026 12:44
@dongjoon-hyun
Copy link
Member

cc @yaooqinn and @peter-toth , too.

@peter-toth
Copy link
Contributor

peter-toth commented Mar 17, 2026

I might be missing something but I don't get this part:

Without this optimization, Spark must first shuffle the entire Union result before performing the join. With this rule, each Union branch joins independently with the broadcasted right side, eliminating the expensive shuffle of the Union result.

Why Spark needs to shuffle the union result if the right side is small enough to be broadcasted (i.e. the original join was a broadcast join)? Is there a TPCDS plan where an exchange is removed by this PR?

* Sort (59)
+- Exchange (58)
+- * Project (57)
+- * SortMergeJoin Inner (56)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was BroadcastHashJoin before this PR. Why do we have SortMergeJoin now?

Copy link
Contributor Author

@LuciferYang LuciferYang Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch @peter-toth. I investigated the root cause and it turns out to be a statistics estimation degradation chain triggered by a pre-existing gap in UnionEstimation.

Root cause: UnionEstimation only propagates min/max and nullCount through Union — it does not propagate distinctCount. When PushDownJoinThroughUnion transforms the plan from Aggregate(Join(Union, date_dim)) to Aggregate(Union(Join, Join)), the d_week_seq column loses its distinctCount after passing through the new Union node, which causes AggregateEstimation CBO to fail (since hasCountStats requires both distinctCount and nullCount), falling back to SizeInBytesOnlyStatsPlanVisitor with a vastly inflated estimate.

I wrote a simplified reproduction test using TPC-DS sf100 stats to measure the actual impact:

Metric BEFORE (Agg(Join(Union, dd))) AFTER (Agg(Union(Join, Join)))
d_week_seq distinctCount Some(10010) None (lost by Union)
d_week_seq hasCountStats true false
Aggregate rowCount Some(10010) None (CBO failed)
Aggregate sizeInBytes 195KB 4.1GB (~21,000x inflation)

This inflated estimate (4.1GB) far exceeds the broadcast threshold (default 10MB), causing the top-level self-join (year-over-year comparison) to fall from BroadcastHashJoin to SortMergeJoin.

Notably, I think this UnionEstimation gap is pre-existing — any GROUP BY ... FROM (... UNION ALL ...) pattern with CBO column stats will lose distinctCount through the Union. I can try to create a case to reproduce this issue without this pr and attempt to fix it separately first

) match {
case Join(_, deduped, _, _, _) => deduped
case other =>
throw SparkException.internalError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any other optimization through bug-like errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yaooqinn. Yes, SparkException.internalError is used in several optimizer rules as a defensive guard for "should-never-happen" plan shapes, for example:

  • NestedColumnAliasing: "Unreasonable plan after optimization: $other"
  • PushExtraPredicateThroughJoin / Optimizer: "Unexpected join type: $other"
  • DecorrelateInnerQuery: "Unexpected domain join type $o"
  • subquery.scala: "Unexpected plan when optimizing one row relation subquery: $o"

The dedupRight method here follows the same pattern — it guards against the (theoretically impossible) case where DeduplicateRelations changes the Join plan shape.

That said, InlineCTE uses the same "fake self-join + DeduplicateRelations" approach and simply calls .children(1) directly without any defensive check. I can align with InlineCTE and remove the explicit throw if you think that's cleaner. Alternatively, I could keep the pattern match but return the original plan unchanged in the fallback case (skipping the dedup rather than failing). Which approach would you prefer?

hint: JoinHint): Boolean = {
canBroadcastBySize(right, conf) ||
hint.rightHint.exists(_.strategy.contains(BROADCAST)) ||
(joinType == Inner && hint.leftHint.exists(_.strategy.contains(BROADCAST)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the right side is broadcastable

Is this out-of-scope?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @yaooqinn. You're right — this leftHint check on line 111 is problematic and should be removed.

@LuciferYang LuciferYang marked this pull request as draft March 17, 2026 17:44
@LuciferYang
Copy link
Contributor Author

@dongjoon-hyun @yaooqinn @peter-toth Thank you for your comments. I will carefully review the issues mentioned tomorrow.

@LuciferYang
Copy link
Contributor Author

I might be missing something but I don't get this part:

Without this optimization, Spark must first shuffle the entire Union result before performing the join. With this rule, each Union branch joins independently with the broadcasted right side, eliminating the expensive shuffle of the Union result.

Why Spark needs to shuffle the union result if the right side is small enough to be broadcasted (i.e. the original join was a broadcast join)? Is there a TPCDS plan where an exchange is removed by this PR?

Thanks @peter-toth, you are absolutely right. The PR description was incorrect — since the right side is already broadcastable, the original join is a BroadcastHashJoin, and the Union result (left side) is not shuffled. I apologize for the misleading description.

if conf.getConf(SQLConf.PUSH_DOWN_JOIN_THROUGH_UNION_ENABLED) &&
(joinType == Inner || joinType == LeftOuter) &&
joinCond.isDefined &&
isBroadcastable(joinType, right, hint) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In PushDownLeftSemiAntiJoin we use canPlanAsBroadcastHashJoin(), can you please check if we could use that here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's feasible. Let's give it a test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants