Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ enum SharePartitionState {
*/
private final int maxDeliveryCount;

/**
* Records whose delivery count exceeds this are deemed abnormal,
* and the batching of these records should be reduced.
*/
private final int badRecordDeliveryThreshold;
/**
* The group config manager is used to retrieve the values for dynamic group configurations
*/
Expand Down Expand Up @@ -362,6 +367,7 @@ enum SharePartitionState {
this.leaderEpoch = leaderEpoch;
this.maxInFlightRecords = maxInFlightRecords;
this.maxDeliveryCount = maxDeliveryCount;
this.badRecordDeliveryThreshold = (int) Math.ceil((double) maxDeliveryCount / 2);
this.cachedState = new ConcurrentSkipListMap<>();
this.lock = new ReentrantReadWriteLock();
this.findNextFetchOffset = false;
Expand Down Expand Up @@ -834,7 +840,14 @@ public ShareAcquiredRecords acquire(
boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastOffsetToAcquire);
int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
boolean recordLimitSubsetMatch = isRecordLimitMode && checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire, acquiredCount);
if (!fullMatch || inFlightBatch.offsetState() != null || recordLimitSubsetMatch) {
boolean deliveryCountExceed = checkForBadRecordsDeliveryCount(inFlightBatch);
// Stop acquiring more records if bad record found after acquiring some data to
// prevent affecting already acquired records
if (deliveryCountExceed && acquiredCount > 0) {
maxRecordsToAcquire = -1;
break;
}
if (!fullMatch || inFlightBatch.offsetState() != null || recordLimitSubsetMatch || deliveryCountExceed) {
log.trace("Subset or offset tracked batch record found for share partition,"
+ " batch: {} request offsets - first: {}, last: {} for the share"
+ " partition: {}-{}", inFlightBatch, firstBatch.baseOffset(),
Expand All @@ -859,6 +872,13 @@ public ShareAcquiredRecords acquire(
// maxRecordsToAcquire. Hence, pass the remaining number of records that can
// be acquired.
int acquiredSubsetCount = acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining, firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
// If a bad record is present, return immediately and set `maxRecordsToAcquire = -1`
// to prevent acquiring any new records afterwards.
if (acquiredSubsetCount < 0) {
maxRecordsToAcquire = -1;
acquiredCount += -1 * acquiredSubsetCount;
break;
}
acquiredCount += acquiredSubsetCount;
continue;
}
Expand Down Expand Up @@ -1866,6 +1886,8 @@ private int acquireSubsetBatchRecords(
) {
lock.writeLock().lock();
int acquiredCount = 0;
long maxFetchRecordsWhileBadRecord = -1;
boolean hasBadRecord = false;
try {
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState().entrySet()) {
// For the first batch which might have offsets prior to the request base
Expand All @@ -1885,7 +1907,21 @@ private int acquireSubsetBatchRecords(
continue;
}

InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE,
int recordDeliveryCount = offsetState.getValue().deliveryCount();
// On last delivery attempt, submit acquired records,
// bad record will be delivered alone next time
if (recordDeliveryCount == maxDeliveryCount - 1 && acquiredCount > 0) {
hasBadRecord = true;
break;
}

// On repeated delivery failures (>= badRecordDeliveryThreshold), progressively reduce batch size to isolate bad record
if (recordDeliveryCount >= badRecordDeliveryThreshold && maxFetchRecordsWhileBadRecord < 0) {
maxFetchRecordsWhileBadRecord = Math.max(1, (long) inFlightBatch.offsetState().size() >> (recordDeliveryCount - badRecordDeliveryThreshold + 1));
hasBadRecord = true;
}

InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE,
maxDeliveryCount, memberId);
if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) {
log.trace("Unable to acquire records for the offset: {} in batch: {}"
Expand All @@ -1904,15 +1940,24 @@ private int acquireSubsetBatchRecords(
.setLastOffset(offsetState.getKey())
.setDeliveryCount((short) offsetState.getValue().deliveryCount()));
acquiredCount++;

// Delivered alone.
if (offsetState.getValue().deliveryCount() == maxDeliveryCount) {
hasBadRecord = true;
break;
}
if (isRecordLimitMode && acquiredCount == maxFetchRecords) {
// In record_limit mode, acquire only the requested number of records.
break;
}
if (hasBadRecord && acquiredCount == maxFetchRecordsWhileBadRecord) {
break;
}
}
} finally {
lock.writeLock().unlock();
}
return acquiredCount;
return hasBadRecord ? -acquiredCount : acquiredCount;
}

/**
Expand Down Expand Up @@ -1942,6 +1987,19 @@ private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batch
return batchFirstOffset < localStartOffset && batchLastOffset >= localStartOffset;
}

/**
* Check if the in-flight batch contains bad records based on delivery count.
*
* @param inFlightBatch The in-flight batch to check for bad records.
* @return True if the batch contains bad records (delivery count >= threshold), false otherwise.
*/
private boolean checkForBadRecordsDeliveryCount(InFlightBatch inFlightBatch) {
if (inFlightBatch.offsetState() == null) {
return inFlightBatch.batchDeliveryCount() >= badRecordDeliveryThreshold;
}
return inFlightBatch.offsetState().values().stream().mapToInt(InFlightState::deliveryCount).max().orElse(0) >= badRecordDeliveryThreshold;
}

// Visibility for test
static Map<Long, Byte> fetchAckTypeMapForBatch(ShareAcknowledgementBatch batch) {
// Client can either send a single entry in acknowledgeTypes which represents the state
Expand Down
Loading
Loading