-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-17541:[2/2] Improve handling of delivery count #20837
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
apoorvmittal10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I have doubts for the approach. Can you please help me explain.
| int numRecordsRemaining = maxRecordsToAcquire - acquiredCount; | ||
| boolean recordLimitSubsetMatch = isRecordLimitMode && checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire, acquiredCount); | ||
| if (!fullMatch || inFlightBatch.offsetState() != null || recordLimitSubsetMatch) { | ||
| if (!fullMatch || inFlightBatch.offsetState() != null || recordLimitSubsetMatch || inFlightBatch.batchDeliveryCount() >= 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have defined the constant above but have used directly 2 here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also can you help explain the reasoning here for the condition added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made the updates, PTAL.
For batches or records with a deliveryCount >= 3, we consider that bad records exist within the batch. Therefore, depending on the specific situation (such as whether there are pending records to be sent), we will determine the sending behavior for each record.
| * Records whose delivery count exceeds this are deemed abnormal, | ||
| * and the batching of these records should be reduced. | ||
| */ | ||
| private static final int BAD_RECORD_DELIVERY_THRESHOLD = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be 3 as default.
| // If the record has any pending deliveries, return immediately and do not deliver the current bad record. | ||
| if (offsetState.getValue().deliveryCount() >= BAD_RECORD_DELIVERY_THRESHOLD && (hasBeenAcquired > 0 || acquiredCount > 0)) { | ||
| return -acquiredCount; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't understand what we are verifying here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the deliveryCount of the current record is greater than or equal to the threshold, it indicates that the current record is a bad record and may fail to be delivered.
- If there are already pending deliveries, we should immediately send those to avoid being affected by the bad record.
- If there are no pending deliveries, the bad record should be sent individually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide example in your PR description i.e. what exactly we have handled in the PR. The changes seems very confusing hence wanted to come to common understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example Scenario
Record offsets and delivery counts:
Offset: 10, 11, 12, 13, 14, 15
Delivery Count: 1, 2, 3, 2, 2, 1
(Assuming BAD_RECORD_DELIVERY_THRESHOLD = 3 - offset 12 is a bad record)
Before Changes:
- Case 1:
fetchStartOffset = 10,maxFetchRecords = 6- Returns records from offset 10-15 (including the bad record at offset 12)
- Case 2:
fetchStartOffset = 12,maxFetchRecords = 6- Returns records from offset 12-15 (starting with the bad record)
After Changes:
- Case 1:
fetchStartOffset = 10,maxFetchRecords = 6- Returns records from offset 10-11 only (stops at the first bad record)
- Case 2:
fetchStartOffset = 12,maxFetchRecords = 6- Returns only offset 12 (the bad record itself, since there are no preceding good records)
adixitconfluent
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Overall I didn't get the concept to use a negative number as acquired count. Maybe comment some examples to show how using a negative number for acquired count is useful.
| // to prevent acquiring any new records afterwards. | ||
| if (acquiredSubsetCount < 0) { | ||
| maxRecordsToAcquire = -1; | ||
| acquiredCount += acquiredSubsetCount == Integer.MIN_VALUE ? 0 : -1 * acquiredSubsetCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little confusing to me. Maybe take an example and explain what are we trying to achieve here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The negative return values serve as special flags to handle bad records while maintaining data delivery:
Example Scenarios:
- Normal case (no bad records):
- Returns
3→ acquired 3 records normally - Processing continues until
maxRecordsToAcquireis reached
- Returns
- Bad record encountered after some good records:
- Acquired 3 good records, then hits a bad record
- Returns
-3→ signals "deliver the 3 good records AND stop further acquisition due to bad record" - Outer code:
acquiredCount += -(-3) = +3
- First record is bad (edge case):
- First record is bad, zero records acquired
- Returns
Integer.MIN_VALUE→ signals "no records to deliver AND stop due to bad record" - Outer code:
acquiredCount += 0(no increment)
The negative return values act as "circuit breakers" - they tell the outer loop to deliver what we have and stop further processing in that batch.
apoorvmittal10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, can you please add examples in PR description around the fix in the PR.
| // If the record has any pending deliveries, return immediately and do not deliver the current bad record. | ||
| if (offsetState.getValue().deliveryCount() >= BAD_RECORD_DELIVERY_THRESHOLD && (hasBeenAcquired > 0 || acquiredCount > 0)) { | ||
| return -acquiredCount; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide example in your PR description i.e. what exactly we have handled in the PR. The changes seems very confusing hence wanted to come to common understanding.
|
@DL1231 I am not sure if we are on same page with the change. I am trying to write the problem statement and probable solution below: Problem Statement: Solution:
|
|
@apoorvmittal10 Thanks for your patient reply. I'd like to follow up with another question: If offset 30 is the bad record.
In this scenario, records 1-125 would have been delivered 5 times and eventually get archived, while the actual bad record at offset 30 continues to cause failures. So this solution essentially reduces the impact range by about 3/4, but doesn't completely isolate the bad record. Is my understanding correct? This seems to minimize the collateral damage rather than surgically removing the problematic record. |
No, next acquisition will be only offset 1 not 1-499. This is determined by looking at the offset delivery count, offset 1 will also be at limit of delivery count and only last attempt will be pending. Continuing with previous example, this will continue till offset 124 i.e. single record is being acquired as all of them are in final delivery attempt. Then whole 125 - 499 will be acquired as that's not in the final attempt. |
|
@apoorvmittal10 Thanks for the explanation. I get it now. Regarding the scenario with multiple batches:
After the first acquisition fails and we retry by fetching half of the first batch (0-50), if we haven't reached maxFetchRecords yet, should we:
|
I think we should only deal with single batch in these scenarios. |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Just a few comments and I see that the approach is being discussed in the comments.
| * Records whose delivery count exceeds this are deemed abnormal, | ||
| * and the batching of these records should be reduced. | ||
| */ | ||
| private static final int BAD_RECORD_DELIVERY_THRESHOLD = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configured delivery count limit can range from 2 to 10 inclusive. If the configured value is 2, we need to set the threshold as 2. If the configured value is larger, maybe half of the configured value would be a good threshold. So, for config=5, use 3 (default). For config=10, use 5. And so on.
For records with a delivery count exceeding 2, there is suspicion that
delivery failures stem from underlying issues rather than natural
retry scenarios. The batching of such records should be reduced.
Implementation Approach:
record is delivered.
pending records are delivered, and the current bad record is skipped.
Example Scenario
Record offsets and delivery counts:
(Assuming
BAD_RECORD_DELIVERY_THRESHOLD = 3- offset 12 is a badrecord)
Before Changes:
fetchStartOffset = 10,maxFetchRecords = 6offset 12)
fetchStartOffset = 12,maxFetchRecords = 6After Changes:
fetchStartOffset = 10,maxFetchRecords = 6record)
fetchStartOffset = 12,maxFetchRecords = 6preceding good records)