Skip to content

Commit 57ce303

Browse files
Max KsyunzYury-FridlyandGabeFernandez310acarbonetto
authored
Support for pagination in v2 engine of SELECT * FROM <table> queries (#1666)
v2 SQL engine can now paginate simple queries. Pagination is initiated by setting fetch_size property in the request JSON. Pagination is implemented using the OpenSearch Scroll API. Please see pagination-v2.md for implementation details. --------- Signed-off-by: MaxKsyunz <[email protected]> Signed-off-by: Yury-Fridlyand <[email protected]> Signed-off-by: Max Ksyunz <[email protected]> Co-authored-by: Yury-Fridlyand <[email protected]> Co-authored-by: GabeFernandez310 <[email protected]> Co-authored-by: Andrew Carbonetto <[email protected]>
1 parent 986db39 commit 57ce303

File tree

141 files changed

+6383
-1507
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

141 files changed

+6383
-1507
lines changed

Diff for: core/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ dependencies {
5757
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
5858
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
5959
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
60+
testImplementation group: 'org.mockito', name: 'mockito-inline', version: '3.12.4'
6061
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
6162
}
6263

Diff for: core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

+24-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.opensearch.sql.analysis;
88

9+
import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
910
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
1011
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
1112
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
@@ -42,13 +43,16 @@
4243
import org.opensearch.sql.ast.expression.UnresolvedExpression;
4344
import org.opensearch.sql.ast.tree.AD;
4445
import org.opensearch.sql.ast.tree.Aggregation;
46+
import org.opensearch.sql.ast.tree.CloseCursor;
4547
import org.opensearch.sql.ast.tree.Dedupe;
4648
import org.opensearch.sql.ast.tree.Eval;
49+
import org.opensearch.sql.ast.tree.FetchCursor;
4750
import org.opensearch.sql.ast.tree.Filter;
4851
import org.opensearch.sql.ast.tree.Head;
4952
import org.opensearch.sql.ast.tree.Kmeans;
5053
import org.opensearch.sql.ast.tree.Limit;
5154
import org.opensearch.sql.ast.tree.ML;
55+
import org.opensearch.sql.ast.tree.Paginate;
5256
import org.opensearch.sql.ast.tree.Parse;
5357
import org.opensearch.sql.ast.tree.Project;
5458
import org.opensearch.sql.ast.tree.RareTopN;
@@ -80,12 +84,15 @@
8084
import org.opensearch.sql.expression.parse.ParseExpression;
8185
import org.opensearch.sql.planner.logical.LogicalAD;
8286
import org.opensearch.sql.planner.logical.LogicalAggregation;
87+
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
8388
import org.opensearch.sql.planner.logical.LogicalDedupe;
8489
import org.opensearch.sql.planner.logical.LogicalEval;
90+
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
8591
import org.opensearch.sql.planner.logical.LogicalFilter;
8692
import org.opensearch.sql.planner.logical.LogicalLimit;
8793
import org.opensearch.sql.planner.logical.LogicalML;
8894
import org.opensearch.sql.planner.logical.LogicalMLCommons;
95+
import org.opensearch.sql.planner.logical.LogicalPaginate;
8996
import org.opensearch.sql.planner.logical.LogicalPlan;
9097
import org.opensearch.sql.planner.logical.LogicalProject;
9198
import org.opensearch.sql.planner.logical.LogicalRareTopN;
@@ -208,7 +215,6 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
208215
tableFunctionImplementation.applyArguments());
209216
}
210217

