Skip to content

Commit 2a315ef

Browse files
Add rest, transport, service layer changes for Tiering
Signed-off-by: Neetika Singhal <[email protected]>
1 parent fbe048f commit 2a315ef

15 files changed

+1940
-1
lines changed

server/src/main/java/org/opensearch/action/ActionModule.java

+5
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@
288288
import org.opensearch.action.termvectors.TransportMultiTermVectorsAction;
289289
import org.opensearch.action.termvectors.TransportShardMultiTermsVectorAction;
290290
import org.opensearch.action.termvectors.TransportTermVectorsAction;
291+
import org.opensearch.action.tiering.RestWarmTieringAction;
292+
import org.opensearch.action.tiering.TransportWarmTieringAction;
293+
import org.opensearch.action.tiering.WarmTieringAction;
291294
import org.opensearch.action.update.TransportUpdateAction;
292295
import org.opensearch.action.update.UpdateAction;
293296
import org.opensearch.client.node.NodeClient;
@@ -633,6 +636,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
633636
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
634637
actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class);
635638
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
639+
actions.register(WarmTieringAction.INSTANCE, TransportWarmTieringAction.class);
636640
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
637641

638642
actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
@@ -964,6 +968,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
964968
registerHandler.accept(new RestNodeAttrsAction());
965969
registerHandler.accept(new RestRepositoriesAction());
966970
registerHandler.accept(new RestSnapshotAction());
971+
registerHandler.accept(new RestWarmTieringAction());
967972
registerHandler.accept(new RestTemplatesAction());
968973

