Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Calcite by default and refactor all related ITs #3468

Merged
merged 4 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
@ToString
public class Alias extends UnresolvedExpression {

/** The name to be associated with the result of computing delegated expression. */
/**
* The name to be associated with the result of computing delegated expression. In OpenSearch ppl,
* the name is the actual alias of an expression
*/
private final String name;

/** Expression aliased. */
private final UnresolvedExpression delegated;

/** TODO. Optional field alias. */
/** TODO. Optional field alias. This field is OpenSearch SQL-only */
private String alias;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.executor.QueryType;

/** Explain Statement. */
@Data
@EqualsAndHashCode(callSuper = false)
public class Explain extends Statement {

private final Statement statement;
private final QueryType queryType;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.executor.QueryType;

/** Query Statement. */
@Getter
Expand All @@ -26,6 +27,7 @@ public class Query extends Statement {

protected final UnresolvedPlan plan;
protected final int fetchSize;
private final QueryType queryType;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
Expand All @@ -29,7 +30,8 @@ public AggCall analyze(UnresolvedExpression unresolved, CalcitePlanContext conte
@Override
public AggCall visitAlias(Alias node, CalcitePlanContext context) {
AggCall aggCall = analyze(node.getDelegated(), context);
return aggCall.as(node.getName());
// Only OpenSearch SQL uses node.getAlias, OpenSearch PPL uses node.getName.
return aggCall.as(Strings.isEmpty(node.getAlias()) ? node.getName() : node.getAlias());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.calcite.util.DateString;
import org.apache.calcite.util.TimeString;
import org.apache.calcite.util.TimestampString;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.And;
Expand Down Expand Up @@ -267,7 +268,9 @@ public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context
@Override
public RexNode visitAlias(Alias node, CalcitePlanContext context) {
RexNode expr = analyze(node.getDelegated(), context);
return context.relBuilder.alias(expr, node.getName());
// Only OpenSearch SQL uses node.getAlias, OpenSearch PPL uses node.getName.
return context.relBuilder.alias(
expr, Strings.isEmpty(node.getAlias()) ? node.getName() : node.getAlias());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.CalciteUnsupportedException;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPlan;
Expand Down Expand Up @@ -68,13 +69,17 @@ public class QueryService {
* @param listener {@link ResponseListener}
*/
public void execute(
UnresolvedPlan plan, ResponseListener<ExecutionEngine.QueryResponse> listener) {
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
boolean calciteEnabled = false;
if (settings != null) {
calciteEnabled = settings.getSettingValue(Settings.Key.CALCITE_ENGINE_ENABLED);
}
if (!calciteEnabled || relNodeVisitor == null) {
// TODO https://github.com/opensearch-project/sql/issues/3457
// Calcite is not available for SQL query now. Maybe release in 3.1.0?
if (!calciteEnabled || relNodeVisitor == null || queryType == QueryType.SQL) {
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
} else {
try {
Expand All @@ -86,15 +91,21 @@ public void execute(
executePlanByCalcite(analyze(plan, context), context, listener);
return null;
});
} catch (Exception e) {
} catch (Throwable t) {
boolean fallbackAllowed = true;
if (settings != null) {
fallbackAllowed = settings.getSettingValue(Settings.Key.CALCITE_FALLBACK_ALLOWED);
}
if (!fallbackAllowed) {
listener.onFailure(e);
if (t instanceof Error) {
// Calcite may throw AssertError during query execution.
// Convert them to CalciteUnsupportedException.
listener.onFailure(new CalciteUnsupportedException(t.getMessage()));
} else {
listener.onFailure((Exception) t);
}
}
LOG.warn("Fallback to V2 query engine since got exception", e);
LOG.warn("Fallback to V2 query engine since got exception", t);
executePlan(analyze(plan), PlanContext.emptyPlanContext(), listener);
}
}
Expand Down Expand Up @@ -168,7 +179,9 @@ private static RelNode convertToCalcitePlan(RelNode osPlan) {
* @param listener {@link ResponseListener} for explain response
*/
public void explain(
UnresolvedPlan plan, ResponseListener<ExecutionEngine.ExplainResponse> listener) {
UnresolvedPlan plan,
QueryType queryType,
ResponseListener<ExecutionEngine.ExplainResponse> listener) {
try {
executionEngine.explain(plan(analyze(plan)), listener);
} catch (Exception e) {
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/opensearch/sql/executor/QueryType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor;

public enum QueryType {
PPL,
SQL
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryType;

/** AbstractPlan represent the execution entity of the Statement. */
@RequiredArgsConstructor
Expand All @@ -21,6 +22,8 @@ public abstract class AbstractPlan {
/** Uniq query id. */
@Getter private final QueryId queryId;

@Getter private final QueryType queryType;

/** Start query execution. */
public abstract void execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.QueryType;

/**
* Query plan which does not reflect a search query being executed. It contains a command or an
Expand All @@ -31,18 +32,19 @@ public class CommandPlan extends AbstractPlan {
/** Constructor. */
public CommandPlan(
QueryId queryId,
QueryType queryType,
UnresolvedPlan plan,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
super(queryId, queryType);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
}

@Override
public void execute() {
queryService.execute(plan, listener);
queryService.execute(plan, getQueryType(), listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryType;

/** Explain plan. */
public class ExplainPlan extends AbstractPlan {
Expand All @@ -22,9 +23,10 @@ public class ExplainPlan extends AbstractPlan {
/** Constructor. */
public ExplainPlan(
QueryId queryId,
QueryType queryType,
AbstractPlan plan,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
super(queryId);
super(queryId, queryType);
this.plan = plan;
this.explainListener = explainListener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.QueryType;

/** Query plan which includes a <em>select</em> query. */
public class QueryPlan extends AbstractPlan {
Expand All @@ -33,10 +34,11 @@ public class QueryPlan extends AbstractPlan {
/** Constructor. */
public QueryPlan(
QueryId queryId,
QueryType queryType,
UnresolvedPlan plan,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
super(queryId, queryType);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
Expand All @@ -46,11 +48,12 @@ public QueryPlan(
/** Constructor with page size. */
public QueryPlan(
QueryId queryId,
QueryType queryType,
UnresolvedPlan plan,
int pageSize,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
super(queryId, queryType);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
Expand All @@ -60,9 +63,9 @@ public QueryPlan(
@Override
public void execute() {
if (pageSize.isPresent()) {
queryService.execute(new Paginate(pageSize.get(), plan), listener);
queryService.execute(new Paginate(pageSize.get(), plan), getQueryType(), listener);
} else {
queryService.execute(plan, listener);
queryService.execute(plan, getQueryType(), listener);
}
}

Expand All @@ -73,7 +76,7 @@ public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener)
new NotImplementedException(
"`explain` feature for paginated requests is not implemented yet."));
} else {
queryService.explain(plan, listener);
queryService.explain(plan, getQueryType(), listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.executor.pagination.CanPaginateVisitor;

/** QueryExecution Factory. */
Expand Down Expand Up @@ -72,11 +73,14 @@ public AbstractPlan create(
public AbstractPlan create(
String cursor,
boolean isExplain,
QueryType queryType,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
QueryId queryId = QueryId.queryId();
var plan = new QueryPlan(queryId, new FetchCursor(cursor), queryService, queryResponseListener);
return isExplain ? new ExplainPlan(queryId, plan, explainListener) : plan;
var plan =
new QueryPlan(
queryId, queryType, new FetchCursor(cursor), queryService, queryResponseListener);
return isExplain ? new ExplainPlan(queryId, queryType, plan, explainListener) : plan;
}

boolean canConvertToCursor(UnresolvedPlan plan) {
Expand All @@ -85,9 +89,12 @@ boolean canConvertToCursor(UnresolvedPlan plan) {

/** Creates a {@link CloseCursor} command on a cursor. */
public AbstractPlan createCloseCursor(
String cursor, ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener) {
String cursor,
QueryType queryType,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener) {
return new CommandPlan(
QueryId.queryId(),
queryType,
new CloseCursor().attach(new FetchCursor(cursor)),
queryService,
queryResponseListener);
Expand All @@ -107,6 +114,7 @@ public AbstractPlan visitQuery(
if (canConvertToCursor(node.getPlan())) {
return new QueryPlan(
QueryId.queryId(),
node.getQueryType(),
node.getPlan(),
node.getFetchSize(),
queryService,
Expand All @@ -117,7 +125,11 @@ public AbstractPlan visitQuery(
}
} else {
return new QueryPlan(
QueryId.queryId(), node.getPlan(), queryService, context.getLeft().get());
QueryId.queryId(),
node.getQueryType(),
node.getPlan(),
queryService,
context.getLeft().get());
}
}

Expand All @@ -133,6 +145,7 @@ public AbstractPlan visitExplain(

return new ExplainPlan(
QueryId.queryId(),
node.getQueryType(),
create(node.getStatement(), Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()),
context.getRight().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.executor.streaming.DefaultMetadataLog;
import org.opensearch.sql.executor.streaming.MicroBatchStreamingExecution;
import org.opensearch.sql.executor.streaming.StreamingSource;
Expand All @@ -36,11 +37,12 @@ public class StreamingQueryPlan extends QueryPlan {
/** constructor. */
public StreamingQueryPlan(
QueryId queryId,
QueryType queryType,
UnresolvedPlan plan,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener,
ExecutionStrategy executionStrategy) {
super(queryId, plan, queryService, listener);
super(queryId, queryType, plan, queryService, listener);

this.executionStrategy = executionStrategy;
}
Expand Down
Loading
Loading