211-
212218
@Override
213219
public LogicalPlan visitLimit(Limit node, AnalysisContext context) {
214220
LogicalPlan child = node.getChild().get(0).accept(this, context);
@@ -561,6 +567,23 @@ public LogicalPlan visitML(ML node, AnalysisContext context) {
561567
return new LogicalML(child, node.getArguments());
562568
}
563569

570+
@Override
571+
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
572+
LogicalPlan child = paginate.getChild().get(0).accept(this, context);
573+
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
574+
}
575+
576+
@Override
577+
public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) {
578+
return new LogicalFetchCursor(cursor.getCursor(),
579+
dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
580+
}
581+
582+
@Override
583+
public LogicalPlan visitCloseCursor(CloseCursor closeCursor, AnalysisContext context) {
584+
return new LogicalCloseCursor(closeCursor.getChild().get(0).accept(this, context));
585+
}
586+
564587
/**
565588
* The first argument is always "asc", others are optional.
566589
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.
@@ -576,5 +599,4 @@ private SortOption analyzeSortOption(List<Argument> fieldArgs) {
576599
}
577600
return asc ? SortOption.DEFAULT_ASC : SortOption.DEFAULT_DESC;
578601
}
579-
580602
}

Diff for: core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

+15
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,16 @@
4141
import org.opensearch.sql.ast.statement.Statement;
4242
import org.opensearch.sql.ast.tree.AD;
4343
import org.opensearch.sql.ast.tree.Aggregation;
44+
import org.opensearch.sql.ast.tree.CloseCursor;
4445
import org.opensearch.sql.ast.tree.Dedupe;
4546
import org.opensearch.sql.ast.tree.Eval;
47+
import org.opensearch.sql.ast.tree.FetchCursor;
4648
import org.opensearch.sql.ast.tree.Filter;
4749
import org.opensearch.sql.ast.tree.Head;
4850
import org.opensearch.sql.ast.tree.Kmeans;
4951
import org.opensearch.sql.ast.tree.Limit;
5052
import org.opensearch.sql.ast.tree.ML;
53+
import org.opensearch.sql.ast.tree.Paginate;
5154
import org.opensearch.sql.ast.tree.Parse;
5255
import org.opensearch.sql.ast.tree.Project;
5356
import org.opensearch.sql.ast.tree.RareTopN;
@@ -294,4 +297,16 @@ public T visitQuery(Query node, C context) {
294297
public T visitExplain(Explain node, C context) {
295298
return visitStatement(node, context);
296299
}
300+
301+
public T visitPaginate(Paginate paginate, C context) {
302+
return visitChildren(paginate, context);
303+
}
304+
305+
public T visitFetchCursor(FetchCursor cursor, C context) {
306+
return visitChildren(cursor, context);
307+
}
308+
309+
public T visitCloseCursor(CloseCursor closeCursor, C context) {
310+
return visitChildren(closeCursor, context);
311+
}
297312
}

Diff for: core/src/main/java/org/opensearch/sql/ast/statement/Query.java

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
public class Query extends Statement {
2828

2929
protected final UnresolvedPlan plan;
30+
protected final int fetchSize;
3031

3132
@Override
3233
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import java.util.List;
9+
import org.opensearch.sql.ast.AbstractNodeVisitor;
10+
import org.opensearch.sql.ast.Node;
11+
12+
/**
13+
* AST node to represent close cursor operation.
14+
* Actually a wrapper to the AST.
15+
*/
16+
public class CloseCursor extends UnresolvedPlan {
17+
18+
/**
19+
* An instance of {@link FetchCursor}.
20+
*/
21+
private UnresolvedPlan cursor;
22+
23+
@Override
24+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
25+
return nodeVisitor.visitCloseCursor(this, context);
26+
}
27+
28+
@Override
29+
public UnresolvedPlan attach(UnresolvedPlan child) {
30+
this.cursor = child;
31+
return this;
32+
}
33+
34+
@Override
35+
public List<? extends Node> getChild() {
36+
return List.of(cursor);
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import lombok.EqualsAndHashCode;
9+
import lombok.Getter;
10+
import lombok.RequiredArgsConstructor;
11+
import org.opensearch.sql.ast.AbstractNodeVisitor;
12+
13+
/**
14+
* An unresolved plan that represents fetching the next
15+
* batch in paginationed plan.
16+
*/
17+
@RequiredArgsConstructor
18+
@EqualsAndHashCode(callSuper = false)
19+
public class FetchCursor extends UnresolvedPlan {
20+
@Getter
21+
final String cursor;
22+
23+
@Override
24+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
25+
return nodeVisitor.visitFetchCursor(this, context);
26+
}
27+
28+
@Override
29+
public UnresolvedPlan attach(UnresolvedPlan child) {
30+
throw new UnsupportedOperationException("Cursor unresolved plan does not support children");
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import java.util.List;
9+
import lombok.EqualsAndHashCode;
10+
import lombok.Getter;
11+
import lombok.RequiredArgsConstructor;
12+
import lombok.ToString;
13+
import org.opensearch.sql.ast.AbstractNodeVisitor;
14+
import org.opensearch.sql.ast.Node;
15+
16+
/**
17+
* AST node to represent pagination operation.
18+
* Actually a wrapper to the AST.
19+
*/
20+
@RequiredArgsConstructor
21+
@EqualsAndHashCode(callSuper = false)
22+
@ToString
23+
public class Paginate extends UnresolvedPlan {
24+
@Getter
25+
private final int pageSize;
26+
private UnresolvedPlan child;
27+
28+
public Paginate(int pageSize, UnresolvedPlan child) {
29+
this.pageSize = pageSize;
30+
this.child = child;
31+
}
32+
33+
@Override
34+
public List<? extends Node> getChild() {
35+
return List.of(child);
36+
}
37+
38+
@Override
39+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
40+
return nodeVisitor.visitPaginate(this, context);
41+
}
42+
43+
@Override
44+
public UnresolvedPlan attach(UnresolvedPlan child) {
45+
this.child = child;
46+
return this;
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.exception;
7+
8+
/**
9+
* This should be thrown on serialization of a PhysicalPlan tree if paging is finished.
10+
* Processing of such exception should outcome of responding no cursor to the user.
11+
*/
12+
public class NoCursorException extends RuntimeException {
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.exception;
7+
8+
/**
9+
* This should be thrown by V2 engine to support fallback scenario.
10+
*/
11+
public class UnsupportedCursorRequestException extends RuntimeException {
12+
}

Diff for: core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.sql.common.response.ResponseListener;
1515
import org.opensearch.sql.data.model.ExprValue;
1616
import org.opensearch.sql.data.type.ExprType;
17+
import org.opensearch.sql.executor.pagination.Cursor;
1718
import org.opensearch.sql.planner.physical.PhysicalPlan;
1819

1920
/**
@@ -53,6 +54,7 @@ void execute(PhysicalPlan plan, ExecutionContext context,
5354
class QueryResponse {
5455
private final Schema schema;
5556
private final List<ExprValue> results;
57+
private final Cursor cursor;
5658
}
5759

5860
@Data
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.sql.executor.execution;
10+
11+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
12+
import org.opensearch.sql.common.response.ResponseListener;
13+
import org.opensearch.sql.executor.ExecutionEngine;
14+
import org.opensearch.sql.executor.QueryId;
15+
import org.opensearch.sql.executor.QueryService;
16+
17+
/**
18+
* Query plan which does not reflect a search query being executed.
19+
* It contains a command or an action, for example, a DDL query.
20+
*/
21+
public class CommandPlan extends AbstractPlan {
22+
23+
/**
24+
* The query plan ast.
25+
*/
26+
protected final UnresolvedPlan plan;
27+
28+
/**
29+
* Query service.
30+
*/
31+
protected final QueryService queryService;
32+
33+
protected final ResponseListener<ExecutionEngine.QueryResponse> listener;
34+
35+
/** Constructor. */
36+
public CommandPlan(QueryId queryId, UnresolvedPlan plan, QueryService queryService,
37+
ResponseListener<ExecutionEngine.QueryResponse> listener) {
38+
super(queryId);
39+
this.plan = plan;
40+
this.queryService = queryService;
41+
this.listener = listener;
42+
}
43+
44+
@Override
45+
public void execute() {
46+
queryService.execute(plan, listener);
47+
}
48+
49+
@Override
50+
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
51+
throw new UnsupportedOperationException("CommandPlan does not support explain");
52+
}
53+
}

0 commit comments

Comments
 (0)