Skip to content

Commit 5756fd0

Browse files
Refactor and support shard level acks
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 1b6951d commit 5756fd0

27 files changed

+839
-437
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- [Rule Based Auto-tagging] Add rule schema for auto tagging ([#17238](https://github.com/opensearch-project/OpenSearch/pull/17238))
1010
- Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573))
1111
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
12-
- Fix systemd integTest on deb regarding path ownership check ([#17641](https://github.com/opensearch-project/OpenSearch/pull/17641))
12+
- Fix systemd integTest on deb regarding path ownership check ([#17641](https://github.com/opensearch-project/OpenSearch/pull/17641))
1313
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
14+
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))
1415

1516
### Changed
1617
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,11 @@ public void testPauseAndResumeIngestion() throws Exception {
172172
// pause ingestion
173173
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
174174
assertTrue(pauseResponse.isAcknowledged());
175+
assertTrue(pauseResponse.isShardsAcknowledged());
175176
waitForState(() -> {
176177
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
177-
return Arrays.stream(ingestionState.getShardStates()).allMatch(state -> state.getPollerState().equalsIgnoreCase("paused"));
178+
return Arrays.stream(ingestionState.getShardStates())
179+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
178180
});
179181

180182
// verify ingestion state is persisted
@@ -191,17 +193,20 @@ public void testPauseAndResumeIngestion() throws Exception {
191193
assertEquals(2, getSearchableDocCount(nodeB));
192194
waitForState(() -> {
193195
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
194-
return Arrays.stream(ingestionState.getShardStates()).allMatch(state -> state.getPollerState().equalsIgnoreCase("paused"));
196+
return Arrays.stream(ingestionState.getShardStates())
197+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
195198
});
196199

197200
// resume ingestion
198201
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName);
199202
assertTrue(resumeResponse.isAcknowledged());
203+
assertTrue(resumeResponse.isShardsAcknowledged());
200204
waitForState(() -> {
201205
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
202206
return Arrays.stream(ingestionState.getShardStates())
203207
.allMatch(
204-
state -> state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing")
208+
state -> state.isPollerPaused() == false
209+
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
205210
);
206211
});
207212
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));

server/src/main/java/org/opensearch/action/ActionModule.java

+3
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@
204204
import org.opensearch.action.admin.indices.streamingingestion.resume.TransportResumeIngestionAction;
205205
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateAction;
206206
import org.opensearch.action.admin.indices.streamingingestion.state.TransportGetIngestionStateAction;
207+
import org.opensearch.action.admin.indices.streamingingestion.state.TransportUpdateIngestionStateAction;
208+
import org.opensearch.action.admin.indices.streamingingestion.state.UpdateIngestionStateAction;
207209
import org.opensearch.action.admin.indices.template.delete.DeleteComponentTemplateAction;
208210
import org.opensearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
209211
import org.opensearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
@@ -819,6 +821,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
819821
actions.register(PauseIngestionAction.INSTANCE, TransportPauseIngestionAction.class);
820822
actions.register(ResumeIngestionAction.INSTANCE, TransportResumeIngestionAction.class);
821823
actions.register(GetIngestionStateAction.INSTANCE, TransportGetIngestionStateAction.class);
824+
actions.register(UpdateIngestionStateAction.INSTANCE, TransportUpdateIngestionStateAction.class);
822825

823826
return unmodifiableMap(actions.getRegistry());
824827
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.core.common.Strings;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.core.common.io.stream.StreamOutput;
15+
import org.opensearch.core.common.io.stream.Writeable;
16+
import org.opensearch.core.xcontent.MediaTypeRegistry;
17+
import org.opensearch.core.xcontent.ToXContentFragment;
18+
import org.opensearch.core.xcontent.XContentBuilder;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Objects;
26+
27+
/**
28+
* Indicates ingestion failures at index and shard level.
29+
*
30+
* @opensearch.experimental
31+
*/
32+
@ExperimentalApi
33+
public class IngestionStateShardFailure implements Writeable, ToXContentFragment {
34+
private static final String SHARD = "shard";
35+
private static final String ERROR = "error";
36+
37+
private final String index;
38+
private final int shard;
39+
private String errorMessage;
40+
41+
public IngestionStateShardFailure(String index, int shard, String errorMessage) {
42+
this.index = index;
43+
this.shard = shard;
44+
this.errorMessage = errorMessage;
45+
}
46+
47+
public IngestionStateShardFailure(StreamInput in) throws IOException {
48+
this.index = in.readString();
49+
this.shard = in.readInt();
50+
this.errorMessage = in.readString();
51+
}
52+
53+
public String getIndex() {
54+
return index;
55+
}
56+
57+
public int getShard() {
58+
return shard;
59+
}
60+
61+
@Override
62+
public void writeTo(StreamOutput out) throws IOException {
63+
out.writeString(index);
64+
out.writeInt(shard);
65+
out.writeString(errorMessage);
66+
}
67+
68+
@Override
69+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
70+
builder.startObject();
71+
builder.field(SHARD, shard);
72+
builder.field(ERROR, errorMessage);
73+
return builder.endObject();
74+
}
75+
76+
@Override
77+
public boolean equals(Object o) {
78+
if (this == o) return true;
79+
if (o == null || getClass() != o.getClass()) return false;
80+
IngestionStateShardFailure that = (IngestionStateShardFailure) o;
81+
return Objects.equals(index, that.index) && shard == that.shard && Objects.equals(errorMessage, that.errorMessage);
82+
}
83+
84+
@Override
85+
public int hashCode() {
86+
int result = Objects.hashCode(index);
87+
result = 31 * result + Objects.hashCode(shard);
88+
result = 31 * result + Objects.hashCode(errorMessage);
89+
return result;
90+
}
91+
92+
@Override
93+
public String toString() {
94+
return Strings.toString(MediaTypeRegistry.JSON, this);
95+
}
96+
97+
/**
98+
* Groups provided shard ingestion state failures by index name.
99+
*/
100+
public static Map<String, List<IngestionStateShardFailure>> groupShardFailuresByIndex(IngestionStateShardFailure[] shardFailures) {
101+
Map<String, List<IngestionStateShardFailure>> shardFailuresByIndex = new HashMap<>();
102+
103+
for (IngestionStateShardFailure shardFailure : shardFailures) {
104+
shardFailuresByIndex.computeIfAbsent(shardFailure.getIndex(), (index) -> new ArrayList<>());
105+
shardFailuresByIndex.get(shardFailure.getIndex()).add(shardFailure);
106+
}
107+
108+
return shardFailuresByIndex;
109+
}
110+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.streamingingestion;
10+
11+
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.core.common.Strings;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.core.common.io.stream.StreamOutput;
16+
import org.opensearch.core.xcontent.MediaTypeRegistry;
17+
import org.opensearch.core.xcontent.XContentBuilder;
18+
19+
import java.io.IOException;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
/**
24+
* Transport response for pausing ingestion.
25+
*
26+
* @opensearch.experimental
27+
*/
28+
@ExperimentalApi
29+
public class IngestionUpdateStateResponse extends AcknowledgedResponse {
30+
protected static final String SHARD_ACK = "shards_acknowledged";
31+
protected static final String ERROR = "error";
32+
protected static final String FAILURES = "failures";
33+
34+
protected boolean shardsAcknowledged;
35+
protected IngestionStateShardFailure[] shardFailuresList;
36+
protected String errorMessage;
37+
38+
public IngestionUpdateStateResponse(StreamInput in) throws IOException {
39+
super(in);
40+
shardFailuresList = in.readArray(IngestionStateShardFailure::new, IngestionStateShardFailure[]::new);
41+
errorMessage = in.readString();
42+
shardsAcknowledged = in.readBoolean();
43+
}
44+
45+
public IngestionUpdateStateResponse(
46+
final boolean acknowledged,
47+
final boolean shardsAcknowledged,
48+
final IngestionStateShardFailure[] shardFailuresList,
49+
String errorMessage
50+
) {
51+
super(acknowledged);
52+
this.shardFailuresList = shardFailuresList;
53+
this.shardsAcknowledged = shardsAcknowledged;
54+
this.errorMessage = errorMessage;
55+
}
56+
57+
@Override
58+
public void writeTo(StreamOutput out) throws IOException {
59+
super.writeTo(out);
60+
out.writeArray(shardFailuresList);
61+
out.writeString(errorMessage);
62+
out.writeBoolean(shardsAcknowledged);
63+
}
64+
65+
@Override
66+
protected void addCustomFields(final XContentBuilder builder, final Params params) throws IOException {
67+
super.addCustomFields(builder, params);
68+
builder.field(SHARD_ACK, shardsAcknowledged);
69+
builder.field(ERROR, errorMessage);
70+
71+
if (shardFailuresList.length > 0) {
72+
Map<String, List<IngestionStateShardFailure>> shardFailuresByIndex = IngestionStateShardFailure.groupShardFailuresByIndex(
73+
shardFailuresList
74+
);
75+
builder.startObject(FAILURES);
76+
for (Map.Entry<String, List<IngestionStateShardFailure>> indexShardFailures : shardFailuresByIndex.entrySet()) {
77+
builder.startArray(indexShardFailures.getKey());
78+
for (IngestionStateShardFailure shardFailure : indexShardFailures.getValue()) {
79+
shardFailure.toXContent(builder, params);
80+
}
81+
builder.endArray();
82+
}
83+
builder.endObject();
84+
}
85+
}
86+
87+
@Override
88+
public String toString() {
89+
return Strings.toString(MediaTypeRegistry.JSON, this);
90+
}
91+
92+
public boolean isShardsAcknowledged() {
93+
return shardsAcknowledged;
94+
}
95+
96+
public IngestionStateShardFailure[] getShardFailures() {
97+
return shardFailuresList;
98+
}
99+
100+
public String getErrorMessage() {
101+
return errorMessage;
102+
}
103+
}

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/pause/PauseIngestionClusterStateUpdateRequest.java

-34
This file was deleted.

0 commit comments

Comments
 (0)