diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index c17927990df7f..e11fa9f58aea9 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -2046,9 +2046,9 @@ private void awaitPendingClose() { } } - public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { - - } +// public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { +// +// } /** * Returns the timestamp of the last write in nanoseconds. diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index ad3cea6291eeb..ecd06bf3aa057 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -44,7 +44,9 @@ import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; @@ -301,6 +303,11 @@ public String getHistoryUUID() { return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY); } + @Override + public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { + + } + @Override public long getWritingBytes() { return 0; diff --git a/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java b/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java index 39f8929fe703c..5ad1d8009bc14 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java @@ -9,6 +9,8 @@ package org.opensearch.index.engine.exec.bridge; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.SafeCommitInfo; @@ -91,4 +93,9 @@ Translog.Snapshot newChangesSnapshot( ) throws IOException; String getHistoryUUID(); + + /** + * Applies changes to input settings. + */ + void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps); } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java index 5c750defc5237..432ea10b72123 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java @@ -10,6 +10,8 @@ import org.apache.lucene.search.ReferenceManager; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.CatalogSnapshotAwareRefreshListener; import org.opensearch.index.engine.Engine; @@ -83,7 +85,7 @@ public CompositeEngine(MapperService mapperService, this.mergeHandler = new ParquetMergeHandler(this, this.engine, this.engine.getDataFormat(), indexSettings.getParquetMergePolicy()); - mergeScheduler = new MergeScheduler(this.mergeHandler, this); + mergeScheduler = new MergeScheduler(this.mergeHandler, this, indexSettings); // Refresh here so that catalog snapshot gets initialized // TODO : any better way to do this ? @@ -386,4 +388,9 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long public String getHistoryUUID() { return ""; } + + @Override + public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { + mergeScheduler.refreshConfig(); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/merge/MergeScheduler.java b/server/src/main/java/org/opensearch/index/engine/exec/merge/MergeScheduler.java index 80265bf774928..ef00b18260043 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/merge/MergeScheduler.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/merge/MergeScheduler.java @@ -8,7 +8,9 @@ package org.opensearch.index.engine.exec.merge; -import org.apache.lucene.index.MergePolicy; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.exec.coord.CompositeEngine; import org.opensearch.index.MergeSchedulerConfig; import org.apache.logging.log4j.LogManager; @@ -30,39 +32,19 @@ public class MergeScheduler { private final List mergeThreads = new CopyOnWriteArrayList<>(); private final AtomicInteger activeMerges = new AtomicInteger(0); private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final MergeSchedulerConfig mergeSchedulerConfig; + private final Settings indexSettings; private volatile int maxConcurrentMerges; private volatile int maxMergeCount; - public MergeScheduler(MergeHandler mergeHandler, CompositeEngine compositeEngine) { - this(mergeHandler, compositeEngine, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); - - } - - public MergeScheduler(MergeHandler mergeHandler, CompositeEngine compositeEngine, int maxConcurrentMerges) { + public MergeScheduler(MergeHandler mergeHandler, + CompositeEngine compositeEngine, + IndexSettings indexSettings) { this.mergeHandler = mergeHandler; this.compositeEngine = compositeEngine; - this.maxConcurrentMerges = maxConcurrentMerges; - this.maxMergeCount = maxConcurrentMerges + 5; - } - - //TODO use this function to refresh the config from IndexSettings.MergeSchedulerConfig - /** - * Refreshes merge scheduler configuration from MergeSchedulerConfig. - * Updates max thread count and max merge count dynamically. - */ - public synchronized void refreshConfig(MergeSchedulerConfig config) { - int newMaxThreadCount = config.getMaxThreadCount(); - int newMaxMergeCount = config.getMaxMergeCount(); - - if (newMaxThreadCount == this.maxConcurrentMerges && newMaxMergeCount == this.maxMergeCount) { - return; - } - - logger.info("Updating merge scheduler config: maxThreadCount {} -> {}, maxMergeCount {} -> {}", - this.maxConcurrentMerges, newMaxThreadCount, this.maxMergeCount, newMaxMergeCount); - - this.maxConcurrentMerges = newMaxThreadCount; - this.maxMergeCount = newMaxMergeCount; + this.mergeSchedulerConfig = indexSettings.getMergeSchedulerConfig(); + this.indexSettings = indexSettings.getSettings(); + refreshConfig(); } /** @@ -77,19 +59,13 @@ public void triggerMerges() throws IOException { mergeHandler.updatePendingMerges(); - - // Submit merges up to available capacity - int scheduled = 0; - int availableToSchedule = getAvailableMergeSlots(); - - while(availableToSchedule >= scheduled && mergeHandler.hasPendingMerges()) { + while(activeMerges.get() < maxConcurrentMerges && mergeHandler.hasPendingMerges()) { OneMerge oneMerge = mergeHandler.getNextMerge(); if (oneMerge == null) { return; } try { submitMergeTask(oneMerge); - scheduled++; } catch (Exception e) { mergeHandler.onMergeFailure(oneMerge); } @@ -119,11 +95,39 @@ private int getAvailableMergeSlots() { */ private void submitMergeTask(OneMerge oneMerge) { activeMerges.incrementAndGet(); - MergeThread thread = new MergeThread(oneMerge); + MergeThread thread = getMergeThread(oneMerge); mergeThreads.add(thread); thread.start(); } + /** + * Refreshes merge scheduler configuration from MergeSchedulerConfig. + * Updates max thread count and max merge count dynamically. + */ + public void refreshConfig() { + int newMaxThreadCount = this.mergeSchedulerConfig.getMaxThreadCount(); + int newMaxMergeCount = this.mergeSchedulerConfig.getMaxMergeCount(); + + if (newMaxThreadCount == this.getMaxConcurrentMerges() && newMaxMergeCount == this.getMaxMergeCount()) { + return; + } + + logger.info("Updating merge scheduler config: maxThreadCount {} -> {}, maxMergeCount {} -> {}", + this.getActiveMergeCount(), newMaxThreadCount, this.getMaxMergeCount(), newMaxMergeCount); + + this.maxConcurrentMerges = newMaxThreadCount; + this.maxMergeCount = newMaxMergeCount; + } + + private MergeThread getMergeThread(OneMerge oneMerge) { + final MergeThread mergeThread = new MergeThread(oneMerge); + //TODO update the merge thread name with the below once shardId info is available +// mergeThread.setName(OpenSearchExecutors.threadName(indexSettings, "[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName())); + mergeThread.setName(OpenSearchExecutors.threadName(indexSettings, mergeThread.getName())); + mergeThread.setDaemon(true); + return mergeThread; + } + /** * Thread that executes a single merge operation. */ @@ -131,9 +135,7 @@ private class MergeThread extends Thread { private final OneMerge oneMerge; MergeThread(OneMerge oneMerge) { - super("merge-scheduler-" + oneMerge.toString()); this.oneMerge = oneMerge; - setDaemon(true); } @Override @@ -160,6 +162,12 @@ public void run() { } finally { activeMerges.decrementAndGet(); mergeThreads.remove(this); + try { + triggerMerges(); + } catch (IOException e) { + logger.error("ERROR in MERGE : " + e.getMessage()); + e.printStackTrace(); + } } } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 62518dc063465..cf690e873c5ef 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3272,10 +3272,11 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - Engine engineOrNull = getEngineOrNull(); - if (engineOrNull != null) { +// Engine engineOrNull = getEngineOrNull(); + Indexer engine = compositeEngine; + if (engine != null) { final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery; - engineOrNull.onSettingsChanged( + engine.onSettingsChanged( disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations()