Skip to content

Support Limit pushdown #3615

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 30, 2025
Merged

Conversation

yuancu
Copy link
Contributor

@yuancu yuancu commented May 12, 2025

Description

Implement LIMIT pushdown.

Related Issues

Resolves #3381

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Yuanchun Shen <[email protected]>
@LantaoJin LantaoJin added the calcite calcite migration releated label May 12, 2025
dai-chen
dai-chen previously approved these changes May 16, 2025
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalSort sort = call.rel(0);
final CalciteLogicalIndexScan scan = call.rel(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Calcite take care of the case that the child of sort is not index scan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule only matches LogicalSort -> CalciteLogicalIndexScan, which is specified in OpenSearchLimitIndexScanRule.Config. When the child of sort is not index scan, the query will not be caught and pushed-down via this rule.

Do you mean "is it possible that a limit does not fall into the LogicalSort -> CalciteLogicalIndexScan pattern"?

Copy link
Member

@LantaoJin LantaoJin May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the implementation here may have a bug:

source=t | where a=0 | head 10;
source=t | head 10 | where a=0;

Above two PPLs will both push down the filter and limit to DSL. Please check are the DSLs generated same?
If they are same, the results are incorrect since the two PPL queries should have different behaviors.

Copy link
Contributor Author

@yuancu yuancu May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the following two requests return the same DSL with the latest pushdown:

  • source=opensearch-sql_test_index_account | head 10 | where age = 20
  • source=opensearch-sql_test_index_account | where age = 20 | head 10
OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10,"timeout":"1m","query":{"term":{"age":{"value":20,"boost":1.0}}},"_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]},"sort":[{"_doc":{"order":"asc"}}]}, requestedTotalSize=10, pageSize=null, startFrom=0)])

Besides, the same problem also exists in v2 -- when explaining the two queries, both of them return the same request:

OpenSearchQueryRequest(indexName=country, sourceBuilder={"from":0,"size":10,"timeout":"1m","query":{"term":{"age":{"value":20,"boost":1.0}}},"_source":{"includes":["name","country","state","month","year","age"],"excludes":[]},"sort":[{"_doc":{"order":"asc"}}]}, needClean=true, searchDone=false, pitId=null, cursorKeepAlive=null, searchAfter=null, searchResponse=null)

Copy link
Member

@LantaoJin LantaoJin May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DSL means sourceBuilder={...}. They are same. Please fix it by checking the size status in optimize rule

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin Fixed

The size in the request builder is set to a default size at initialization (See OpenSearchIndex.java#L285). Therefore, we can not determine whether if there is already a limit by simply check whether size is set.

I created a flag in PushDownContext to log whether there was a limit pushdown.

dai-chen
dai-chen previously approved these changes May 19, 2025
Copy link
Collaborator

@dai-chen dai-chen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes!

Comment on lines 65 to 66
Predicate.not(OpenSearchIndexScanRule::isLimitPushed)
.and(OpenSearchIndexScanRule::test))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would OpenSearchAggregateIndexScanRule be applied with the same fixing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should. The results are incorrect without it.

// E.g. for `source=t | head 10 | head 5`, we take 5
// This also ensures that the limit won't exceed the initial default value. (set to
// Settings.Key.QUERY_SIZE_LIMIT in OpenSearchIndex)
requestedTotalSize = Math.min(limit, requestedTotalSize);
startFrom = offset;
sourceBuilder.from(offset).size(limit);
Copy link
Member

@LantaoJin LantaoJin May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't be sourceBuilder.from(offset).size(requestedTotalSize)?

Could you add 2 tests for this case in explain IT:
verify the pushdown size = 5

source=t | head 10 | head 5
source=t | head 5 | head 10

verify the pushdown size=10

source=t | head 10 | filter a = 0 | head 5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was a mistake. Fixed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not correct to set requestedTotalSize to the min here, I think.

source=t | head 10 from 1 | head 10 from 2 should return 8 rows, but it actually will return 10 rows under the current implementation.

@@ -109,6 +109,36 @@ public void testLimitPushDownExplain() throws Exception {
+ "| fields ageMinus"));
}

@Test
public void testLimitWithFilterPushdownExplain() throws Exception {
// TODO: Fix limit-then-filter pushdown without Calcite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how's the effort to fix the v2 bug together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed with minor effort.

* @param sort The LogicalSort to check.
* @return True if the LogicalSort is a LIMIT that can be pushed down, false otherwise.
*/
private static boolean isLogicalSortLimit(LogicalSort sort) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this condition to the RuleConfig, which will be more efficient?

Let's say RuleConfig find a LogicalSort node which is not limit, we should skip the step of identifying the pattern of its children and the above onMatch method immediately, instead of processing continuously until this place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// E.g. for `source=t | head 10 | head 5`, we take 5
// This also ensures that the limit won't exceed the initial default value. (set to
// Settings.Key.QUERY_SIZE_LIMIT in OpenSearchIndex)
requestedTotalSize = Math.min(limit, requestedTotalSize);
startFrom = offset;
Copy link
Collaborator

@qianheng-aws qianheng-aws May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not correct to override with the latest offset directly.

Let's say SQL like select * from (select * from table limit 10 offset 1) limit 5 offset 2, it's not correct to use 2 as the offset. The right offset should be 1 + 2 I think. Please add a test for this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Accumulating them should be the right behavior. Fixed.

if (LOG.isDebugEnabled()) {
LOG.debug("Cannot pushdown the limit {}", limit, e);
} else {
LOG.warn("Cannot pushdown the limit {}, ", limit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot pushdown the limit <Integer> is a meaningless message and warn level is too severe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the error message to LOG.debug("Cannot pushdown limit {} with offset {}. Exception: {}", limit, offset, e);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with @qianheng-aws offline. Now I think the log won't be generated too many. So please added this else back and change it to info. Can you fix all the these warn logs in this class?

Comment on lines 43 to 47
private static Integer extractLimitValue(RexNode fetch) {
if (fetch instanceof RexLiteral) {
return ((RexLiteral) fetch).getValueAs(Integer.class);
}
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this fetch always a RexLiteral? how about head 1+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LantaoJin
LantaoJin previously approved these changes May 22, 2025
@yuancu yuancu requested a review from qianheng-aws May 23, 2025 02:48
@LantaoJin LantaoJin merged commit df9f5dd into opensearch-project:main May 30, 2025
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
calcite calcite migration releated
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Calcite Engine Framework: pushdown limit
4 participants