Skip to content

Commit 5bae8ec

Browse files
committed
Drafting SumOverTime transform for timeseries
1 parent f0d7ec4 commit 5bae8ec

File tree

6 files changed

+119
-1
lines changed

6 files changed

+119
-1
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,3 +274,23 @@ max_cost:double | cluster:keyword | time_bucket:datetime
274274
11.625 | prod | 2024-05-10T00:12:00.000Z
275275
11.5 | staging | 2024-05-10T00:16:00.000Z
276276
;
277+
278+
sum_over_time
279+
required_capability: metrics_command
280+
required_capability: sum_over_time
281+
282+
TS k8s | STATS cost=sum(sum_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT cost DESC, time_bucket DESC, cluster | LIMIT 10;
283+
284+
cost:double | cluster:keyword | time_bucket:datetime
285+
12.375 | prod | 2024-05-10T00:17:00.000Z
286+
12.375 | qa | 2024-05-10T00:01:00.000Z
287+
12.25 | prod | 2024-05-10T00:19:00.000Z
288+
12.125 | qa | 2024-05-10T00:07:00.000Z
289+
12.125 | staging | 2024-05-10T00:03:00.000Z
290+
11.875 | prod | 2024-05-10T00:15:00.000Z
291+
11.875 | qa | 2024-05-10T00:09:00.000Z
292+
11.75 | qa | 2024-05-10T00:06:00.000Z
293+
11.625 | prod | 2024-05-10T00:12:00.000Z
294+
11.5 | staging | 2024-05-10T00:16:00.000Z
295+
;
296+

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,6 +1066,11 @@ public enum Cap {
10661066
*/
10671067
FIRST_OVER_TIME(Build.current().isSnapshot()),
10681068

1069+
/**
1070+
* Support sum_over_time aggregation that gets evaluated per time-series
1071+
*/
1072+
SUM_OVER_TIME(Build.current().isSnapshot()),
1073+
10691074
/**
10701075
* Resolve groupings before resolving references to groupings in the aggregations.
10711076
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialExtent;
3838
import org.elasticsearch.xpack.esql.expression.function.aggregate.StdDev;
3939
import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum;
40+
import org.elasticsearch.xpack.esql.expression.function.aggregate.SumOverTime;
4041
import org.elasticsearch.xpack.esql.expression.function.aggregate.Top;
4142
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
4243
import org.elasticsearch.xpack.esql.expression.function.aggregate.WeightedAvg;
@@ -449,6 +450,7 @@ private static FunctionDefinition[][] snapshotFunctions() {
449450
def(Rate.class, Rate::withUnresolvedTimestamp, "rate"),
450451
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
451452
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
453+
def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"),
452454
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
453455
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
454456
def(FirstOverTime.class, FirstOverTime::withUnresolvedTimestamp, "first_over_time"),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
3636
AvgOverTime.ENTRY,
3737
LastOverTime.ENTRY,
3838
FirstOverTime.ENTRY,
39+
SumOverTime.ENTRY,
3940
// internal functions
4041
ToPartial.ENTRY,
4142
FromPartial.ENTRY,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.aggregate;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.xpack.esql.core.expression.Expression;
13+
import org.elasticsearch.xpack.esql.core.expression.Literal;
14+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
17+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
18+
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
19+
import org.elasticsearch.xpack.esql.expression.function.Param;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
24+
import static java.util.Collections.emptyList;
25+
26+
/**
27+
* Similar to {@link Sum}, but it is used to calculate the sum of values over a time series from the given field.
28+
*/
29+
public class SumOverTime extends TimeSeriesAggregateFunction {
30+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
31+
Expression.class,
32+
"SumOverTime",
33+
SumOverTime::new
34+
);
35+
36+
@FunctionInfo(
37+
returnType = { "double", "integer", "long" },
38+
description = "The sum over time value of a field.",
39+
type = FunctionType.AGGREGATE
40+
)
41+
public SumOverTime(
42+
Source source,
43+
@Param(name = "field", type = { "aggregate_metric_double", "double", "integer", "long" }) Expression field
44+
) {
45+
this(source, field, Literal.TRUE);
46+
}
47+
48+
public SumOverTime(Source source, Expression field, Expression filter) {
49+
super(source, field, filter, emptyList());
50+
}
51+
52+
private SumOverTime(StreamInput in) throws IOException {
53+
super(in);
54+
}
55+
56+
@Override
57+
public String getWriteableName() {
58+
return ENTRY.name;
59+
}
60+
61+
@Override
62+
public SumOverTime withFilter(Expression filter) {
63+
return new SumOverTime(source(), field(), filter);
64+
}
65+
66+
@Override
67+
protected NodeInfo<SumOverTime> info() {
68+
return NodeInfo.create(this, SumOverTime::new, field(), filter());
69+
}
70+
71+
@Override
72+
public SumOverTime replaceChildren(List<Expression> newChildren) {
73+
return new SumOverTime(source(), newChildren.get(0), newChildren.get(1));
74+
}
75+
76+
@Override
77+
protected TypeResolution resolveType() {
78+
return perTimeSeriesAggregation().resolveType();
79+
}
80+
81+
@Override
82+
public DataType dataType() {
83+
return perTimeSeriesAggregation().dataType();
84+
}
85+
86+
@Override
87+
public Sum perTimeSeriesAggregation() {
88+
return new Sum(source(), field(), filter());
89+
}
90+
}

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ setup:
3333
path: /_query
3434
parameters: []
3535
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
36-
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time]
36+
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, first_over_time, sum_over_time]
3737
reason: "Test that should only be executed on snapshot versions"
3838

3939
- do: {xpack.usage: {}}

0 commit comments

Comments
 (0)