Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that
Expand All @@ -30,6 +31,7 @@
* iterating over the records in the batch.
*/
public abstract class BufferSupplier implements AutoCloseable {
protected AtomicLong cachedSize = new AtomicLong();

public static final BufferSupplier NO_CACHING = new BufferSupplier() {
@Override
Expand All @@ -40,6 +42,11 @@ public ByteBuffer get(int capacity) {
@Override
public void release(ByteBuffer buffer) {}

@Override
public long size() {
return 0;
}

@Override
public void close() {}
};
Expand All @@ -58,6 +65,11 @@ public static BufferSupplier create() {
*/
public abstract void release(ByteBuffer buffer);

/**
* Return total size in bytes of cached buffers.
*/
public abstract long size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size implementations aren't thread safe, but they will be called from a metrics thread.

Copy link
Collaborator Author

@DL1231 DL1231 Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing out this issue. As BufferSupplier is designed to be a "simple non-threadsafe interface for caching byte buffers," I've made the following targeted changes to address the specific concerns with the size() method when called from metrics threads:

  • Changed the bufferMap in DefaultSupplier to a ConcurrentHashMap to prevent potential ConcurrentModificationException.
  • Added the volatile keyword to cachedBuffer in GrowableBufferSupplier to resolve visibility issues.

I intentionally avoided adding synchronization to the get(), release(), and size() methods to maintain performance, as comprehensive thread safety isn't a design goal for this interface. These changes specifically address the metrics collection use case while preserving the lightweight. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR!

Avoiding locks is the right thing to do but the fixes do not look complete.

  • The ArrayDeques within DefaultSupplier suffer from the same visibility issue as cachedBuffer.
  • The first read of cachedBuffer within GrowableBufferSupplier.size can be non-null while the second read can be null.

Instead I would propose we introduce a size AtomicLong and update them in get and release.


/**
* Release all resources associated with this supplier.
*/
Expand All @@ -82,11 +94,18 @@ public void release(ByteBuffer buffer) {
// We currently keep a single buffer in flight, so optimise for that case
Deque<ByteBuffer> bufferQueue = bufferMap.computeIfAbsent(buffer.capacity(), k -> new ArrayDeque<>(1));
bufferQueue.addLast(buffer);
cachedSize.addAndGet(buffer.capacity());
}

@Override
public long size() {
return cachedSize.get();
}

@Override
public void close() {
bufferMap.clear();
cachedSize.set(0);
}
}

Expand All @@ -113,12 +132,18 @@ public ByteBuffer get(int minCapacity) {
public void release(ByteBuffer buffer) {
buffer.clear();
cachedBuffer = buffer;
cachedSize.set(buffer.capacity());
}

@Override
public long size() {
return cachedSize.get();
}

@Override
public void close() {
cachedBuffer = null;
cachedSize.set(0);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.lang.Math.min;
Expand Down Expand Up @@ -119,6 +120,7 @@ public static class Builder<S extends CoordinatorShard<U>, U> {
private Compression compression;
private OptionalInt appendLingerMs;
private ExecutorService executorService;
private Supplier<Integer> appendMaxBufferSizeSupplier;

public Builder<S, U> withLogPrefix(String logPrefix) {
this.logPrefix = logPrefix;
Expand Down Expand Up @@ -195,6 +197,11 @@ public Builder<S, U> withExecutorService(ExecutorService executorService) {
return this;
}

public Builder<S, U> withMaxBufferSizeSupplier(Supplier<Integer> maxBufferSizeSupplier) {
this.appendMaxBufferSizeSupplier = maxBufferSizeSupplier;
return this;
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
public CoordinatorRuntime<S, U> build() {
if (logPrefix == null)
Expand Down Expand Up @@ -227,6 +234,8 @@ public CoordinatorRuntime<S, U> build() {
throw new IllegalArgumentException("AppendLinger must be empty or >= 0");
if (executorService == null)
throw new IllegalArgumentException("ExecutorService must be set.");
if (appendMaxBufferSizeSupplier == null)
throw new IllegalArgumentException("Append max buffer size supplier must be set.");

return new CoordinatorRuntime<>(
logPrefix,
Expand All @@ -243,7 +252,8 @@ public CoordinatorRuntime<S, U> build() {
serializer,
compression,
appendLingerMs,
executorService
executorService,
appendMaxBufferSizeSupplier
);
}
}
Expand Down Expand Up @@ -771,13 +781,15 @@ private void freeCurrentBatch() {
// Cancel the linger timeout.
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);

// Release the buffer only if it is not larger than the maxBatchSize.
int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
// Release the buffer only if it is not larger than the max buffer size.
int maxBufferSize = appendMaxBufferSizeSupplier.get();

if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
if (currentBatch.builder.buffer().capacity() <= maxBufferSize) {
bufferSupplier.release(currentBatch.builder.buffer());
} else if (currentBatch.buffer.capacity() <= maxBatchSize) {
} else if (currentBatch.buffer.capacity() <= maxBufferSize) {
bufferSupplier.release(currentBatch.buffer);
} else {
runtimeMetrics.recordAppendBufferDiscarded();
}

currentBatch = null;
Expand Down Expand Up @@ -2050,6 +2062,11 @@ public void onHighWatermarkUpdated(
*/
private final ExecutorService executorService;

/**
* The maximum buffer size that the coordinator can cache.
*/
private final Supplier<Integer> appendMaxBufferSizeSupplier;

/**
* Atomic boolean indicating whether the runtime is running.
*/
Expand Down Expand Up @@ -2078,6 +2095,7 @@ public void onHighWatermarkUpdated(
* @param compression The compression codec.
* @param appendLingerMs The append linger time in ms.
* @param executorService The executor service.
* @param appendMaxBufferSizeSupplier The append max buffer size supplier.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private CoordinatorRuntime(
Expand All @@ -2096,6 +2114,7 @@ private CoordinatorRuntime(
Compression compression,
OptionalInt appendLingerMs,
ExecutorService executorService
Supplier<Integer> appendMaxBufferSizeSupplier
) {
this.logPrefix = logPrefix;
this.log = logContext.logger(CoordinatorRuntime.class);
Expand All @@ -2113,6 +2132,10 @@ private CoordinatorRuntime(
this.compression = compression;
this.appendLingerMs = appendLingerMs;
this.executorService = executorService;
this.appendMaxBufferSizeSupplier = appendMaxBufferSizeSupplier;
this.runtimeMetrics.registerAppendBufferSizeGauge(
() -> coordinators.values().stream().mapToLong(c -> c.bufferSupplier.size()).sum()
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,16 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable {
* @param sizeSupplier The size supplier.
*/
void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier);

/**
* Register the coordinator append buffer size gauge.
*
* @param sizeSupplier The size supplier.
*/
void registerAppendBufferSizeGauge(Supplier<Long> sizeSupplier);

/**
* Called when a buffer is discarded upon release instead of being cached.
*/
void recordAppendBufferDiscarded();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
*/
public static final String BATCH_FLUSH_TIME_METRIC_NAME = "batch-flush-time-ms";

/**
* The coordinator append buffer size metric name.
*/
public static final String APPEND_BUFFER_SIZE_METRIC_NAME = "coordinator-append-buffer-size-bytes";

/**
* The coordinator append buffer skip cache count metric name.
*/
public static final String APPEND_BUFFER_SKIP_CACHE_COUNT_METRIC_NAME = "coordinator-append-buffer-skip-cache-count";

/**
* Metric to count the number of partitions in Loading state.
*/
Expand All @@ -92,6 +102,17 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
*/
private final MetricName eventQueueSize;

/**
* Metric to count the size of the coordinator append buffer.
*/
private final MetricName appendBufferSize;

/**
* Metric to count the number of over-sized append buffers that were discarded.
*/
private final MetricName appendBufferSkipCacheCount;
private final AtomicLong appendBufferSkipCacheCounter = new AtomicLong(0);

/**
* The Kafka metrics registry.
*/
Expand Down Expand Up @@ -156,6 +177,17 @@ public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) {

this.eventQueueSize = kafkaMetricName("event-queue-size", "The event accumulator queue size.");

this.appendBufferSize = kafkaMetricName(
APPEND_BUFFER_SIZE_METRIC_NAME,
"The current total size in bytes of the append buffers being held in the coordinator's cache."
);

this.appendBufferSkipCacheCount = kafkaMetricName(
APPEND_BUFFER_SKIP_CACHE_COUNT_METRIC_NAME,
"The count of over-sized append buffers that were discarded instead of being cached upon release."
);

metrics.addMetric(appendBufferSkipCacheCount, (Gauge<Long>) (config, now) -> appendBufferSkipCacheCounter.get());
metrics.addMetric(numPartitionsLoading, (Gauge<Long>) (config, now) -> numPartitionsLoadingCounter.get());
metrics.addMetric(numPartitionsActive, (Gauge<Long>) (config, now) -> numPartitionsActiveCounter.get());
metrics.addMetric(numPartitionsFailed, (Gauge<Long>) (config, now) -> numPartitionsFailedCounter.get());
Expand Down Expand Up @@ -252,7 +284,9 @@ public void close() {
numPartitionsLoading,
numPartitionsActive,
numPartitionsFailed,
eventQueueSize
eventQueueSize,
appendBufferSize,
appendBufferSkipCacheCount
).forEach(metrics::removeMetric);

metrics.removeSensor(partitionLoadSensor.name());
Expand Down Expand Up @@ -340,4 +374,14 @@ public void recordThreadIdleTime(double idleTimeMs) {
public void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier) {
metrics.addMetric(eventQueueSize, (Gauge<Long>) (config, now) -> (long) sizeSupplier.get());
}

@Override
public void registerAppendBufferSizeGauge(Supplier<Long> sizeSupplier) {
metrics.addMetric(appendBufferSize, (Gauge<Long>) (config, now) -> sizeSupplier.get());
}

@Override
public void recordAppendBufferDiscarded() {
appendBufferSkipCacheCounter.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Set;
import java.util.stream.IntStream;

import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.APPEND_BUFFER_SIZE_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.APPEND_BUFFER_SKIP_CACHE_COUNT_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_LINGER_TIME_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PROCESSING_TIME_METRIC_NAME;
Expand All @@ -56,6 +58,8 @@ public void testMetricNames() {
kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "loading"),
kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "active"),
kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "failed"),
kafkaMetricName(metrics, APPEND_BUFFER_SIZE_METRIC_NAME),
kafkaMetricName(metrics, APPEND_BUFFER_SKIP_CACHE_COUNT_METRIC_NAME),
kafkaMetricName(metrics, "event-queue-size"),
kafkaMetricName(metrics, "partition-load-time-max"),
kafkaMetricName(metrics, "partition-load-time-avg"),
Expand Down Expand Up @@ -90,6 +94,7 @@ public void testMetricNames() {

try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
runtimeMetrics.registerAppendBufferSizeGauge(() -> 0L);
expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName)));
}

Expand Down Expand Up @@ -238,6 +243,55 @@ public void testEventQueueSizeMetricsGroupIsolation() {
}
}

@Test
public void testAppendBufferSize() {
Metrics metrics = new Metrics();

try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
runtimeMetrics.registerAppendBufferSizeGauge(() -> 5L);
assertMetricGauge(metrics, kafkaMetricName(metrics, APPEND_BUFFER_SIZE_METRIC_NAME), 5);
}
}

@Test
public void testAppendBufferSizeMetricsGroupIsolation() {
Time time = new MockTime();
Metrics metrics = new Metrics(time);

try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
runtimeMetrics.registerAppendBufferSizeGauge(() -> 5L);
otherRuntimeMetrics.registerAppendBufferSizeGauge(() -> 0L);

assertMetricGauge(metrics, kafkaMetricName(metrics, APPEND_BUFFER_SIZE_METRIC_NAME), 5);
assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, APPEND_BUFFER_SIZE_METRIC_NAME), 0);
}
}

@Test
public void testAppendBufferSkipCacheCount() {
Metrics metrics = new Metrics();

try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
runtimeMetrics.recordAppendBufferDiscarded();
assertMetricGauge(metrics, kafkaMetricName(metrics, APPEND_BUFFER_SKIP_CACHE_COUNT_METRIC_NAME), 1);
}
}

@Test
public void testAppendBufferSkipCacheCountMetricsGroupIsolation() {
Time time = new MockTime();
Metrics metrics = new Metrics(time);

try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
runtimeMetrics.recordAppendBufferDiscarded();

assertMetricGauge(metrics, kafkaMetricName(metrics, APPEND_BUFFER_SKIP_CACHE_COUNT_METRIC_NAME), 1);
assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, APPEND_BUFFER_SKIP_CACHE_COUNT_METRIC_NAME), 0);
}
}

@ParameterizedTest
@ValueSource(strings = {
EVENT_QUEUE_TIME_METRIC_NAME,
Expand Down
Loading
Loading