Skip to content

Add sum_over_time aggregation #128413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,24 @@ max_cost:double | cluster:keyword | time_bucket:datetime
11.625 | prod | 2024-05-10T00:12:00.000Z
11.5 | staging | 2024-05-10T00:16:00.000Z
;

sum_over_time
required_capability: metrics_command
required_capability: sum_over_time

TS k8s | STATS sum_cost=sum(sum_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT sum_cost DESC, time_bucket DESC, cluster | LIMIT 10;

sum_cost:double | cluster:keyword | time_bucket:datetime
67.625 | qa | 2024-05-10T00:17:00.000Z
65.75 | staging | 2024-05-10T00:09:00.000Z
48.125 | qa | 2024-05-10T00:09:00.000Z
48.125 | qa | 2024-05-10T00:06:00.000Z
41.25 | qa | 2024-05-10T00:11:00.000Z
38.875 | qa | 2024-05-10T00:04:00.000Z
38.0 | qa | 2024-05-10T00:15:00.000Z
37.5 | prod | 2024-05-10T00:09:00.000Z
36.75 | qa | 2024-05-10T00:08:00.000Z
32.25 | qa | 2024-05-10T00:18:00.000Z

;

Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,11 @@ public enum Cap {
*/
FIRST_OVER_TIME(Build.current().isSnapshot()),

/**
* Support sum_over_time aggregation that gets evaluated per time-series
*/
SUM_OVER_TIME(Build.current().isSnapshot()),

/**
* Resolve groupings before resolving references to groupings in the aggregations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialExtent;
import org.elasticsearch.xpack.esql.expression.function.aggregate.StdDev;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum;
import org.elasticsearch.xpack.esql.expression.function.aggregate.SumOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Top;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
import org.elasticsearch.xpack.esql.expression.function.aggregate.WeightedAvg;
Expand Down Expand Up @@ -453,6 +454,7 @@ private static FunctionDefinition[][] snapshotFunctions() {
def(Rate.class, Rate::withUnresolvedTimestamp, "rate"),
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"),
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
def(FirstOverTime.class, FirstOverTime::withUnresolvedTimestamp, "first_over_time"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
AvgOverTime.ENTRY,
LastOverTime.ENTRY,
FirstOverTime.ENTRY,
SumOverTime.ENTRY,
// internal functions
ToPartial.ENTRY,
FromPartial.ENTRY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.aggregate;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.Param;

import java.io.IOException;
import java.util.List;

import static java.util.Collections.emptyList;

/**
* Similar to {@link Sum}, but it is used to calculate the sum of values over a time series from the given field.
*/
public class SumOverTime extends TimeSeriesAggregateFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"SumOverTime",
SumOverTime::new
);

@FunctionInfo(
returnType = { "double", "integer", "long" },
description = "The sum over time value of a field.",
type = FunctionType.AGGREGATE
)
public SumOverTime(
Source source,
@Param(name = "field", type = { "aggregate_metric_double", "double", "integer", "long" }) Expression field
) {
this(source, field, Literal.TRUE);
}

public SumOverTime(Source source, Expression field, Expression filter) {
super(source, field, filter, emptyList());
}

private SumOverTime(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public SumOverTime withFilter(Expression filter) {
return new SumOverTime(source(), field(), filter);
}

@Override
protected NodeInfo<SumOverTime> info() {
return NodeInfo.create(this, SumOverTime::new, field(), filter());
}

@Override
public SumOverTime replaceChildren(List<Expression> newChildren) {
return new SumOverTime(source(), newChildren.get(0), newChildren.get(1));
}

@Override
protected TypeResolution resolveType() {
return perTimeSeriesAggregation().resolveType();
}

@Override
public DataType dataType() {
return perTimeSeriesAggregation().dataType();
}

@Override
public Sum perTimeSeriesAggregation() {
return new Sum(source(), field(), filter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ setup:
path: /_query
parameters: []
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time]
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time, sum_over_time]
reason: "Test that should only be executed on snapshot versions"

- do: {xpack.usage: {}}
Expand Down Expand Up @@ -123,7 +123,7 @@ setup:
- match: {esql.functions.coalesce: $functions_coalesce}
- gt: {esql.functions.categorize: $functions_categorize}
# Testing for the entire function set isn't feasible, so we just check that we return the correct count as an approximation.
- length: {esql.functions: 143} # check the "sister" test below for a likely update to the same esql.functions length check
- length: {esql.functions: 144} # check the "sister" test below for a likely update to the same esql.functions length check

---
"Basic ESQL usage output (telemetry) non-snapshot version":
Expand Down