Skip to content

Commit 82ccc8d

Browse files
authored
Support parse command with Calcite (#3474)
--------- Signed-off-by: Lantao Jin <[email protected]>
1 parent 347fa55 commit 82ccc8d

File tree

11 files changed

+314
-86
lines changed

11 files changed

+314
-86
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

+78-62
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.calcite.rex.RexLiteral;
3232
import org.apache.calcite.rex.RexNode;
3333
import org.apache.calcite.rex.RexWindowBounds;
34+
import org.apache.calcite.sql.fun.SqlLibraryOperators;
3435
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
3536
import org.apache.calcite.tools.RelBuilder;
3637
import org.apache.calcite.tools.RelBuilder.AggCall;
@@ -42,6 +43,8 @@
4243
import org.opensearch.sql.ast.expression.Argument;
4344
import org.opensearch.sql.ast.expression.Field;
4445
import org.opensearch.sql.ast.expression.Let;
46+
import org.opensearch.sql.ast.expression.Literal;
47+
import org.opensearch.sql.ast.expression.ParseMethod;
4548
import org.opensearch.sql.ast.expression.UnresolvedExpression;
4649
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
4750
import org.opensearch.sql.ast.tree.AD;
@@ -73,6 +76,7 @@
7376
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
7477
import org.opensearch.sql.exception.CalciteUnsupportedException;
7578
import org.opensearch.sql.exception.SemanticCheckException;
79+
import org.opensearch.sql.utils.ParseUtils;
7680

7781
public class CalciteRelNodeVisitor extends AbstractNodeVisitor<RelNode, CalcitePlanContext> {
7882

@@ -244,69 +248,86 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
244248
}
245249

246250
@Override
247-
public RelNode visitEval(Eval node, CalcitePlanContext context) {
251+
public RelNode visitParse(Parse node, CalcitePlanContext context) {
248252
visitChildren(node, context);
249253
List<String> originalFieldNames = context.relBuilder.peek().getRowType().getFieldNames();
250-
List<RexNode> evalList =
251-
node.getExpressionList().stream()
254+
RexNode sourceField = rexVisitor.analyze(node.getSourceField(), context);
255+
ParseMethod parseMethod = node.getParseMethod();
256+
java.util.Map<String, Literal> arguments = node.getArguments();
257+
String pattern = (String) node.getPattern().getValue();
258+
List<String> groupCandidates =
259+
ParseUtils.getNamedGroupCandidates(parseMethod, pattern, arguments);
260+
List<RexNode> newFields =
261+
groupCandidates.stream()
252262
.map(
253-
expr -> {
254-
boolean containsSubqueryExpression = containsSubqueryExpression(expr);
255-
final Holder<@Nullable RexCorrelVariable> v = Holder.empty();
256-
if (containsSubqueryExpression) {
257-
context.relBuilder.variable(v::set);
258-
context.pushCorrelVar(v.get());
259-
}
260-
RexNode eval = rexVisitor.analyze(expr, context);
261-
if (containsSubqueryExpression) {
262-
// RelBuilder.projectPlus doesn't have a parameter with variablesSet:
263-
// projectPlus(Iterable<CorrelationId> variablesSet, RexNode... nodes)
264-
context.relBuilder.project(
265-
Iterables.concat(context.relBuilder.fields(), ImmutableList.of(eval)),
266-
ImmutableList.of(),
267-
false,
268-
ImmutableList.of(v.get().id));
269-
context.popCorrelVar();
270-
} else {
271-
context.relBuilder.projectPlus(eval);
272-
}
273-
return eval;
274-
})
275-
.collect(Collectors.toList());
276-
// Overriding the existing field if the alias has the same name with original field name. For
277-
// example, eval field = 1
278-
List<String> overriding =
279-
evalList.stream()
280-
.filter(expr -> expr.getKind() == AS)
281-
.map(
282-
expr ->
283-
((RexLiteral) ((RexCall) expr).getOperands().get(1)).getValueAs(String.class))
284-
.collect(Collectors.toList());
285-
overriding.retainAll(originalFieldNames);
286-
if (!overriding.isEmpty()) {
287-
List<RexNode> toDrop = context.relBuilder.fields(overriding);
288-
context.relBuilder.projectExcept(toDrop);
289-
290-
// the overriding field in Calcite will add a numeric suffix, for example:
291-
// `| eval SAL = SAL + 1` creates a field SAL0 to replace SAL, so we rename it back to SAL,
292-
// or query `| eval SAL=SAL + 1 | where exists [ source=DEPT | where emp.SAL=HISAL ]` fails.
293-
List<String> newNames =
294-
context.relBuilder.peek().getRowType().getFieldNames().stream()
295-
.map(
296-
cur -> {
297-
String noNumericSuffix = cur.replaceAll("\\d", "");
298-
if (overriding.contains(noNumericSuffix)) {
299-
return noNumericSuffix;
300-
} else {
301-
return cur;
302-
}
303-
})
304-
.toList();
305-
context.relBuilder.rename(newNames);
306-
}
263+
group ->
264+
context.rexBuilder.makeCall(
265+
SqlLibraryOperators.REGEXP_EXTRACT,
266+
sourceField,
267+
context.rexBuilder.makeLiteral(pattern)))
268+
.toList();
269+
projectPlusOverriding(newFields, groupCandidates, context);
307270
return context.relBuilder.peek();
308271
}
309272

273+
@Override
274+
public RelNode visitEval(Eval node, CalcitePlanContext context) {
275+
visitChildren(node, context);
276+
List<String> originalFieldNames = context.relBuilder.peek().getRowType().getFieldNames();
277+
node.getExpressionList()
278+
.forEach(
279+
expr -> {
280+
boolean containsSubqueryExpression = containsSubqueryExpression(expr);
281+
final Holder<@Nullable RexCorrelVariable> v = Holder.empty();
282+
if (containsSubqueryExpression) {
283+
context.relBuilder.variable(v::set);
284+
context.pushCorrelVar(v.get());
285+
}
286+
RexNode eval = rexVisitor.analyze(expr, context);
287+
if (containsSubqueryExpression) {
288+
// RelBuilder.projectPlus doesn't have a parameter with variablesSet:
289+
// projectPlus(Iterable<CorrelationId> variablesSet, RexNode... nodes)
290+
context.relBuilder.project(
291+
Iterables.concat(context.relBuilder.fields(), ImmutableList.of(eval)),
292+
ImmutableList.of(),
293+
false,
294+
ImmutableList.of(v.get().id));
295+
context.popCorrelVar();
296+
} else {
297+
// Overriding the existing field if the alias has the same name with original field.
298+
String alias =
299+
((RexLiteral) ((RexCall) eval).getOperands().get(1)).getValueAs(String.class);
300+
projectPlusOverriding(List.of(eval), List.of(alias), context);
301+
}
302+
});
303+
return context.relBuilder.peek();
304+
}
305+
306+
private void projectPlusOverriding(
307+
List<RexNode> newFields, List<String> newNames, CalcitePlanContext context) {
308+
List<String> originalFieldNames = context.relBuilder.peek().getRowType().getFieldNames();
309+
List<RexNode> toOverrideList =
310+
originalFieldNames.stream()
311+
.filter(newNames::contains)
312+
.map(a -> (RexNode) context.relBuilder.field(a))
313+
.toList();
314+
// 1. add the new fields, For example "age0, country0"
315+
context.relBuilder.projectPlus(newFields);
316+
// 2. drop the overriding field list, it's duplicated now. For example "age, country"
317+
if (!toOverrideList.isEmpty()) {
318+
context.relBuilder.projectExcept(toOverrideList);
319+
}
320+
// 3. get current fields list, the "age0, country0" should include in it.
321+
List<String> currentFields = context.relBuilder.peek().getRowType().getFieldNames();
322+
int length = currentFields.size();
323+
// 4. add new names "age, country" to the end of rename list.
324+
List<String> expectedRenameFields =
325+
new ArrayList<>(currentFields.subList(0, length - newNames.size()));
326+
expectedRenameFields.addAll(newNames);
327+
// 5. rename
328+
context.relBuilder.rename(expectedRenameFields);
329+
}
330+
310331
@Override
311332
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
312333
visitChildren(node, context);
@@ -604,11 +625,6 @@ public RelNode visitFillNull(FillNull fillNull, CalcitePlanContext context) {
604625
throw new CalciteUnsupportedException("FillNull command is unsupported in Calcite");
605626
}
606627

607-
@Override
608-
public RelNode visitParse(Parse node, CalcitePlanContext context) {
609-
throw new CalciteUnsupportedException("Parse command is unsupported in Calcite");
610-
}
611-
612628
@Override
613629
public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
614630
throw new CalciteUnsupportedException("Rare and Top commands are unsupported in Calcite");

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

-18
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import java.math.BigDecimal;
1414
import java.util.List;
15-
import java.util.Map;
1615
import java.util.stream.Collectors;
1716
import lombok.RequiredArgsConstructor;
1817
import org.apache.calcite.rel.RelNode;
@@ -261,16 +260,6 @@ public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context
261260
.peekCorrelVar()
262261
.map(correlVar -> context.relBuilder.field(correlVar, qualifiedName))
263262
.orElseGet(() -> context.relBuilder.field(qualifiedName));
264-
}
265-
// 3. resolve overriding fields, for example, `eval SAL = SAL + 1` will delete the original SAL
266-
// and add a SAL0. SAL0 in currentFields, but qualifiedName is SAL.
267-
// TODO now we cannot handle the case using a overriding fields in subquery, for example
268-
// source = EMP | eval DEPTNO = DEPTNO + 1 | where exists [ source = DEPT | where emp.DEPTNO =
269-
// DEPTNO ]
270-
Map<String, String> fieldMap =
271-
currentFields.stream().collect(Collectors.toMap(s -> s.replaceAll("\\d", ""), s -> s));
272-
if (fieldMap.containsKey(qualifiedName)) {
273-
return context.relBuilder.field(fieldMap.get(qualifiedName));
274263
} else {
275264
throw new IllegalArgumentException(
276265
String.format(
@@ -326,13 +315,6 @@ private boolean isTimeBased(SpanUnit unit) {
326315
return !(unit == NONE || unit == UNKNOWN);
327316
}
328317

329-
// @Override
330-
// public RexNode visitAggregateFunction(AggregateFunction node, Context context) {
331-
// RexNode field = analyze(node.getField(), context);
332-
// AggregateCall aggregateCall = translateAggregateCall(node, field, relBuilder);
333-
// return new MyAggregateCall(aggregateCall);
334-
// }
335-
336318
@Override
337319
public RexNode visitLet(Let node, CalcitePlanContext context) {
338320
RexNode expr = analyze(node.getExpression(), context);

integ-test/src/test/java/org/opensearch/sql/calcite/remote/nonfallback/NonFallbackCalciteParseCommandIT.java

-2
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55

66
package org.opensearch.sql.calcite.remote.nonfallback;
77

8-
import org.junit.Ignore;
98
import org.opensearch.sql.calcite.remote.fallback.CalciteParseCommandIT;
109

11-
@Ignore("https://github.com/opensearch-project/sql/issues/3463")
1210
public class NonFallbackCalciteParseCommandIT extends CalciteParseCommandIT {
1311
@Override
1412
public void init() throws Exception {

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDedupIT.java integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDedupIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.json.JSONObject;
1616
import org.junit.jupiter.api.Test;
1717

18-
public class CalciteDedupIT extends CalcitePPLIntegTestCase {
18+
public class CalcitePPLDedupIT extends CalcitePPLIntegTestCase {
1919

2020
@Override
2121
public void init() throws IOException {

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDedupPushdownIT.java integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDedupPushdownIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import org.opensearch.sql.common.setting.Settings;
99

10-
public class CalciteDedupPushdownIT extends CalciteDedupIT {
10+
public class CalcitePPLDedupPushdownIT extends CalcitePPLDedupIT {
1111

1212
@Override
1313
protected Settings getSettings() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.standalone;
7+
8+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
9+
import static org.opensearch.sql.util.MatcherUtils.rows;
10+
import static org.opensearch.sql.util.MatcherUtils.schema;
11+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
12+
import static org.opensearch.sql.util.MatcherUtils.verifyErrorMessageContains;
13+
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
14+
15+
import java.io.IOException;
16+
import org.json.JSONObject;
17+
import org.junit.Test;
18+
import org.opensearch.client.Request;
19+
20+
public class CalcitePPLParseIT extends CalcitePPLIntegTestCase {
21+
@Override
22+
public void init() throws IOException {
23+
super.init();
24+
25+
loadIndex(Index.BANK);
26+
loadIndex(Index.BANK_WITH_NULL_VALUES);
27+
}
28+
29+
@Test
30+
public void testParseEmail() {
31+
JSONObject result =
32+
executeQuery(
33+
String.format(
34+
"""
35+
source = %s | parse email '.+@(?<host>.+)' | fields email, host
36+
""",
37+
TEST_INDEX_BANK));
38+
verifySchema(result, schema("email", "string"), schema("host", "string"));
39+
verifyDataRows(
40+
result,
41+
rows("[email protected]", "pyrami.com"),
42+
rows("[email protected]", "netagy.com"),
43+
rows("[email protected]", "quility.com"),
44+
rows("[email protected]", "boink.com"),
45+
rows("[email protected]", "scentric.com"),
46+
rows("[email protected]", "filodyne.com"),
47+
rows("[email protected]", "quailcom.com"));
48+
}
49+
50+
@Test
51+
public void testParseOverriding() {
52+
JSONObject result =
53+
executeQuery(
54+
String.format(
55+
"""
56+
source = %s | parse email '.+@(?<email>.+)' | fields email
57+
""",
58+
TEST_INDEX_BANK));
59+
verifySchema(result, schema("email", "string"));
60+
verifyDataRows(
61+
result,
62+
rows("pyrami.com"),
63+
rows("netagy.com"),
64+
rows("quility.com"),
65+
rows("boink.com"),
66+
rows("scentric.com"),
67+
rows("filodyne.com"),
68+
rows("quailcom.com"));
69+
}
70+
71+
@Test
72+
public void testParseEmailCountByHost() {
73+
JSONObject result =
74+
executeQuery(
75+
String.format(
76+
"""
77+
source = %s | parse email '.+@(?<host>.+)' | stats count() by host
78+
""",
79+
TEST_INDEX_BANK));
80+
verifySchema(result, schema("count()", "long"), schema("host", "string"));
81+
verifyDataRows(
82+
result,
83+
rows(1, "pyrami.com"),
84+
rows(1, "netagy.com"),
85+
rows(1, "quility.com"),
86+
rows(1, "boink.com"),
87+
rows(1, "scentric.com"),
88+
rows(1, "filodyne.com"),
89+
rows(1, "quailcom.com"));
90+
}
91+
92+
@Test
93+
public void testParseStreetNumber() {
94+
JSONObject result =
95+
executeQuery(
96+
String.format(
97+
"""
98+
source = %s | parse address '(?<streetNumber>\\d+)'
99+
| eval streetNumberInt = cast(streetNumber as integer)
100+
| where streetNumberInt > 500
101+
| sort streetNumberInt
102+
| fields streetNumberInt, address
103+
""",
104+
TEST_INDEX_BANK));
105+
verifySchema(result, schema("streetNumberInt", "integer"), schema("address", "string"));
106+
verifyDataRows(
107+
result,
108+
rows(671, "671 Bristol Street"),
109+
rows(702, "702 Quentin Street"),
110+
rows(789, "789 Madison Street"),
111+
rows(880, "880 Holmes Lane"));
112+
}
113+
114+
// TODO Multiple capturing groups are not allowed in Calcite REGEXP_EXTRACT function.
115+
// https://github.com/opensearch-project/sql/issues/3472
116+
@Test
117+
public void testParseMultipleGroups() {
118+
RuntimeException e =
119+
assertThrows(
120+
RuntimeException.class,
121+
() ->
122+
executeQuery(
123+
String.format(
124+
"""
125+
source = %s | parse address '(?<streetNumber>\\d+) (?<street>.+)'
126+
| fields streetNumber, street
127+
""",
128+
TEST_INDEX_BANK)));
129+
verifyErrorMessageContains(
130+
e, "Multiple capturing groups (count=2) not allowed in regex input for REGEXP_EXTRACT");
131+
}
132+
133+
@Test
134+
public void testParseOverriding2() throws IOException {
135+
Request request1 = new Request("PUT", "/test/_doc/1?refresh=true");
136+
request1.setJsonEntity(
137+
"{\"email\": \"[email protected]\", \"email0\": \"[email protected]\", \"email1\": \"[email protected]\"}");
138+
client().performRequest(request1);
139+
JSONObject result;
140+
result =
141+
executeQuery(
142+
"source = test | parse email '.+@(?<email0>.+)' | fields email, email0, email1");
143+
verifyDataRows(result, rows("[email protected]", "a.com", "[email protected]"));
144+
result =
145+
executeQuery(
146+
"source = test | parse email '.+@(?<email>.+)' | fields email, email0, email1");
147+
verifyDataRows(result, rows("a.com", "[email protected]", "[email protected]"));
148+
}
149+
}

0 commit comments

Comments
 (0)