Skip to content

Commit dcccf22

Browse files
Add ingestion management APIs
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent d9a9274 commit dcccf22

40 files changed

+2076
-16
lines changed

Diff for: plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

+22
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@
1414
import org.apache.kafka.clients.producer.Producer;
1515
import org.apache.kafka.clients.producer.ProducerRecord;
1616
import org.apache.kafka.common.serialization.StringSerializer;
17+
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
18+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
19+
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
1720
import org.opensearch.action.search.SearchResponse;
1821
import org.opensearch.cluster.metadata.IndexMetadata;
1922
import org.opensearch.common.settings.Settings;
2023
import org.opensearch.plugins.Plugin;
2124
import org.opensearch.test.OpenSearchIntegTestCase;
25+
import org.opensearch.transport.client.Requests;
2226
import org.junit.After;
2327
import org.junit.Before;
2428

@@ -28,6 +32,7 @@
2832
import java.util.Locale;
2933
import java.util.Properties;
3034
import java.util.concurrent.Callable;
35+
import java.util.concurrent.ExecutionException;
3136
import java.util.concurrent.TimeUnit;
3237

3338
import org.testcontainers.containers.KafkaContainer;
@@ -112,6 +117,11 @@ protected void waitForSearchableDocs(long docCount, List<String> nodes) throws E
112117
}, 1, TimeUnit.MINUTES);
113118
}
114119

