Skip to content

Commit 3f8b3d6

Browse files
neetikasinghalwangdongyu.danny
authored and
wangdongyu.danny
committed
Add rest, transport layer changes for Hot to warm tiering - dedicated setup (opensearch-project#13980)
Signed-off-by: Neetika Singhal <[email protected]>
1 parent 0b811bc commit 3f8b3d6

17 files changed

+1630
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2727
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
2828
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
2929
- Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847)))
30+
- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980))
3031

3132
### Dependencies
3233
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))

server/src/main/java/org/opensearch/action/ActionModule.java

+9
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,9 @@
216216
import org.opensearch.action.admin.indices.template.put.TransportPutComponentTemplateAction;
217217
import org.opensearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
218218
import org.opensearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
219+
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction;
220+
import org.opensearch.action.admin.indices.tiering.RestWarmTieringAction;
221+
import org.opensearch.action.admin.indices.tiering.TransportHotToWarmTieringAction;
219222
import org.opensearch.action.admin.indices.upgrade.get.TransportUpgradeStatusAction;
220223
import org.opensearch.action.admin.indices.upgrade.get.UpgradeStatusAction;
221224
import org.opensearch.action.admin.indices.upgrade.post.TransportUpgradeAction;
@@ -634,6 +637,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
634637
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
635638
actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class);
636639
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
640+
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) {
641+
actions.register(HotToWarmTieringAction.INSTANCE, TransportHotToWarmTieringAction.class);
642+
}
637643
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
638644

639645
actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
@@ -966,6 +972,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
966972
registerHandler.accept(new RestNodeAttrsAction());
967973
registerHandler.accept(new RestRepositoriesAction());
968974
registerHandler.accept(new RestSnapshotAction());
975+
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) {
976+
registerHandler.accept(new RestWarmTieringAction());
977+
}
969978
registerHandler.accept(new RestTemplatesAction());
970979

