Skip to content

Commit 5e33b66

Browse files
Add unit tests for ingestion management APIs
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 5756fd0 commit 5e33b66

12 files changed

+787
-5
lines changed

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/resume/ResumeIngestionRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void writeTo(StreamOutput out) throws IOException {
133133
* @opensearch.experimental
134134
*/
135135
@ExperimentalApi
136-
public class ResetSettings implements Writeable {
136+
public static class ResetSettings implements Writeable {
137137
private final int shard;
138138
private final String mode;
139139
private final String value;

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ public ShardIngestionState() {
5252
public ShardIngestionState(StreamInput in) throws IOException {
5353
index = in.readString();
5454
shardId = in.readInt();
55-
pollerState = in.readString();
56-
errorPolicy = in.readString();
55+
pollerState = in.readOptionalString();
56+
errorPolicy = in.readOptionalString();
5757
isPollerPaused = in.readBoolean();
5858
}
5959

@@ -91,8 +91,8 @@ public boolean isPollerPaused() {
9191
public void writeTo(StreamOutput out) throws IOException {
9292
out.writeString(index);
9393
out.writeInt(shardId);
94-
out.writeString(pollerState);
95-
out.writeString(errorPolicy);
94+
out.writeOptionalString(pollerState);
95+
out.writeOptionalString(errorPolicy);
9696
out.writeBoolean(isPollerPaused);
9797
}
9898

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion;
10+
11+
import org.opensearch.common.io.stream.BytesStreamOutput;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import java.io.IOException;
16+
import java.util.List;
17+
import java.util.Map;
18+
19+
public class IngestionUpdateStateResponseTests extends OpenSearchTestCase {
20+
21+
public void testSerialization() throws IOException {
22+
IngestionStateShardFailure[] shardFailures = new IngestionStateShardFailure[] {
23+
new IngestionStateShardFailure("index1", 0, "test failure") };
24+
IngestionUpdateStateResponse response = new IngestionUpdateStateResponse(true, true, shardFailures, "test error");
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
response.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
IngestionUpdateStateResponse deserializedResponse = new IngestionUpdateStateResponse(in);
31+
assertTrue(deserializedResponse.isAcknowledged());
32+
assertTrue(deserializedResponse.isShardsAcknowledged());
33+
assertNotNull(deserializedResponse.getShardFailures());
34+
assertEquals(1, deserializedResponse.getShardFailures().length);
35+
assertEquals("index1", deserializedResponse.getShardFailures()[0].getIndex());
36+
assertEquals(0, deserializedResponse.getShardFailures()[0].getShard());
37+
assertEquals("test error", deserializedResponse.getErrorMessage());
38+
}
39+
}
40+
}
41+
42+
public void testShardFailureGrouping() {
43+
IngestionStateShardFailure[] shardFailures = new IngestionStateShardFailure[] {
44+
new IngestionStateShardFailure("index1", 0, "failure 1"),
45+
new IngestionStateShardFailure("index1", 1, "failure 2"),
46+
new IngestionStateShardFailure("index2", 0, "failure 3") };
47+
IngestionUpdateStateResponse response = new IngestionUpdateStateResponse(true, true, shardFailures, "test error");
48+
49+
Map<String, List<IngestionStateShardFailure>> groupedFailures = IngestionStateShardFailure.groupShardFailuresByIndex(shardFailures);
50+
assertEquals(2, groupedFailures.size());
51+
assertEquals(2, groupedFailures.get("index1").size());
52+
assertEquals(1, groupedFailures.get("index2").size());
53+
assertEquals(0, groupedFailures.get("index1").get(0).getShard());
54+
assertEquals(1, groupedFailures.get("index1").get(1).getShard());
55+
assertEquals(0, groupedFailures.get("index2").get(0).getShard());
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.pause;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.support.IndicesOptions;
13+
import org.opensearch.common.io.stream.BytesStreamOutput;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.test.OpenSearchTestCase;
16+
17+
import java.io.IOException;
18+
19+
public class PauseIngestionRequestTests extends OpenSearchTestCase {
20+
21+
public void testSerialization() throws IOException {
22+
String[] indices = new String[] { "index1", "index2" };
23+
PauseIngestionRequest request = new PauseIngestionRequest(indices);
24+
request.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
request.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
PauseIngestionRequest deserializedRequest = new PauseIngestionRequest(in);
31+
assertArrayEquals(request.indices(), deserializedRequest.indices());
32+
assertEquals(request.indicesOptions(), deserializedRequest.indicesOptions());
33+
}
34+
}
35+
}
36+
37+
public void testValidation() {
38+
// Test with valid indices
39+
PauseIngestionRequest request = new PauseIngestionRequest(new String[] { "index1", "index2" });
40+
assertNull(request.validate());
41+
42+
// Test with empty indices
43+
PauseIngestionRequest request2 = new PauseIngestionRequest(new String[0]);
44+
ActionRequestValidationException e = request2.validate();
45+
assertNotNull(e);
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.resume;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.support.IndicesOptions;
13+
import org.opensearch.common.io.stream.BytesStreamOutput;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.test.OpenSearchTestCase;
16+
17+
import java.io.IOException;
18+
19+
public class ResumeIngestionRequestTests extends OpenSearchTestCase {
20+
21+
public void testSerialization() throws IOException {
22+
String[] indices = new String[] { "index1", "index2" };
23+
ResumeIngestionRequest request = new ResumeIngestionRequest(indices);
24+
request.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
request.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
ResumeIngestionRequest deserializedRequest = new ResumeIngestionRequest(in);
31+
assertArrayEquals(request.indices(), deserializedRequest.indices());
32+
assertEquals(request.indicesOptions(), deserializedRequest.indicesOptions());
33+
}
34+
}
35+
}
36+
37+
public void testValidation() {
38+
// Test with valid indices
39+
ResumeIngestionRequest request1 = new ResumeIngestionRequest(new String[] { "index1", "index2" });
40+
assertNull(request1.validate());
41+
42+
// Test with empty indices
43+
ResumeIngestionRequest request2 = new ResumeIngestionRequest(new String[0]);
44+
ActionRequestValidationException e = request2.validate();
45+
assertNotNull(e);
46+
}
47+
48+
public void testResetSettingsSerialization() throws IOException {
49+
ResumeIngestionRequest.ResetSettings settings = new ResumeIngestionRequest.ResetSettings(1, "mode", "value");
50+
51+
try (BytesStreamOutput out = new BytesStreamOutput()) {
52+
settings.writeTo(out);
53+
54+
try (StreamInput in = out.bytes().streamInput()) {
55+
ResumeIngestionRequest.ResetSettings deserialized = new ResumeIngestionRequest.ResetSettings(in);
56+
assertEquals(settings.getShard(), deserialized.getShard());
57+
assertEquals(settings.getMode(), deserialized.getMode());
58+
assertEquals(settings.getValue(), deserialized.getValue());
59+
}
60+
}
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.state;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.common.io.stream.BytesStreamOutput;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
import java.io.IOException;
17+
18+
public class GetIngestionStateRequestTests extends OpenSearchTestCase {
19+
20+
public void testSerialization() throws IOException {
21+
String[] indices = new String[] { "index1", "index2" };
22+
int[] shards = new int[] { 0, 1, 2 };
23+
GetIngestionStateRequest request = new GetIngestionStateRequest(indices);
24+
request.setShards(shards);
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
request.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
GetIngestionStateRequest deserializedRequest = new GetIngestionStateRequest(in);
31+
assertArrayEquals(request.getIndex(), deserializedRequest.getIndex());
32+
assertArrayEquals(request.getShards(), deserializedRequest.getShards());
33+
}
34+
}
35+
}
36+
37+
public void testValidation() {
38+
// Test with valid indices
39+
GetIngestionStateRequest request1 = new GetIngestionStateRequest(new String[] { "index1", "index2" });
40+
assertNull(request1.validate());
41+
42+
// Test with null indices
43+
GetIngestionStateRequest request2 = new GetIngestionStateRequest((String[]) null);
44+
ActionRequestValidationException e = request2.validate();
45+
assertNotNull(e);
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.state;
10+
11+
import org.opensearch.common.io.stream.BytesStreamOutput;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import java.io.IOException;
16+
import java.util.Collections;
17+
18+
public class GetIngestionStateResponseTests extends OpenSearchTestCase {
19+
20+
public void testSerialization() throws IOException {
21+
ShardIngestionState[] shardStates = new ShardIngestionState[] {
22+
new ShardIngestionState("index1", 0, "POLLING", "DROP", false),
23+
new ShardIngestionState("index1", 1, "PAUSED", "BLOCK", true) };
24+
GetIngestionStateResponse response = new GetIngestionStateResponse(shardStates, 2, 2, 0, Collections.emptyList());
25+
26+
try (BytesStreamOutput out = new BytesStreamOutput()) {
27+
response.writeTo(out);
28+
29+
try (StreamInput in = out.bytes().streamInput()) {
30+
GetIngestionStateResponse deserializedResponse = new GetIngestionStateResponse(in);
31+
assertEquals(response.getShardStates()[0].getShardId(), deserializedResponse.getShardStates()[0].getShardId());
32+
assertEquals(response.getShardStates()[1].getShardId(), deserializedResponse.getShardStates()[1].getShardId());
33+
assertEquals(response.getTotalShards(), deserializedResponse.getTotalShards());
34+
assertEquals(response.getSuccessfulShards(), deserializedResponse.getSuccessfulShards());
35+
assertEquals(response.getFailedShards(), deserializedResponse.getFailedShards());
36+
}
37+
}
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion.state;
10+
11+
import org.opensearch.common.io.stream.BytesStreamOutput;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import java.io.IOException;
16+
import java.util.List;
17+
import java.util.Map;
18+
19+
public class ShardIngestionStateTests extends OpenSearchTestCase {
20+
21+
public void testSerialization() throws IOException {
22+
ShardIngestionState state = new ShardIngestionState("index1", 0, "POLLING", "DROP", false);
23+
24+
try (BytesStreamOutput out = new BytesStreamOutput()) {
25+
state.writeTo(out);
26+
27+
try (StreamInput in = out.bytes().streamInput()) {
28+
ShardIngestionState deserializedState = new ShardIngestionState(in);
29+
assertEquals(state.getIndex(), deserializedState.getIndex());
30+
assertEquals(state.getShardId(), deserializedState.getShardId());
31+
assertEquals(state.getPollerState(), deserializedState.getPollerState());
32+
assertEquals(state.isPollerPaused(), deserializedState.isPollerPaused());
33+
}
34+
}
35+
}
36+
37+
public void testSerializationWithNullValues() throws IOException {
38+
ShardIngestionState state = new ShardIngestionState("index1", 0, null, null, false);
39+
40+
try (BytesStreamOutput out = new BytesStreamOutput()) {
41+
state.writeTo(out);
42+
43+
try (StreamInput in = out.bytes().streamInput()) {
44+
ShardIngestionState deserializedState = new ShardIngestionState(in);
45+
assertEquals(state.getIndex(), deserializedState.getIndex());
46+
assertEquals(state.getShardId(), deserializedState.getShardId());
47+
assertNull(deserializedState.getPollerState());
48+
assertEquals(state.isPollerPaused(), deserializedState.isPollerPaused());
49+
}
50+
}
51+
}
52+
53+
public void testGroupShardStateByIndex() {
54+
ShardIngestionState[] states = new ShardIngestionState[] {
55+
new ShardIngestionState("index1", 0, "POLLING", "DROP", true),
56+
new ShardIngestionState("index1", 1, "PAUSED", "DROP", false),
57+
new ShardIngestionState("index2", 0, "POLLING", "DROP", true) };
58+
59+
Map<String, List<ShardIngestionState>> groupedStates = ShardIngestionState.groupShardStateByIndex(states);
60+
61+
assertEquals(2, groupedStates.size());
62+
assertEquals(2, groupedStates.get("index1").size());
63+
assertEquals(1, groupedStates.get("index2").size());
64+
65+
// Verify index1 shards
66+
List<ShardIngestionState> indexStates1 = groupedStates.get("index1");
67+
assertEquals(0, indexStates1.get(0).getShardId());
68+
assertEquals(1, indexStates1.get(1).getShardId());
69+
70+
// Verify index2 shards
71+
List<ShardIngestionState> indexStates2 = groupedStates.get("index2");
72+
assertEquals(0, indexStates2.get(0).getShardId());
73+
}
74+
}

0 commit comments

Comments
 (0)