120+
protected long getSearchableDocCount(String node) throws Exception {
121+
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get();
122+
return response.getHits().getTotalHits().value();
123+
}
124+
115125
protected void waitForState(Callable<Boolean> checkState) throws Exception {
116126
assertBusy(() -> {
117127
if (checkState.call() == false) {
@@ -124,6 +134,18 @@ protected String getSettings(String indexName, String setting) {
124134
return client().admin().indices().prepareGetSettings(indexName).get().getSetting(indexName, setting);
125135
}
126136

137+
protected GetIngestionStateResponse getIngestionState(String indexName) throws ExecutionException, InterruptedException {
138+
return client().admin().indices().getIngestionState(Requests.getIngestionStateRequest(indexName)).get();
139+
}
140+
141+
protected PauseIngestionResponse pauseIngestion(String indexName) throws ExecutionException, InterruptedException {
142+
return client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(indexName)).get();
143+
}
144+
145+
protected ResumeIngestionResponse resumeIngestion(String indexName) throws ExecutionException, InterruptedException {
146+
return client().admin().indices().resumeIngestion(Requests.resumeIngestionRequest(indexName)).get();
147+
}
148+
127149
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
128150
createIndex(
129151
indexName,

Diff for: plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+53
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
package org.opensearch.plugin.kafka;
1010

1111
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
12+
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
13+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
14+
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
1215
import org.opensearch.action.search.SearchResponse;
1316
import org.opensearch.cluster.metadata.IndexMetadata;
1417
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
@@ -154,6 +157,56 @@ public void testErrorStrategy() throws Exception {
154157
waitForSearchableDocs(2, Arrays.asList(node));
155158
}
156159

160+
public void testPauseAndResumeIngestion() throws Exception {
161+
// setup nodes and index
162+
produceData("1", "name1", "24");
163+
produceData("2", "name2", "20");
164+
internalCluster().startClusterManagerOnlyNode();
165+
final String nodeA = internalCluster().startDataOnlyNode();
166+
final String nodeB = internalCluster().startDataOnlyNode();
167+
168+
createIndexWithDefaultSettings(1, 1);
169+
ensureGreen(indexName);
170+
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));
171+
172+
// pause ingestion
173+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
174+
assertTrue(pauseResponse.isAcknowledged());
175+
waitForState(() -> {
176+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
177+
return Arrays.stream(ingestionState.getShardStates()).allMatch(state -> state.getPollerState().equalsIgnoreCase("paused"));
178+
});
179+
180+
// verify ingestion state is persisted
181+
produceData("3", "name3", "30");
182+
produceData("4", "name4", "31");
183+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
184+
ensureYellowAndNoInitializingShards(indexName);
185+
assertTrue(nodeB.equals(primaryNodeName(indexName)));
186+
187+
final String nodeC = internalCluster().startDataOnlyNode();
188+
client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get();
189+
ensureGreen(indexName);
190+
assertTrue(nodeC.equals(replicaNodeName(indexName)));
191+
assertEquals(2, getSearchableDocCount(nodeB));
192+
waitForState(() -> {
193+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
194+
return Arrays.stream(ingestionState.getShardStates()).allMatch(state -> state.getPollerState().equalsIgnoreCase("paused"));
195+
});
196+
197+
// resume ingestion
198+
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName);
199+
assertTrue(resumeResponse.isAcknowledged());
200+
waitForState(() -> {
201+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
202+
return Arrays.stream(ingestionState.getShardStates())
203+
.allMatch(
204+
state -> state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing")
205+
);
206+
});
207+
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));
208+
}
209+
157210
private void verifyRemoteStoreEnabled(String node) {
158211
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
159212
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

Diff for: server/src/main/java/org/opensearch/action/ActionModule.java

+19
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@
198198
import org.opensearch.action.admin.indices.shrink.TransportResizeAction;
199199
import org.opensearch.action.admin.indices.stats.IndicesStatsAction;
200200
import org.opensearch.action.admin.indices.stats.TransportIndicesStatsAction;
201+
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionAction;
202+
import org.opensearch.action.admin.indices.streamingingestion.pause.TransportPauseIngestionAction;
203+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionAction;
204+
import org.opensearch.action.admin.indices.streamingingestion.resume.TransportResumeIngestionAction;
205+
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateAction;
206+
import org.opensearch.action.admin.indices.streamingingestion.state.TransportGetIngestionStateAction;
201207
import org.opensearch.action.admin.indices.template.delete.DeleteComponentTemplateAction;
202208
import org.opensearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
203209
import org.opensearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
@@ -401,6 +407,7 @@
401407
import org.opensearch.rest.action.admin.indices.RestGetFieldMappingAction;
402408
import org.opensearch.rest.action.admin.indices.RestGetIndexTemplateAction;
403409
import org.opensearch.rest.action.admin.indices.RestGetIndicesAction;
410+
import org.opensearch.rest.action.admin.indices.RestGetIngestionStateAction;
404411
import org.opensearch.rest.action.admin.indices.RestGetMappingAction;
405412
import org.opensearch.rest.action.admin.indices.RestGetSettingsAction;
406413
import org.opensearch.rest.action.admin.indices.RestIndexDeleteAliasesAction;
@@ -410,6 +417,7 @@
410417
import org.opensearch.rest.action.admin.indices.RestIndicesShardStoresAction;
411418
import org.opensearch.rest.action.admin.indices.RestIndicesStatsAction;
412419
import org.opensearch.rest.action.admin.indices.RestOpenIndexAction;
420+
import org.opensearch.rest.action.admin.indices.RestPauseIngestionAction;
413421
import org.opensearch.rest.action.admin.indices.RestPutComponentTemplateAction;
414422
import org.opensearch.rest.action.admin.indices.RestPutComposableIndexTemplateAction;
415423
import org.opensearch.rest.action.admin.indices.RestPutIndexTemplateAction;
@@ -418,6 +426,7 @@
418426
import org.opensearch.rest.action.admin.indices.RestRefreshAction;
419427
import org.opensearch.rest.action.admin.indices.RestResizeHandler;
420428
import org.opensearch.rest.action.admin.indices.RestResolveIndexAction;
429+
import org.opensearch.rest.action.admin.indices.RestResumeIngestionAction;
421430
import org.opensearch.rest.action.admin.indices.RestRolloverIndexAction;
422431
import org.opensearch.rest.action.admin.indices.RestSimulateIndexTemplateAction;
423432
import org.opensearch.rest.action.admin.indices.RestSimulateTemplateAction;
@@ -806,6 +815,11 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
806815
actions.register(GetSearchPipelineAction.INSTANCE, GetSearchPipelineTransportAction.class);
807816
actions.register(DeleteSearchPipelineAction.INSTANCE, DeleteSearchPipelineTransportAction.class);
808817

818+
// Pull-based ingestion actions
819+
actions.register(PauseIngestionAction.INSTANCE, TransportPauseIngestionAction.class);
820+
actions.register(ResumeIngestionAction.INSTANCE, TransportResumeIngestionAction.class);
821+
actions.register(GetIngestionStateAction.INSTANCE, TransportGetIngestionStateAction.class);
822+
809823
return unmodifiableMap(actions.getRegistry());
810824
}
811825

@@ -1041,6 +1055,11 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
10411055
registerHandler.accept(new RestGetDecommissionStateAction());
10421056
registerHandler.accept(new RestRemoteStoreStatsAction());
10431057
registerHandler.accept(new RestRestoreRemoteStoreAction());
1058+
1059+
// pull-based ingestion API
1060+
registerHandler.accept(new RestPauseIngestionAction());
1061+
registerHandler.accept(new RestResumeIngestionAction());
1062+
registerHandler.accept(new RestGetIngestionStateAction());
10441063
}
10451064