971980
// Point in time API
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.tiering;
10+
11+
import org.opensearch.action.ActionType;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
14+
/**
15+
* Tiering action to move indices from hot to warm
16+
*
17+
* @opensearch.experimental
18+
*/
19+
@ExperimentalApi
20+
public class HotToWarmTieringAction extends ActionType<HotToWarmTieringResponse> {
21+
22+
public static final HotToWarmTieringAction INSTANCE = new HotToWarmTieringAction();
23+
public static final String NAME = "indices:admin/tier/hot_to_warm";
24+
25+
private HotToWarmTieringAction() {
26+
super(NAME, HotToWarmTieringResponse::new);
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.tiering;
10+
11+
import org.opensearch.action.support.master.AcknowledgedResponse;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.core.common.Strings;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.core.common.io.stream.StreamOutput;
16+
import org.opensearch.core.common.io.stream.Writeable;
17+
import org.opensearch.core.xcontent.MediaTypeRegistry;
18+
import org.opensearch.core.xcontent.ToXContentFragment;
19+
import org.opensearch.core.xcontent.XContentBuilder;
20+
21+
import java.io.IOException;
22+
import java.util.Collections;
23+
import java.util.Comparator;
24+
import java.util.List;
25+
import java.util.Objects;
26+
import java.util.stream.Collectors;
27+
28+
/**
29+
* Response object for an {@link TieringIndexRequest} which is sent to client after the initial verification of the request
30+
* by the backend service. The format of the response object will be as below:
31+
*
32+
* {
33+
* "acknowledged": true/false,
34+
* "failed_indices": [
35+
* {
36+
* "index": "index1",
37+
* "error": "Low disk threshold watermark breached"
38+
* },
39+
* {
40+
* "index": "index2",
41+
* "error": "Index is not a remote store backed index"
42+
* }
43+
* ]
44+
* }
45+
*
46+
* @opensearch.experimental
47+
*/
48+
@ExperimentalApi
49+
public class HotToWarmTieringResponse extends AcknowledgedResponse {
50+
51+
private final List<IndexResult> failedIndices;
52+
53+
public HotToWarmTieringResponse(boolean acknowledged) {
54+
super(acknowledged);
55+
this.failedIndices = Collections.emptyList();
56+
}
57+
58+
public HotToWarmTieringResponse(boolean acknowledged, List<IndexResult> indicesResults) {
59+
super(acknowledged);
60+
this.failedIndices = (indicesResults == null)
61+
? Collections.emptyList()
62+
: indicesResults.stream().sorted(Comparator.comparing(IndexResult::getIndex)).collect(Collectors.toList());
63+
}
64+
65+
public HotToWarmTieringResponse(StreamInput in) throws IOException {
66+
super(in);
67+
failedIndices = Collections.unmodifiableList(in.readList(IndexResult::new));
68+
}
69+
70+
public List<IndexResult> getFailedIndices() {
71+
return this.failedIndices;
72+
}
73+
74+
@Override
75+
public void writeTo(StreamOutput out) throws IOException {
76+
super.writeTo(out);
77+
out.writeList(this.failedIndices);
78+
}
79+
80+
@Override
81+
protected void addCustomFields(XContentBuilder builder, Params params) throws IOException {
82+
super.addCustomFields(builder, params);
83+
builder.startArray("failed_indices");
84+
85+
for (IndexResult failedIndex : failedIndices) {
86+
failedIndex.toXContent(builder, params);
87+
}
88+
builder.endArray();
89+
}
90+
91+
@Override
92+
public String toString() {
93+
return Strings.toString(MediaTypeRegistry.JSON, this);
94+
}
95+
96+
/**
97+
* Inner class to represent the result of a failed index for tiering.
98+
* @opensearch.experimental
99+
*/
100+
@ExperimentalApi
101+
public static class IndexResult implements Writeable, ToXContentFragment {
102+
private final String index;
103+
private final String failureReason;
104+
105+
public IndexResult(String index, String failureReason) {
106+
this.index = index;
107+
this.failureReason = failureReason;
108+
}
109+
110+
IndexResult(StreamInput in) throws IOException {
111+
this.index = in.readString();
112+
this.failureReason = in.readString();
113+
}
114+
115+
public String getIndex() {
116+
return index;
117+
}
118+
119+
public String getFailureReason() {
120+
return failureReason;
121+
}
122+
123+
@Override
124+
public void writeTo(StreamOutput out) throws IOException {
125+
out.writeString(index);
126+
out.writeString(failureReason);
127+
}
128+
129+
@Override
130+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
131+
builder.startObject();
132+
builder.field("index", index);
133+
builder.field("error", failureReason);
134+
return builder.endObject();
135+
}
136+
137+
@Override
138+
public boolean equals(Object o) {
139+
if (this == o) return true;
140+
if (o == null || getClass() != o.getClass()) return false;
141+
IndexResult that = (IndexResult) o;
142+
return Objects.equals(index, that.index) && Objects.equals(failureReason, that.failureReason);
143+
}
144+
145+
@Override
146+
public int hashCode() {
147+
int result = Objects.hashCode(index);
148+
result = 31 * result + Objects.hashCode(failureReason);
149+
return result;
150+
}
151+
152+
@Override
153+
public String toString() {
154+
return Strings.toString(MediaTypeRegistry.JSON, this);
155+
}
156+
}
157+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.tiering;
10+
11+
import org.opensearch.action.support.IndicesOptions;
12+
import org.opensearch.client.node.NodeClient;
13+
import org.opensearch.common.annotation.ExperimentalApi;
14+
import org.opensearch.rest.BaseRestHandler;
15+
import org.opensearch.rest.RestHandler;
16+
import org.opensearch.rest.RestRequest;
17+
import org.opensearch.rest.action.RestToXContentListener;
18+
19+
import java.util.List;
20+
21+
import static java.util.Collections.singletonList;
22+
import static org.opensearch.core.common.Strings.splitStringByCommaToArray;
23+
import static org.opensearch.rest.RestRequest.Method.POST;
24+
25+
/**
26+
* Rest Tiering API action to move indices to warm tier
27+
*
28+
* @opensearch.experimental
29+
*/
30+
@ExperimentalApi
31+
public class RestWarmTieringAction extends BaseRestHandler {
32+
33+
private static final String TARGET_TIER = "warm";
34+
35+
@Override
36+
public List<RestHandler.Route> routes() {
37+
return singletonList(new RestHandler.Route(POST, "/{index}/_tier/" + TARGET_TIER));
38+
}
39+
40+
@Override
41+
public String getName() {
42+
return "warm_tiering_action";
43+
}
44+
45+
@Override
46+
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
47+
final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest(
48+
TARGET_TIER,
49+
splitStringByCommaToArray(request.param("index"))
50+
);
51+
tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout()));
52+
tieringIndexRequest.clusterManagerNodeTimeout(
53+
request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout())
54+
);
55+
tieringIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, tieringIndexRequest.indicesOptions()));
56+
tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", tieringIndexRequest.waitForCompletion()));
57+
return channel -> client.admin()
58+
.cluster()
59+
.execute(HotToWarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel));
60+
}
61+
}

0 commit comments

Comments
 (0)