Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull-based Ingestion] Introduce ingestion management APIs #17631

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

varunbharadwaj
Copy link
Contributor

@varunbharadwaj varunbharadwaj commented Mar 19, 2025

Description

This PR adds ingestion management APIs - mainly Pause, Resume and GetIngestionState APIs. This PR is a first version to introduce the APIs and subsequent PRs will build on this to support pagination and consumer reset options.

Related Issues

Part one of #17442

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Contributor

❌ Gradle check result for dcccf22: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 77aa63a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Varun Bharadwaj <[email protected]>
Copy link
Contributor

❌ Gradle check result for 7402d31: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

✅ Gradle check result for 5756fd0: SUCCESS

Copy link

codecov bot commented Mar 22, 2025

Codecov Report

Attention: Patch coverage is 54.02098% with 263 lines in your changes missing coverage. Please review.

Project coverage is 72.33%. Comparing base (6d53f9d) to head (060d76c).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...tadata/MetadataStreamingIngestionStateService.java 11.62% 38 Missing ⚠️
...ingestion/pause/TransportPauseIngestionAction.java 17.14% 29 Missing ⚠️
...gestion/resume/TransportResumeIngestionAction.java 17.14% 29 Missing ⚠️
...in/java/org/opensearch/index/shard/IndexShard.java 0.00% 18 Missing ⚠️
...reamingingestion/IngestionUpdateStateResponse.java 52.94% 16 Missing ⚠️
...streamingingestion/IngestionStateShardFailure.java 61.76% 13 Missing ⚠️
...mingingestion/state/GetIngestionStateResponse.java 43.47% 13 Missing ⚠️
.../streamingingestion/state/ShardIngestionState.java 72.50% 11 Missing ⚠️
...a/org/opensearch/index/engine/IngestionEngine.java 0.00% 10 Missing and 1 partial ⚠️
...ion/admin/indices/RestGetIngestionStateAction.java 23.07% 10 Missing ⚠️
... and 17 more
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #17631      +/-   ##
============================================
- Coverage     72.40%   72.33%   -0.07%     
- Complexity    65828    65851      +23     
============================================
  Files          5316     5339      +23     
  Lines        305294   305905     +611     
  Branches      44289    44338      +49     
============================================
+ Hits         221033   221266     +233     
- Misses        66187    66495     +308     
- Partials      18074    18144      +70     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@varunbharadwaj varunbharadwaj changed the title [WIP] [Pull-based Ingestion] Introduce ingestion management APIs [Pull-based Ingestion] Introduce ingestion management APIs Mar 22, 2025
Copy link
Contributor

❌ Gradle check result for 371e634: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 7eccc3c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❕ Gradle check result for 060d76c: UNSTABLE

  • TEST FAILURES:
      1 org.opensearch.snapshots.DedicatedClusterSnapshotRestoreIT.testSnapshotWithStuckNode

Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure.

Copy link
Contributor

@yupeng9 yupeng9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SHARD, shard);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We skip index from XContent since shard level failures generally follow the format of indexName: {[<shard failures>]}. So the indexName will be used as a key from the container class holding this failure.

public void onResponse(UpdateIngestionStateResponse updateIngestionStateResponse) {
boolean shardsAcked = updateIngestionStateResponse.isAcknowledged()
&& updateIngestionStateResponse.getTotalShards() > 0
&& updateIngestionStateResponse.getFailedShards() == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are checking if the request was processed successfully on all individual shards in order to set shard level ack to true. If atleast 1 shard failed to explicitly update the state, we set this to false and add the shard in the failure list.

/**
* Holds metadata required for updating ingestion state.
*
* <p> This is for internal use only and will not be exposed to the user. </p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we ensure user cannot access this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Right now, I just create the transport action but do not register it as a REST action in the action module so users cannot call this externally.

if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
ingestionPaused = in.readBoolean();
} else {
ingestionPaused = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this one? the default value of ingestionPaused is false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a final variable and hence we have to initialize it explicitly to false. If we don't want to, we can make it non-final.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants