Skip to content

Commit f15cbbd

Browse files
kh3raAditya Khera
andauthored
Add Metrics For The Merged Segment Warmer Flow (#18929)
* Adding stats for merged segment warmer Signed-off-by: kh3ra <[email protected]> Signed-off-by: Aditya Khera <[email protected]> * Adding unit tests Signed-off-by: kh3ra <[email protected]> Signed-off-by: Aditya Khera <[email protected]> * SpotlessApply Signed-off-by: kh3ra <[email protected]> Signed-off-by: Aditya Khera <[email protected]> * Minor fix Signed-off-by: kh3ra <[email protected]> Signed-off-by: Aditya Khera <[email protected]> * minor fix Signed-off-by: kh3ra <[email protected]> Signed-off-by: Aditya Khera <[email protected]> * addressing review comments - tests to follow Signed-off-by: Aditya Khera <[email protected]> * Addressing review comments + added ITs Signed-off-by: Aditya Khera <[email protected]> * addressing review comments - tests to follow Signed-off-by: Aditya Khera <[email protected]> * Changelog + minor bug fix Signed-off-by: Aditya Khera <[email protected]> * Addressing review comments + added UTs Signed-off-by: Aditya Khera <[email protected]> * Fixing tests Signed-off-by: Aditya Khera <[email protected]> * Restored breaking publicAPIs Signed-off-by: Aditya Khera <[email protected]> * spotlessApply Signed-off-by: Aditya Khera <[email protected]> * Empty commit Signed-off-by: Aditya Khera <[email protected]> * Empty commit to trigger build Signed-off-by: Aditya Khera <[email protected]> * Test changes Signed-off-by: Aditya Khera <[email protected]> * Empty commit Signed-off-by: Aditya Khera <[email protected]> * Empty commit to trigger build Signed-off-by: Aditya Khera <[email protected]> * Fixing tests Signed-off-by: Aditya Khera <[email protected]> * fixes to merged segment warmer + tests Signed-off-by: Aditya Khera <[email protected]> * spotlessApply Signed-off-by: Aditya Khera <[email protected]> * Fixing ITs after rebase Signed-off-by: Aditya Khera <[email protected]> * Empty commit Signed-off-by: Aditya Khera <[email protected]> * Addressing review comments Signed-off-by: Aditya Khera <[email protected]> * rebased with main Signed-off-by: Aditya Khera <[email protected]> * Fixing tests Signed-off-by: Aditya Khera <[email protected]> * spotlessApply Signed-off-by: Aditya Khera <[email protected]> * YAML test fix Signed-off-by: Aditya Khera <[email protected]> * yaml test fix Signed-off-by: Aditya Khera <[email protected]> * test fixes Signed-off-by: Aditya Khera <[email protected]> --------- Signed-off-by: kh3ra <[email protected]> Signed-off-by: Aditya Khera <[email protected]> Co-authored-by: Aditya Khera <[email protected]>
1 parent 65cd2c8 commit f15cbbd

33 files changed

+1793
-139
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
1111
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))
1212

13+
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
14+
1315
### Changed
1416
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
1517
- Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551))

libs/core/src/main/java/org/opensearch/core/action/ActionListener.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,4 +358,8 @@ static <Response> void completeWith(ActionListener<Response> listener, CheckedSu
358358
throw ex;
359359
}
360360
}
361+
362+
static <T> ActionListener<T> noOp() {
363+
return ActionListener.wrap(response -> {}, exception -> {});
364+
}
361365
}

rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml

Lines changed: 195 additions & 87 deletions
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/merge/MergeStatsIT.java

Lines changed: 353 additions & 0 deletions
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/index/engine/Engine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.opensearch.index.mapper.SourceToParse;
8989
import org.opensearch.index.mapper.Uid;
9090
import org.opensearch.index.merge.MergeStats;
91+
import org.opensearch.index.merge.MergedSegmentTransferTracker;
9192
import org.opensearch.index.seqno.SeqNoStats;
9293
import org.opensearch.index.seqno.SequenceNumbers;
9394
import org.opensearch.index.shard.DocsStats;
@@ -216,6 +217,10 @@ public MergeStats getMergeStats() {
216217
return new MergeStats();
217218
}
218219

220+
public MergedSegmentTransferTracker getMergedSegmentTransferTracker() {
221+
return engineConfig.getMergedSegmentTransferTracker();
222+
}
223+
219224
/** returns the history uuid for the engine */
220225
public abstract String getHistoryUUID();
221226

