You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am running a modified version of TPC-H query 10. I've removed the filters to stress comet and see how it behaves when processing large amount of data:
-- SQLBench-H query 10 derived from TPC-H query 10 under the terms of the TPC Fair Use Policy.-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council.select
c_custkey,
c_name,
sum(l_extendedprice * (1- l_discount)) as revenue,
c_acctbal,
n_name,
c_address,
c_phone,
c_comment
from
customer,
orders,
lineitem,
nation
where
c_custkey = o_custkey
and l_orderkey = o_orderkey
and c_nationkey = n_nationkey
group by
c_custkey,
c_name,
c_acctbal,
c_phone,
n_name,
c_address,
c_comment
order by
revenue desclimit20;
The Spark SQL metrics page showed that the CometSortExec operators return 100 rows and 0 row, and no spill is triggered. The SMJ node returns 0 rows, which is certainly not the case.
I've enabled spark.comet.explain.native.enabled and saw that the native execution plan with metrics showed somewhat reasonable numbers:
24/10/08 13:34:59 INFO core/src/execution/jni_api.rs: Comet native query plan with metrics (stage: 13 task: 42):
AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 as col_6], aggr=[sum], metrics=[output_rows=2553986, elapsed_compute=3.777709066s]
ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, col_8@8 as col_7, col_1@10 as col_8], metrics=[output_rows=14996536, elapsed_compute=2.170829ms]
ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1], metrics=[output_rows=14996536, elapsed_compute=2.538593ms]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_3@3)], metrics=[output_rows=14996536, input_batches=1831, build_input_rows=25, build_input_batches=1, output_batches=1831, input_rows=14996536, build_mem_used=1392, build_time=53.125µs, join_time=856.613437ms]
CopyExec [UnpackOrDeepCopy], metrics=[output_rows=25, elapsed_compute=5.209µs]
ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int64, col_1: Utf8], metrics=[output_rows=25, elapsed_compute=1.168µs, cast_time=1ns]
CopyExec [UnpackOrClone], metrics=[output_rows=14996536, elapsed_compute=2.237373ms]
ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as col_6, col_1@9 as col_7, col_2@10 as col_8], metrics=[output_rows=14996536, elapsed_compute=2.037619ms]
SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)], metrics=[output_rows=14996536, spill_count=0, spilled_bytes=0, spilled_rows=0, input_batches=2289, input_rows=18746334, output_batches=1831, peak_mem_used=918320, join_time=4.79088405s]
SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false], metrics=[output_rows=3749798, elapsed_compute=1.841784352s, spill_count=3, spilled_bytes=586947488, spilled_rows=3144844]
CopyExec [UnpackOrDeepCopy], metrics=[output_rows=3749798, elapsed_compute=66.035032ms]
ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: Decimal128(12, 2), col_6: Utf8, col_7: Int64], metrics=[output_rows=3749798, elapsed_compute=411.397µs, cast_time=1ns]
SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false], metrics=[output_rows=14996536, elapsed_compute=2.318524006s, spill_count=4, spilled_bytes=590603456, spilled_rows=14752112]
CopyExec [UnpackOrDeepCopy], metrics=[output_rows=14996536, elapsed_compute=32.209479ms]
ScanExec: source=[Exchange (unknown)], schema=[col_0: Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)], metrics=[output_rows=14996536, elapsed_compute=472.861µs, cast_time=1ns]
The SortExec operator in one of the tasks produced 15 million rows, and spilled 3~4 times.
Steps to reproduce
Run the SQL query mentioned above on TPC-H data with scale factor = 10.
Expected behavior
The metrics shown on the SQL page should be consistent with the native datafusion metrics.
Additional context
This issue is produced using Spark 3.5 with master=local[4], the version of comet is a slightly modified version of 3413397
Kontinuation
changed the title
Spark SQL metrics is incorrect when running some particular queries
Spark SQL metrics is incorrect when running a modified version of TPC-H Query 10
Oct 8, 2024
Describe the bug
I am running a modified version of TPC-H query 10. I've removed the filters to stress comet and see how it behaves when processing large amount of data:
The Spark SQL metrics page showed that the
CometSortExec
operators return 100 rows and 0 row, and no spill is triggered. The SMJ node returns 0 rows, which is certainly not the case.I've enabled
spark.comet.explain.native.enabled
and saw that the native execution plan with metrics showed somewhat reasonable numbers:The SortExec operator in one of the tasks produced 15 million rows, and spilled 3~4 times.
Steps to reproduce
Run the SQL query mentioned above on TPC-H data with scale factor = 10.
Expected behavior
The metrics shown on the SQL page should be consistent with the native datafusion metrics.
Additional context
This issue is produced using Spark 3.5 with
master=local[4]
, the version of comet is a slightly modified version of 3413397Here are relevant spark configurations:
The text was updated successfully, but these errors were encountered: