Skip to content

Conversation

@DL1231
Copy link
Contributor

@DL1231 DL1231 commented Nov 7, 2025

Changes

  1. New Dynamic Configurations
  • group.coordinator.append.max.buffer.size: Largest buffer size allowed by GroupCoordinator
  • share.coordinator.append.max.buffer.size: Largest buffer size allowed by ShareCoordinator

Both configurations default to 1 * 1024 * 1024 + Records.LOG_OVERHEAD with minimum value of 512 * 1024.

  1. Extended CoordinatorRuntime Builder Interface

Added withMaxBufferSize(Supplier maxBufferSizeSupplier) method to allow different coordinator implementations to supply their buffer size configuration.

  1. New Monitoring Metrics
  • coordinator-append-buffer-size-bytes: Current total size in bytes of the append buffers being held in the coordinator's cache
  • coordinator-append-buffer-skip-cache-count: Count of oversized append buffers that were discarded instead of being cached upon release

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka clients group-coordinator labels Nov 7, 2025
@AndrewJSchofield AndrewJSchofield requested a review from smjn November 7, 2025 10:58
@AndrewJSchofield AndrewJSchofield removed the triage PRs from the community label Nov 7, 2025
Copy link
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! I left a few comments. I haven't reviewed the full PR yet.

/**
* Return total size in bytes of cached buffers.
*/
public abstract long size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size implementations aren't thread safe, but they will be called from a metrics thread.

Copy link
Contributor Author

@DL1231 DL1231 Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing out this issue. As BufferSupplier is designed to be a "simple non-threadsafe interface for caching byte buffers," I've made the following targeted changes to address the specific concerns with the size() method when called from metrics threads:

  • Changed the bufferMap in DefaultSupplier to a ConcurrentHashMap to prevent potential ConcurrentModificationException.
  • Added the volatile keyword to cachedBuffer in GrowableBufferSupplier to resolve visibility issues.

I intentionally avoided adding synchronization to the get(), release(), and size() methods to maintain performance, as comprehensive thread safety isn't a design goal for this interface. These changes specifically address the metrics collection use case while preserving the lightweight. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR!

Avoiding locks is the right thing to do but the fixes do not look complete.

  • The ArrayDeques within DefaultSupplier suffer from the same visibility issue as cachedBuffer.
  • The first read of cachedBuffer within GrowableBufferSupplier.size can be non-null while the second read can be null.

Instead I would propose we introduce a size AtomicLong and update them in get and release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants