Skip to content
Open
59 changes: 59 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/MergeTaskWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.apache.lucene.index;

import java.util.concurrent.Future;

/**
* Package-private wrapper that *is* the Runnable for a single merge.
* (Prototype: minimal fields; will be extended later for metrics / priority.)
*/
final class MergeTaskWrapper implements Runnable {

final SharedMergeScheduler owner;
final long mergeSizeBytes; // size hint (may be used later for prioritization)
Runnable delegate; // calls mergeSource.merge(...)
volatile boolean started = false;
volatile boolean finished = false;
volatile boolean aborted = false;
volatile Future<?> future; // will be set after submission (for possible cancellation later)

MergeTaskWrapper(SharedMergeScheduler owner, Runnable delegate, long mergeSizeBytes) {
this.owner = owner;
this.mergeSizeBytes = mergeSizeBytes;
this.delegate = delegate;
}

public void setRunnable(Runnable delegate) {
this.delegate = delegate;
}

public void setFuture(Future<?> future) {
this.future = future;
}


@Override
public void run() {
started = true;
try {
delegate.run();
} finally {
finished = true;
owner.onTaskFinished(this);
}
}

/** Attempt to cancel if still queued (not started). */
boolean cancelIfNotStarted() {
Future<?> f = future;
return !started && f != null && f.cancel(false);
}

public void markAborted() {
this.aborted = true;
}

public boolean isAborted() {
return aborted;
}
}

113 changes: 113 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/SharedMergeScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package org.apache.lucene.index;

import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

/**
* SharedMergeScheduler is an experimental MergeScheduler that submits merge tasks to a shared
* thread pool across IndexWriters.
*/
public class SharedMergeScheduler extends MergeScheduler {

// Tracks submitted merges per writer
private final Set<MergeTaskWrapper> mergeTasks = ConcurrentHashMap.newKeySet();

private final AtomicInteger submittedTasks = new AtomicInteger();
private final AtomicInteger completedTasks = new AtomicInteger();
private final Set<String> mergeThreadNames = java.util.Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ConcurrentHashMap<IndexWriter, CopyOnWriteArraySet<MergeTaskWrapper>> writerToMerges = new ConcurrentHashMap<>();

private final ExecutorService executor;

public SharedMergeScheduler(ExecutorService executor) {
this.executor = executor;
}

/**
* Retrieves pending merge tasks from the given {@link MergeSource} and submits them to the shared
* thread pool for execution.
*
* @param mergeSource the source of merge tasks (typically an IndexWriter)
* @param trigger the event that triggered the merge (e.g., SEGMENT_FLUSH, EXPLICIT)
* @throws IOException if merging fails
*/
@Override
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
while (mergeSource.hasPendingMerges()) {
final MergePolicy.OneMerge merge = mergeSource.getNextMerge();

MergeTaskWrapper wrappedTask = new MergeTaskWrapper(this, null, merge.totalBytesSize());

Runnable mergeRunnable = () -> {
mergeThreadNames.add(Thread.currentThread().getName());
try {
if (wrappedTask.isAborted()) {
return;
}
mergeSource.merge(merge);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
// No cleanup required here at the moment.
}
};

wrappedTask.setRunnable(mergeRunnable);

mergeTasks.add(wrappedTask);
submittedTasks.incrementAndGet();

Future<?> f = executor.submit(wrappedTask);
wrappedTask.setFuture(f);

}
}


//@Override
public void close(IndexWriter writer) {
CopyOnWriteArraySet<MergeTaskWrapper> tasks = writerToMerges.remove(writer);
if (tasks != null) {
for (MergeTaskWrapper task : tasks) {
task.markAborted();
}
}
}

void onTaskFinished(MergeTaskWrapper task) {
completedTasks.incrementAndGet();
mergeTasks.remove(task); // Clean up from the global task set
}

/* ===================== PACKAGE-PRIVATE TEST HOOKS ===================== */

int getActiveMergeTaskCount() {
return writerToMerges.values().stream().mapToInt(Set::size).sum();
}

int getSubmittedTaskCount() {
return submittedTasks.get();
}

int getCompletedTaskCount() {
return completedTasks.get();
}

Set<String> getMergeThreadNames() {
return java.util.Collections.unmodifiableSet(mergeThreadNames);
}

boolean hasWriter(IndexWriter w) {
return writerToMerges.containsKey(w);
}

}

Loading