Skip to content

Commit ddadd7a

Browse files
shiv0408wangdongyu.danny
authored and
wangdongyu.danny
committed
[Remote State] Create interface RemoteEntitiesManager (opensearch-project#14671)
* Create interface RemoteEntitiesManager Signed-off-by: Shivansh Arora <[email protected]>
1 parent 8b3a39a commit ddadd7a

15 files changed

+769
-674
lines changed

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.opensearch.cluster.DiffableUtils;
1616
import org.opensearch.cluster.routing.IndexRoutingTable;
1717
import org.opensearch.cluster.routing.RoutingTable;
18-
import org.opensearch.common.CheckedRunnable;
1918
import org.opensearch.common.blobstore.BlobPath;
2019
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
2120
import org.opensearch.common.remote.RemoteWritableEntityStore;
@@ -102,16 +101,16 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
102101
}
103102

104103
/**
105-
* Create async action for writing one {@code IndexRoutingTable} to remote store
104+
* Async action for writing one {@code IndexRoutingTable} to remote store
105+
*
106106
* @param term current term
107107
* @param version current version
108108
* @param clusterUUID current cluster UUID
109109
* @param indexRouting indexRoutingTable to write to remote store
110110
* @param latchedActionListener listener for handling async action response
111-
* @return returns runnable async action
112111
*/
113112
@Override
114-
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
113+
public void getAsyncIndexRoutingWriteAction(
115114
String clusterUUID,
116115
long term,
117116
long version,
@@ -128,7 +127,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
128127
)
129128
);
130129

131-
return () -> remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener);
130+
remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener);
132131
}
133132

