Replies: 6 comments 4 replies
-
Thank for putting together the detailed design plan. +1 for adding the feature. |
Beta Was this translation helpful? Give feedback.
-
thanks wei for the well written design for the point:
|
Beta Was this translation helpful? Give feedback.
-
would it make sense to have average_merge_final. (an aggregation that starts with partial result and up with final results with out having to call extract). |
Beta Was this translation helpful? Give feedback.
-
the document does not mention how addSingleGroupIntermediateResults is going to be created for the new agg function |
Beta Was this translation helpful? Give feedback.
-
looks like we can also push |
Beta Was this translation helpful? Give feedback.
-
(With @mbasmanova)
We are designing a UDAF adapter to allow more flexible combination of different aggregation steps in aggregation queries. Potential usages include rolling aggregation and the support for Spark query plans for aggregation with one distinct (#4412). We discuss our work in progress in this discussion.
Use Cases
Some Velox users are interested in computing aggregation on a rolling basis, e.g., aggregate events happening within a 30-day window and update the aggregation result every day.
A naive approach is to first filter records by date stamps and keep only those within the last 30 days and then aggregate events of them.
However, computing such rolling aggregation every day from scratch is expensive and a waste of resources because the aggregation for events within the past few days won’t change as the window slides.
Alternatively, one can pre-aggregate events every day to intermediate states and then aggregate these daily states together when computing a 30-day aggregation. In the example below, avg_partial computes intermediate states from input events, then avg_merge aggregates partial states into another intermediate state, and finally, avg_extract converts the intermediate state into an aggregation result.
This approach requires more flexible computation of different phases of an aggregation. Presto supports this by providing companion aggregate and scalar functions for a UDAF that allows aggregating raw inputs to intermediate states, combining intermediate states, and extracting the aggregation result from an intermediate state separately. But this is only supported for a small set of Presto UDAFs right now, such as approx_distinct() via HyperLogLog functions and approx_percentile() via Q-Digest and T-Digest functions.
This alternative approach is better, but it still incurs unnecessary computation when aggregating daily states from DATEID-29 to DATEID-1 because these do not change from the previous 30-day aggregation to the current one. An even better approach is to subtract the daily aggregation of the oldest day from the previous 30-day aggregation and add the newest daily aggregation to it.
Below is an example query for rolling average over 30 days. In this example, window-size states are stored in the longterm table and slide-size states are stored in the daily table.
Goal
We believe this is a general use case that would benefit from a generic solution. Therefore, we propose to enhance Velox to automatically generate companion functions for all aggregate functions. Given a Velox UDAF, e.g., avg, Velox will automatically provide two companion scalar functions and two companion aggregate functions, together with their registration code.
Design
This goal has two pieces:
For the first piece, Velox will extend the vector-based UDAF authoring interface with two new optional APIs to allow defining how raw input and slide-size state can be retracted from a window-size state. UDAF authors will write UDAFs using this vector-based UDAF interface and Velox will provide an adapter to auto-generate companion functions for it.
For the second piece, the original registerAggregationFunction() API will be extended to accept an additional boolean flag indicating whether all companion functions will be registered together.
Current Velox UDAF interface and registration
The current Velox UDAF authoring interface consists of seven core methods for aggregation that UDAF authors need to define.
In addition to these methods, the UDAF author defines all supported signatures for this UDAF, including their input types, intermediate types, and the result types. These signatures are used to resolve data types at runtime. The author also defines a factory function that takes the input type, result type, and the current aggregation step (e.g., core::AggregationNode::Step::kPartial), and returns an instance of the UDAF that supports this use case.
More details about how to implement a Velox UDAF can be found here: https://facebookincubator.github.io/velox/develop/aggregate-functions.html.
Planned extensions
To support creation of the retract function, Velox will introduce two optional method functions to the UDAF authoring interface, namely retractRawInput and retractIntermediateResults. retractIntermediateResults will be used to generate the retract companion function, and retractRawInput will be used to optimize the computation of window functions.
The registration API will register all supported companion functions by default. The code that UDAF authors write is the same as before, but the partial, merge, and extract companion functions are registered automatically.
Not all UDAFs support the retract operation, e.g., min() and max(), hence we don’t register the retract companion function by default. If a UDAF supports retracting, the author should provide an AggregateFunctionMetadata object specifying this support when registering the UDAF. An extra retract function will then be registered together.
An additional API will be provided for retrieving signatures of companion functions of a given UDAF. If a UDAF with the given name doesn’t exist, this API returns std::nullopt.
Known limitations
There are two known limitations with the current design. However, we expect most UDAFs to not have these issues or can avoid these issues through simple twists of their implementations.
Generate companion functions
The table below shows how the companion functions are composed of the member functions from the original UDAF. Let’s assume the original UDAF is avg() that is implemented through a class AverageAggregate.
Aggregate companion functions
Scalar companion function
AverageAggregate::initializeNewGroups(groups, range);
AverageAggregate::addIntermediateResults(groups, rows, {args[0]});
AverageAggregate::retractIntermediateResults(groups, rows, {args[1]});
AverageAggregate::extractAccumulators(groups, rows.size(), args);
freeGroups(groups);
AverageAggregate::initializeNewGroups(groups, range);
AverageAggregate::addIntermediateResults(groups, rows, args);
AverageAggregate::extractValues(groups, rows.size(), result);
freeGroups(groups);
To generate these UDFs and UDAFs, Velox will build an adapter that takes the original UDAF and creates all the companion functions. A prototype of the adapter can be found here:
velox/velox/exec/AggregateFunctionAdapter.h
Lines 27 to 293 in fc41b12
Generate registration code
A UDAF author needs to write the registration code of the original UDAF and the rest registration of companion functions will be handled by a Velox adapter automatically. Specifically, the author needs to define all supported function signatures and a factory function that creates a std::unique_ptr of the original UDAF instance given the input type, result type, and the aggregation step (example)
We extend the registerAggregateFunction() API so that companion functions are registered together with the original UDAF:
velox/velox/exec/Aggregate.cpp
Lines 55 to 80 in fc41b12
The new API for retrieving companion function signatures is added together:
velox/velox/exec/Aggregate.cpp
Lines 106 to 162 in fc41b12
The RegisterAdapter generates factories that create an instance of the original UDAF and pass it to the companion function constructors. A prototype can be found here:
velox/velox/exec/AggregateFunctionAdapter.h
Lines 295 to 480 in fc41b12
Finally, #4489 gives an example of the aforementioned adapters together with unit tests for the auto-generated and auto-registered companion functions for avg().
Footnotes
The extract function name may have a suffix of the result type name if the original UDAF has multiple signatures with the same intermediate type but different result types. E.g., avg() has two signatures real -> row(double, bigint) -> real and double -> row(double, bigint) -> double. Their extract function signatures are row(double, bigint) -> real and row(double, bigint) -> double. However, Velox requires an expression result type to be infer-able from the function name and its argument types. Therefore, we append a suffix of the result type name to the extract functions to differentiate them, i.e., avg_extract_real() and avg_extract_double(). ↩
Beta Was this translation helpful? Give feedback.
All reactions