-
Notifications
You must be signed in to change notification settings - Fork 264
Reimplement min_by and max_by aggregations
#13603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Nghia Truong <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR reimplements the min_by and max_by aggregation functions by switching from a struct-based approach to using CUDF's argMin and argMax operations. The new implementation is more efficient as it directly finds the index of the minimum/maximum ordering value and then gathers the corresponding value.
Key changes:
- Replaced struct-based aggregation with argMin/argMax operations
- Introduced a new
GpuGatherexpression for extracting values by index - Simplified the aggregation logic by eliminating complex struct creation and extraction
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| dataExtractors.scala | Adds new GpuGather expression for gathering values by indices |
| aggregateFunctions.scala | Reimplements min_by/max_by using argMin/argMax operations instead of struct-based approach |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| override lazy val initialValues: Seq[Expression] = Seq(GpuLiteral(null, valueExpr.dataType)) | ||
|
|
||
| override lazy val initialValues: Seq[Expression] = Seq( | ||
| GpuLiteral(null, valueExpr.dataType), GpuLiteral(null, orderingExpr.dataType)) | ||
| // The ordering column is used as input for reduction/groupby to find the argmin/argmax index. | ||
| override lazy val inputProjection: Seq[Expression] = Seq(orderingExpr) | ||
| override lazy val updateAggregates: Seq[CudfAggregate] = Seq(cudfArgMinMaxAggregate) | ||
|
|
||
| override lazy val inputProjection: Seq[Expression] = Seq( | ||
| createStructExpression(orderingExpr, valueExpr)) | ||
| override lazy val updateAggregates: Seq[CudfAggregate] = Seq(cudfMaxMinByAggregate) | ||
| override lazy val postUpdate: Seq[Expression] = extractChildren | ||
| // Extract the extremum value from argmin/argmax index. | ||
| override lazy val postUpdate: Seq[Expression] = | ||
| Seq(GpuGather(bufferValue, cudfArgMinMaxAggregate.attr)) | ||
|
|
||
| override lazy val preMerge: Seq[Expression] = Seq( | ||
| createStructExpression(bufferOrdering, bufferValue)) | ||
| override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(cudfMaxMinByAggregate) | ||
| override lazy val postMerge: Seq[Expression] = extractChildren | ||
| override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(cudfArgMinMaxAggregate) |
Copilot
AI
Oct 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GpuGather operation is being called with bufferValue as the values column, but bufferValue is not being populated during the update phase. The initialValues only initializes it with null, and there's no mechanism to collect the actual values that correspond to the ordering column indices.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala
Show resolved
Hide resolved
Signed-off-by: Nghia Truong <[email protected]>
Signed-off-by: Nghia Truong <[email protected]>
Signed-off-by: Nghia Truong <[email protected]>
|
NOTE: release/25.12 has been created from main. Please retarget your PR to release/25.12 if it should be included in the release. |
This optimizes
min_byandmax_byaggregations by avoiding data copy and replacing the expensive operations (min/maxon structs column) by the much cheaper ones (argmin/argmaxon the inputorderingcolumn then gather).