Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2046,9 +2046,9 @@ private void awaitPendingClose() {
}
}

public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {

}
// public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
//
// }

/**
* Returns the timestamp of the last write in nanoseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -301,6 +303,11 @@ public String getHistoryUUID() {
return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY);
}

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {

}

@Override
public long getWritingBytes() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.index.engine.exec.bridge;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.SafeCommitInfo;
Expand Down Expand Up @@ -91,4 +93,9 @@ Translog.Snapshot newChangesSnapshot(
) throws IOException;

String getHistoryUUID();

/**
* Applies changes to input settings.
*/
void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.lucene.search.ReferenceManager;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.CatalogSnapshotAwareRefreshListener;
import org.opensearch.index.engine.Engine;
Expand Down Expand Up @@ -83,7 +85,7 @@ public CompositeEngine(MapperService mapperService,

this.mergeHandler = new ParquetMergeHandler(this, this.engine, this.engine.getDataFormat(),
indexSettings.getParquetMergePolicy());
mergeScheduler = new MergeScheduler(this.mergeHandler, this);
mergeScheduler = new MergeScheduler(this.mergeHandler, this, indexSettings);

// Refresh here so that catalog snapshot gets initialized
// TODO : any better way to do this ?
Expand Down Expand Up @@ -386,4 +388,9 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long
public String getHistoryUUID() {
return "";
}

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
mergeScheduler.refreshConfig();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

package org.opensearch.index.engine.exec.merge;

import org.apache.lucene.index.MergePolicy;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.coord.CompositeEngine;
import org.opensearch.index.MergeSchedulerConfig;
import org.apache.logging.log4j.LogManager;
Expand All @@ -30,39 +32,19 @@ public class MergeScheduler {
private final List<MergeThread> mergeThreads = new CopyOnWriteArrayList<>();
private final AtomicInteger activeMerges = new AtomicInteger(0);
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final MergeSchedulerConfig mergeSchedulerConfig;
private final Settings indexSettings;
private volatile int maxConcurrentMerges;
private volatile int maxMergeCount;

public MergeScheduler(MergeHandler mergeHandler, CompositeEngine compositeEngine) {
this(mergeHandler, compositeEngine, Math.max(1, Runtime.getRuntime().availableProcessors() / 4));

}

public MergeScheduler(MergeHandler mergeHandler, CompositeEngine compositeEngine, int maxConcurrentMerges) {
public MergeScheduler(MergeHandler mergeHandler,
CompositeEngine compositeEngine,
IndexSettings indexSettings) {
this.mergeHandler = mergeHandler;
this.compositeEngine = compositeEngine;
this.maxConcurrentMerges = maxConcurrentMerges;
this.maxMergeCount = maxConcurrentMerges + 5;
}

//TODO use this function to refresh the config from IndexSettings.MergeSchedulerConfig
/**
* Refreshes merge scheduler configuration from MergeSchedulerConfig.
* Updates max thread count and max merge count dynamically.
*/
public synchronized void refreshConfig(MergeSchedulerConfig config) {
int newMaxThreadCount = config.getMaxThreadCount();
int newMaxMergeCount = config.getMaxMergeCount();

if (newMaxThreadCount == this.maxConcurrentMerges && newMaxMergeCount == this.maxMergeCount) {
return;
}

logger.info("Updating merge scheduler config: maxThreadCount {} -> {}, maxMergeCount {} -> {}",
this.maxConcurrentMerges, newMaxThreadCount, this.maxMergeCount, newMaxMergeCount);

this.maxConcurrentMerges = newMaxThreadCount;
this.maxMergeCount = newMaxMergeCount;
this.mergeSchedulerConfig = indexSettings.getMergeSchedulerConfig();
this.indexSettings = indexSettings.getSettings();
refreshConfig();
}

/**
Expand All @@ -77,19 +59,13 @@ public void triggerMerges() throws IOException {

mergeHandler.updatePendingMerges();


// Submit merges up to available capacity
int scheduled = 0;
int availableToSchedule = getAvailableMergeSlots();

while(availableToSchedule >= scheduled && mergeHandler.hasPendingMerges()) {
while(activeMerges.get() < maxConcurrentMerges && mergeHandler.hasPendingMerges()) {
OneMerge oneMerge = mergeHandler.getNextMerge();
if (oneMerge == null) {
return;
}
try {
submitMergeTask(oneMerge);
scheduled++;
} catch (Exception e) {
mergeHandler.onMergeFailure(oneMerge);
}
Expand Down Expand Up @@ -119,21 +95,47 @@ private int getAvailableMergeSlots() {
*/
private void submitMergeTask(OneMerge oneMerge) {
activeMerges.incrementAndGet();
MergeThread thread = new MergeThread(oneMerge);
MergeThread thread = getMergeThread(oneMerge);
mergeThreads.add(thread);
thread.start();
}

/**
* Refreshes merge scheduler configuration from MergeSchedulerConfig.
* Updates max thread count and max merge count dynamically.
*/
public void refreshConfig() {
int newMaxThreadCount = this.mergeSchedulerConfig.getMaxThreadCount();
int newMaxMergeCount = this.mergeSchedulerConfig.getMaxMergeCount();

if (newMaxThreadCount == this.getMaxConcurrentMerges() && newMaxMergeCount == this.getMaxMergeCount()) {
return;
}

logger.info("Updating merge scheduler config: maxThreadCount {} -> {}, maxMergeCount {} -> {}",
this.getActiveMergeCount(), newMaxThreadCount, this.getMaxMergeCount(), newMaxMergeCount);

this.maxConcurrentMerges = newMaxThreadCount;
this.maxMergeCount = newMaxMergeCount;
}

private MergeThread getMergeThread(OneMerge oneMerge) {
final MergeThread mergeThread = new MergeThread(oneMerge);
//TODO update the merge thread name with the below once shardId info is available
// mergeThread.setName(OpenSearchExecutors.threadName(indexSettings, "[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName()));
mergeThread.setName(OpenSearchExecutors.threadName(indexSettings, mergeThread.getName()));
mergeThread.setDaemon(true);
return mergeThread;
}

/**
* Thread that executes a single merge operation.
*/
private class MergeThread extends Thread {
private final OneMerge oneMerge;

MergeThread(OneMerge oneMerge) {
super("merge-scheduler-" + oneMerge.toString());
this.oneMerge = oneMerge;
setDaemon(true);
}

@Override
Expand All @@ -160,6 +162,12 @@ public void run() {
} finally {
activeMerges.decrementAndGet();
mergeThreads.remove(this);
try {
triggerMerges();
} catch (IOException e) {
logger.error("ERROR in MERGE : " + e.getMessage());
e.printStackTrace();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3272,10 +3272,11 @@ boolean shouldRollTranslogGeneration() {
}

public void onSettingsChanged() {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null) {
// Engine engineOrNull = getEngineOrNull();
Indexer engine = compositeEngine;
if (engine != null) {
final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery;
engineOrNull.onSettingsChanged(
engine.onSettingsChanged(
disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
Expand Down
Loading