134133
/**
@@ -156,7 +155,7 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
156155
}
157156

158157
@Override
159-
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
158+
public void getAsyncIndexRoutingReadAction(
160159
String clusterUUID,
161160
String uploadedFilename,
162161
LatchedActionListener<IndexRoutingTable> latchedActionListener
@@ -169,7 +168,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
169168

170169
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);
171170

172-
return () -> remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
171+
remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
173172
}
174173

175174
@Override

server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.opensearch.cluster.DiffableUtils;
1313
import org.opensearch.cluster.routing.IndexRoutingTable;
1414
import org.opensearch.cluster.routing.RoutingTable;
15-
import org.opensearch.common.CheckedRunnable;
1615
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
1716
import org.opensearch.gateway.remote.ClusterMetadataManifest;
1817

@@ -39,15 +38,14 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
3938
}
4039

4140
@Override
42-
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
41+
public void getAsyncIndexRoutingWriteAction(
4342
String clusterUUID,
4443
long term,
4544
long version,
4645
IndexRoutingTable indexRouting,
4746
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
4847
) {
4948
// noop
50-
return () -> {};
5149
}
5250

5351
@Override
@@ -61,13 +59,12 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
6159
}
6260

6361
@Override
64-
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
62+
public void getAsyncIndexRoutingReadAction(
6563
String clusterUUID,
6664
String uploadedFilename,
6765
LatchedActionListener<IndexRoutingTable> latchedActionListener
6866
) {
6967
// noop
70-
return () -> {};
7168
}
7269

7370
@Override

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.opensearch.cluster.DiffableUtils;
1313
import org.opensearch.cluster.routing.IndexRoutingTable;
1414
import org.opensearch.cluster.routing.RoutingTable;
15-
import org.opensearch.common.CheckedRunnable;
1615
import org.opensearch.common.lifecycle.LifecycleComponent;
1716
import org.opensearch.core.common.io.stream.StreamInput;
1817
import org.opensearch.core.common.io.stream.StreamOutput;
@@ -43,7 +42,7 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {
4342

4443
List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable);
4544

46-
CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
45+
void getAsyncIndexRoutingReadAction(
4746
String clusterUUID,
4847
String uploadedFilename,
4948
LatchedActionListener<IndexRoutingTable> latchedActionListener
@@ -59,7 +58,7 @@ DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>
5958
RoutingTable after
6059
);
6160

62-
CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
61+
void getAsyncIndexRoutingWriteAction(
6362
String clusterUUID,
6463
long term,
6564
long version,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.remote;
10+
11+
import org.opensearch.core.action.ActionListener;
12+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
13+
import org.opensearch.gateway.remote.model.RemoteReadResult;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
/**
19+
* An abstract class that provides a base implementation for managing remote entities in the remote store.
20+
*/
21+
public abstract class AbstractRemoteWritableEntityManager implements RemoteWritableEntityManager {
22+
/**
23+
* A map that stores the remote writable entity stores, keyed by the entity type.
24+
*/
25+
protected final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores = new HashMap<>();
26+
27+
/**
28+
* Retrieves the remote writable entity store for the given entity.
29+
*
30+
* @param entity the entity for which the store is requested
31+
* @return the remote writable entity store for the given entity
32+
* @throws IllegalArgumentException if the entity type is unknown
33+
*/
34+
protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
35+
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
36+
if (remoteStore == null) {
37+
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
38+
}
39+
return remoteStore;
40+
}
41+
42+
/**
43+
* Returns an ActionListener for handling the write operation for the specified component, remote object, and latched action listener.
44+
*
45+
* @param component the component for which the write operation is performed
46+
* @param remoteEntity the remote object to be written
47+
* @param listener the listener to be notified when the write operation completes
48+
* @return an ActionListener for handling the write operation
49+
*/
50+
protected abstract ActionListener<Void> getWrappedWriteListener(
51+
String component,
52+
AbstractRemoteWritableBlobEntity remoteEntity,
53+
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
54+
);
55+
56+
/**
57+
* Returns an ActionListener for handling the read operation for the specified component,
58+
* remote object, and latched action listener.
59+
*
60+
* @param component the component for which the read operation is performed
61+
* @param remoteEntity the remote object to be read
62+
* @param listener the listener to be notified when the read operation completes
63+
* @return an ActionListener for handling the read operation
64+
*/
65+
protected abstract ActionListener<Object> getWrappedReadListener(
66+
String component,
67+
AbstractRemoteWritableBlobEntity remoteEntity,
68+
ActionListener<RemoteReadResult> listener
69+
);
70+
71+
@Override
72+
public void writeAsync(
73+
String component,
74+
AbstractRemoteWritableBlobEntity entity,
75+
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
76+
) {
77+
getStore(entity).writeAsync(entity, getWrappedWriteListener(component, entity, listener));
78+
}
79+
80+
@Override
81+
public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
82+
getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener));
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.remote;
10+
11+
import org.opensearch.core.action.ActionListener;
12+
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
13+
import org.opensearch.gateway.remote.model.RemoteReadResult;
14+
15+
/**
16+
* The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store
17+
*/
18+
public interface RemoteWritableEntityManager {
19+
20+
/**
21+
* Performs an asynchronous read operation for the specified component and entity.
22+
*
23+
* @param component the component for which the read operation is performed
24+
* @param entity the entity to be read
25+
* @param listener the listener to be notified when the read operation completes.
26+
* The listener's {@link ActionListener#onResponse(Object)} method
27+
* is called with a {@link RemoteReadResult} object containing the
28+
* read data on successful read. The
29+
* {@link ActionListener#onFailure(Exception)} method is called with
30+
* an exception if the read operation fails.
31+
*/
32+
void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener);
33+
34+
/**
35+
* Performs an asynchronous write operation for the specified component and entity.
36+
*
37+
* @param component the component for which the write operation is performed
38+
* @param entity the entity to be written
39+
* @param listener the listener to be notified when the write operation completes.
40+
* The listener's {@link ActionListener#onResponse(Object)} method
41+
* is called with a {@link UploadedMetadata} object containing the
42+
* uploaded metadata on successful write. The
43+
* {@link ActionListener#onFailure(Exception)} method is called with
44+
* an exception if the write operation fails.
45+
*/
46+
void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<UploadedMetadata> listener);
47+
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java