969974
// Point in time API
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.tiering;
10+
11+
import org.opensearch.client.node.NodeClient;
12+
import org.opensearch.rest.BaseRestHandler;
13+
import org.opensearch.rest.RestHandler;
14+
import org.opensearch.rest.RestRequest;
15+
import org.opensearch.rest.action.RestToXContentListener;
16+
17+
import java.util.List;
18+
19+
import static java.util.Collections.singletonList;
20+
import static org.opensearch.rest.RestRequest.Method.POST;
21+
22+
/**
23+
* Rest Tiering API class to move index from hot to warm
24+
*
25+
* @opensearch.experimental
26+
*/
27+
public class RestWarmTieringAction extends BaseRestHandler {
28+
29+
@Override
30+
public List<RestHandler.Route> routes() {
31+
return singletonList(new RestHandler.Route(POST, "/{index}/_tier/_warm"));
32+
}
33+
34+
@Override
35+
public String getName() {
36+
return "tiering_warm";
37+
}
38+
39+
@Override
40+
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
41+
final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest(request.param("index"));
42+
tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout()));
43+
tieringIndexRequest.clusterManagerNodeTimeout(
44+
request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout())
45+
);
46+
tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false));
47+
return channel -> client.admin().cluster().execute(WarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel));
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.tiering;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.IndicesRequest;
13+
import org.opensearch.action.support.IndicesOptions;
14+
import org.opensearch.action.support.master.AcknowledgedRequest;
15+
import org.opensearch.common.annotation.ExperimentalApi;
16+
import org.opensearch.core.common.io.stream.StreamInput;
17+
import org.opensearch.core.common.io.stream.StreamOutput;
18+
19+
import java.io.IOException;
20+
import java.util.Arrays;
21+
import java.util.Objects;
22+
23+
import static org.opensearch.action.ValidateActions.addValidationError;
24+
25+
/**
26+
* Represents the tiering request for indices
27+
* to move to a different tier
28+
*
29+
* @opensearch.experimental
30+
*/
31+
@ExperimentalApi
32+
public class TieringIndexRequest extends AcknowledgedRequest<TieringIndexRequest> implements IndicesRequest.Replaceable {
33+
34+
private String[] indices;
35+
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, true);
36+
private boolean waitForCompletion;
37+
public TieringIndexRequest() {
38+
}
39+
40+
@Override
41+
public ActionRequestValidationException validate() {
42+
ActionRequestValidationException validationException = null;
43+
if (indices == null || indices.length == 0) {
44+
validationException = addValidationError("Mandatory parameter - indices is missing from the request", validationException);
45+
}
46+
return validationException;
47+
}
48+
49+
public TieringIndexRequest(String... indices) {
50+
this.indices = indices;
51+
}
52+
53+
public TieringIndexRequest(StreamInput in) throws IOException {
54+
super(in);
55+
indices = in.readStringArray();
56+
indicesOptions = IndicesOptions.readIndicesOptions(in);
57+
waitForCompletion = in.readBoolean();
58+
}
59+
60+
@Override
61+
public void writeTo(StreamOutput out) throws IOException {
62+
super.writeTo(out);
63+
out.writeStringArray(indices);
64+
indicesOptions.writeIndicesOptions(out);
65+
out.writeBoolean(waitForCompletion);
66+
}
67+
68+
@Override
69+
public String[] indices() {
70+
return indices;
71+
}
72+
73+
@Override
74+
public IndicesOptions indicesOptions() {
75+
return indicesOptions;
76+
}
77+
78+
@Override
79+
public TieringIndexRequest indices(String... indices) {
80+
this.indices = indices;
81+
return this;
82+
}
83+
84+
public TieringIndexRequest indicesOptions(IndicesOptions indicesOptions) {
85+
this.indicesOptions = indicesOptions;
86+
return this;
87+
}
88+
89+
/**
90+
* If this parameter is set to true the operation will wait for completion of tiering process before returning.
91+
*
92+
* @param waitForCompletion if true the operation will wait for completion
93+
* @return this request
94+
*/
95+
public TieringIndexRequest waitForCompletion(boolean waitForCompletion) {
96+
this.waitForCompletion = waitForCompletion;
97+
return this;
98+
}
99+
100+
/**
101+
* Returns wait for completion setting
102+
*
103+
* @return true if the operation will wait for completion
104+
*/
105+
public boolean waitForCompletion() {
106+
return waitForCompletion;
107+
}
108+
109+
@Override
110+
public boolean equals(Object o) {
111+
if (this == o) {
112+
return true;
113+
}
114+
if (o == null || getClass() != o.getClass()) {
115+
return false;
116+
}
117+
TieringIndexRequest that = (TieringIndexRequest) o;
118+
return clusterManagerNodeTimeout.equals(that.clusterManagerNodeTimeout)
119+
&& timeout.equals(that.timeout)
120+
&& Objects.equals(indicesOptions, that.indicesOptions)
121+
&& Arrays.equals(indices, that.indices)
122+
&& waitForCompletion == that.waitForCompletion;
123+
}
124+
125+
@Override
126+
public int hashCode() {
127+
return Objects.hash(clusterManagerNodeTimeout, timeout, indicesOptions, waitForCompletion, Arrays.hashCode(indices));
128+
}
129+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.tiering;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
14+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
15+
import org.opensearch.action.support.ActionFilters;
16+
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
17+
import org.opensearch.action.support.master.AcknowledgedResponse;
18+
import org.opensearch.client.Client;
19+
import org.opensearch.cluster.ClusterState;
20+
import org.opensearch.cluster.block.ClusterBlockException;
21+
import org.opensearch.cluster.block.ClusterBlockLevel;
22+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.opensearch.cluster.service.ClusterService;
24+
import org.opensearch.common.inject.Inject;
25+
import org.opensearch.core.action.ActionListener;
26+
import org.opensearch.core.common.io.stream.StreamInput;
27+
import org.opensearch.threadpool.ThreadPool;
28+
import org.opensearch.tiering.TieringClusterStateListener;
29+
import org.opensearch.tiering.TieringService;
30+
import org.opensearch.transport.TransportService;
31+
32+
import java.io.IOException;
33+
34+
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
35+
36+
/**
37+
* Transport Tiering API class to move index from hot to warm
38+
*
39+
* @opensearch.experimental
40+
*/
41+
public class TransportWarmTieringAction extends TransportClusterManagerNodeAction<TieringIndexRequest, AcknowledgedResponse> {
42+
43+
private static final Logger logger = LogManager.getLogger(TransportWarmTieringAction.class);
44+
private final TieringService tieringService;
45+
private final Client client;
46+
@Inject
47+
public TransportWarmTieringAction(TransportService transportService, ClusterService clusterService,
48+
ThreadPool threadPool, TieringService tieringService,
49+
ActionFilters actionFilters,
50+
IndexNameExpressionResolver indexNameExpressionResolver, Client client) {
51+
super(WarmTieringAction.NAME, transportService, clusterService, threadPool, actionFilters,
52+
TieringIndexRequest::new, indexNameExpressionResolver);
53+
this.client = client;
54+
this.tieringService = tieringService;
55+
}
56+
57+
@Override
58+
protected String executor() {
59+
return ThreadPool.Names.SAME;
60+
}
61+
62+
@Override
63+
protected AcknowledgedResponse read(StreamInput in) throws IOException {
64+
return new AcknowledgedResponse(in);
65+
}
66+
67+
@Override
68+
protected ClusterBlockException checkBlock(TieringIndexRequest request, ClusterState state) {
69+
ClusterBlockException blockException =
70+
state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
71+
if (blockException == null) {
72+
// Check indices level block
73+
blockException = state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
74+
indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request));
75+
}
76+
return blockException;
77+
}
78+
79+
@Override
80+
protected void clusterManagerOperation(TieringIndexRequest request, ClusterState state,
81+
ActionListener<AcknowledgedResponse> listener) throws Exception {
82+
// Collect node stats to get node level filesystem info. The response will be used for performing some
83+
// validations.
84+
client.admin().cluster().prepareNodesStats().clear().addMetric(FS.metricName()).execute(
85+
new ActionListener<NodesStatsResponse>() {
86+
@Override
87+
public void onResponse(NodesStatsResponse nodesStatsResponse) {
88+
// Collect index level stats. This response is also used for validations.
89+
client.admin().indices().prepareStats().clear().setStore(true).setIndices(request.indices()).execute(
90+
new ActionListener<IndicesStatsResponse>() {
91+
@Override
92+
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
93+
tieringService.tier(request, nodesStatsResponse, indicesStatsResponse,
94+
ActionListener.delegateFailure(listener, (delegatedListener, acknowledgedResponse) -> {
95+
if (request.waitForCompletion()) {
96+
TieringClusterStateListener.createAndRegisterListener(
97+
clusterService,
98+
new AcknowledgedResponse(acknowledgedResponse.isAcknowledged()),
99+
delegatedListener
100+
);
101+
} else {
102+
delegatedListener.onResponse(new AcknowledgedResponse(true));
103+
}
104+
}));
105+
}
106+
@Override
107+
public void onFailure(Exception e) {
108+
logger.debug("Indices stats call failed with exception", e);
109+
listener.onFailure(e);
110+
}
111+
});
112+
}
113+
@Override
114+
public void onFailure(Exception e) {
115+
logger.debug("Node stats call failed with exception", e);
116+
listener.onFailure(e);
117+
}
118+
});
119+
}
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.tiering;
10+
11+
import org.opensearch.action.ActionType;
12+
import org.opensearch.action.support.master.AcknowledgedResponse;
13+
14+
/**
15+
* Tiering action class to move index from hot to warm
16+
*
17+
* @opensearch.experimental
18+
*/
19+
public class WarmTieringAction extends ActionType<AcknowledgedResponse> {
20+
21+
public static final WarmTieringAction INSTANCE = new WarmTieringAction();
22+
public static final String NAME = "indices:admin/tiering/warm";
23+
24+
public WarmTieringAction() {
25+
super(NAME, AcknowledgedResponse::new);
26+
}
27+
}