10461065
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/** Streaming ingestion transport handlers. */
10+
package org.opensearch.action.admin.indices.streamingingestion;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.pause;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Transport action for pausing ingestion.
15+
*
16+
* @opensearch.experimental
17+
*/
18+
public class PauseIngestionAction extends ActionType<PauseIngestionResponse> {
19+
20+
public static final PauseIngestionAction INSTANCE = new PauseIngestionAction();
21+
public static final String NAME = "indices:admin/ingestion/pause";
22+
23+
private PauseIngestionAction() {
24+
super(NAME, PauseIngestionResponse::new);
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.pause;
10+
11+
import org.opensearch.cluster.ack.IndicesClusterStateUpdateRequest;
12+
13+
/**
14+
* Cluster state update request that allows to pause ingestion.
15+
*
16+
* @opensearch.experimental
17+
*/
18+
public class PauseIngestionClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<PauseIngestionClusterStateUpdateRequest> {
19+
20+
private long taskId;
21+
22+
public PauseIngestionClusterStateUpdateRequest(final long taskId) {
23+
this.taskId = taskId;
24+
}
25+
26+
public long taskId() {
27+
return taskId;
28+
}
29+
30+
public PauseIngestionClusterStateUpdateRequest taskId(final long taskId) {
31+
this.taskId = taskId;
32+
return this;
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.pause;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.IndicesRequest;
13+
import org.opensearch.action.support.IndicesOptions;
14+
import org.opensearch.action.support.clustermanager.AcknowledgedRequest;
15+
import org.opensearch.common.annotation.ExperimentalApi;
16+
import org.opensearch.core.common.io.stream.StreamInput;
17+
import org.opensearch.core.common.io.stream.StreamOutput;
18+
import org.opensearch.core.common.util.CollectionUtils;
19+
20+
import java.io.IOException;
21+
22+
import static org.opensearch.action.ValidateActions.addValidationError;
23+
24+
/**
25+
* A request to pause ingestion.
26+
*
27+
* @opensearch.experimental
28+
*/
29+
@ExperimentalApi
30+
public class PauseIngestionRequest extends AcknowledgedRequest<PauseIngestionRequest> implements IndicesRequest.Replaceable {
31+
32+
private String[] indices;
33+
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
34+
35+
public PauseIngestionRequest(StreamInput in) throws IOException {
36+
super(in);
37+
this.indices = in.readStringArray();
38+
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
39+
}
40+
41+
public PauseIngestionRequest(String[] indices) {
42+
this.indices = indices;
43+
}
44+
45+
@Override
46+
public ActionRequestValidationException validate() {
47+
ActionRequestValidationException validationException = null;
48+
if (CollectionUtils.isEmpty(indices)) {
49+
validationException = addValidationError("index is missing", validationException);
50+
}
51+
return validationException;
52+
}
53+
54+
/**
55+
* Returns the indices to be paused.
56+
*/
57+
@Override
58+
public String[] indices() {
59+
return indices;
60+
}
61+
62+
/**
63+
* Sets the indices to be paused.
64+
*/
65+
@Override
66+
public PauseIngestionRequest indices(String... indices) {
67+
this.indices = indices;
68+
return this;
69+
}
70+
71+
/**
72+
* Specifies what type of requested indices to ignore and how to deal with wildcard expressions.
73+
* For example indices that don't exist.
74+
*
75+
* @return the desired behaviour regarding indices to ignore and wildcard indices expressions
76+
*/
77+
@Override
78+
public IndicesOptions indicesOptions() {
79+
return indicesOptions;
80+
}
81+
82+
/**
83+
* Specifies what type of requested indices to ignore and how to deal wild wildcard expressions.
84+
* For example indices that don't exist.
85+
*
86+
* @param indicesOptions the desired behaviour regarding indices to ignore and wildcard indices expressions
87+
* @return the request itself
88+
*/
89+
public PauseIngestionRequest indicesOptions(IndicesOptions indicesOptions) {
90+
this.indicesOptions = indicesOptions;
91+
return this;
92+
}
93+
94+
@Override
95+
public void writeTo(StreamOutput out) throws IOException {
96+
super.writeTo(out);
97+
out.writeStringArray(indices);
98+
indicesOptions.writeIndicesOptions(out);
99+
}
100+
}

0 commit comments

Comments
 (0)