Skip to content

Commit 060d76c

Browse files
Add unit tests for ingestion management APIs
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 5756fd0 commit 060d76c

16 files changed

+823
-8
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

+5
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ protected GetIngestionStateResponse getIngestionState(String indexName) throws E
138138
return client().admin().indices().getIngestionState(Requests.getIngestionStateRequest(indexName)).get();
139139
}
140140

141+
protected GetIngestionStateResponse getIngestionState(String[] indexNames, int[] shards) throws ExecutionException,
142+
InterruptedException {
143+
return client().admin().indices().getIngestionState(Requests.getIngestionStateRequest(indexNames, shards)).get();
144+
}
145+
141146
protected PauseIngestionResponse pauseIngestion(String indexName) throws ExecutionException, InterruptedException {
142147
return client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(indexName)).get();
143148
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+22
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.nio.file.Path;
2525
import java.util.Arrays;
26+
import java.util.concurrent.ExecutionException;
2627

2728
import static org.hamcrest.Matchers.is;
2829

@@ -212,6 +213,27 @@ public void testPauseAndResumeIngestion() throws Exception {
212213
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));
213214
}
214215

216+
public void testGetIngestionState() throws ExecutionException, InterruptedException {
217+
internalCluster().startClusterManagerOnlyNode();
218+
internalCluster().startDataOnlyNode();
219+
internalCluster().startDataOnlyNode();
220+
createIndexWithDefaultSettings(1, 1);
221+
ensureGreen(indexName);
222+
223+
GetIngestionStateResponse ingestionState = getIngestionState(new String[] { indexName }, new int[] { 0 });
224+
assertEquals(0, ingestionState.getFailedShards());
225+
assertEquals(1, ingestionState.getSuccessfulShards());
226+
assertEquals(1, ingestionState.getTotalShards());
227+
assertEquals(1, ingestionState.getShardStates().length);
228+
assertEquals(0, ingestionState.getShardStates()[0].getShardId());
229+
assertEquals("POLLING", ingestionState.getShardStates()[0].getPollerState());
230+
assertEquals("DROP", ingestionState.getShardStates()[0].getErrorPolicy());
231+
assertFalse(ingestionState.getShardStates()[0].isPollerPaused());
232+
233+
GetIngestionStateResponse ingestionStateForInvalidShard = getIngestionState(new String[] { indexName }, new int[] { 1 });
234+
assertEquals(0, ingestionStateForInvalidShard.getTotalShards());
235+
}
236+
215237
private void verifyRemoteStoreEnabled(String node) {
216238
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
217239
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/IngestionUpdateStateResponse.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.Map;
2222

2323
/**
24-
* Transport response for pausing ingestion.
24+
* Transport response for ingestion state updates.
2525
*
2626
* @opensearch.experimental
2727
*/

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/resume/ResumeIngestionRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void writeTo(StreamOutput out) throws IOException {
133133
* @opensearch.experimental
134134
*/
135135
@ExperimentalApi
136-
public class ResetSettings implements Writeable {
136+
public static class ResetSettings implements Writeable {
137137
private final int shard;
138138
private final String mode;
139139
private final String value;

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ public ShardIngestionState() {
5252
public ShardIngestionState(StreamInput in) throws IOException {
5353
index = in.readString();
5454
shardId = in.readInt();
55-
pollerState = in.readString();
56-
errorPolicy = in.readString();
55+
pollerState = in.readOptionalString();
56+
errorPolicy = in.readOptionalString();
5757
isPollerPaused = in.readBoolean();
5858
}
5959

@@ -87,12 +87,16 @@ public boolean isPollerPaused() {
8787
return isPollerPaused;
8888
}
8989

90+
public String getErrorPolicy() {
91+
return errorPolicy;
92+
}
93+
9094
@Override
9195
public void writeTo(StreamOutput out) throws IOException {
9296
out.writeString(index);
9397
out.writeInt(shardId);
94-
out.writeString(pollerState);
95-
out.writeString(errorPolicy);
98+
out.writeOptionalString(pollerState);
99+
out.writeOptionalString(errorPolicy);
96100
out.writeBoolean(isPollerPaused);
97101
}
98102

server/src/main/java/org/opensearch/transport/client/Requests.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,9 @@ public static GetIngestionStateRequest getIngestionStateRequest(String index) {
636636
/**
637637
* Creates a get ingestion state request given list of indices.
638638
*/
639-
public static GetIngestionStateRequest getIngestionStateRequest(String[] indices) {
640-
return new GetIngestionStateRequest(indices);
639+
public static GetIngestionStateRequest getIngestionStateRequest(String[] indices, int[] shards) {
640+
GetIngestionStateRequest request = new GetIngestionStateRequest(indices);
641+
request.setShards(shards);
642+
return request;
641643
}
642644
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion;
10+
11+
import org.opensearch.common.io.stream.BytesStreamOutput;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import java.io.IOException;
16+
import java.util.List;
17+
import java.util.Map;
18+
19+
public class IngestionUpdateStateResponseTests extends OpenSearchTestCase {
20+
21+
public void testSerialization() throws IOException {
22+
IngestionStateShardFailure[] shardFailures = new IngestionStateShardFailure[] {
23+
new IngestionStateShardFailure("index1", 0, "test failure") };
24+
IngestionUpdateStateResponse response = new IngestionUpdateStateResponse(true, true, shardFailures, "test error");
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
response.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
IngestionUpdateStateResponse deserializedResponse = new IngestionUpdateStateResponse(in);
31+
assertTrue(deserializedResponse.isAcknowledged());
32+
assertTrue(deserializedResponse.isShardsAcknowledged());
33+
assertNotNull(deserializedResponse.getShardFailures());
34+
assertEquals(1, deserializedResponse.getShardFailures().length);
35+
assertEquals("index1", deserializedResponse.getShardFailures()[0].getIndex());
36+
assertEquals(0, deserializedResponse.getShardFailures()[0].getShard());
37+
assertEquals("test error", deserializedResponse.getErrorMessage());
38+
}
39+
}
40+
}
41+
42+
public void testShardFailureGrouping() {
43+
IngestionStateShardFailure[] shardFailures = new IngestionStateShardFailure[] {
44+
new IngestionStateShardFailure("index1", 0, "failure 1"),
45+
new IngestionStateShardFailure("index1", 1, "failure 2"),
46+
new IngestionStateShardFailure("index2", 0, "failure 3") };
47+
IngestionUpdateStateResponse response = new IngestionUpdateStateResponse(true, true, shardFailures, "test error");
48+
49+
Map<String, List<IngestionStateShardFailure>> groupedFailures = IngestionStateShardFailure.groupShardFailuresByIndex(shardFailures);
50+
assertEquals(2, groupedFailures.size());
51+
assertEquals(2, groupedFailures.get("index1").size());
52+
assertEquals(1, groupedFailures.get("index2").size());
53+
assertEquals(0, groupedFailures.get("index1").get(0).getShard());
54+
assertEquals(1, groupedFailures.get("index1").get(1).getShard());
55+
assertEquals(0, groupedFailures.get("index2").get(0).getShard());
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.pause;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.support.IndicesOptions;
13+
import org.opensearch.common.io.stream.BytesStreamOutput;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.test.OpenSearchTestCase;
16+
17+
import java.io.IOException;
18+
19+
public class PauseIngestionRequestTests extends OpenSearchTestCase {
20+
21+
public void testSerialization() throws IOException {
22+
String[] indices = new String[] { "index1", "index2" };
23+
PauseIngestionRequest request = new PauseIngestionRequest(indices);
24+
request.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
request.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
PauseIngestionRequest deserializedRequest = new PauseIngestionRequest(in);
31+
assertArrayEquals(request.indices(), deserializedRequest.indices());
32+
assertEquals(request.indicesOptions(), deserializedRequest.indicesOptions());
33+
}
34+
}
35+
}
36+
37+
public void testValidation() {
38+
// Test with valid indices
39+
PauseIngestionRequest request = new PauseIngestionRequest(new String[] { "index1", "index2" });
40+
assertNull(request.validate());
41+
42+
// Test with empty indices
43+
PauseIngestionRequest request2 = new PauseIngestionRequest(new String[0]);
44+
ActionRequestValidationException e = request2.validate();
45+
assertNotNull(e);
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.resume;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.support.IndicesOptions;
13+
import org.opensearch.common.io.stream.BytesStreamOutput;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.test.OpenSearchTestCase;
16+
17+
import java.io.IOException;
18+
19+
public class ResumeIngestionRequestTests extends OpenSearchTestCase {
20+
21+
public void testSerialization() throws IOException {
22+
String[] indices = new String[] { "index1", "index2" };
23+
ResumeIngestionRequest request = new ResumeIngestionRequest(indices);
24+
request.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
request.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
ResumeIngestionRequest deserializedRequest = new ResumeIngestionRequest(in);
31+
assertArrayEquals(request.indices(), deserializedRequest.indices());
32+
assertEquals(request.indicesOptions(), deserializedRequest.indicesOptions());
33+
}
34+
}
35+
}
36+
37+
public void testValidation() {
38+
// Test with valid indices
39+
ResumeIngestionRequest request1 = new ResumeIngestionRequest(new String[] { "index1", "index2" });
40+
assertNull(request1.validate());
41+
42+
// Test with empty indices
43+
ResumeIngestionRequest request2 = new ResumeIngestionRequest(new String[0]);
44+
ActionRequestValidationException e = request2.validate();
45+
assertNotNull(e);
46+
}
47+
48+
public void testResetSettingsSerialization() throws IOException {
49+
ResumeIngestionRequest.ResetSettings settings = new ResumeIngestionRequest.ResetSettings(1, "mode", "value");
50+
51+
try (BytesStreamOutput out = new BytesStreamOutput()) {
52+
settings.writeTo(out);
53+
54+
try (StreamInput in = out.bytes().streamInput()) {
55+
ResumeIngestionRequest.ResetSettings deserialized = new ResumeIngestionRequest.ResetSettings(in);
56+
assertEquals(settings.getShard(), deserialized.getShard());
57+
assertEquals(settings.getMode(), deserialized.getMode());
58+
assertEquals(settings.getValue(), deserialized.getValue());
59+
}
60+
}
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.state;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.common.io.stream.BytesStreamOutput;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
import java.io.IOException;
17+
18+
public class GetIngestionStateRequestTests extends OpenSearchTestCase {
19+
20+
public void testSerialization() throws IOException {
21+
String[] indices = new String[] { "index1", "index2" };
22+
int[] shards = new int[] { 0, 1, 2 };
23+
GetIngestionStateRequest request = new GetIngestionStateRequest(indices);
24+
request.setShards(shards);
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
request.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
GetIngestionStateRequest deserializedRequest = new GetIngestionStateRequest(in);
31+
assertArrayEquals(request.getIndex(), deserializedRequest.getIndex());
32+
assertArrayEquals(request.getShards(), deserializedRequest.getShards());
33+
}
34+
}
35+
}
36+
37+
public void testValidation() {
38+
// Test with valid indices
39+
GetIngestionStateRequest request1 = new GetIngestionStateRequest(new String[] { "index1", "index2" });
40+
assertNull(request1.validate());
41+
42+
// Test with null indices
43+
GetIngestionStateRequest request2 = new GetIngestionStateRequest((String[]) null);
44+
ActionRequestValidationException e = request2.validate();
45+
assertNotNull(e);
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.state;
10+
11+
import org.opensearch.common.io.stream.BytesStreamOutput;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import java.io.IOException;
16+
import java.util.Collections;
17+
18+
public class GetIngestionStateResponseTests extends OpenSearchTestCase {
19+
20+
public void testSerialization() throws IOException {
21+
ShardIngestionState[] shardStates = new ShardIngestionState[] {
22+
new ShardIngestionState("index1", 0, "POLLING", "DROP", false),
23+
new ShardIngestionState("index1", 1, "PAUSED", "BLOCK", true) };
24+
GetIngestionStateResponse response = new GetIngestionStateResponse(shardStates, 2, 2, 0, Collections.emptyList());
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
response.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
GetIngestionStateResponse deserializedResponse = new GetIngestionStateResponse(in);
31+
assertEquals(response.getShardStates()[0].getShardId(), deserializedResponse.getShardStates()[0].getShardId());
32+
assertEquals(response.getShardStates()[1].getShardId(), deserializedResponse.getShardStates()[1].getShardId());
33+
assertEquals(response.getTotalShards(), deserializedResponse.getTotalShards());
34+
assertEquals(response.getSuccessfulShards(), deserializedResponse.getSuccessfulShards());
35+
assertEquals(response.getFailedShards(), deserializedResponse.getFailedShards());
36+
}
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)