KAFKA-20491: Add uncommitted bytes limit#22597
Conversation
| } | ||
| stateConsumer.pollAndUpdate(); | ||
|
|
||
| final long uncommittedLimit = maxUncommittedBytes; |
There was a problem hiding this comment.
I'm going to preempt some feedback here and clarify why we copy maxUncommittedBytes into a local uncommittedLimit variable here.
The reason is that maxUncommittedBytes can be modified by another thread (the main thread) via resizeMaxUncommittedBytes below, when adding/removing StreamThreads to the running application. So we copy its value to a local temporary variable to ensure the value remains consistent across the 3 usages below.
| if (totalMax <= 0) { | ||
| return -1; | ||
| } | ||
| final int divisor = Math.max(numStreamThreads, 0) + (topologyMetadata.hasGlobalTopology() ? 1 : 0); |
There was a problem hiding this comment.
It may seem strange to floor numStreamThreads at 0, since StreamsConfig.NUM_STREAM_THREADS has a minimum of 1, but it's actually possible for there to be 0 stream threads... If you removeStreamThread the last remaining stream thread; or if you replaceStreamThread, there's a narrow window where the old thread has been removed and the new one has not registered yet.
|
@nicktelford there's a compile error |
@bbejeck Sorry about that. Should be good now! |
No worries, thanks! |
Introduces the `statestore.uncommitted.max.bytes` config, which triggers an early commit when the total uncommitted bytes across all transactional state stores on a thread exceeds the limit, regardless of `commit.interval.ms`. This bounds the memory consumed by pending write buffers under high write throughput. The limit is divided equally across StreamThreads and the GlobalStreamThread. Each thread independently enforces its share: StreamThreads trigger an early task commit via maybeCommit(); the GlobalStreamThread flushes at the end of each pollAndUpdate() cycle. StateStore gains a default approximateNumUncommittedBytes() method (returning 0); adapter and wrapper classes delegate to the inner store. Metered key-value stores register an uncommitted-bytes gauge. Segmented and versioned stores aggregate across their underlying segments. The per-thread limit is recomputed whenever stream threads are dynamically added or removed via addStreamThread()/removeStreamThread(), mirroring the existing thread-cache resize logic. The per-thread limit field is volatile so running threads pick up the new value on their next commit check without synchronisation. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
9af58e4 to
8a02381
Compare
|
@bbejeck There was a test failure; I have fixed it and rebased on latest trunk. |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks @nicktelford - LGTM just have a couple of minor nits otherwise ready for merge
| } | ||
| stateConsumer.pollAndUpdate(); | ||
|
|
||
| final long uncommittedLimit = maxUncommittedBytes; |
There was a problem hiding this comment.
I get that but I'm wondering if we used AtomicLong would offer any benefit over the current approach?
There was a problem hiding this comment.
I don't think so. The crucial thing here is that the following few lines use a consistent value, so we would still need to copy it out of the AtomicLong into a temporary variable. maxUncommittedBytes is already marked volatile, so updates to it from other threads should be immediately published to the GlobalStreamThread (same goes for maxUncommittedBytesPerThread in StreamThread).
Introduces the
statestore.uncommitted.max.bytesconfig, which triggersan early commit when the total uncommitted bytes across all
transactional state stores on a thread exceeds the configured limit,
regardless of
commit.interval.ms. This bounds the memory consumed bypending write buffers under high write throughput.
The limit is divided equally across StreamThreads and the
GlobalStreamThread. Each thread independently enforces its share:
StreamThreads trigger an early task commit via
maybeCommit(); theGlobalStreamThread flushes at the end of each
pollAndUpdate()cycle.StateStoregains a defaultapproximateNumUncommittedBytes()method(returning 0); adapter and wrapper classes delegate to the inner store.
Metered key-value stores register an uncommitted-bytes gauge. Segmented
and versioned stores aggregate across their underlying segments.
The per-thread limit is recomputed whenever stream threads are
dynamically added or removed via
addStreamThread()/removeStreamThread(), mirroring the existingthread-cache resize logic. The per-thread limit field is
volatilesorunning threads pick up the new value on their next commit check without
synchronisation.
This is part of
KIP-892.
Reviewers: Bill Bejeck bbejeck@apache.org