Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))

- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))

### Changed
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
- Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,8 @@ static <Response> void completeWith(ActionListener<Response> listener, CheckedSu
throw ex;
}
}

static <T> ActionListener<T> noOp() {
return ActionListener.wrap(response -> {}, exception -> {});
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.DocsStats;
Expand Down Expand Up @@ -216,6 +217,10 @@ public MergeStats getMergeStats() {
return new MergeStats();
}

public MergedSegmentTransferTracker getMergedSegmentTransferTracker() {
return engineConfig.getMergedSegmentTransferTracker();
}

/** returns the history uuid for the engine */
public abstract String getHistoryUUID();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.index.codec.CodecSettings;
import org.opensearch.index.mapper.DocumentMapperForType;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
Expand Down Expand Up @@ -115,6 +116,7 @@ public final class EngineConfig {
private final Comparator<LeafReader> leafSorter;
private final Supplier<DocumentMapperForType> documentMapperForTypeSupplier;
private final ClusterApplierService clusterApplierService;
private final MergedSegmentTransferTracker mergedSegmentTransferTracker;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -306,6 +308,7 @@ private EngineConfig(Builder builder) {
this.documentMapperForTypeSupplier = builder.documentMapperForTypeSupplier;
this.indexReaderWarmer = builder.indexReaderWarmer;
this.clusterApplierService = builder.clusterApplierService;
this.mergedSegmentTransferTracker = builder.mergedSegmentTransferTracker;
}

/**
Expand Down Expand Up @@ -625,6 +628,13 @@ public ClusterApplierService getClusterApplierService() {
return this.clusterApplierService;
}

/**
* Returns the MergedSegmentTransferTracker instance.
*/
public MergedSegmentTransferTracker getMergedSegmentTransferTracker() {
return this.mergedSegmentTransferTracker;
}

/**
* Builder for EngineConfig class
*
Expand Down Expand Up @@ -662,6 +672,7 @@ public static class Builder {
Comparator<LeafReader> leafSorter;
private IndexWriter.IndexReaderWarmer indexReaderWarmer;
private ClusterApplierService clusterApplierService;
private MergedSegmentTransferTracker mergedSegmentTransferTracker;

public Builder shardId(ShardId shardId) {
this.shardId = shardId;
Expand Down Expand Up @@ -813,6 +824,11 @@ public Builder clusterApplierService(ClusterApplierService clusterApplierService
return this;
}

public Builder mergedSegmentTransferTracker(MergedSegmentTransferTracker mergedSegmentTransferTracker) {
this.mergedSegmentTransferTracker = mergedSegmentTransferTracker;
return this;
}

public EngineConfig build() {
return new EngineConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.mapper.DocumentMapperForType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -160,7 +161,8 @@ public EngineConfig newEngineConfig(
Comparator<LeafReader> leafSorter,
Supplier<DocumentMapperForType> documentMapperForTypeSupplier,
IndexWriter.IndexReaderWarmer indexReaderWarmer,
ClusterApplierService clusterApplierService
ClusterApplierService clusterApplierService,
MergedSegmentTransferTracker mergedSegmentTransferTracker
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -197,6 +199,7 @@ public EngineConfig newEngineConfig(
.documentMapperForTypeSupplier(documentMapperForTypeSupplier)
.indexReaderWarmer(indexReaderWarmer)
.clusterApplierService(clusterApplierService)
.mergedSegmentTransferTracker(mergedSegmentTransferTracker)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.opensearch.index.mapper.SourceFieldMapper;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.merge.OnGoingMerge;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -258,7 +259,11 @@ public TranslogManager translogManager() {
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
mergeScheduler = scheduler = new EngineMergeScheduler(
engineConfig.getShardId(),
engineConfig.getIndexSettings(),
getMergedSegmentTransferTracker()
);
throttle = new IndexThrottle();
try {
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
Expand Down Expand Up @@ -2475,8 +2480,8 @@ private final class EngineMergeScheduler extends OpenSearchConcurrentMergeSchedu
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();

EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings, MergedSegmentTransferTracker mergedSegmentTransferTracker) {
super(shardId, indexSettings, mergedSegmentTransferTracker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.index.SegmentReader;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.transport.TransportService;
Expand All @@ -33,7 +34,7 @@ public class MergedSegmentWarmer implements IndexWriter.IndexReaderWarmer {
private final RecoverySettings recoverySettings;
private final ClusterService clusterService;
private final IndexShard indexShard;

private final MergedSegmentTransferTracker mergedSegmentTransferTracker;
private final Logger logger;

public MergedSegmentWarmer(
Expand All @@ -46,23 +47,29 @@ public MergedSegmentWarmer(
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.indexShard = indexShard;
this.mergedSegmentTransferTracker = indexShard.mergedSegmentTransferTracker();
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
}

@Override
public void warm(LeafReader leafReader) throws IOException {
try {
if (shouldWarm() == false) {
return;
}
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
assert leafReader instanceof SegmentReader;
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
if (shouldWarm() == false) {
return;
}

long startTime = System.currentTimeMillis();
mergedSegmentTransferTracker.incrementTotalWarmInvocationsCount();
mergedSegmentTransferTracker.incrementOngoingWarms();
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
assert leafReader instanceof SegmentReader;
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
long startTime = System.currentTimeMillis();
long elapsedTime = 0;
try {
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
logger.info(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
indexShard.publishMergedSegment(segmentCommitInfo);
elapsedTime = System.currentTimeMillis() - startTime;
long finalElapsedTime = elapsedTime;
logger.trace(() -> {
long segmentSize = -1;
try {
Expand All @@ -72,17 +79,15 @@ public void warm(LeafReader leafReader) throws IOException {
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
segmentCommitInfo.info.name,
segmentSize,
(System.currentTimeMillis() - startTime)
finalElapsedTime
);
});
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"Throw exception during merged segment warmer, skip merged segment {} warmer",
((SegmentReader) leafReader).getSegmentName()
),
e
);
} catch (Throwable t) {
logger.warn(() -> new ParameterizedMessage("Failed to warm segment. Continuing. {}", leafReader), t);
mergedSegmentTransferTracker.incrementTotalWarmFailureCount();
} finally {
mergedSegmentTransferTracker.addTotalWarmTimeMillis(elapsedTime);
mergedSegmentTransferTracker.decrementOngoingWarms();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -499,6 +500,13 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
@Override
public void maybePruneDeletes() {}

@Override
public MergeStats getMergeStats() {
MergeStats mergeStats = new MergeStats();
mergeStats.add(engineConfig.getMergedSegmentTransferTracker().stats());
return mergeStats;
}

@Override
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
import org.opensearch.index.merge.OnGoingMerge;

import java.io.IOException;
Expand Down Expand Up @@ -78,12 +79,18 @@ class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
private final MergeSchedulerConfig config;
private final MergedSegmentTransferTracker mergedSegmentTransferTracker;

OpenSearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
OpenSearchConcurrentMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
MergedSegmentTransferTracker mergedSegmentTransferTracker
) {
this.config = indexSettings.getMergeSchedulerConfig();
this.shardId = shardId;
this.indexSettings = indexSettings;
this.logger = Loggers.getLogger(getClass(), shardId);
this.mergedSegmentTransferTracker = mergedSegmentTransferTracker;
refreshConfig();
}

Expand Down Expand Up @@ -211,7 +218,8 @@ MergeStats stats() {
currentMergesSizeInBytes.count(),
totalMergeStoppedTime.count(),
totalMergeThrottledTime.count(),
config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY
config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY,
mergedSegmentTransferTracker.stats()
);
return mergeStats;
}
Expand Down
Loading
Loading