server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.cluster.metadata.IndexMetadata;
1212
import org.opensearch.cluster.node.DiscoveryNode;
1313
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
14+
import org.opensearch.index.IndexModule;
1415

1516
/**
1617
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
@@ -58,6 +59,8 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
5859
* @return {@link RoutingPool} for the given index.
5960
*/
6061
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
61-
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
62+
return indexMetadata.isRemoteSnapshot() ||
63+
IndexModule.DataLocalityType.PARTIAL.name()
64+
.equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())) ? REMOTE_CAPABLE : LOCAL_ONLY;
6265
}
6366
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+2
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@
165165
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
166166
import org.opensearch.telemetry.TelemetrySettings;
167167
import org.opensearch.threadpool.ThreadPool;
168+
import org.opensearch.tiering.HotToWarmTieringService;
168169
import org.opensearch.transport.ProxyConnectionStrategy;
169170
import org.opensearch.transport.RemoteClusterService;
170171
import org.opensearch.transport.RemoteConnectionStrategy;
@@ -278,6 +279,7 @@ public void apply(Settings value, Settings current, Settings previous) {
278279
FsRepository.REPOSITORIES_CHUNK_SIZE_SETTING,
279280
FsRepository.REPOSITORIES_COMPRESS_SETTING,
280281
FsRepository.REPOSITORIES_LOCATION_SETTING,
282+
HotToWarmTieringService.WARM_TIERING_MAX_SHARD_SIZE,
281283
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
282284
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
283285
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,

0 commit comments

Comments
 (0)