Skip to content

Commit 41917ef

Browse files
authored
Unified OpenSearch PPL Data Type (#3345)
--------- Signed-off-by: Peng Huo <[email protected]>
1 parent ccae017 commit 41917ef

File tree

36 files changed

+592
-214
lines changed

36 files changed

+592
-214
lines changed

core/src/main/java/org/opensearch/sql/analysis/AnalysisContext.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.List;
1010
import java.util.Objects;
1111
import lombok.Getter;
12+
import org.opensearch.sql.executor.QueryType;
1213
import org.opensearch.sql.expression.NamedExpression;
1314
import org.opensearch.sql.expression.function.FunctionProperties;
1415

@@ -21,8 +22,12 @@ public class AnalysisContext {
2122

2223
@Getter private final FunctionProperties functionProperties;
2324

25+
public AnalysisContext(QueryType queryType) {
26+
this(new TypeEnvironment(null), queryType);
27+
}
28+
2429
public AnalysisContext() {
25-
this(new TypeEnvironment(null));
30+
this(new TypeEnvironment(null), QueryType.SQL);
2631
}
2732

2833
/**
@@ -31,9 +36,13 @@ public AnalysisContext() {
3136
* @param environment Env to set to a new instance.
3237
*/
3338
public AnalysisContext(TypeEnvironment environment) {
39+
this(environment, QueryType.SQL);
40+
}
41+
42+
public AnalysisContext(TypeEnvironment environment, QueryType queryType) {
3443
this.environment = environment;
3544
this.namedParseExpressions = new ArrayList<>();
36-
this.functionProperties = new FunctionProperties();
45+
this.functionProperties = new FunctionProperties(queryType);
3746
}
3847

3948
/** Push a new environment. */

core/src/main/java/org/opensearch/sql/executor/QueryService.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void execute(
8080
// TODO https://github.com/opensearch-project/sql/issues/3457
8181
// Calcite is not available for SQL query now. Maybe release in 3.1.0?
8282
if (!calciteEnabled || relNodeVisitor == null || queryType == QueryType.SQL) {
83-
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
83+
executePlan(analyze(plan, queryType), PlanContext.emptyPlanContext(), listener);
8484
} else {
8585
try {
8686
AccessController.doPrivileged(
@@ -106,7 +106,7 @@ public void execute(
106106
}
107107
}
108108
LOG.warn("Fallback to V2 query engine since got exception", t);
109-
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
109+
executePlan(analyze(plan, queryType), PlanContext.emptyPlanContext(), listener);
110110
}
111111
}
112112
} catch (Exception e) {
@@ -183,15 +183,15 @@ public void explain(
183183
QueryType queryType,
184184
ResponseListener<ExecutionEngine.ExplainResponse> listener) {
185185
try {
186-
executionEngine.explain(plan(analyze(plan)), listener);
186+
executionEngine.explain(plan(analyze(plan, queryType)), listener);
187187
} catch (Exception e) {
188188
listener.onFailure(e);
189189
}
190190
}
191191

192192
/** Analyze {@link UnresolvedPlan}. */
193-
public LogicalPlan analyze(UnresolvedPlan plan) {
194-
return analyzer.analyze(plan, new AnalysisContext());
193+
public LogicalPlan analyze(UnresolvedPlan plan, QueryType queryType) {
194+
return analyzer.analyze(plan, new AnalysisContext(queryType));
195195
}
196196

197197
public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) {

core/src/main/java/org/opensearch/sql/executor/execution/AbstractPlan.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public abstract class AbstractPlan {
2222
/** Uniq query id. */
2323
@Getter private final QueryId queryId;
2424

25-
@Getter private final QueryType queryType;
25+
@Getter protected final QueryType queryType;
2626

2727
/** Start query execution. */
2828
public abstract void execute();

core/src/main/java/org/opensearch/sql/executor/execution/StreamingQueryPlan.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public StreamingQueryPlan(
5050
@Override
5151
public void execute() {
5252
try {
53-
LogicalPlan logicalPlan = queryService.analyze(plan);
53+
LogicalPlan logicalPlan = queryService.analyze(plan, queryType);
5454
StreamingSource streamingSource = buildStreamingSource(logicalPlan);
5555
streamingExecution =
5656
new MicroBatchStreamingExecution(

core/src/main/java/org/opensearch/sql/expression/function/FunctionProperties.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,33 @@
1010
import java.time.Instant;
1111
import java.time.ZoneId;
1212
import lombok.EqualsAndHashCode;
13-
import lombok.RequiredArgsConstructor;
13+
import lombok.Getter;
14+
import org.opensearch.sql.executor.QueryType;
1415

15-
@RequiredArgsConstructor
1616
@EqualsAndHashCode
1717
public class FunctionProperties implements Serializable {
1818

1919
private final Instant nowInstant;
2020
private final ZoneId currentZoneId;
21+
@Getter private final QueryType queryType;
2122

2223
/** By default, use current time and current timezone. */
2324
public FunctionProperties() {
24-
nowInstant = Instant.now();
25-
currentZoneId = ZoneId.systemDefault();
25+
this(QueryType.SQL);
26+
}
27+
28+
public FunctionProperties(QueryType queryType) {
29+
this(Instant.now(), ZoneId.systemDefault(), queryType);
30+
}
31+
32+
public FunctionProperties(Instant nowInstant, ZoneId currentZoneId) {
33+
this(nowInstant, currentZoneId, QueryType.SQL);
34+
}
35+
36+
public FunctionProperties(Instant nowInstant, ZoneId currentZoneId, QueryType queryType) {
37+
this.nowInstant = nowInstant;
38+
this.currentZoneId = currentZoneId;
39+
this.queryType = queryType;
2640
}
2741

2842
/**

core/src/main/java/org/opensearch/sql/expression/system/SystemFunctions.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
package org.opensearch.sql.expression.system;
77

88
import static org.opensearch.sql.data.type.ExprCoreType.STRING;
9+
import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC;
910

11+
import java.util.Locale;
1012
import lombok.experimental.UtilityClass;
1113
import org.apache.commons.lang3.tuple.Pair;
1214
import org.opensearch.sql.data.model.ExprStringValue;
1315
import org.opensearch.sql.data.model.ExprValue;
1416
import org.opensearch.sql.data.type.ExprType;
17+
import org.opensearch.sql.executor.QueryType;
1518
import org.opensearch.sql.expression.Expression;
1619
import org.opensearch.sql.expression.FunctionExpression;
1720
import org.opensearch.sql.expression.env.Environment;
@@ -41,7 +44,12 @@ public Pair<FunctionSignature, FunctionBuilder> resolve(
4144
new FunctionExpression(BuiltinFunctionName.TYPEOF.getName(), arguments) {
4245
@Override
4346
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
44-
return new ExprStringValue(getArguments().get(0).type().legacyTypeName());
47+
ExprType type = getArguments().get(0).type();
48+
return new ExprStringValue(
49+
(functionProperties.getQueryType() == QueryType.PPL
50+
? PPL_SPEC.typeName(type)
51+
: type.legacyTypeName())
52+
.toUpperCase(Locale.ROOT));
4553
}
4654

4755
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.lang;
7+
8+
import static org.opensearch.sql.executor.QueryType.PPL;
9+
10+
import org.opensearch.sql.data.type.ExprType;
11+
import org.opensearch.sql.executor.QueryType;
12+
13+
/**
14+
* Represents a language specification for query processing.
15+
*
16+
* <p>This interface defines basic methods for language-specific behaviors, such as determining the
17+
* language type and mapping expression types to type names. Two language specifications are
18+
* provided: SQL and PPL.
19+
*/
20+
public interface LangSpec {
21+
22+
/** The default SQL language specification instance. */
23+
LangSpec SQL_SPEC = new LangSpec() {};
24+
25+
/**
26+
* Returns a language specification instance based on the provided language name.
27+
*
28+
* @param language the name of the language, case-insensitive.
29+
* @return the PPL language specification if the language is PPL (ignoring case); otherwise, the
30+
* SQL language specification.
31+
*/
32+
static LangSpec fromLanguage(String language) {
33+
if (PPL.name().equalsIgnoreCase(language)) {
34+
return PPLLangSpec.PPL_SPEC;
35+
} else {
36+
return SQL_SPEC;
37+
}
38+
}
39+
40+
/**
41+
* Returns the language type of this specification.
42+
*
43+
* <p>By default, the language is considered SQL.
44+
*
45+
* @return the language type, SQL by default.
46+
*/
47+
default QueryType language() {
48+
return QueryType.SQL;
49+
}
50+
51+
/**
52+
* Returns the type name for the given expression type.
53+
*
54+
* <p>This default implementation returns the result of {@code exprType.typeName()}.
55+
*
56+
* @param exprType the expression type.
57+
* @return the type name of the expression.
58+
*/
59+
default String typeName(ExprType exprType) {
60+
return exprType.typeName();
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.lang;
7+
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import org.opensearch.sql.data.type.ExprCoreType;
11+
import org.opensearch.sql.data.type.ExprType;
12+
import org.opensearch.sql.executor.QueryType;
13+
14+
/**
15+
* PPL language specification implementation.
16+
*
17+
* <p>This class provides a singleton implementation of {@link LangSpec} for PPL. It defines a
18+
* custom mapping from expression types to PPL type names.
19+
*/
20+
public class PPLLangSpec implements LangSpec {
21+
22+
public static final PPLLangSpec PPL_SPEC = new PPLLangSpec();
23+
24+
private static Map<ExprType, String> exprTypeToPPLType = new HashMap<>();
25+
26+
static {
27+
exprTypeToPPLType.put(ExprCoreType.BYTE, "tinyint");
28+
exprTypeToPPLType.put(ExprCoreType.SHORT, "smallint");
29+
exprTypeToPPLType.put(ExprCoreType.INTEGER, "int");
30+
exprTypeToPPLType.put(ExprCoreType.LONG, "bigint");
31+
}
32+
33+
private PPLLangSpec() {}
34+
35+
@Override
36+
public QueryType language() {
37+
return QueryType.PPL;
38+
}
39+
40+
/**
41+
* Returns the corresponding PPL type name for the given expression type. If the expression type
42+
* is not mapped, it returns the default type name.
43+
*
44+
* @param exprType the expression type.
45+
* @return the PPL type name associated with the expression type, or the default type name.
46+
*/
47+
@Override
48+
public String typeName(ExprType exprType) {
49+
return exprTypeToPPLType.getOrDefault(exprType, exprType.typeName());
50+
}
51+
}

core/src/main/java/org/opensearch/sql/utils/SystemIndexUtils.java

+50-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import lombok.Getter;
99
import lombok.RequiredArgsConstructor;
1010
import lombok.experimental.UtilityClass;
11+
import org.opensearch.sql.lang.LangSpec;
1112

1213
/** System Index Utils. Todo. Find the better name for this class. */
1314
@UtilityClass
@@ -39,7 +40,47 @@ public static Boolean isSystemIndex(String indexName) {
3940
* @return system mapping table.
4041
*/
4142
public static String mappingTable(String indexName) {
42-
return String.join(".", indexName, SYS_MAPPINGS_SUFFIX);
43+
return mappingTable(indexName, LangSpec.SQL_SPEC);
44+
}
45+
46+
public static String mappingTable(String indexName, LangSpec langSpec) {
47+
48+
return String.join(".", indexName, encodeLangSpec(langSpec));
49+
}
50+
51+
/**
52+
* Encodes the language specification into a system mappings suffix.
53+
*
54+
* <p>The returned suffix is composed of the language name (e.g., "SQL" or "PPL") concatenated
55+
* with an underscore and the system mappings suffix constant. For example:
56+
* "SQL_MAPPINGS_ODFE_SYS_TABLE".
57+
*
58+
* @param spec the language specification.
59+
* @return the encoded system mappings suffix.
60+
*/
61+
public static String encodeLangSpec(LangSpec spec) {
62+
return spec.language().name() + "_" + SYS_MAPPINGS_SUFFIX;
63+
}
64+
65+
/**
66+
* Extracts the language specification from a given system mappings suffix.
67+
*
68+
* <p>This method expects the suffix to start with the language name followed by an underscore.
69+
* For example, given "SQL_MAPPINGS_ODFE_SYS_TABLE", it extracts "SQL" and returns the
70+
* corresponding language specification via {@link LangSpec#fromLanguage(String)}. If the expected
71+
* format is not met, the default SQL specification is returned.
72+
*
73+
* @param systemMappingsSuffix the system mappings suffix.
74+
* @return the language specification extracted from the suffix, or {@link LangSpec#SQL_SPEC} if
75+
* the format is invalid.
76+
*/
77+
public static LangSpec extractLangSpec(String systemMappingsSuffix) {
78+
int underscoreIndex = systemMappingsSuffix.indexOf('_');
79+
if (underscoreIndex <= 0) {
80+
return LangSpec.SQL_SPEC;
81+
}
82+
String langName = systemMappingsSuffix.substring(0, underscoreIndex);
83+
return LangSpec.fromLanguage(langName);
4384
}
4485

4586
/**
@@ -52,10 +93,10 @@ public static SystemTable systemTable(String indexName) {
5293
String suffix = indexName.substring(lastDot + 1);
5394
String tableName = indexName.substring(0, lastDot).replace("%", "*");
5495

55-
if (suffix.equalsIgnoreCase(SYS_META_SUFFIX)) {
96+
if (suffix.endsWith(SYS_META_SUFFIX)) {
5697
return new SystemInfoTable(tableName);
57-
} else if (suffix.equalsIgnoreCase(SYS_MAPPINGS_SUFFIX)) {
58-
return new MetaInfoTable(tableName);
98+
} else if (suffix.endsWith(SYS_MAPPINGS_SUFFIX)) {
99+
return new MetaInfoTable(tableName, extractLangSpec(suffix));
59100
} else {
60101
throw new IllegalStateException("Invalid system index name: " + indexName);
61102
}
@@ -66,6 +107,10 @@ public interface SystemTable {
66107

67108
String getTableName();
68109

110+
default LangSpec getLangSpec() {
111+
return LangSpec.SQL_SPEC;
112+
}
113+
69114
default boolean isSystemInfoTable() {
70115
return false;
71116
}
@@ -93,6 +138,7 @@ public boolean isSystemInfoTable() {
93138
public static class MetaInfoTable implements SystemTable {
94139

95140
private final String tableName;
141+
private final LangSpec langSpec;
96142

97143
public boolean isMetaInfoTable() {
98144
return true;

core/src/test/java/org/opensearch/sql/executor/execution/StreamingQueryPlanTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public Helper() {
9090

9191
Helper streamingSource() {
9292
when(table.asStreamingSource()).thenReturn(streamingSource);
93-
when(queryService.analyze(any()))
93+
when(queryService.analyze(any(), any(QueryType.class)))
9494
.thenReturn(
9595
LogicalPlanDSL.project(
9696
LogicalPlanDSL.relation(tableName, table),
@@ -100,13 +100,14 @@ Helper streamingSource() {
100100

101101
Helper nonStreamingSource() {
102102
when(table.asStreamingSource()).thenThrow(UnsupportedOperationException.class);
103-
when(queryService.analyze(any())).thenReturn(LogicalPlanDSL.relation(tableName, table));
103+
when(queryService.analyze(any(), any(QueryType.class)))
104+
.thenReturn(LogicalPlanDSL.relation(tableName, table));
104105

105106
return this;
106107
}
107108

108109
Helper withoutSource() {
109-
when(queryService.analyze(any())).thenReturn(LogicalPlanDSL.values());
110+
when(queryService.analyze(any(), any(QueryType.class))).thenReturn(LogicalPlanDSL.values());
110111

111112
return this;
112113
}

0 commit comments

Comments
 (0)