Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AQE may materialize a non-supported Final-mode HashAggregate #1389

Open
EmilyMatt opened this issue Feb 12, 2025 · 4 comments · May be fixed by #1390
Open

AQE may materialize a non-supported Final-mode HashAggregate #1389

EmilyMatt opened this issue Feb 12, 2025 · 4 comments · May be fixed by #1390
Labels
bug Something isn't working

Comments

@EmilyMatt
Copy link
Contributor

EmilyMatt commented Feb 12, 2025

Describe the bug

In cases where we support a HashAggregate's aggregate functions, we will convert the partial stage HashAggregate, execute it in DF, then use native shuffle to forward the results, however, the next stage will not materialize due to AQE waiting for the shuffle and its stats.
Then, if the agg() function contained unsupported expressions wrapping the aggregates themselves, we cannot convert the Final mode.
This causes the Spark HashAggregate to crash as it attempts to access fields that don't exist, and even if they do it expects its own aggregate buffer representation, which we don't have(The error being thrown is the "Not supported on CometListVector"), I'm not sure why the InputAdapter doesn't always work here, I think this can only be supported in cases where no aggregate buffer exists,
This comment implies the possible issue:

// When Comet shuffle is disabled, we don't want to transform the HashAggregate
// to CometHashAggregate. Otherwise, we probably get partial Comet aggregation
// and final Spark aggregation.
isCometShuffleEnabled(conf) =>

I believe I've seen a few tests that are ignored because of this.
I don't think this is a valid situation, We should not crash based on previously Comet-ran operators if they were successful.

Steps to reproduce

Have a group_by/aggregate that either needs a shuffle or aggregate buffer
Like
.agg(
collect_set(col("my_column"))
)

but wrap that collect_set with an unsupported expression or cast or something
(I am not sure, but I believe I saw something about a simliar behaviour that can be created using Decimal avg, as the Partial aggregate is supported but generates a Sum and Count, but generating results from the intermediate data does is not supported natively, that may no longer be relevant though)

For example:
.agg(
concat(flatten(collect_set(col("my_column"))))
)
can create this behaviour with AQE on.
not sure if this is datatype related.

Expected behavior

Comet should either not convert a Partial HashAggregate whose Final stage cannot be converted.
Or if it already did, should elegantly execute the Final aggregations and let Spark finish the work without breaking the plan.

Additional context

I believe I have a non-invasive solution where if the result expressions are not supported, we convert them into a separate ProjectExec, which will be the parent of the CometHashAggregateExec, which will not have result expressions (Like the Partial stage doesn't), and will have the grouping+aggregate attributes as its output.
We then have a conversion to rows and run a ProjectExec with the unsupported expression, ensuring that even if the rest of the stage cannot be run using Comet, we don't break an already running workflow.

Will open a PR shortly

@EmilyMatt EmilyMatt added the bug Something isn't working label Feb 12, 2025
@kazuyukitanimura
Copy link
Contributor

kazuyukitanimura commented Feb 15, 2025

cc @viirya
I forgot the context why we did this in #991

// When Comet shuffle is disabled, we don't want to transform the HashAggregate
// to CometHashAggregate. Otherwise, we probably get partial Comet aggregation
// and final Spark aggregation.
isCometShuffleEnabled(conf) =>

@viirya
Copy link
Member

viirya commented Feb 15, 2025

Because this?

// When Comet shuffle is disabled, we don't want to transform the HashAggregate
// to CometHashAggregate. Otherwise, we probably get partial Comet aggregation
// and final Spark aggregation.

Comet partial aggregation operator might return different data than Spark partial aggregation operator.

@kazuyukitanimura
Copy link
Contributor

Thanks @viirya

@EmilyMatt Did you mean that we should be able to run Comet aggregation even Comet shuffle is disabled by

I believe I've seen a few tests that are ignored because of this.
I don't think this is a valid situation, We should not crash based on previously Comet-ran operators if they were successful.

@EmilyMatt
Copy link
Contributor Author

The issue can be dissected as this:

a. There is no reason to have a Partial aggregate and not a Final one, regardless of shuffle, if we support the aggregate expressions, why should we run the Final stage in Spark if it is supported? we should get the performance benefit of running the final aggregate in native as well, and if something can run in Spark and the conversion has to be made, it should be only the result expressions.
This is the less critical aspect.

b. Currently, all the supported aggregate expressions are "naive", as in, they result in some sort of Spark primitive, and they are performed using a primitive aggregate, even ones such as Avg are represented as "Sum" and "Count", but what happens when Spark uses an aggregate buffer that it passes between stages? such as in the case of CollectSet and CollectList.
Despite the expected result probably being an ArrayType(StringType, false), for example, Spark will pass the serialized intermediate representation between stages, which can be observed as the DataType is BinaryType(the attribute is usually called "buf#" or something).
When we reach something like this, using columnar shuffle, the shuffle will throw an error,
because we will output an unexpected intermediate type(ArrayType of non-nullable string), while it expects a BinaryType.
Let's say we fixed this by iterating over the outputs in CometHashAggregateExec, and correcting the DataType to what Comet outputs in the previous stage, now the shuffle will work correctly, as it will call the correct method on the ColumnarBatch and write it to the shuffle file.
However, the Final stage HashAggregate, will still attempt to read the aggregate buffer it expects(binary), not an intermediate value of the result type.
This will cause a crash then.

This cannot be expected before stage materialization as the plan doesn't exist yet and we don't know if the result expressions are not supported, so I think allowing a Spark Final stage with a Comet partial stage is slightly shortsighted, unless in the future we write a conversion(maybe using codegen) between the Comet output and a Spark aggregate binary representation.
I think a better solution is the separation of the result expressions in cases where they are not supported.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants