Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CometHashJoin always selects BuildRight which causes potential performance regression #1382

Open
hayman42 opened this issue Feb 10, 2025 · 10 comments
Labels
bug Something isn't working

Comments

@hayman42
Copy link

Describe the bug

First of all, thank you guys for such a great project. I am currently doing some research to see if our team can make use of datafusion comet to our workload.

As mentioned in hashjoin, it is important to keep build side table as small as possible. I am not sure if it is intended, but anyway current comet's implementation always chooses BuildRight unless it is impossible to build right. This causes performance regression for query like tpch q9.

Additionally, I tried to make some modifications to RewriteJoin so that build side selection works based on size of each table, but then other bugs happen.

Below are the metrics from CometHashJoin.

Before

BuildRight is selected even if right table is much larger

Image

After modification (I refered Gluten's source code https://github.com/hayman42/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala)

BuildLeft is selected and as a result CometHashJoin has become faster

Image

but afterwards I could find other bugs which make the job even slower. I just made changes to it without deep understanding of this project so I think that is the reason why.

Steps to reproduce

Run tpch SF200 q9 with following configs

    --master yarn\
    --deploy-mode cluster\
    --driver-memory 4G\
    --executor-memory 10G\
    --executor-cores 4\
    --num-executors 8\
    --conf spark.executor.memoryOverhead=0g \
    --conf spark.sql.shuffle.partitions=2000 \
    --conf spark.eventLog.enabled=true \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=7g \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.comet.exec.shuffle.mode=auto \
    --conf spark.comet.exec.shuffle.compression.codec=lz4 \
    --conf spark.comet.exec.shuffle.fallbackToColumnar=true \
    --conf spark.comet.exec.sort.enabled=true \
    --conf spark.comet.exec.replaceSortMergeJoin=true

Most of our workload is TB~PB scale, so I used multiple executors to test scalability.

I added --conf spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold=64MB for vanilla spark test to always enable SHJ.

Expected behavior

Ideally it should behave like vanilla spark's SHJ. Query with Spark SHJ is almost 8x faster in the setting above

Additional context

Below are the details for each setting

Comet (BuildRight - BuildRight - BuildRight)

Image

Spark (BuildLeft - BuildRight - BuildLeft)

Image

Comet with custom RewriteRule (BuildLeft - BuildRight - BuildRight(left table has become larger for unknown reason))

Image

@hayman42 hayman42 added the bug Something isn't working label Feb 10, 2025
@parthchandra
Copy link
Contributor

@hayman42 what a great find. I have not observed this myself even at SF10000, probably because by default we were falling back to SMJ. Would you be able to compare the plan with Comet shuffle disabled?

@hayman42
Copy link
Author

@parthchandra With comet shuffle disabled, the plan is almost like vanilla spark's because it replaces comet SHJ to spark SHJ. And thus it preserves spark's performance. Here is the plan with comet shuffle disabled.

comet shuffle disabled

image is not attached so I put text instead

+- == Final Plan ==
   Execute InsertIntoHadoopFsRelationCommand (66)
   +- WriteFiles (65)
      +- * Sort (64)
         +- AQEShuffleRead (63)
            +- ShuffleQueryStage (62), Statistics(sizeInBytes=9.9 KiB, rowCount=175)
               +- Exchange (61)
                  +- * HashAggregate (60)
                     +- AQEShuffleRead (59)
                        +- ShuffleQueryStage (58), Statistics(sizeInBytes=878.7 KiB, rowCount=1.37E+4)
                           +- Exchange (57)
                              +- * HashAggregate (56)
                                 +- * Project (55)
                                    +- * BroadcastHashJoin Inner BuildRight (54)
                                       :- * Project (48)
                                       :  +- * ShuffledHashJoin Inner BuildLeft (47)
                                       :     :- AQEShuffleRead (40)
                                       :     :  +- ShuffleQueryStage (39), Statistics(sizeInBytes=3.4 GiB, rowCount=6.53E+7)
                                       :     :     +- Exchange (38)
                                       :     :        +- * Project (37)
                                       :     :           +- * ShuffledHashJoin Inner BuildLeft (36)
                                       :     :              :- AQEShuffleRead (29)
                                       :     :              :  +- ShuffleQueryStage (28), Statistics(sizeInBytes=3.9 GiB, rowCount=6.53E+7)
                                       :     :              :     +- Exchange (27)
                                       :     :              :        +- * Project (26)
                                       :     :              :           +- * ShuffledHashJoin Inner BuildRight (25)
                                       :     :              :              :- AQEShuffleRead (18)
                                       :     :              :              :  +- ShuffleQueryStage (17), Statistics(sizeInBytes=3.4 GiB, rowCount=6.53E+7)
                                       :     :              :              :     +- Exchange (16)
                                       :     :              :              :        +- * Project (15)
                                       :     :              :              :           +- * ShuffledHashJoin Inner BuildLeft (14)
                                       :     :              :              :              :- AQEShuffleRead (7)
                                       :     :              :              :              :  +- ShuffleQueryStage (6), Statistics(sizeInBytes=33.2 MiB, rowCount=2.18E+6)
                                       :     :              :              :              :     +- Exchange (5)
                                       :     :              :              :              :        +- * CometColumnarToRow (4)
                                       :     :              :              :              :           +- CometProject (3)
                                       :     :              :              :              :              +- CometFilter (2)
                                       :     :              :              :              :                 +- CometScan parquet  (1)
                                       :     :              :              :              +- AQEShuffleRead (13)
                                       :     :              :              :                 +- ShuffleQueryStage (12), Statistics(sizeInBytes=62.6 GiB, rowCount=1.20E+9)
                                       :     :              :              :                    +- Exchange (11)
                                       :     :              :              :                       +- * CometColumnarToRow (10)
                                       :     :              :              :                          +- CometFilter (9)
                                       :     :              :              :                             +- CometScan parquet  (8)
                                       :     :              :              +- AQEShuffleRead (24)
                                       :     :              :                 +- ShuffleQueryStage (23), Statistics(sizeInBytes=45.8 MiB, rowCount=2.00E+6)
                                       :     :              :                    +- Exchange (22)
                                       :     :              :                       +- * CometColumnarToRow (21)
                                       :     :              :                          +- CometFilter (20)
                                       :     :              :                             +- CometScan parquet  (19)
                                       :     :              +- AQEShuffleRead (35)
                                       :     :                 +- ShuffleQueryStage (34), Statistics(sizeInBytes=4.8 GiB, rowCount=1.60E+8)
                                       :     :                    +- Exchange (33)
                                       :     :                       +- * CometColumnarToRow (32)
                                       :     :                          +- CometFilter (31)
                                       :     :                             +- CometScan parquet  (30)
                                       :     +- AQEShuffleRead (46)
                                       :        +- ShuffleQueryStage (45), Statistics(sizeInBytes=6.7 GiB, rowCount=3.00E+8)
                                       :           +- Exchange (44)
                                       :              +- * CometColumnarToRow (43)
                                       :                 +- CometFilter (42)
                                       :                    +- CometScan parquet  (41)
                                       +- BroadcastQueryStage (53), Statistics(sizeInBytes=1024.2 KiB, rowCount=25)
                                          +- BroadcastExchange (52)
                                             +- * CometColumnarToRow (51)
                                                +- CometFilter (50)
                                                   +- CometScan parquet  (49)

I have another question regarding your comment. Is it ok to use comet SMJ to a large dataset? Or did you just disable comet shuffle? I observed comet SMJ is way too slower compared to spark and that is why I am trying to use SHJ.

@parthchandra
Copy link
Contributor

@parthchandra With comet shuffle disabled, the plan is almost like vanilla spark's because it replaces comet SHJ to spark SHJ. And thus it preserves spark's performance.

Yeah, I was afraid that would be the case. Interesting that Spark gets the plan right but it gets messed up with Comet. Afaik, Comet itself does not do any of the build side planning. Maybe it should, which is what you've tried to do here. I'm not the expert on this, I'm afraid. @viirya any thoughts?

I have another question regarding your comment. Is it ok to use comet SMJ to a large dataset? Or did you just disable comet shuffle? I observed comet SMJ is way too slower compared to spark and that is why I am trying to use SHJ.

It should be ok to use Comet SMJ. We may be spilling too soon for Comet SMJ causing the slower performance. @kazuyukitanimura any thoughts on this?

@kazuyukitanimura
Copy link
Contributor

We have not enabled SHJ as default because we haven't implemented spilling IIRC.
Regardless, thank you for reporting this @hayman42 Did you have a chance to dive into where the slowness come after your change?

@hayman42
Copy link
Author

@kazuyukitanimura I am not sure but I think the slowness comes from CometExchange that is executed after the join with BuildLeft

this is for original Comet

CometExchange

shuffle records written: 65,254,713
number of spills: 2,619
shuffle write time total (min, med, max )
29.7 s (2 ms, 43 ms, 138 ms )
number of input batches: 100,000
records read: 65,254,713
memory pool time total (min, med, max )
1.2 m (25 ms, 113 ms, 183 ms )
local bytes read total (min, med, max )
513.1 MiB (4.3 MiB, 7.7 MiB, 9.5 MiB )
fetch wait time total (min, med, max )
0 ms (0 ms, 0 ms, 0 ms )
remote bytes read total (min, med, max )
3.4 GiB (37.6 MiB, 52.2 MiB, 54.1 MiB )
repartition time total (min, med, max )
50.8 s (21 ms, 53 ms, 315 ms )
decoding and decompression time total (min, med, max )
3.3 m (1.6 s, 2.4 s, 8.6 s )
local blocks read: 171,154
spilled bytes: 2,250,148,478,976
remote blocks read: 1,160,828
data size total (min, med, max )
4.4 GiB (4.5 MiB, 6.8 MiB, 7.2 MiB )
native shuffle writer time total (min, med, max )
4.8 m (100 ms, 341 ms, 1.5 s )
number of partitions: 2,000
encoding and compression time total (min, med, max )
1.9 m (34 ms, 102 ms, 744 ms )
remote reqs duration total (min, med, max )
1.2 m (339 ms, 639 ms, 2.2 s )
shuffle bytes written total (min, med, max )
3.9 GiB (2.6 MiB, 6.2 MiB, 6.8 MiB )

and here is the metric with my change

CometExchange

shuffle records written: 65,254,713
number of spills: 17,160
shuffle write time total (min, med, max )
3.0 m (2 ms, 243 ms, 878 ms )
number of input batches: 11,485,874
records read: 65,254,713
memory pool time total (min, med, max )
8.9 m (36 ms, 782 ms, 1.7 s )
local bytes read total (min, med, max )
748.1 MiB (1611.3 KiB, 7.9 MiB, 9.8 MiB )
fetch wait time total (min, med, max )
0 ms (0 ms, 0 ms, 0 ms )
remote bytes read total (min, med, max )
4.8 GiB (12.7 MiB, 52.0 MiB, 55.9 MiB )
repartition time total (min, med, max )
4.2 m (62 ms, 294 ms, 2.0 s )
decoding and decompression time total (min, med, max )
18.8 m (7.4 s, 9.9 s, 36.4 s )
local blocks read: 174,515
spilled bytes: 16,134,291,652,608
remote blocks read: 1,157,467
data size total (min, med, max )
5.9 GiB (6.0 MiB, 9.1 MiB, 9.6 MiB )
native shuffle writer time total (min, med, max )
22.1 m (152 ms, 1.7 s, 6.7 s )
number of partitions: 2,000
encoding and compression time total (min, med, max )
3.7 m (21 ms, 244 ms, 2.0 s )
remote reqs duration total (min, med, max )
57.0 s (71 ms, 337 ms, 1.6 s )
shuffle bytes written total (min, med, max )
5.6 GiB (2.1 MiB, 8.7 MiB, 9.2 MiB )

It is weird that most of the metrics including spill size and execution time get 7-8x higher. I don't know why it happens but I am trying to figure out.

@kazuyukitanimura
Copy link
Contributor

Spilling greatly affect the speed. Need to understand why it is the case

@viirya
Copy link
Member

viirya commented Feb 11, 2025

Yeah, I was afraid that would be the case. Interesting that Spark gets the plan right but it gets messed up with Comet. Afaik, Comet itself does not do any of the build side planning. Maybe it should, which is what you've tried to do here. I'm not the expert on this, I'm afraid. @viirya any thoughts?

I remember that Comet query planning doesn't change build side on HashJoin:

.setBuildSide(
if (join.buildSide == BuildLeft) BuildSide.BuildLeft else BuildSide.BuildRight)

If you turn off AQE, will it affect the build side?

@hayman42
Copy link
Author

@viirya It seems AQE does not affect the build side. I confirmed both Spark and Comet behave the same as before with AQE disabled.

@andygrove
Copy link
Member

Thanks for writing this up @hayman42. I opened a PR #1424 to update RewriteRule to match latest version in Gluten and I am seeing a large improvement in performance for TPC-H (more than 10% faster).

@andygrove
Copy link
Member

@kazuyukitanimura I am not sure but I think the slowness comes from CometExchange that is executed after the join with BuildLeft

this is for original Comet

CometExchange

shuffle records written: 65,254,713
number of spills: 2,619
shuffle write time total (min, med, max )
29.7 s (2 ms, 43 ms, 138 ms )
number of input batches: 100,000
records read: 65,254,713
memory pool time total (min, med, max )
1.2 m (25 ms, 113 ms, 183 ms )
local bytes read total (min, med, max )
513.1 MiB (4.3 MiB, 7.7 MiB, 9.5 MiB )
fetch wait time total (min, med, max )
0 ms (0 ms, 0 ms, 0 ms )
remote bytes read total (min, med, max )
3.4 GiB (37.6 MiB, 52.2 MiB, 54.1 MiB )
repartition time total (min, med, max )
50.8 s (21 ms, 53 ms, 315 ms )
decoding and decompression time total (min, med, max )
3.3 m (1.6 s, 2.4 s, 8.6 s )
local blocks read: 171,154
spilled bytes: 2,250,148,478,976
remote blocks read: 1,160,828
data size total (min, med, max )
4.4 GiB (4.5 MiB, 6.8 MiB, 7.2 MiB )
native shuffle writer time total (min, med, max )
4.8 m (100 ms, 341 ms, 1.5 s )
number of partitions: 2,000
encoding and compression time total (min, med, max )
1.9 m (34 ms, 102 ms, 744 ms )
remote reqs duration total (min, med, max )
1.2 m (339 ms, 639 ms, 2.2 s )
shuffle bytes written total (min, med, max )
3.9 GiB (2.6 MiB, 6.2 MiB, 6.8 MiB )

and here is the metric with my change

CometExchange

shuffle records written: 65,254,713
number of spills: 17,160
shuffle write time total (min, med, max )
3.0 m (2 ms, 243 ms, 878 ms )
number of input batches: 11,485,874
records read: 65,254,713
memory pool time total (min, med, max )
8.9 m (36 ms, 782 ms, 1.7 s )
local bytes read total (min, med, max )
748.1 MiB (1611.3 KiB, 7.9 MiB, 9.8 MiB )
fetch wait time total (min, med, max )
0 ms (0 ms, 0 ms, 0 ms )
remote bytes read total (min, med, max )
4.8 GiB (12.7 MiB, 52.0 MiB, 55.9 MiB )
repartition time total (min, med, max )
4.2 m (62 ms, 294 ms, 2.0 s )
decoding and decompression time total (min, med, max )
18.8 m (7.4 s, 9.9 s, 36.4 s )
local blocks read: 174,515
spilled bytes: 16,134,291,652,608
remote blocks read: 1,157,467
data size total (min, med, max )
5.9 GiB (6.0 MiB, 9.1 MiB, 9.6 MiB )
native shuffle writer time total (min, med, max )
22.1 m (152 ms, 1.7 s, 6.7 s )
number of partitions: 2,000
encoding and compression time total (min, med, max )
3.7 m (21 ms, 244 ms, 2.0 s )
remote reqs duration total (min, med, max )
57.0 s (71 ms, 337 ms, 1.6 s )
shuffle bytes written total (min, med, max )
5.6 GiB (2.1 MiB, 8.7 MiB, 9.2 MiB )

It is weird that most of the metrics including spill size and execution time get 7-8x higher. I don't know why it happens but I am trying to figure out.

Spilling 16 TB (16,134,291,652,608) is definitely not desirable. I suspect that our spilling is currently too aggressive. Could you try allocating more off-heap memory to avoid spilling and see how the performance looks?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants