Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
totalCacheSize = totalCacheSize(applicationConfigs);
final int numStreamThreads = topologyMetadata.numStreamThreads(applicationConfigs);
final long cacheSizePerThread = cacheSizePerThread(numStreamThreads);
final long maxUncommittedBytesPerThread = maxUncommittedBytesPerThread(numStreamThreads);

GlobalStreamThread.State globalThreadState = null;
if (hasGlobalTopology) {
Expand All @@ -1022,6 +1023,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)),
stateDirectory,
cacheSizePerThread,
maxUncommittedBytesPerThread,
streamsMetrics,
time,
globalThreadId,
Expand All @@ -1043,7 +1045,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
globalStateStoreProvider,
applicationConfigs::defaultInteractiveQueryIsolationLevel);
for (int i = 1; i <= numStreamThreads; i++) {
createAndAddStreamThread(cacheSizePerThread, i);
createAndAddStreamThread(cacheSizePerThread, maxUncommittedBytesPerThread, i);
}

stateDirCleaner = setupStateDirCleaner();
Expand All @@ -1066,7 +1068,9 @@ private void throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol() {
}
}

private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final int threadIdx) {
private StreamThread createAndAddStreamThread(final long cacheSizePerThread,
final long maxUncommittedBytesPerThread,
final int threadIdx) {
final StreamThread streamThread = StreamThread.create(
topologyMetadata,
applicationConfigs,
Expand All @@ -1078,6 +1082,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
time,
streamsMetadataState,
cacheSizePerThread,
maxUncommittedBytesPerThread,
stateDirectory,
delegatingStateRestoreListener,
delegatingStandbyUpdateListener,
Expand Down Expand Up @@ -1124,12 +1129,14 @@ public Optional<String> addStreamThread() {
final int threadIdx = nextThreadIndex();
final int numLiveThreads = numLiveStreamThreads();
final long cacheSizePerThread = cacheSizePerThread(numLiveThreads + 1);
final long maxUncommittedBytesPerThread = maxUncommittedBytesPerThread(numLiveThreads + 1);
log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
threadIdx, numLiveThreads + 1, cacheSizePerThread);
resizeThreadCache(cacheSizePerThread);
resizeMaxUncommittedBytes(maxUncommittedBytesPerThread);
// Creating thread should hold the lock in order to avoid duplicate thread index.
// If the duplicate index happen, the metadata of thread may be duplicate too.
streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx);
streamThread = createAndAddStreamThread(cacheSizePerThread, maxUncommittedBytesPerThread, threadIdx);
}

synchronized (stateLock) {
Expand All @@ -1143,6 +1150,7 @@ public Optional<String> addStreamThread() {
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
resizeThreadCache(cacheSizePerThread);
resizeMaxUncommittedBytes(maxUncommittedBytesPerThread(numLiveStreamThreads()));
return Optional.empty();
}
}
Expand Down Expand Up @@ -1223,6 +1231,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread);
resizeThreadCache(cacheSizePerThread);
resizeMaxUncommittedBytes(maxUncommittedBytesPerThread(numLiveStreamThreads()));
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
if (remainingTimeMs <= 0) {
throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time");
Expand Down Expand Up @@ -1324,13 +1333,29 @@ private long cacheSizePerThread(final int numStreamThreads) {
return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
}

private long maxUncommittedBytesPerThread(final int numStreamThreads) {
final long totalMax = applicationConfigs.getLong(StreamsConfig.STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG);
if (totalMax <= 0) {
return -1;
}
final int divisor = Math.max(numStreamThreads, 0) + (topologyMetadata.hasGlobalTopology() ? 1 : 0);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may seem strange to floor numStreamThreads at 0, since StreamsConfig.NUM_STREAM_THREADS has a minimum of 1, but it's actually possible for there to be 0 stream threads... If you removeStreamThread the last remaining stream thread; or if you replaceStreamThread, there's a narrow window where the old thread has been removed and the new one has not registered yet.

return divisor == 0 ? totalMax : totalMax / divisor;
}

private void resizeThreadCache(final long cacheSizePerThread) {
processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
if (globalStreamThread != null) {
globalStreamThread.resize(cacheSizePerThread);
}
}

private void resizeMaxUncommittedBytes(final long maxUncommittedBytesPerThread) {
processStreamThread(thread -> thread.resizeMaxUncommittedBytes(maxUncommittedBytesPerThread));
if (globalStreamThread != null) {
globalStreamThread.resizeMaxUncommittedBytes(maxUncommittedBytesPerThread);
}
}

private ScheduledExecutorService setupStateDirCleaner() {
return Executors.newSingleThreadScheduledExecutor(r -> {
final Thread thread = new Thread(r, clientId + "-CleanupThread");
Expand Down
15 changes: 15 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,15 @@ public class StreamsConfig extends AbstractConfig {
@Deprecated
public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads";

/** {@code statestore.uncommitted.max.bytes} */
public static final String STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG = "statestore.uncommitted.max.bytes";
private static final String STATESTORE_UNCOMMITTED_MAX_BYTES_DOC =
"The maximum number of uncommitted bytes across all transactional state stores on this " +
"application instance before an early commit is triggered, regardless of commit.interval.ms. " +
"The limit is divided equally across the configured number of stream threads, and the global state thread," +
"if the topology has any global stores. Set to -1 to disable. Default is 67108864 (64 MB).";
private static final long STATESTORE_UNCOMMITTED_MAX_BYTES_DEFAULT = 67_108_864L;

/** {@code task.assignor.class} */
@SuppressWarnings("WeakerAccess")
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
Expand Down Expand Up @@ -959,6 +968,12 @@ public class StreamsConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
STATESTORE_CACHE_MAX_BYTES_DOC)
.define(STATESTORE_UNCOMMITTED_MAX_BYTES_CONFIG,
Type.LONG,
STATESTORE_UNCOMMITTED_MAX_BYTES_DEFAULT,
atLeast(-1),
Importance.LOW,
STATESTORE_UNCOMMITTED_MAX_BYTES_DOC)
.define(CLIENT_ID_CONFIG,
Type.STRING,
"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ default <R> QueryResult<R> query(
return QueryResult.forUnknownQueryType(query, this);
}

/**
* Returns an approximation of the number of uncommitted bytes currently buffered in this store's
* transaction buffer, or 0 if the store is not transactional. This is used to trigger early
* commits when the buffer grows too large.
*/
default long approximateNumUncommittedBytes() {
return 0;
}

/**
* Returns the position the state store is at with respect to the input topic/partitions
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ interface GlobalStateMaintainer {
void update(ConsumerRecord<byte[], byte[]> record);

void maybeCheckpoint();

long approximateNumUncommittedBytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public interface GlobalStateManager extends StateManager {
* @throws StreamsException if the store's change log does not contain the partition
*/
Set<String> initialize();

long approximateNumUncommittedBytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,17 @@ private long maybeUpdateDeadlineOrThrow(final long currentDeadlineMs) {
return currentDeadlineMs;
}

@Override
public long approximateNumUncommittedBytes() {
long total = 0;
for (final Optional<StateStore> entry : globalStores.values()) {
if (entry.isPresent()) {
total += entry.get().approximateNumUncommittedBytes();
}
}
return total;
}

@Override
public void commit() {
log.debug("Committing all global globalStores registered in the state manager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ private void initTopology() {
}
}

@Override
public long approximateNumUncommittedBytes() {
return stateMgr.approximateNumUncommittedBytes();
}

@Override
public void maybeCheckpoint() {
final long now = time.milliseconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class GlobalStreamThread extends Thread {
private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
private final CountDownLatch initializationLatch = new CountDownLatch(1);
private volatile long maxUncommittedBytes;

/**
* The states that the global stream thread can be in
Expand Down Expand Up @@ -202,6 +203,7 @@ public GlobalStreamThread(final ProcessorTopology topology,
final Consumer<byte[], byte[]> globalConsumer,
final StateDirectory stateDirectory,
final long cacheSizeBytes,
final long maxUncommittedBytes,
final StreamsMetricsImpl streamsMetrics,
final Time time,
final String threadClientId,
Expand All @@ -221,6 +223,7 @@ public GlobalStreamThread(final ProcessorTopology topology,
this.stateRestoreListener = stateRestoreListener;
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheSize = new AtomicLong(-1L);
this.maxUncommittedBytes = maxUncommittedBytes;
}

static class StateConsumer {
Expand Down Expand Up @@ -259,6 +262,14 @@ void pollAndUpdate() {
stateMaintainer.maybeCheckpoint();
}

void flushState() {
stateMaintainer.flushState();
}

long approximateNumUncommittedBytes() {
return stateMaintainer.approximateNumUncommittedBytes();
}

public void close(final boolean wipeStateStore) throws IOException {
try {
globalConsumer.close();
Expand Down Expand Up @@ -301,6 +312,13 @@ public void run() {
}
stateConsumer.pollAndUpdate();

final long uncommittedLimit = maxUncommittedBytes;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to preempt some feedback here and clarify why we copy maxUncommittedBytes into a local uncommittedLimit variable here.

The reason is that maxUncommittedBytes can be modified by another thread (the main thread) via resizeMaxUncommittedBytes below, when adding/removing StreamThreads to the running application. So we copy its value to a local temporary variable to ensure the value remains consistent across the 3 usages below.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that but I'm wondering if we used AtomicLong would offer any benefit over the current approach?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The crucial thing here is that the following few lines use a consistent value, so we would still need to copy it out of the AtomicLong into a temporary variable. maxUncommittedBytes is already marked volatile, so updates to it from other threads should be immediately published to the GlobalStreamThread (same goes for maxUncommittedBytesPerThread in StreamThread).

if (uncommittedLimit > 0
&& stateConsumer.approximateNumUncommittedBytes() > uncommittedLimit) {
log.debug("Committing global state: uncommitted bytes exceeded {}", uncommittedLimit);
stateConsumer.flushState();
}

if (fetchDeadlineClientInstanceId != -1) {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
Expand Down Expand Up @@ -372,6 +390,10 @@ public void resize(final long cacheSize) {
this.cacheSize.set(cacheSize);
}

public void resizeMaxUncommittedBytes(final long maxUncommittedBytes) {
this.maxUncommittedBytes = maxUncommittedBytes;
}

private StateConsumer initialize() {
StateConsumer stateConsumer = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,14 @@ public void flushCache() {
}
}

long approximateNumUncommittedBytes() {
long total = 0;
for (final StateStoreMetadata metadata : stores.values()) {
total += metadata.stateStore.approximateNumUncommittedBytes();
}
return total;
}

/**
* {@link StateStore#close() Close} all stores (even in case of failure).
* Log all exceptions and re-throw the first exception that occurred at the end.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,9 @@ public Optional<Long> timeCurrentIdlingStarted() {
public ProcessorStateManager stateManager() {
throw new UnsupportedOperationException("This task is read-only");
}

@Override
public long approximateNumUncommittedBytes() {
throw new UnsupportedOperationException("This task is read-only");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,11 @@ public String toString(final String indent) {
return sb.toString();
}

@Override
public long approximateNumUncommittedBytes() {
return stateMgr.approximateNumUncommittedBytes();
}

@Override
public boolean commitNeeded() {
// we need to do an extra check if the flag was false, that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ public boolean isStartingRunningOrPartitionAssigned() {
private final Sensor commitRatioSensor;
private final Sensor failedStreamThreadSensor;

private volatile long maxUncommittedBytesPerThread;
private final long logSummaryIntervalMs; // the count summary log output time interval
private long lastLogSummaryMs = -1L;
private long totalRecordsProcessedSinceLastSummary = 0L;
Expand Down Expand Up @@ -406,6 +407,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
final Time time,
final StreamsMetadataState streamsMetadataState,
final long cacheSizeBytes,
final long maxUncommittedBytesPerThread,
final StateDirectory stateDirectory,
final StateRestoreListener userStateRestoreListener,
final StandbyUpdateListener userStandbyUpdateListener,
Expand Down Expand Up @@ -542,7 +544,8 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
cache::resize,
mainConsumerSetup.streamsRebalanceData,
streamsMetadataState,
metricsReporter
metricsReporter,
maxUncommittedBytesPerThread
);

return streamThread.updateThreadMetadata(adminClientId(clientId));
Expand Down Expand Up @@ -790,7 +793,8 @@ public StreamThread(final Time time,
final java.util.function.Consumer<Long> cacheResizer,
final Optional<StreamsRebalanceData> streamsRebalanceData,
final StreamsMetadataState streamsMetadataState,
final StreamsThreadMetricsDelegatingReporter metricsReporter
final StreamsThreadMetricsDelegatingReporter metricsReporter,
final long maxUncommittedBytesPerThread
) {
super(threadId);
this.stateLock = new Object();
Expand Down Expand Up @@ -885,6 +889,7 @@ public StreamThread(final Time time,
this.eosEnabled = eosEnabled(config);
this.processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals());
this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
this.maxUncommittedBytesPerThread = maxUncommittedBytesPerThread;

this.streamsRebalanceData = streamsRebalanceData;
this.streamsMetadataState = streamsMetadataState;
Expand Down Expand Up @@ -1816,7 +1821,7 @@ public void signalResume() {
// visible for testing
int maybeCommit() {
final int committed;
if (now - lastCommitMs > commitTimeMs) {
if (now - lastCommitMs > commitTimeMs || shouldCommitDueToUncommittedBytes()) {
if (log.isDebugEnabled()) {
log.debug("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)",
taskManager.activeRunningTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
Expand Down Expand Up @@ -1849,6 +1854,16 @@ int maybeCommit() {
return committed;
}

private boolean shouldCommitDueToUncommittedBytes() {
final long limit = maxUncommittedBytesPerThread;
if (limit <= 0) return false;
return taskManager.totalUncommittedBytes() > limit;
}

public void resizeMaxUncommittedBytes(final long maxUncommittedBytesPerThread) {
this.maxUncommittedBytesPerThread = maxUncommittedBytesPerThread;
}

/**
* Compute the latency based on the current marked timestamp, and update the marked timestamp
* with the current system timestamp.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ default boolean needsInitializationOrRestoration() {

boolean commitNeeded();

default long approximateNumUncommittedBytes() {
return 0;
}

default boolean commitRequested() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,14 @@ Map<TaskId, Task> allRunningTasks() {
return tasks.allInitializedTasksPerId();
}

long totalUncommittedBytes() {
long total = 0;
for (final Task task : allRunningTasks().values()) {
total += task.approximateNumUncommittedBytes();
}
return total;
}

Set<Task> readOnlyAllTasks() {
// not bothering with an unmodifiable map, since the tasks themselves are mutable, but
// if any outside code modifies the map or the tasks, it would be a severe transgression.
Expand Down
Loading
Loading