Skip to content

Commit ee7fbbd

Browse files
authored
Implemented computation of segment replication stats at shard level (#17055)
* Implemented computation of segment replication stats at shard level The method implemented here computes the segment replication stats at the shard level, instead of relying on the primary shard to compute stats based on reports from its replicas. Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Updated style checks in the test Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Updated changelog Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * fixed style issues Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Fix the failing integration test Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Fix stylecheck Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Fixed the comments for the initial revision Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Updated to use System.nanoTime() for lag calculation Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Fixed the integration test for node stats Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Modified the version in the ReplicationCheckpoint for backward compatibility Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Added precomputation logic for the stats calculation Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Removed unwanted lines Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Clean up the maps when index closed Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Added a null check for the indexshard checkpoint Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * fix style checks Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Updated version and added bwc for RemoteSegmentMetadata Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Upated the javadoc comments Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Address comments PR Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Removed the latestReceivedCheckpoint map from SegmentReplicationTargetService Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Added granular locks for the concurrency of stats methods Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Style check fixes Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * Changes to maintain atomicity Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * spotlessApply Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * removed querying the remotestore when replication is in progress Signed-off-by: Vinay Krishna Pudyodu <[email protected]> * spotlessApply Signed-off-by: Vinay Krishna Pudyodu <[email protected]> --------- Signed-off-by: Vinay Krishna Pudyodu <[email protected]>
1 parent 0ffed5e commit ee7fbbd

34 files changed

+706
-101
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1111
- Introduce a setting to disable download of full cluster state from remote on term mismatch([#16798](https://github.com/opensearch-project/OpenSearch/pull/16798/))
1212
- Added ability to retrieve value from DocValues in a flat_object filed([#16802](https://github.com/opensearch-project/OpenSearch/pull/16802))
1313
- Improve performace of NumericTermAggregation by avoiding unnecessary sorting([#17252](https://github.com/opensearch-project/OpenSearch/pull/17252))
14+
- Implemented computation of segment replication stats at shard level ([#17055](https://github.com/opensearch-project/OpenSearch/pull/17055))
1415
- [Rule Based Auto-tagging] Add in-memory attribute value store ([#17342](https://github.com/opensearch-project/OpenSearch/pull/17342))
1516

1617
### Dependencies

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import java.util.concurrent.TimeUnit;
115115
import java.util.concurrent.atomic.AtomicBoolean;
116116
import java.util.concurrent.atomic.AtomicReference;
117+
import java.util.function.Function;
117118
import java.util.function.Predicate;
118119
import java.util.stream.Stream;
119120

@@ -136,6 +137,7 @@
136137
import static org.hamcrest.Matchers.instanceOf;
137138
import static org.hamcrest.Matchers.lessThanOrEqualTo;
138139
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
140+
import static org.mockito.Mockito.mock;
139141

140142
public class IndexShardIT extends OpenSearchSingleNodeTestCase {
141143

@@ -716,7 +718,8 @@ public static final IndexShard newIndexShard(
716718
null,
717719
DefaultRemoteStoreSettings.INSTANCE,
718720
false,
719-
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting)
721+
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting),
722+
mock(Function.class)
720723
);
721724
}
722725

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -404,19 +404,17 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception {
404404

405405
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
406406
ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats();
407-
// primary node - should hold replication statistics
407+
// primary node - do not have any replication statistics
408408
if (nodeStats.getNode().getName().equals(primaryNode)) {
409+
assertTrue(replicationStats.getMaxBytesBehind() == 0);
410+
assertTrue(replicationStats.getTotalBytesBehind() == 0);
411+
assertTrue(replicationStats.getMaxReplicationLag() == 0);
412+
}
413+
// replica nodes - should hold replication statistics
414+
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
409415
assertTrue(replicationStats.getMaxBytesBehind() > 0);
410416
assertTrue(replicationStats.getTotalBytesBehind() > 0);
411417
assertTrue(replicationStats.getMaxReplicationLag() > 0);
412-
// 2 replicas so total bytes should be double of max
413-
assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind());
414-
}
415-
// replica nodes - should hold empty replication statistics
416-
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
417-
assertEquals(0, replicationStats.getMaxBytesBehind());
418-
assertEquals(0, replicationStats.getTotalBytesBehind());
419-
assertEquals(0, replicationStats.getMaxReplicationLag());
420418
}
421419
}
422420
// get replication statistics at index level
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.io;
10+
11+
/**
12+
* Interface for factory to provide handler implementation for type {@link T}
13+
* @param <T> The type of content to be read/written to stream
14+
*
15+
* @opensearch.internal
16+
*/
17+
public interface IndexIOStreamHandlerFactory<T> {
18+
19+
/**
20+
* Implements logic to provide handler based on the stream versions
21+
* @param version stream version
22+
* @return Handler for reading/writing content streams to/from - {@link T}
23+
*/
24+
IndexIOStreamHandler<T> getHandler(int version);
25+
}

server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,25 @@
2828
public class VersionedCodecStreamWrapper<T> {
2929
private static final Logger logger = LogManager.getLogger(VersionedCodecStreamWrapper.class);
3030

31-
// TODO This can be updated to hold a streamReadWriteHandlerFactory and get relevant handler based on the stream versions
32-
private final IndexIOStreamHandler<T> indexIOStreamHandler;
31+
private final IndexIOStreamHandlerFactory<T> indexIOStreamHandlerFactory;
32+
private final int minVersion;
3333
private final int currentVersion;
3434
private final String codec;
3535

3636
/**
37-
* @param indexIOStreamHandler handler to read/write stream from T
37+
* @param indexIOStreamHandlerFactory factory for providing handler to read/write stream from T
38+
* @param minVersion earliest supported version of the stream
3839
* @param currentVersion latest supported version of the stream
3940
* @param codec: stream codec
4041
*/
41-
public VersionedCodecStreamWrapper(IndexIOStreamHandler<T> indexIOStreamHandler, int currentVersion, String codec) {
42-
this.indexIOStreamHandler = indexIOStreamHandler;
42+
public VersionedCodecStreamWrapper(
43+
IndexIOStreamHandlerFactory<T> indexIOStreamHandlerFactory,
44+
int minVersion,
45+
int currentVersion,
46+
String codec
47+
) {
48+
this.indexIOStreamHandlerFactory = indexIOStreamHandlerFactory;
49+
this.minVersion = minVersion;
4350
this.currentVersion = currentVersion;
4451
this.codec = codec;
4552
}
@@ -87,7 +94,7 @@ public void writeStream(IndexOutput indexOutput, T content) throws IOException {
8794
*/
8895
private int checkHeader(IndexInput indexInput) throws IOException {
8996
// TODO Once versioning strategy is decided we'll add support for min/max supported versions
90-
return CodecUtil.checkHeader(indexInput, this.codec, this.currentVersion, this.currentVersion);
97+
return CodecUtil.checkHeader(indexInput, this.codec, minVersion, this.currentVersion);
9198
}
9299

93100
/**
@@ -120,8 +127,6 @@ private void writeFooter(IndexOutput indexOutput) throws IOException {
120127
* @param version stream content version
121128
*/
122129
private IndexIOStreamHandler<T> getHandlerForVersion(int version) {
123-
// TODO implement factory and pick relevant handler based on version.
124-
// It should also take into account min and max supported versions
125-
return this.indexIOStreamHandler;
130+
return this.indexIOStreamHandlerFactory.getHandler(version);
126131
}
127132
}

server/src/main/java/org/opensearch/index/IndexModule.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.opensearch.common.util.io.IOUtils;
5858
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
5959
import org.opensearch.core.index.Index;
60+
import org.opensearch.core.index.shard.ShardId;
6061
import org.opensearch.core.indices.breaker.CircuitBreakerService;
6162
import org.opensearch.core.xcontent.NamedXContentRegistry;
6263
import org.opensearch.env.NodeEnvironment;
@@ -652,7 +653,8 @@ public IndexService newIndexService(
652653
clusterDefaultRefreshIntervalSupplier,
653654
recoverySettings,
654655
remoteStoreSettings,
655-
(s) -> {}
656+
(s) -> {},
657+
shardId -> ReplicationStats.empty()
656658
);
657659
}
658660

@@ -678,7 +680,8 @@ public IndexService newIndexService(
678680
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
679681
RecoverySettings recoverySettings,
680682
RemoteStoreSettings remoteStoreSettings,
681-
Consumer<IndexShard> replicator
683+
Consumer<IndexShard> replicator,
684+
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
682685
) throws IOException {
683686
final IndexEventListener eventListener = freeze();
684687
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
@@ -740,7 +743,8 @@ public IndexService newIndexService(
740743
remoteStoreSettings,
741744
fileCache,
742745
compositeIndexSettings,
743-
replicator
746+
replicator,
747+
segmentReplicationStatsProvider
744748
);
745749
success = true;
746750
return indexService;

server/src/main/java/org/opensearch/index/IndexService.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
197197
private final FileCache fileCache;
198198
private final CompositeIndexSettings compositeIndexSettings;
199199
private final Consumer<IndexShard> replicator;
200+
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
200201

201202
public IndexService(
202203
IndexSettings indexSettings,
@@ -235,7 +236,8 @@ public IndexService(
235236
RemoteStoreSettings remoteStoreSettings,
236237
FileCache fileCache,
237238
CompositeIndexSettings compositeIndexSettings,
238-
Consumer<IndexShard> replicator
239+
Consumer<IndexShard> replicator,
240+
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
239241
) {
240242
super(indexSettings);
241243
this.allowExpensiveQueries = allowExpensiveQueries;
@@ -322,6 +324,7 @@ public IndexService(
322324
this.compositeIndexSettings = compositeIndexSettings;
323325
this.fileCache = fileCache;
324326
this.replicator = replicator;
327+
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
325328
updateFsyncTaskIfNecessary();
326329
}
327330

@@ -398,7 +401,8 @@ public IndexService(
398401
remoteStoreSettings,
399402
null,
400403
null,
401-
s -> {}
404+
s -> {},
405+
(shardId) -> ReplicationStats.empty()
402406
);
403407
}
404408

@@ -694,7 +698,8 @@ protected void closeInternal() {
694698
recoverySettings,
695699
remoteStoreSettings,
696700
seedRemote,
697-
discoveryNodes
701+
discoveryNodes,
702+
segmentReplicationStatsProvider
698703
);
699704
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
700705
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/opensearch/index/ReplicationStats.java

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ public ReplicationStats(StreamInput in) throws IOException {
4242
this.maxReplicationLag = in.readVLong();
4343
}
4444

45+
public static ReplicationStats empty() {
46+
return new ReplicationStats();
47+
}
48+
4549
public ReplicationStats() {
4650

4751
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+8-12
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ Runnable getGlobalCheckpointSyncer() {
361361
*/
362362
private final ShardMigrationState shardMigrationState;
363363
private DiscoveryNodes discoveryNodes;
364+
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
364365

365366
public IndexShard(
366367
final ShardRouting shardRouting,
@@ -391,7 +392,8 @@ public IndexShard(
391392
final RecoverySettings recoverySettings,
392393
final RemoteStoreSettings remoteStoreSettings,
393394
boolean seedRemote,
394-
final DiscoveryNodes discoveryNodes
395+
final DiscoveryNodes discoveryNodes,
396+
final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
395397
) throws IOException {
396398
super(shardRouting.shardId(), indexSettings);
397399
assert shardRouting.initializing();
@@ -493,6 +495,7 @@ public boolean shouldCache(Query query) {
493495
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
494496
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
495497
this.discoveryNodes = discoveryNodes;
498+
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
496499
}
497500

498501
public ThreadPool getThreadPool() {
@@ -3233,17 +3236,10 @@ public Set<SegmentReplicationShardStats> getReplicationStatsForTrackedReplicas()
32333236
}
32343237

32353238
public ReplicationStats getReplicationStats() {
3236-
if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary()) {
3237-
final Set<SegmentReplicationShardStats> stats = getReplicationStatsForTrackedReplicas();
3238-
long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L);
3239-
long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum();
3240-
long maxReplicationLag = stats.stream()
3241-
.mapToLong(SegmentReplicationShardStats::getCurrentReplicationLagMillis)
3242-
.max()
3243-
.orElse(0L);
3244-
return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag);
3245-
}
3246-
return new ReplicationStats();
3239+
if (indexSettings.isSegRepEnabledOrRemoteNode() && !routingEntry().primary()) {
3240+
return segmentReplicationStatsProvider.apply(shardId);
3241+
}
3242+
return ReplicationStats.empty();
32473243
}
32483244

32493245
/**

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
3939
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
4040
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
41-
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
41+
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandlerFactory;
4242
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
4343
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
4444
import org.opensearch.threadpool.ThreadPool;
@@ -104,7 +104,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
104104
private Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore;
105105

106106
private static final VersionedCodecStreamWrapper<RemoteSegmentMetadata> metadataStreamWrapper = new VersionedCodecStreamWrapper<>(
107-
new RemoteSegmentMetadataHandler(),
107+
new RemoteSegmentMetadataHandlerFactory(),
108+
RemoteSegmentMetadata.VERSION_ONE,
108109
RemoteSegmentMetadata.CURRENT_VERSION,
109110
RemoteSegmentMetadata.METADATA_CODEC
110111
);

server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,15 @@
3030
*/
3131
@PublicApi(since = "2.6.0")
3232
public class RemoteSegmentMetadata {
33+
34+
public static final int VERSION_ONE = 1;
35+
36+
public static final int VERSION_TWO = 2;
37+
3338
/**
3439
* Latest supported version of metadata
3540
*/
36-
public static final int CURRENT_VERSION = 1;
41+
public static final int CURRENT_VERSION = VERSION_TWO;
3742
/**
3843
* Metadata codec
3944
*/
@@ -106,18 +111,30 @@ public static Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> f
106111
);
107112
}
108113

114+
/**
115+
* Write always writes with the latest version of the RemoteSegmentMetadata
116+
* @param out file output stream which will store stream content
117+
* @throws IOException in case there is a problem writing the file
118+
*/
109119
public void write(IndexOutput out) throws IOException {
110120
out.writeMapOfStrings(toMapOfStrings());
111121
writeCheckpointToIndexOutput(replicationCheckpoint, out);
112122
out.writeLong(segmentInfosBytes.length);
113123
out.writeBytes(segmentInfosBytes, segmentInfosBytes.length);
114124
}
115125

116-
public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException {
126+
/**
127+
* Read can happen in the upgraded version of replica which needs to support all versions of RemoteSegmentMetadata
128+
* @param indexInput file input stream
129+
* @param version version of the RemoteSegmentMetadata
130+
* @return {@code RemoteSegmentMetadata}
131+
* @throws IOException in case there is a problem reading from the file input stream
132+
*/
133+
public static RemoteSegmentMetadata read(IndexInput indexInput, int version) throws IOException {
117134
Map<String, String> metadata = indexInput.readMapOfStrings();
118135
final Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap = RemoteSegmentMetadata
119136
.fromMapOfStrings(metadata);
120-
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap);
137+
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap, version);
121138
int byteArraySize = (int) indexInput.readLong();
122139
byte[] segmentInfosBytes = new byte[byteArraySize];
123140
indexInput.readBytes(segmentInfosBytes, 0, byteArraySize);
@@ -136,11 +153,13 @@ public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicatio
136153
out.writeLong(replicationCheckpoint.getSegmentInfosVersion());
137154
out.writeLong(replicationCheckpoint.getLength());
138155
out.writeString(replicationCheckpoint.getCodec());
156+
out.writeLong(replicationCheckpoint.getCreatedTimeStamp());
139157
}
140158

141159
private static ReplicationCheckpoint readCheckpointFromIndexInput(
142160
IndexInput in,
143-
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap
161+
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap,
162+
int version
144163
) throws IOException {
145164
return new ReplicationCheckpoint(
146165
new ShardId(new Index(in.readString(), in.readString()), in.readVInt()),
@@ -149,7 +168,8 @@ private static ReplicationCheckpoint readCheckpointFromIndexInput(
149168
in.readLong(),
150169
in.readLong(),
151170
in.readString(),
152-
toStoreFileMetadata(uploadedSegmentMetadataMap)
171+
toStoreFileMetadata(uploadedSegmentMetadataMap),
172+
version >= VERSION_TWO ? in.readLong() : 0
153173
);
154174
}
155175

server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,21 @@
2020
* @opensearch.internal
2121
*/
2222
public class RemoteSegmentMetadataHandler implements IndexIOStreamHandler<RemoteSegmentMetadata> {
23+
24+
private final int version;
25+
26+
public RemoteSegmentMetadataHandler(int version) {
27+
this.version = version;
28+
}
29+
2330
/**
2431
* Reads metadata content from metadata file input stream and parsed into {@link RemoteSegmentMetadata}
2532
* @param indexInput metadata file input stream with {@link IndexInput#getFilePointer()} pointing to metadata content
2633
* @return {@link RemoteSegmentMetadata}
2734
*/
2835
@Override
2936
public RemoteSegmentMetadata readContent(IndexInput indexInput) throws IOException {
30-
return RemoteSegmentMetadata.read(indexInput);
37+
return RemoteSegmentMetadata.read(indexInput, version);
3138
}
3239

3340
/**

0 commit comments

Comments
 (0)