Skip to content

Modify the mechanism to pause indexing #128405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128405.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128405
summary: Modify the mechanism to pause indexing
area: Distributed
type: bug
issues: []
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test025SyncPluginsUsingProxy
issue: https://github.com/elastic/elasticsearch/issues/127138
- class: org.elasticsearch.indices.stats.IndexStatsIT
method: testThrottleStats
issue: https://github.com/elastic/elasticsearch/issues/126359
- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityRestIT
method: testTaskCancellation
issue: https://github.com/elastic/elasticsearch/issues/128009
Expand Down
122 changes: 61 additions & 61 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -455,33 +454,66 @@ protected static final class IndexThrottle {
private final CounterMetric throttleTimeMillisMetric = new CounterMetric();
private volatile long startOfThrottleNS;
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
private final PauseLock throttlingLock;
private final ReleasableLock lockReference;
// This lock throttles indexing to 1 thread (per shard)
private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
// This lock pauses indexing completely (on a per shard basis)
private final Lock pauseIndexingLock = new ReentrantLock();
private final Condition pauseCondition = pauseIndexingLock.newCondition();
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
private volatile AtomicBoolean suspendThrottling = new AtomicBoolean();
private final boolean pauseWhenThrottled; // Should throttling pause indexing ?
private volatile ReleasableLock lock = NOOP_LOCK;

public IndexThrottle(boolean pause) {
throttlingLock = new PauseLock(pause ? 0 : 1);
lockReference = new ReleasableLock(throttlingLock);
pauseWhenThrottled = pause;
}

public Releasable acquireThrottle() {
return lock.acquire();
var lockCopy = this.lock;
if (lockCopy == pauseLockReference) {
try (var ignored = pauseLockReference.acquire()) {
// If pause throttling is activated and not temporarily suspended
while ((lock == pauseLockReference) && (suspendThrottling.getAcquire() == false)) {
logger.trace("Waiting on pause indexing lock");
pauseCondition.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
logger.trace("Acquired pause indexing lock");
}
return (() -> {});
} else {
return lockCopy.acquire();
}
}

/** Activate throttling, which switches the lock to be a real lock */
public void activate() {
assert lock == NOOP_LOCK : "throttling activated while already active";

startOfThrottleNS = System.nanoTime();
throttlingLock.throttle();
lock = lockReference;
if (pauseWhenThrottled) {
lock = pauseLockReference;
logger.trace("Activated index throttling pause");
} else {
lock = lockReference;
}
}

/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
public void deactivate() {
assert lock != NOOP_LOCK : "throttling deactivated but not active";

throttlingLock.unthrottle();
lock = NOOP_LOCK;
if (pauseWhenThrottled) {
// Signal the threads that are waiting on pauseCondition
try (Releasable releasableLock = pauseLockReference.acquire()) {
pauseCondition.signalAll();
}
logger.trace("Deactivated index throttling pause");
}

assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS";
long throttleTimeNS = System.nanoTime() - startOfThrottleNS;
Expand All @@ -508,6 +540,26 @@ boolean isThrottled() {
return lock != NOOP_LOCK;
}

/** Suspend throttling to allow another task such as relocation to acquire all indexing permits */
public void suspendThrottle() {
if (pauseWhenThrottled) {
try (Releasable releasableLock = pauseLockReference.acquire()) {
suspendThrottling.setRelease(true);
pauseCondition.signalAll();
}
}
}

/** Reverse what was done in {@link #suspendThrottle()} */
public void resumeThrottle() {
if (pauseWhenThrottled) {
try (Releasable releasableLock = pauseLockReference.acquire()) {
suspendThrottling.setRelease(false);
pauseCondition.signalAll();
}
}
}

boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
if (isThrottled()) {
return lock.isHeldByCurrentThread();
Expand Down Expand Up @@ -570,58 +622,6 @@ public Condition newCondition() {
}
}

/* A lock implementation that allows us to control how many threads can take the lock
* In particular, this is used to set the number of allowed threads to 1 or 0
* when index throttling is activated.
*/
protected static final class PauseLock implements Lock {
private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
private final int allowThreads;

public PauseLock(int allowThreads) {
this.allowThreads = allowThreads;
}

public void lock() {
semaphore.acquireUninterruptibly();
}

@Override
public void lockInterruptibly() throws InterruptedException {
semaphore.acquire();
}

@Override
public void unlock() {
semaphore.release();
}

@Override
public boolean tryLock() {
throw new UnsupportedOperationException();
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}

public void throttle() {
assert semaphore.availablePermits() == Integer.MAX_VALUE;
semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads);
}

public void unthrottle() {
assert semaphore.availablePermits() <= allowThreads;
semaphore.release(Integer.MAX_VALUE - allowThreads);
}
}

/**
* Perform document index operation on the engine
* @param index operation to perform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2828,19 +2828,27 @@ public void accept(ElasticsearchDirectoryReader reader, ElasticsearchDirectoryRe

@Override
public void activateThrottling() {
int count = throttleRequestCount.incrementAndGet();
assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
if (count == 1) {
throttle.activate();
// Synchronize on throttleRequestCount to make activateThrottling and deactivateThrottling
// atomic w.r.t each other
synchronized (throttleRequestCount) {
int count = throttleRequestCount.incrementAndGet();
assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
if (count == 1) {
throttle.activate();
}
}
}

@Override
public void deactivateThrottling() {
int count = throttleRequestCount.decrementAndGet();
assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count;
if (count == 0) {
throttle.deactivate();
// Synchronize on throttleRequestCount to make activateThrottling and deactivateThrottling
// atomic w.r.t each other
synchronized (throttleRequestCount) {
int count = throttleRequestCount.decrementAndGet();
assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count;
if (count == 0) {
throttle.deactivate();
}
}
}

Expand Down Expand Up @@ -2972,7 +2980,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {
@Override
public synchronized void afterMerge(OnGoingMerge merge) {
int maxNumMerges = getMaxMergeCount();
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (numMergesInFlight.decrementAndGet() <= maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
deactivateThrottling();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogOperationsUtils;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
Expand Down Expand Up @@ -170,6 +171,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntSupplier;
Expand Down Expand Up @@ -7035,6 +7037,50 @@ public void testIndexThrottling() throws Exception {
verify(indexWithoutThrottlingCheck, atLeastOnce()).startTime();
}

/* Test that indexing is paused during throttling using the PAUSE_INDEXING_ON_THROTTLE setting is on.
* The test tries to index a document into a shard for which indexing is throttled. It is unable to
* do so until throttling is disabled.
* Indexing proceeds as usual once the shard throttle is deactivated.
*/
public void testIndexThrottlingWithPause() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
try (
Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))
) {
final List<DocIdSeqNoAndSource> prevDocs;
final Engine.Index indexWithThrottlingCheck = indexForDoc(createParsedDoc("1", null));
final Engine.Index indexWithoutThrottlingCheck = indexForDoc(createParsedDoc("2", null));
prevDocs = getDocIds(engine, true);
assertThat(prevDocs.size(), equalTo(0));
Thread indexWithThrottle = new Thread(() -> {
try {
engine.index(indexWithThrottlingCheck);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

// Activate throttling (this will pause indexing)
engine.activateThrottling();
assertTrue(engine.isThrottled());
indexWithThrottle.start();
// Wait for the thread to complete, it will not complete because of the pause
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
assertThat(getDocIds(engine, true).size(), equalTo(0));
// Deactivate to allow the indexing thread to proceed
engine.deactivateThrottling();
indexWithThrottle.join();
assertThat(getDocIds(engine, true).size(), equalTo(1));
engine.index(indexWithoutThrottlingCheck);
assertThat(getDocIds(engine, true).size(), equalTo(2));
}
}

public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception {
MapperService mapperService = createMapperService();
final AtomicInteger refreshCount = new AtomicInteger();
Expand Down
Loading