Skip to content

Unified OpenSearch PPL Data Type #3345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Objects;
import lombok.Getter;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.function.FunctionProperties;

Expand All @@ -21,8 +22,12 @@ public class AnalysisContext {

@Getter private final FunctionProperties functionProperties;

public AnalysisContext(QueryType queryType) {
this(new TypeEnvironment(null), queryType);
}

public AnalysisContext() {
this(new TypeEnvironment(null));
this(new TypeEnvironment(null), QueryType.SQL);
}

/**
Expand All @@ -31,9 +36,13 @@ public AnalysisContext() {
* @param environment Env to set to a new instance.
*/
public AnalysisContext(TypeEnvironment environment) {
this(environment, QueryType.SQL);
}

public AnalysisContext(TypeEnvironment environment, QueryType queryType) {
this.environment = environment;
this.namedParseExpressions = new ArrayList<>();
this.functionProperties = new FunctionProperties();
this.functionProperties = new FunctionProperties(queryType);
}

/** Push a new environment. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void execute(
// TODO https://github.com/opensearch-project/sql/issues/3457
// Calcite is not available for SQL query now. Maybe release in 3.1.0?
if (!calciteEnabled || relNodeVisitor == null || queryType == QueryType.SQL) {
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
executePlan(analyze(plan, queryType), PlanContext.emptyPlanContext(), listener);
} else {
try {
AccessController.doPrivileged(
Expand All @@ -106,7 +106,7 @@ public void execute(
}
}
LOG.warn("Fallback to V2 query engine since got exception", t);
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
executePlan(analyze(plan, queryType), PlanContext.emptyPlanContext(), listener);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -183,15 +183,15 @@ public void explain(
QueryType queryType,
ResponseListener<ExecutionEngine.ExplainResponse> listener) {
try {
executionEngine.explain(plan(analyze(plan)), listener);
executionEngine.explain(plan(analyze(plan, queryType)), listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

/** Analyze {@link UnresolvedPlan}. */
public LogicalPlan analyze(UnresolvedPlan plan) {
return analyzer.analyze(plan, new AnalysisContext());
public LogicalPlan analyze(UnresolvedPlan plan, QueryType queryType) {
return analyzer.analyze(plan, new AnalysisContext(queryType));
}

public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public abstract class AbstractPlan {
/** Uniq query id. */
@Getter private final QueryId queryId;

@Getter private final QueryType queryType;
@Getter protected final QueryType queryType;

/** Start query execution. */
public abstract void execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public StreamingQueryPlan(
@Override
public void execute() {
try {
LogicalPlan logicalPlan = queryService.analyze(plan);
LogicalPlan logicalPlan = queryService.analyze(plan, queryType);
StreamingSource streamingSource = buildStreamingSource(logicalPlan);
streamingExecution =
new MicroBatchStreamingExecution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,33 @@
import java.time.Instant;
import java.time.ZoneId;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.Getter;
import org.opensearch.sql.executor.QueryType;

@RequiredArgsConstructor
@EqualsAndHashCode
public class FunctionProperties implements Serializable {

private final Instant nowInstant;
private final ZoneId currentZoneId;
@Getter private final QueryType queryType;

/** By default, use current time and current timezone. */
public FunctionProperties() {
nowInstant = Instant.now();
currentZoneId = ZoneId.systemDefault();
this(QueryType.SQL);
}

public FunctionProperties(QueryType queryType) {
this(Instant.now(), ZoneId.systemDefault(), queryType);
}

public FunctionProperties(Instant nowInstant, ZoneId currentZoneId) {
this(nowInstant, currentZoneId, QueryType.SQL);
}

public FunctionProperties(Instant nowInstant, ZoneId currentZoneId, QueryType queryType) {
this.nowInstant = nowInstant;
this.currentZoneId = currentZoneId;
this.queryType = queryType;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
package org.opensearch.sql.expression.system;

import static org.opensearch.sql.data.type.ExprCoreType.STRING;
import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC;

import java.util.Locale;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.env.Environment;
Expand Down Expand Up @@ -41,7 +44,12 @@ public Pair<FunctionSignature, FunctionBuilder> resolve(
new FunctionExpression(BuiltinFunctionName.TYPEOF.getName(), arguments) {
@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
return new ExprStringValue(getArguments().get(0).type().legacyTypeName());
ExprType type = getArguments().get(0).type();
return new ExprStringValue(
(functionProperties.getQueryType() == QueryType.PPL
? PPL_SPEC.typeName(type)
: type.legacyTypeName())
.toUpperCase(Locale.ROOT));
}

@Override
Expand Down
62 changes: 62 additions & 0 deletions core/src/main/java/org/opensearch/sql/lang/LangSpec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.lang;

import static org.opensearch.sql.executor.QueryType.PPL;

import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.QueryType;

/**
* Represents a language specification for query processing.
*
* <p>This interface defines basic methods for language-specific behaviors, such as determining the
* language type and mapping expression types to type names. Two language specifications are
* provided: SQL and PPL.
*/
public interface LangSpec {

/** The default SQL language specification instance. */
LangSpec SQL_SPEC = new LangSpec() {};

/**
* Returns a language specification instance based on the provided language name.
*
* @param language the name of the language, case-insensitive.
* @return the PPL language specification if the language is PPL (ignoring case); otherwise, the
* SQL language specification.
*/
static LangSpec fromLanguage(String language) {
if (PPL.name().equalsIgnoreCase(language)) {
return PPLLangSpec.PPL_SPEC;
} else {
return SQL_SPEC;
}
}

/**
* Returns the language type of this specification.
*
* <p>By default, the language is considered SQL.
*
* @return the language type, SQL by default.
*/
default QueryType language() {
return QueryType.SQL;
}

/**
* Returns the type name for the given expression type.
*
* <p>This default implementation returns the result of {@code exprType.typeName()}.
*
* @param exprType the expression type.
* @return the type name of the expression.
*/
default String typeName(ExprType exprType) {
return exprType.typeName();
}
}
51 changes: 51 additions & 0 deletions core/src/main/java/org/opensearch/sql/lang/PPLLangSpec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.lang;

import java.util.HashMap;
import java.util.Map;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.QueryType;

/**
* PPL language specification implementation.
*
* <p>This class provides a singleton implementation of {@link LangSpec} for PPL. It defines a
* custom mapping from expression types to PPL type names.
*/
public class PPLLangSpec implements LangSpec {

public static final PPLLangSpec PPL_SPEC = new PPLLangSpec();

private static Map<ExprType, String> exprTypeToPPLType = new HashMap<>();

static {
exprTypeToPPLType.put(ExprCoreType.BYTE, "tinyint");
exprTypeToPPLType.put(ExprCoreType.SHORT, "smallint");
exprTypeToPPLType.put(ExprCoreType.INTEGER, "int");
exprTypeToPPLType.put(ExprCoreType.LONG, "bigint");
}
Comment on lines +26 to +31
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a breaking change, why don't we directly change the old type to the new type, instead of introducing LangSpec? Isn't this unnecessarily adding complexity?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. The core type was not upgraded because I intended for the data type changes to affect only PPL and not SQL. This PR focuses on unifying PPL data types, while SQL data types can be addressed in a separate issue, as changes there would impact JDBC, ODBC, and CLI.

Ideally, the query engine should use well-defined data types, with LangSpec serving as the protocol for translating these engine types to language-specific types. Once the Calcite implementation is complete, CalciteDataType will translate to ExprDataType, and LangSpec will translate from ExprDataType to the PPL response data type.


private PPLLangSpec() {}

@Override
public QueryType language() {
return QueryType.PPL;
}

/**
* Returns the corresponding PPL type name for the given expression type. If the expression type
* is not mapped, it returns the default type name.
*
* @param exprType the expression type.
* @return the PPL type name associated with the expression type, or the default type name.
*/
@Override
public String typeName(ExprType exprType) {
return exprTypeToPPLType.getOrDefault(exprType, exprType.typeName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.UtilityClass;
import org.opensearch.sql.lang.LangSpec;

/** System Index Utils. Todo. Find the better name for this class. */
@UtilityClass
Expand Down Expand Up @@ -39,7 +40,47 @@ public static Boolean isSystemIndex(String indexName) {
* @return system mapping table.
*/
public static String mappingTable(String indexName) {
return String.join(".", indexName, SYS_MAPPINGS_SUFFIX);
return mappingTable(indexName, LangSpec.SQL_SPEC);
}

public static String mappingTable(String indexName, LangSpec langSpec) {

return String.join(".", indexName, encodeLangSpec(langSpec));
}

/**
* Encodes the language specification into a system mappings suffix.
*
* <p>The returned suffix is composed of the language name (e.g., "SQL" or "PPL") concatenated
* with an underscore and the system mappings suffix constant. For example:
* "SQL_MAPPINGS_ODFE_SYS_TABLE".
*
* @param spec the language specification.
* @return the encoded system mappings suffix.
*/
public static String encodeLangSpec(LangSpec spec) {
return spec.language().name() + "_" + SYS_MAPPINGS_SUFFIX;
}

/**
* Extracts the language specification from a given system mappings suffix.
*
* <p>This method expects the suffix to start with the language name followed by an underscore.
* For example, given "SQL_MAPPINGS_ODFE_SYS_TABLE", it extracts "SQL" and returns the
* corresponding language specification via {@link LangSpec#fromLanguage(String)}. If the expected
* format is not met, the default SQL specification is returned.
*
* @param systemMappingsSuffix the system mappings suffix.
* @return the language specification extracted from the suffix, or {@link LangSpec#SQL_SPEC} if
* the format is invalid.
*/
public static LangSpec extractLangSpec(String systemMappingsSuffix) {
int underscoreIndex = systemMappingsSuffix.indexOf('_');
if (underscoreIndex <= 0) {
return LangSpec.SQL_SPEC;
}
String langName = systemMappingsSuffix.substring(0, underscoreIndex);
return LangSpec.fromLanguage(langName);
}

/**
Expand All @@ -52,10 +93,10 @@ public static SystemTable systemTable(String indexName) {
String suffix = indexName.substring(lastDot + 1);
String tableName = indexName.substring(0, lastDot).replace("%", "*");

if (suffix.equalsIgnoreCase(SYS_META_SUFFIX)) {
if (suffix.endsWith(SYS_META_SUFFIX)) {
return new SystemInfoTable(tableName);
} else if (suffix.equalsIgnoreCase(SYS_MAPPINGS_SUFFIX)) {
return new MetaInfoTable(tableName);
} else if (suffix.endsWith(SYS_MAPPINGS_SUFFIX)) {
return new MetaInfoTable(tableName, extractLangSpec(suffix));
} else {
throw new IllegalStateException("Invalid system index name: " + indexName);
}
Expand All @@ -66,6 +107,10 @@ public interface SystemTable {

String getTableName();

default LangSpec getLangSpec() {
return LangSpec.SQL_SPEC;
}

default boolean isSystemInfoTable() {
return false;
}
Expand Down Expand Up @@ -93,6 +138,7 @@ public boolean isSystemInfoTable() {
public static class MetaInfoTable implements SystemTable {

private final String tableName;
private final LangSpec langSpec;

public boolean isMetaInfoTable() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public Helper() {

Helper streamingSource() {
when(table.asStreamingSource()).thenReturn(streamingSource);
when(queryService.analyze(any()))
when(queryService.analyze(any(), any(QueryType.class)))
.thenReturn(
LogicalPlanDSL.project(
LogicalPlanDSL.relation(tableName, table),
Expand All @@ -100,13 +100,14 @@ Helper streamingSource() {

Helper nonStreamingSource() {
when(table.asStreamingSource()).thenThrow(UnsupportedOperationException.class);
when(queryService.analyze(any())).thenReturn(LogicalPlanDSL.relation(tableName, table));
when(queryService.analyze(any(), any(QueryType.class)))
.thenReturn(LogicalPlanDSL.relation(tableName, table));

return this;
}

Helper withoutSource() {
when(queryService.analyze(any())).thenReturn(LogicalPlanDSL.values());
when(queryService.analyze(any(), any(QueryType.class))).thenReturn(LogicalPlanDSL.values());

return this;
}
Expand Down
Loading
Loading