+15-40
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,11 @@
88

99
package org.opensearch.gateway.remote;
1010

11-
import org.opensearch.action.LatchedActionListener;
1211
import org.opensearch.cluster.ClusterState;
1312
import org.opensearch.cluster.DiffableUtils;
1413
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
15-
import org.opensearch.common.CheckedRunnable;
1614
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
17-
import org.opensearch.common.remote.RemoteWritableEntityStore;
15+
import org.opensearch.common.remote.AbstractRemoteWritableEntityManager;
1816
import org.opensearch.core.action.ActionListener;
1917
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
2018
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
@@ -26,23 +24,19 @@
2624
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2725
import org.opensearch.threadpool.ThreadPool;
2826

29-
import java.io.IOException;
3027
import java.util.Collections;
31-
import java.util.HashMap;
3228
import java.util.Map;
3329

3430
/**
3531
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
3632
*
3733
* @opensearch.internal
3834
*/
39-
public class RemoteClusterStateAttributesManager {
35+
public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableEntityManager {
4036
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
4137
public static final String DISCOVERY_NODES = "nodes";
4238
public static final String CLUSTER_BLOCKS = "blocks";
4339
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
44-
private final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores;
45-
private final NamedWriteableRegistry namedWriteableRegistry;
4640

4741
RemoteClusterStateAttributesManager(
4842
String clusterName,
@@ -51,8 +45,6 @@ public class RemoteClusterStateAttributesManager {
5145
NamedWriteableRegistry namedWriteableRegistry,
5246
ThreadPool threadpool
5347
) {
54-
this.namedWriteableRegistry = namedWriteableRegistry;
55-
this.remoteWritableEntityStores = new HashMap<>();
5648
this.remoteWritableEntityStores.put(
5749
RemoteDiscoveryNodes.DISCOVERY_NODES,
5850
new RemoteClusterStateBlobStore<>(
@@ -85,46 +77,28 @@ public class RemoteClusterStateAttributesManager {
8577
);
8678
}
8779

88-
/**
89-
* Allows async upload of Cluster State Attribute components to remote
90-
*/
91-
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
80+
@Override
81+
protected ActionListener<Void> getWrappedWriteListener(
9282
String component,
93-
AbstractRemoteWritableBlobEntity blobEntity,
94-
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
95-
) {
96-
return () -> getStore(blobEntity).writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener));
97-
}
98-
99-
private ActionListener<Void> getActionListener(
100-
String component,
101-
AbstractRemoteWritableBlobEntity remoteObject,
102-
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
83+
AbstractRemoteWritableBlobEntity remoteEntity,
84+
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
10385
) {
10486
return ActionListener.wrap(
105-
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
106-
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex))
87+
resp -> listener.onResponse(remoteEntity.getUploadedMetadata()),
88+
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex))
10789
);
10890
}
10991

110-
private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
111-
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
112-
if (remoteStore == null) {
113-
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
114-
}
115-
return remoteStore;
116-
}
117-
118-
public CheckedRunnable<IOException> getAsyncMetadataReadAction(
92+
@Override
93+
protected ActionListener<Object> getWrappedReadListener(
11994
String component,
120-
AbstractRemoteWritableBlobEntity blobEntity,
121-
LatchedActionListener<RemoteReadResult> listener
95+
AbstractRemoteWritableBlobEntity remoteEntity,
96+
ActionListener<RemoteReadResult> listener
12297
) {
123-
final ActionListener actionListener = ActionListener.wrap(
98+
return ActionListener.wrap(
12499
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
125-
listener::onFailure
100+
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex))
126101
);
127-
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
128102
}
129103

130104
public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> getUpdatedCustoms(
@@ -158,4 +132,5 @@ public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterSta
158132
NonDiffableValueSerializer.getAbstractInstance()
159133
);
160134
}
135+
161136
}

0 commit comments

Comments
 (0)