Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,9 @@ managedLedgerDigestType=CRC32C
# Number of threads to be used for managed ledger scheduled tasks
managedLedgerNumSchedulerThreads=

# Number of threads to be use for managed ledger scheduled offload operations.
managedLedgerNumOffloadSchedulerThreads=

# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker. By default, uses 1/5th of available direct memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public class ManagedLedgerFactoryConfig {
*/
private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();

/**
* Number of threads to use for ML offload operations.
*/
private int numManagedLedgerOffloadSchedulerThreads = Runtime.getRuntime().availableProcessors();

/**
* ManagedCursorInfo compression threshold. If the origin metadata size below configuration.
* compression will not apply.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
@Getter
private final ScheduledExecutorService cacheEvictionExecutor;

// Dedicated thread pool for offload operations to isolate from core services
@Getter
private final OrderedScheduler offloadScheduler;

@Getter
protected final ManagedLedgerFactoryMBeanImpl mbean;

Expand Down Expand Up @@ -234,6 +238,17 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
.build();
cacheEvictionExecutor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));

// Create dedicated scheduler for offload operations to prevent blocking core services
// Use a conservative thread count to minimize resource overhead while ensuring adequate capacity
int offloadThreads = config.getNumManagedLedgerOffloadSchedulerThreads();
offloadScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(offloadThreads)
.statsLogger(statsLogger)
.traceTaskExecution(config.isTraceTaskExecution())
.name("bookkeeper-ml-offload-scheduler")
.build();
log.info("Created offload scheduler with {} threads for ML operations isolation", offloadThreads);
this.metadataServiceAvailable = true;
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
Expand Down Expand Up @@ -647,6 +662,11 @@ public CompletableFuture<Void> shutdownAsync() throws ManagedLedgerException {
flushCursorsTask.cancel(true);
cacheEvictionExecutor.shutdownNow();

// Shutdown offload scheduler
if (offloadScheduler != null) {
offloadScheduler.shutdownNow();
}

List<String> ledgerNames = new ArrayList<>(this.ledgers.keySet());
List<CompletableFuture<Void>> futures = new ArrayList<>(ledgerNames.size());
int numLedgers = ledgerNames.size();
Expand Down Expand Up @@ -1037,7 +1057,7 @@ public void operationFailed(MetaStoreException e) {
return OffloadUtils.cleanupOffloaded(li.ledgerId, uuid, mlConfig,
OffloadUtils.getOffloadDriverMetadata(ls,
mlConfig.getLedgerOffloader().getOffloadDriverMetadata()),
"Deletion", managedLedgerName, scheduledExecutor);
"Deletion", managedLedgerName, offloadScheduler);
}

return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2771,7 +2771,9 @@ public void maybeOffloadInBackground(CompletableFuture<Position> promise) {
final long offloadThresholdInSeconds =
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
executor.execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise));
// Use dedicated offload scheduler to avoid any potential blocking of core services
factory.getOffloadScheduler()
.execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise));
}
}

Expand All @@ -2791,7 +2793,8 @@ private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInS
}

if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise),
// Use dedicated offload scheduler to avoid blocking core services
factory.getOffloadScheduler().schedule(() -> maybeOffloadInBackground(finalPromise),
100, TimeUnit.MILLISECONDS);
return;
}
Expand Down Expand Up @@ -3384,7 +3387,7 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
OffloadUtils.cleanupOffloaded(ledgerId, uuid, config,
OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
"Trimming", name, scheduledExecutor);
"Trimming", name, factory.getOffloadScheduler());
}
}

Expand Down Expand Up @@ -3657,7 +3660,7 @@ void offloadLoop(CompletableFuture<Position> promise, Queue<LedgerInfo> ledgersT
TimeUnit.HOURS.toMillis(1)).limit(10),
FAIL_ON_CONFLICT,
() -> completeLedgerInfoForOffloaded(ledgerId, uuid),
scheduledExecutor, name)
factory.getOffloadScheduler(), name)
.whenComplete((ignore2, exception) -> {
if (exception != null) {
Throwable e = FutureUtil.unwrapCompletionException(exception);
Expand All @@ -3678,7 +3681,7 @@ void offloadLoop(CompletableFuture<Position> promise, Queue<LedgerInfo> ledgersT
OffloadUtils.cleanupOffloaded(
ledgerId, uuid, config,
driverMetadata,
"Metastore failure", name, scheduledExecutor);
"Metastore failure", name, factory.getOffloadScheduler());
}
});
})
Expand Down Expand Up @@ -3740,8 +3743,8 @@ private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation tran
CompletableFuture<Void> finalPromise) {
synchronized (this) {
if (!metadataMutex.tryLock()) {
// retry in 100 milliseconds
scheduledExecutor.schedule(
// retry in 100 milliseconds using offload scheduler to avoid blocking core services
factory.getOffloadScheduler().schedule(
() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise), 100,
TimeUnit.MILLISECONDS);
} else { // lock acquired
Expand Down Expand Up @@ -3805,7 +3808,7 @@ private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUI
config.getLedgerOffloader().getOffloadDriverMetadata()),
"Previous failed offload",
name,
scheduledExecutor);
factory.getOffloadScheduler());
}
LedgerInfo.Builder builder = oldInfo.toBuilder();
builder.getOffloadContextBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2313,6 +2313,8 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors();

private int managedLedgerNumOffloadSchedulerThreads = Runtime.getRuntime().availableProcessors();

@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Max number of entries to append to a ledger before triggering a rollover.\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata
conf.getManagedLedgerInfoCompressionThresholdInBytes());
managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds());
managedLedgerFactoryConfig.setManagedCursorInfoCompressionType(conf.getManagedCursorInfoCompressionType());
managedLedgerFactoryConfig.setNumManagedLedgerOffloadSchedulerThreads(
conf.getManagedLedgerNumOffloadSchedulerThreads());
managedLedgerFactoryConfig.setManagedCursorInfoCompressionThresholdInBytes(
conf.getManagedCursorInfoCompressionThresholdInBytes());

Expand Down
Loading