diff --git a/sql-plugin-api/src/main/java/com/nvidia/spark/RapidsSimpleGroupByAggregation.java b/sql-plugin-api/src/main/java/com/nvidia/spark/RapidsSimpleGroupByAggregation.java new file mode 100644 index 00000000000..3b5aa06742a --- /dev/null +++ b/sql-plugin-api/src/main/java/com/nvidia/spark/RapidsSimpleGroupByAggregation.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark; + +import ai.rapids.cudf.GroupByAggregationOnColumn; + +/** + * Standard CUDF-based aggregation step that uses built-in CUDF aggregation + * operations. This handles the most common aggregation patterns and provides + * the best performance. + */ +public interface RapidsSimpleGroupByAggregation extends RapidsUDAFGroupByAggregation { + /** + * The main aggregation step that uses built-in CUDF GroupBy operations. + * + * @param inputIndices An array of ints, which are the indices of the input + * columns. + * @return An array of CUDF `GroupByAggregationOnColumn` instances. + */ + GroupByAggregationOnColumn[] aggregate(int[] inputIndices); +} diff --git a/sql-plugin-api/src/main/java/com/nvidia/spark/RapidsUDAF.java b/sql-plugin-api/src/main/java/com/nvidia/spark/RapidsUDAF.java new file mode 100644 index 00000000000..b8fa8e79a3a --- /dev/null +++ b/sql-plugin-api/src/main/java/com/nvidia/spark/RapidsUDAF.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark; + +import ai.rapids.cudf.ColumnVector; +import ai.rapids.cudf.Scalar; +import org.apache.spark.sql.types.DataType; + +/** + * An interface for a GPU-accelerated User Defined Aggregate Function (UDAF). + * This provides the necessary methods to perform distributed group-by and + * reduction aggregations using CUDF. + */ +public interface RapidsUDAF { + /** + * Provides an array of default values for the aggregation result. This is + * used when a reduction aggregation does not have any rows to aggregate. + *
+ * The returned Scalars will be closed automatically. + *
+ * @return An array of cudf Scalar representing the output of the + * updateAggregation stage of processing. The output of this + * may still be merged with other tasks. + */ + Scalar[] getDefaultValue(); + + /** + * This method returns a RapidsUDAFGroupByAggregation that defines the + * logic for the initial aggregation. + *
+ * @return A RapidsUDAFGroupByAggregation that defines the aggregation logic. + */ + RapidsUDAFGroupByAggregation updateAggregation(); + + /** + * This method returns a RapidsUDAFGroupByAggregation that defines how to + * merge two sets of aggregation results. This is used in distributed + * aggregation scenarios where intermediate results from different + * partitions are combined. + *
+ * @return A RapidsUDAFGroupByAggregation that defines the merge logic. + */ + RapidsUDAFGroupByAggregation mergeAggregation(); + + /** + * A last step that takes the result of the merged aggregation + * and performs any necessary transformations before returning the final + * result. This method returns a single ColumnVector, which is the final + * result of the aggregation. + *
+ * Users should close the input columns to avoid GPU memory leak. But the + * returned column will be closed automatically. + *
+ * @param numRows The number of rows in the aggregated data. + * @param args An array of ColumnVector arguments from the final aggregation step. + * @param outType The final data type of this UDAF + * @return A single ColumnVector representing the final UDAF result. + */ + ColumnVector getResult(int numRows, ColumnVector[] args, DataType outType); + + /** + * Data types of the aggregate buffer. + *
+ * It is better to align with the "bufferSchema" of "UserDefinedAggregateFunction", or + * data corruption is likely to happen when some operations of this aggregation fall + * back to CPU. E.g. Partial aggregates runs on CPU but final aggregates runs on GPU, + * or vice-versa. This is rare but just in case. + */ + DataType[] bufferTypes(); +} diff --git a/sql-plugin-api/src/main/java/com/nvidia/spark/RapidsUDAFGroupByAggregation.java b/sql-plugin-api/src/main/java/com/nvidia/spark/RapidsUDAFGroupByAggregation.java new file mode 100644 index 00000000000..6c162085988 --- /dev/null +++ b/sql-plugin-api/src/main/java/com/nvidia/spark/RapidsUDAFGroupByAggregation.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark; + +import ai.rapids.cudf.ColumnVector; +import ai.rapids.cudf.Scalar; + +/** + * Base interface for GPU-accelerated UDAF aggregation implementations. This provides + * the contract for different aggregation strategies. + *

+ * Please do not try and extend from this interface directly. + * `RapidsSimpleGroupByAggregation` is currently supported as interfaces to directly + * implement. More may be added in the future. + */ +public interface RapidsUDAFGroupByAggregation { + /** + * An optional pre-step for the aggregation. By default, this is a no-op + * and will just return the arguments passed in. + *
+ * Users should close the input columns to avoid GPU memory leak, but the + * returned columns will be closed automatically. + *
+ * @param numRows The number of rows. + * @param args An array of input ColumnVectors. + * @return An array of ColumnVectors. + */ + default ColumnVector[] preStep(int numRows, ColumnVector[] args) { + return args; + } + + /** + * Performs a reduction on the pre-step output (no keys). The + * output of this will be turned into a ColumnVector and possibly + * combined with other rows before being processed more. + *
+ * Both the input columns and returned Scalars will be closed automatically. + * + * @param numRows The number of rows to process. + * @param preStepData The output from the preStep method. + * @return An array of cudf Scalars representing the reduced data. + */ + Scalar[] reduce(int numRows, ColumnVector[] preStepData); + + /** + * A post-process step for the aggregation. It takes the output of the + * aggregations and performs any processing needed to make it match the + * input to the merge aggregation. + *
+ * Users should close the input columns to avoid GPU memory leak, but the + * returned columns will be closed automatically. + * + * @param numRows The number of rows in the aggregated data. + * @param aggregatedData The output from the aggregation step. + * @return An array of ColumnVectors compatible with the merge step. + */ + default ColumnVector[] postStep(int numRows, ColumnVector[] aggregatedData) { + return aggregatedData; + } +}