server/src/main/java/org/opensearch/index/engine/EngineConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.opensearch.index.codec.CodecSettings;
5858
import org.opensearch.index.mapper.DocumentMapperForType;
5959
import org.opensearch.index.mapper.ParsedDocument;
60+
import org.opensearch.index.merge.MergedSegmentTransferTracker;
6061
import org.opensearch.index.seqno.RetentionLeases;
6162
import org.opensearch.index.store.Store;
6263
import org.opensearch.index.translog.InternalTranslogFactory;
@@ -115,6 +116,7 @@ public final class EngineConfig {
115116
private final Comparator<LeafReader> leafSorter;
116117
private final Supplier<DocumentMapperForType> documentMapperForTypeSupplier;
117118
private final ClusterApplierService clusterApplierService;
119+
private final MergedSegmentTransferTracker mergedSegmentTransferTracker;
118120

119121
/**
120122
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@@ -306,6 +308,7 @@ private EngineConfig(Builder builder) {
306308
this.documentMapperForTypeSupplier = builder.documentMapperForTypeSupplier;
307309
this.indexReaderWarmer = builder.indexReaderWarmer;
308310
this.clusterApplierService = builder.clusterApplierService;
311+
this.mergedSegmentTransferTracker = builder.mergedSegmentTransferTracker;
309312
}
310313

311314
/**
@@ -625,6 +628,13 @@ public ClusterApplierService getClusterApplierService() {
625628
return this.clusterApplierService;
626629
}
627630

631+
/**
632+
* Returns the MergedSegmentTransferTracker instance.
633+
*/
634+
public MergedSegmentTransferTracker getMergedSegmentTransferTracker() {
635+
return this.mergedSegmentTransferTracker;
636+
}
637+
628638
/**
629639
* Builder for EngineConfig class
630640
*
@@ -662,6 +672,7 @@ public static class Builder {
662672
Comparator<LeafReader> leafSorter;
663673
private IndexWriter.IndexReaderWarmer indexReaderWarmer;
664674
private ClusterApplierService clusterApplierService;
675+
private MergedSegmentTransferTracker mergedSegmentTransferTracker;
665676

666677
public Builder shardId(ShardId shardId) {
667678
this.shardId = shardId;
@@ -813,6 +824,11 @@ public Builder clusterApplierService(ClusterApplierService clusterApplierService
813824
return this;
814825
}
815826

827+
public Builder mergedSegmentTransferTracker(MergedSegmentTransferTracker mergedSegmentTransferTracker) {
828+
this.mergedSegmentTransferTracker = mergedSegmentTransferTracker;
829+
return this;
830+
}
831+
816832
public EngineConfig build() {
817833
return new EngineConfig(this);
818834
}

server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.opensearch.index.codec.CodecServiceFactory;
3030
import org.opensearch.index.mapper.DocumentMapperForType;
3131
import org.opensearch.index.mapper.MapperService;
32+
import org.opensearch.index.merge.MergedSegmentTransferTracker;
3233
import org.opensearch.index.seqno.RetentionLeases;
3334
import org.opensearch.index.store.Store;
3435
import org.opensearch.index.translog.TranslogConfig;
@@ -160,7 +161,8 @@ public EngineConfig newEngineConfig(
160161
Comparator<LeafReader> leafSorter,
161162
Supplier<DocumentMapperForType> documentMapperForTypeSupplier,
162163
IndexWriter.IndexReaderWarmer indexReaderWarmer,
163-
ClusterApplierService clusterApplierService
164+
ClusterApplierService clusterApplierService,
165+
MergedSegmentTransferTracker mergedSegmentTransferTracker
164166
) {
165167
CodecService codecServiceToUse = codecService;
166168
if (codecService == null && this.codecServiceFactory != null) {
@@ -197,6 +199,7 @@ public EngineConfig newEngineConfig(
197199
.documentMapperForTypeSupplier(documentMapperForTypeSupplier)
198200
.indexReaderWarmer(indexReaderWarmer)
199201
.clusterApplierService(clusterApplierService)
202+
.mergedSegmentTransferTracker(mergedSegmentTransferTracker)
200203
.build();
201204
}
202205

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.opensearch.index.mapper.SourceFieldMapper;
105105
import org.opensearch.index.mapper.Uid;
106106
import org.opensearch.index.merge.MergeStats;
107+
import org.opensearch.index.merge.MergedSegmentTransferTracker;
107108
import org.opensearch.index.merge.OnGoingMerge;
108109
import org.opensearch.index.seqno.LocalCheckpointTracker;
109110
import org.opensearch.index.seqno.SeqNoStats;
@@ -258,7 +259,11 @@ public TranslogManager translogManager() {
258259
boolean success = false;
259260
try {
260261
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
261-
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
262+
mergeScheduler = scheduler = new EngineMergeScheduler(
263+
engineConfig.getShardId(),
264+
engineConfig.getIndexSettings(),
265+
getMergedSegmentTransferTracker()
266+
);
262267
throttle = new IndexThrottle();
263268
try {
264269
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
@@ -2475,8 +2480,8 @@ private final class EngineMergeScheduler extends OpenSearchConcurrentMergeSchedu
24752480
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
24762481
private final AtomicBoolean isThrottling = new AtomicBoolean();
24772482

2478-
EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
2479-
super(shardId, indexSettings);
2483+
EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings, MergedSegmentTransferTracker mergedSegmentTransferTracker) {
2484+
super(shardId, indexSettings, mergedSegmentTransferTracker);
24802485
}
24812486

24822487
@Override

server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.lucene.index.SegmentReader;
1717
import org.opensearch.cluster.service.ClusterService;
1818
import org.opensearch.common.logging.Loggers;
19+
import org.opensearch.index.merge.MergedSegmentTransferTracker;
1920
import org.opensearch.index.shard.IndexShard;
2021
import org.opensearch.indices.recovery.RecoverySettings;
2122
import org.opensearch.transport.TransportService;
@@ -33,7 +34,7 @@ public class MergedSegmentWarmer implements IndexWriter.IndexReaderWarmer {
3334
private final RecoverySettings recoverySettings;
3435
private final ClusterService clusterService;
3536
private final IndexShard indexShard;
36-
37+
private final MergedSegmentTransferTracker mergedSegmentTransferTracker;
3738
private final Logger logger;
3839

3940
public MergedSegmentWarmer(
@@ -46,23 +47,29 @@ public MergedSegmentWarmer(
4647
this.recoverySettings = recoverySettings;
4748
this.clusterService = clusterService;
4849
this.indexShard = indexShard;
50+
this.mergedSegmentTransferTracker = indexShard.mergedSegmentTransferTracker();
4951
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
5052
}
5153

5254
@Override
5355
public void warm(LeafReader leafReader) throws IOException {
54-
try {
55-
if (shouldWarm() == false) {
56-
return;
57-
}
58-
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
59-
assert leafReader instanceof SegmentReader;
60-
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
56+
if (shouldWarm() == false) {
57+
return;
58+
}
6159

62-
long startTime = System.currentTimeMillis();
60+
mergedSegmentTransferTracker.incrementTotalWarmInvocationsCount();
61+
mergedSegmentTransferTracker.incrementOngoingWarms();
62+
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
63+
assert leafReader instanceof SegmentReader;
64+
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
65+
long startTime = System.currentTimeMillis();
66+
long elapsedTime = 0;
67+
try {
6368
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
64-
logger.info(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
69+
logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
6570
indexShard.publishMergedSegment(segmentCommitInfo);
71+
elapsedTime = System.currentTimeMillis() - startTime;
72+
long finalElapsedTime = elapsedTime;
6673
logger.trace(() -> {
6774
long segmentSize = -1;
6875
try {
@@ -72,17 +79,15 @@ public void warm(LeafReader leafReader) throws IOException {
7279
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
7380
segmentCommitInfo.info.name,
7481
segmentSize,
75-
(System.currentTimeMillis() - startTime)
82+
finalElapsedTime
7683
);
7784
});
78-
} catch (Exception e) {
79-
logger.warn(
80-
() -> new ParameterizedMessage(
81-
"Throw exception during merged segment warmer, skip merged segment {} warmer",
82-
((SegmentReader) leafReader).getSegmentName()
83-
),
84-
e
85-
);
85+
} catch (Throwable t) {
86+
logger.warn(() -> new ParameterizedMessage("Failed to warm segment. Continuing. {}", leafReader), t);
87+
mergedSegmentTransferTracker.incrementTotalWarmFailureCount();
88+
} finally {
89+
mergedSegmentTransferTracker.addTotalWarmTimeMillis(elapsedTime);
90+
mergedSegmentTransferTracker.decrementOngoingWarms();
8691
}
8792
}
8893

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.common.util.concurrent.ReleasableLock;
2222
import org.opensearch.common.util.io.IOUtils;
2323
import org.opensearch.core.common.unit.ByteSizeValue;
24+
import org.opensearch.index.merge.MergeStats;
2425
import org.opensearch.index.seqno.LocalCheckpointTracker;
2526
import org.opensearch.index.seqno.SeqNoStats;
2627
import org.opensearch.index.seqno.SequenceNumbers;
@@ -499,6 +500,13 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
499500
@Override
500501
public void maybePruneDeletes() {}
501502

503+
@Override
504+
public MergeStats getMergeStats() {
505+
MergeStats mergeStats = new MergeStats();
506+
mergeStats.add(engineConfig.getMergedSegmentTransferTracker().stats());
507+
return mergeStats;
508+
}
509+
502510
@Override
503511
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {}
504512

0 commit comments

Comments
 (0)