Skip to content

Commit 1a2bfe2

Browse files
authored
Support combination of group field and span in stats command (opensearch-project#417)
* Support combination of group field and span in stats command Signed-off-by: penghuo <[email protected]>
1 parent 2714c2f commit 1a2bfe2

File tree

19 files changed

+637
-602
lines changed

19 files changed

+637
-602
lines changed

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,14 +185,19 @@ public LogicalPlan visitAggregation(Aggregation node, AnalysisContext context) {
185185
aggregatorBuilder
186186
.add(new NamedAggregator(aggExpr.getNameOrAlias(), (Aggregator) aggExpr.getDelegated()));
187187
}
188-
ImmutableList<NamedAggregator> aggregators = aggregatorBuilder.build();
189188

190189
ImmutableList.Builder<NamedExpression> groupbyBuilder = new ImmutableList.Builder<>();
190+
// Span should be first expression if exist.
191+
if (node.getSpan() != null) {
192+
groupbyBuilder.add(namedExpressionAnalyzer.analyze(node.getSpan(), context));
193+
}
194+
191195
for (UnresolvedExpression expr : node.getGroupExprList()) {
192196
groupbyBuilder.add(namedExpressionAnalyzer.analyze(expr, context));
193197
}
194198
ImmutableList<NamedExpression> groupBys = groupbyBuilder.build();
195199

200+
ImmutableList<NamedAggregator> aggregators = aggregatorBuilder.build();
196201
// new context
197202
context.push();
198203
TypeEnvironment newEnv = context.peek();

core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,17 @@ public static UnresolvedPlan agg(
9292
List<UnresolvedExpression> sortList,
9393
List<UnresolvedExpression> groupList,
9494
List<Argument> argList) {
95-
return new Aggregation(aggList, sortList, groupList, argList).attach(input);
95+
return new Aggregation(aggList, sortList, groupList, null, argList).attach(input);
96+
}
97+
98+
public static UnresolvedPlan agg(
99+
UnresolvedPlan input,
100+
List<UnresolvedExpression> aggList,
101+
List<UnresolvedExpression> sortList,
102+
List<UnresolvedExpression> groupList,
103+
UnresolvedExpression span,
104+
List<Argument> argList) {
105+
return new Aggregation(aggList, sortList, groupList, span, argList).attach(input);
96106
}
97107

98108
public static UnresolvedPlan rename(UnresolvedPlan input, Map... maps) {

core/src/main/java/org/opensearch/sql/ast/tree/Aggregation.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,17 @@ public class Aggregation extends UnresolvedPlan {
2828
private List<UnresolvedExpression> aggExprList;
2929
private List<UnresolvedExpression> sortExprList;
3030
private List<UnresolvedExpression> groupExprList;
31+
private UnresolvedExpression span;
3132
private List<Argument> argExprList;
3233
private UnresolvedPlan child;
3334

3435
/**
35-
* Aggregation Constructor without argument.
36+
* Aggregation Constructor without span and argument.
3637
*/
3738
public Aggregation(List<UnresolvedExpression> aggExprList,
3839
List<UnresolvedExpression> sortExprList,
3940
List<UnresolvedExpression> groupExprList) {
40-
this(aggExprList, sortExprList, groupExprList, Collections.emptyList());
41+
this(aggExprList, sortExprList, groupExprList, null, Collections.emptyList());
4142
}
4243

4344
/**
@@ -46,10 +47,12 @@ public Aggregation(List<UnresolvedExpression> aggExprList,
4647
public Aggregation(List<UnresolvedExpression> aggExprList,
4748
List<UnresolvedExpression> sortExprList,
4849
List<UnresolvedExpression> groupExprList,
50+
UnresolvedExpression span,
4951
List<Argument> argExprList) {
5052
this.aggExprList = aggExprList;
5153
this.sortExprList = sortExprList;
5254
this.groupExprList = groupExprList;
55+
this.span = span;
5356
this.argExprList = argExprList;
5457
}
5558

core/src/main/java/org/opensearch/sql/planner/physical/AggregationOperator.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
import org.opensearch.sql.expression.aggregation.Aggregator;
1919
import org.opensearch.sql.expression.aggregation.NamedAggregator;
2020
import org.opensearch.sql.expression.span.SpanExpression;
21-
import org.opensearch.sql.planner.physical.bucket.Group;
22-
import org.opensearch.sql.planner.physical.bucket.SpanBucket;
21+
import org.opensearch.sql.planner.physical.collector.Collector;
2322
import org.opensearch.sql.storage.bindingtuple.BindingTuple;
2423

2524
/**
@@ -35,8 +34,13 @@ public class AggregationOperator extends PhysicalPlan {
3534
private final List<NamedAggregator> aggregatorList;
3635
@Getter
3736
private final List<NamedExpression> groupByExprList;
37+
@Getter
38+
private final NamedExpression span;
39+
/**
40+
* {@link BindingTuple} Collector.
41+
*/
3842
@EqualsAndHashCode.Exclude
39-
private final Group group;
43+
private final Collector collector;
4044
@EqualsAndHashCode.Exclude
4145
private Iterator<ExprValue> iterator;
4246

@@ -51,9 +55,14 @@ public AggregationOperator(PhysicalPlan input, List<NamedAggregator> aggregatorL
5155
List<NamedExpression> groupByExprList) {
5256
this.input = input;
5357
this.aggregatorList = aggregatorList;
54-
this.groupByExprList = groupByExprList;
55-
this.group = groupBySpan(groupByExprList) ? new SpanBucket(aggregatorList, groupByExprList)
56-
: new Group(aggregatorList, groupByExprList);
58+
if (hasSpan(groupByExprList)) {
59+
this.span = groupByExprList.get(0);
60+
this.groupByExprList = groupByExprList.subList(1, groupByExprList.size());
61+
} else {
62+
this.span = null;
63+
this.groupByExprList = groupByExprList;
64+
}
65+
this.collector = Collector.Builder.build(this.span, this.groupByExprList, this.aggregatorList);
5766
}
5867

5968
@Override
@@ -81,14 +90,13 @@ public ExprValue next() {
8190
public void open() {
8291
super.open();
8392
while (input.hasNext()) {
84-
group.push(input.next());
93+
collector.collect(input.next().bindingTuples());
8594
}
86-
iterator = group.result().iterator();
95+
iterator = collector.results().iterator();
8796
}
8897

89-
private boolean groupBySpan(List<NamedExpression> namedExpressionList) {
90-
return namedExpressionList.size() == 1
98+
private boolean hasSpan(List<NamedExpression> namedExpressionList) {
99+
return !namedExpressionList.isEmpty()
91100
&& namedExpressionList.get(0).getDelegated() instanceof SpanExpression;
92101
}
93-
94102
}

core/src/main/java/org/opensearch/sql/planner/physical/bucket/Group.java

Lines changed: 0 additions & 106 deletions
This file was deleted.

core/src/main/java/org/opensearch/sql/planner/physical/bucket/SpanBucket.java

Lines changed: 0 additions & 110 deletions
This file was deleted.

0 commit comments

Comments
 (0)