Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: AQE creating a non-supported Final HashAggregate post-shuffle #1390

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

EmilyMatt
Copy link
Contributor

What issue does this close?

Closes #1389 .

Rationale for this change

As described in the issue, we'd like to prevent situations where despite the Partial aggregate being supported and converted, and the shuffle being supported and converted, the Final would not be converted, because the result expressions were not supported.
This leads to an unrecoverable state, where Spark expects an aggregate buffer to be created by the Partial HA and it doesn't exist.

What changes are included in this PR?

I've separated the conversion of the hash aggregate into a separate function(I believe everything should be separated tbh, its very hard to manage rn), which also returns information about whether the result expressions were converted, when they are not, we create a new ProjectExec with those result expressions, convert the HA without them, and place a conversion between the two, that way we can ensure a valid state at all times.
This feature can be ignored by enforcing result conversion, using "spark.comet.exec.aggregate.enforceResults=true",
result enforcing is disabled by default.

How are these changes tested?

Essentially a lot of the stability tests, will have a new plan where the aggregate is completed natively, and the ProjectExec runs in Spark, instead of the current situation, where the final stage of the HashAggregate is done in Spark completely.
Those tests currently fail because I am unable to run them with SPARK_GENERATE_GOLDEN_FILES, might be a skill issue

@EmilyMatt EmilyMatt changed the title Ok push for now until I understand how to regenerate golden files fix: AQE creating a non-supported Final HashAggregate post-shuffle Feb 12, 2025
@andygrove
Copy link
Member

Thanks @EmilyMatt. Would it be possible to add a test to reproduce the issue?

@codecov-commenter
Copy link

codecov-commenter commented Feb 12, 2025

Codecov Report

Attention: Patch coverage is 70.29703% with 30 lines in your changes missing coverage. Please review.

Project coverage is 38.82%. Comparing base (f09f8af) to head (f9c133c).
Report is 25 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 61.19% 18 Missing and 8 partials ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 86.66% 0 Missing and 4 partials ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##               main    #1390       +/-   ##
=============================================
- Coverage     56.12%   38.82%   -17.31%     
- Complexity      976     2005     +1029     
=============================================
  Files           119      262      +143     
  Lines         11743    60643    +48900     
  Branches       2251    12897    +10646     
=============================================
+ Hits           6591    23544    +16953     
- Misses         4012    32598    +28586     
- Partials       1140     4501     +3361     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@EmilyMatt
Copy link
Contributor Author

@andygrove I've updated "test final min/max/count with result expressions" in the Aggregate suite to verify this more deeply, and I think many of the queries in the stability tests also have this issue.
The current implementation does not crash or anything but just converts the intermediate results back to Spark so they can be used in the Spark HashAggregate.
With this PR the aggregate will be fully native so hopefully this will be a performance boost.
I think a crash will be encountered in any Aggregate that uses an aggregate buffer, as the Partial CometHashAggregate will not generate the expected IntermediateAggBuffer representation Spark expects, and possibly not even the same data type

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reviewing

Comment on lines +221 to +226
.doc("Whether to enforce converting results in the Final stage of a HashAggregate, " +
"When enabled, Final-mode hashAggregates will not be converted to Comet, this can cause " +
"issues when native shuffle is enabled. " +
"If this is disabled, unsupported result expressions will be " +
"separated into a ProjectExec to allow HashAggregate to complete natively. " +
"This is disabled by default.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to understand this more
Final-mode hashAggregates will not be converted to Comet so final aggregation falls back to Spark?
this can cause isues when native shuffle is enabled. why it is the case?
And when should we enable this option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess When the result expression of a Final-mode HashAggregate is unsupported, the entire HashAggregate will fall back to Spark with this option enabled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kazuyukitanimura
Apologies, I believe I intended to say when native shuffle is disabled(or more correctly - when columnar shuffle is used)^
Essentially whenever the Schema would come from Spark and not from the data/batch itself, we will start having problems.

Looking at the tpcds tests, we can see an example:
HashAggregate [cp_catalog_page_id,sum,sum,isEmpty,sum,isEmpty] [sum(UnscaledValue(cs_ext_sales_price)),sum(coalesce(cast(cr_return_amount as decimal(12,2)), 0.00)),sum((cs_net_profit - coalesce(cast(cr_net_loss as decimal(12,2)), 0.00))),sales,returns,profit,channel,id,sum,sum,isEmpty,sum,isEmpty] CometColumnarToRow InputAdapter CometExchange [cp_catalog_page_id] #10 CometHashAggregate

See how the Partial HA is a Comet one, and the Final one is a HashAggregate with a conversion,
This is the current implementation, and works for both shuffles, as comet still produces the data Spark expects.
However, the moment we'll output something Spark does not expect(like having the partial results of a CollectSet, the columnar shuffle will crash, due to the mismatch in data types)

The issue with the shuffle can probably be circumvented by shuffling the aggregate buffer as a binary column regardless of its datatype, then reforming it in the Final aggregate, that way both shuffles will function.
However, as discussed in the issue, this will only delay the inevitable in cases such as an unsupported ResultExpression, as a CometColumnarToRow will not recreate the expected data type a regular HashAggregate will expect, and there is no other path forward but to let the Comet aggregate expressions run their course.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update the documentation for the configuration option, hopefully will be able to regenerate the tpcds plans

@kazuyukitanimura
Copy link
Contributor

#1389 mentioned

// When Comet shuffle is disabled, we don't want to transform the HashAggregate
// to CometHashAggregate. Otherwise, we probably get partial Comet aggregation
// and final Spark aggregation.
isCometShuffleEnabled(conf) =>

I believe I've seen a few tests that are ignored because of this.
I don't think this is a valid situation, We should not crash based on previously Comet-ran operators if they were successful.

Are we planning to remove this comet shuffle requirement with this PR?

@EmilyMatt
Copy link
Contributor Author

#1389 mentioned

// When Comet shuffle is disabled, we don't want to transform the HashAggregate
// to CometHashAggregate. Otherwise, we probably get partial Comet aggregation
// and final Spark aggregation.
isCometShuffleEnabled(conf) =>

I believe I've seen a few tests that are ignored because of this.
I don't think this is a valid situation, We should not crash based on previously Comet-ran operators if they were successful.

Are we planning to remove this comet shuffle requirement with this PR?

I don't really know why this restriction was placed here, so I feel I can't really provide an opinion on any direction, I only saw the comment and figured it is relevant as it speaks of this specific issue.
Despite the comment and restriction, this still occurs many times in our tests, so I am unsure of its importance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AQE may materialize a non-supported Final-mode HashAggregate
5 participants