fix(rust/sedona-spatial-join): wrap probe-side repartition in ProbeShuffleExec to prevent optimizer stripping#677
Conversation
… prevent optimizer stripping DataFusion's EnforceDistribution and EnforceSorting optimizer passes recognize and can strip RepartitionExec nodes that the spatial join planner inserts for round-robin repartitioning of the probe side. This causes the probe side to lose its intended parallelism. Introduce ProbeShuffleExec, a thin wrapper around RepartitionExec that is opaque to DataFusion's optimizer passes, preserving the repartitioning.
There was a problem hiding this comment.
Pull request overview
This PR addresses a performance regression in probe-side auto-repartitioning for spatial joins by making the probe-side round-robin shuffle opaque to DataFusion’s EnforceDistribution / EnforceSorting optimizer passes, so it can’t be stripped during optimization.
Changes:
- Introduces
ProbeShuffleExec, a wrapper around an internalRepartitionExecused to preserve probe-side round-robin repartitioning through physical optimization. - Updates the spatial join physical planner to insert
ProbeShuffleExecinstead of a bareRepartitionExecon the probe side. - Extends integration tests to assert
ProbeShuffleExecpresence on optimized spatial join probe sides across the integration suite.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| rust/sedona-spatial-join/src/planner/probe_shuffle_exec.rs | Adds the new ProbeShuffleExec wrapper implementation delegating to an internal RepartitionExec. |
| rust/sedona-spatial-join/src/planner/physical_planner.rs | Switches probe-side auto-repartition insertion to use ProbeShuffleExec. |
| rust/sedona-spatial-join/src/planner.rs | Exposes the new planner submodule for ProbeShuffleExec. |
| rust/sedona-spatial-join/src/lib.rs | Re-exports ProbeShuffleExec for external visibility (tests/other crates). |
| rust/sedona-spatial-join/tests/spatial_join_integration.rs | Adds assertions to verify probe-side shuffling via ProbeShuffleExec when enabled. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
paleolimbot
left a comment
There was a problem hiding this comment.
Thank you!
Is there a specific test case or benchmark that you used to determine this was an issue? (And if so, can/should it be added to either the tests or benchmarks in this repo?)
|
It applies to all the test cases, the original implementation didn't work and I have not added assertions to verify that RepartitionExec should be present on the probe side. This patch updated the common test method for running spatial join to verify that
I have also observed increase in network bandwidth utilization for some spatial join queries using OvertureMaps data. This is because RepartitionExec uses channels to parallelize the execution of downstream and upstream executors. The network I/O could be scheduled and complete in the background when the tokio worker is busy computing inside SpatialJoinExec. |
Problem
We have tested the release candidate 0.3.0 and found that the probe side auto-repartitioning introduced by #610 didn't work as expected. This is because DataFusion's
EnforceDistributionoptimization pass recognizeRepartitionExecnodes and can strip or replace them. When the spatial join planner inserts aRepartitionExec(RoundRobinBatch)on the probe side to ensure workload balance and eliminate data distribution skewness, these optimizer passes could remove it, causing the probe side to lose its intended repartitioning.This problem does not have impact on the correctness of query results, it affects the balancing of spatial join workloads across worker threads and the CPU utilization.
Solution
ProbeShuffleExecwraps aRepartitionExecinternally but presents itself as a differentExecutionPlantype. Since the optimizer passes only look forRepartitionExecnodes by type, they leaveProbeShuffleExecuntouched. The wrapper delegates allExecutionPlantrait methods to the innerRepartitionExec.Changes
ProbeShuffleExec, a thin wrapper aroundRepartitionExecthat is opaque to DataFusion'sEnforceDistribution/EnforceSortingoptimizer passes, preventing them from stripping the round-robin repartitioning that the spatial join planner inserts on the probe side.ProbeShuffleExecpresence assertion intorun_spatial_join_queryso it runs on every optimized spatial join integration test (~174 cases) rather than only in standalone tests.