diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index ef07688274d4b..02b50ce857980 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -201,6 +201,11 @@ enum SharePartitionState { */ private final int maxDeliveryCount; + /** + * Records whose delivery count exceeds this are deemed abnormal, + * and the batching of these records should be reduced. + */ + private final int badRecordDeliveryThreshold; /** * The group config manager is used to retrieve the values for dynamic group configurations */ @@ -362,6 +367,7 @@ enum SharePartitionState { this.leaderEpoch = leaderEpoch; this.maxInFlightRecords = maxInFlightRecords; this.maxDeliveryCount = maxDeliveryCount; + this.badRecordDeliveryThreshold = (int) Math.ceil((double) maxDeliveryCount / 2); this.cachedState = new ConcurrentSkipListMap<>(); this.lock = new ReentrantReadWriteLock(); this.findNextFetchOffset = false; @@ -834,7 +840,14 @@ public ShareAcquiredRecords acquire( boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastOffsetToAcquire); int numRecordsRemaining = maxRecordsToAcquire - acquiredCount; boolean recordLimitSubsetMatch = isRecordLimitMode && checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire, acquiredCount); - if (!fullMatch || inFlightBatch.offsetState() != null || recordLimitSubsetMatch) { + boolean deliveryCountExceed = checkForBadRecordsDeliveryCount(inFlightBatch); + // Stop acquiring more records if bad record found after acquiring some data to + // prevent affecting already acquired records + if (deliveryCountExceed && acquiredCount > 0) { + maxRecordsToAcquire = -1; + break; + } + if (!fullMatch || inFlightBatch.offsetState() != null || recordLimitSubsetMatch || deliveryCountExceed) { log.trace("Subset or offset tracked batch record found for share partition," + " batch: {} request offsets - first: {}, last: {} for the share" + " partition: {}-{}", inFlightBatch, firstBatch.baseOffset(), @@ -859,6 +872,13 @@ public ShareAcquiredRecords acquire( // maxRecordsToAcquire. Hence, pass the remaining number of records that can // be acquired. int acquiredSubsetCount = acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining, firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result); + // If a bad record is present, return immediately and set `maxRecordsToAcquire = -1` + // to prevent acquiring any new records afterwards. + if (acquiredSubsetCount < 0) { + maxRecordsToAcquire = -1; + acquiredCount += -1 * acquiredSubsetCount; + break; + } acquiredCount += acquiredSubsetCount; continue; } @@ -1866,6 +1886,8 @@ private int acquireSubsetBatchRecords( ) { lock.writeLock().lock(); int acquiredCount = 0; + long maxFetchRecordsWhileBadRecord = -1; + boolean hasBadRecord = false; try { for (Map.Entry offsetState : inFlightBatch.offsetState().entrySet()) { // For the first batch which might have offsets prior to the request base @@ -1885,7 +1907,21 @@ private int acquireSubsetBatchRecords( continue; } - InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, + int recordDeliveryCount = offsetState.getValue().deliveryCount(); + // On last delivery attempt, submit acquired records, + // bad record will be delivered alone next time + if (recordDeliveryCount == maxDeliveryCount - 1 && acquiredCount > 0) { + hasBadRecord = true; + break; + } + + // On repeated delivery failures (>= badRecordDeliveryThreshold), progressively reduce batch size to isolate bad record + if (recordDeliveryCount >= badRecordDeliveryThreshold && maxFetchRecordsWhileBadRecord < 0) { + maxFetchRecordsWhileBadRecord = Math.max(1, (long) inFlightBatch.offsetState().size() >> (recordDeliveryCount - badRecordDeliveryThreshold + 1)); + hasBadRecord = true; + } + + InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, maxDeliveryCount, memberId); if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) { log.trace("Unable to acquire records for the offset: {} in batch: {}" @@ -1904,15 +1940,24 @@ private int acquireSubsetBatchRecords( .setLastOffset(offsetState.getKey()) .setDeliveryCount((short) offsetState.getValue().deliveryCount())); acquiredCount++; + + // Delivered alone. + if (offsetState.getValue().deliveryCount() == maxDeliveryCount) { + hasBadRecord = true; + break; + } if (isRecordLimitMode && acquiredCount == maxFetchRecords) { // In record_limit mode, acquire only the requested number of records. break; } + if (hasBadRecord && acquiredCount == maxFetchRecordsWhileBadRecord) { + break; + } } } finally { lock.writeLock().unlock(); } - return acquiredCount; + return hasBadRecord ? -acquiredCount : acquiredCount; } /** @@ -1942,6 +1987,19 @@ private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batch return batchFirstOffset < localStartOffset && batchLastOffset >= localStartOffset; } + /** + * Check if the in-flight batch contains bad records based on delivery count. + * + * @param inFlightBatch The in-flight batch to check for bad records. + * @return True if the batch contains bad records (delivery count >= threshold), false otherwise. + */ + private boolean checkForBadRecordsDeliveryCount(InFlightBatch inFlightBatch) { + if (inFlightBatch.offsetState() == null) { + return inFlightBatch.batchDeliveryCount() >= badRecordDeliveryThreshold; + } + return inFlightBatch.offsetState().values().stream().mapToInt(InFlightState::deliveryCount).max().orElse(0) >= badRecordDeliveryThreshold; + } + // Visibility for test static Map fetchAckTypeMapForBatch(ShareAcknowledgementBatch batch) { // Client can either send a single entry in acknowledgeTypes which represents the state diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 047fffa517189..6564a59036997 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -1116,9 +1116,9 @@ public void testMaybeInitializeAndAcquire() { new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 0), new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), - new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 1))))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); @@ -1183,10 +1183,10 @@ public void testMaybeInitializeAndAcquire() { FETCH_ISOLATION_HWM), 13); - List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(15, 18, 3)); + List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(15, 18, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 2)); assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); assertEquals(31, sharePartition.nextFetchOffset()); @@ -1239,9 +1239,9 @@ public void testMaybeInitializeAndAcquireWithHigherMaxFetchRecords() { new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 0), new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), - new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 1))))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); @@ -1281,10 +1281,10 @@ public void testMaybeInitializeAndAcquireWithHigherMaxFetchRecords() { 37); List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(10, 14, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 2)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1)); assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); @@ -1320,9 +1320,9 @@ public void testMaybeInitializeAndAcquireWithFetchBatchLastOffsetWithinCachedBat new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 0), new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), - new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 1))))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); @@ -1362,10 +1362,10 @@ public void testMaybeInitializeAndAcquireWithFetchBatchLastOffsetWithinCachedBat 13); List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(12, 14, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4)); + expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2)); assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); assertEquals(28, sharePartition.nextFetchOffset()); @@ -1402,9 +1402,9 @@ public void testMaybeInitializeAndAcquireWithFetchBatchPriorStartOffset() { new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 0), new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), - new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 1))))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); @@ -1443,7 +1443,7 @@ public void testMaybeInitializeAndAcquireWithFetchBatchPriorStartOffset() { 10); List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(10, 14, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); @@ -1473,9 +1473,9 @@ public void testMaybeInitializeAndAcquireWithMultipleBatches() { new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 0), new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), - new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 1))))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); @@ -1581,7 +1581,7 @@ public void testMaybeInitializeAndAcquireWithMultipleBatches() { 10); List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(13, 14, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); @@ -1621,7 +1621,7 @@ public void testMaybeInitializeAndAcquireWithMultipleBatches() { FETCH_ISOLATION_HWM), 24); - expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26, 30, 4)); + expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26, 30, 2)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1)); assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); @@ -1645,9 +1645,9 @@ public void testMaybeInitializeAndAcquireWithMultipleBatchesAndLastOffsetWithinC new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 0), new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), - new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 1))))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); @@ -1694,10 +1694,10 @@ public void testMaybeInitializeAndAcquireWithMultipleBatchesAndLastOffsetWithinC 18); List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(7, 14, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4)); + expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2)); assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); assertEquals(28, sharePartition.nextFetchOffset()); @@ -1734,9 +1734,9 @@ public void testMaybeInitializeAndAcquireWithMultipleBatchesPriorStartOffset() { new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 0), new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), - new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 1))))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); @@ -1781,7 +1781,7 @@ public void testMaybeInitializeAndAcquireWithMultipleBatchesPriorStartOffset() { 10); List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(10, 14, 1)); - expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 1)); expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); @@ -3293,7 +3293,7 @@ public void testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightAvailableBa new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(21L, 30L, RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20 + new PersisterStateBatch(21L, 30L, RecordState.AVAILABLE.id, (short) 1), // There is a gap from 11 to 20 new PersisterStateBatch(31L, 40L, RecordState.ARCHIVED.id, (short) 1) )))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); @@ -3317,11 +3317,11 @@ public void testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightAvailableBa // The gap from 11 to 20 will be acquired. Since the next batch is AVAILABLE, and we records fetched from replica manager // overlap with the next batch, some records from the next batch will also be acquired List expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(11, 20, 1)); - expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 3)); - expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 3)); - expectedAcquiredRecord.addAll(expectedAcquiredRecord(23, 23, 3)); - expectedAcquiredRecord.addAll(expectedAcquiredRecord(24, 24, 3)); - expectedAcquiredRecord.addAll(expectedAcquiredRecord(25, 25, 3)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(23, 23, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(24, 24, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(25, 25, 2)); assertArrayEquals(expectedAcquiredRecord.toArray(), acquiredRecordsList.toArray()); assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); @@ -3348,7 +3348,7 @@ public void testAcquireWhenCachedStateContainsGapsAndRecordsFetchedFromNonGapOff new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(11L, 20L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(11L, 20L, RecordState.AVAILABLE.id, (short) 1), new PersisterStateBatch(31L, 40L, RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30 )))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); @@ -3366,7 +3366,7 @@ public void testAcquireWhenCachedStateContainsGapsAndRecordsFetchedFromNonGapOff // 2 different batches will be acquired this time (11-20 and 21-25). The first batch will have delivery count 3 // as previous deliveryCount was 2. The second batch will have delivery count 1 as it is acquired for the first time. - List expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(11, 20, 3)); + List expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(11, 20, 2)); expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 25, 1)); assertArrayEquals(expectedAcquiredRecord.toArray(), acquiredRecordsList.toArray()); @@ -3454,7 +3454,7 @@ public void testAcquireGapAtBeginningAndFetchedRecordsEndJustBeforeGap() { new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(21L, 30L, RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20 + new PersisterStateBatch(21L, 30L, RecordState.AVAILABLE.id, (short) 1), // There is a gap from 11 to 20 new PersisterStateBatch(41L, 50L, RecordState.ACKNOWLEDGED.id, (short) 1), // There is a gap from 31 to 40 new PersisterStateBatch(61L, 70L, RecordState.ARCHIVED.id, (short) 1) // There is a gap from 51 to 60 )))))); @@ -3473,7 +3473,7 @@ public void testAcquireGapAtBeginningAndFetchedRecordsEndJustBeforeGap() { // 1. 11-20 (gap offsets) // 2. 21-30 (AVAILABLE batch in cachedState) List expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(11, 20, 1)); - expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 30, 3)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 30, 2)); assertArrayEquals(expectedAcquiredRecord.toArray(), acquiredRecordsList.toArray()); assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); @@ -3661,7 +3661,7 @@ public void testAcquireMaxFetchRecordsExceededBeforeAcquiringGaps() { new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(11L, 20L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(11L, 20L, RecordState.AVAILABLE.id, (short) 1), new PersisterStateBatch(31L, 40L, RecordState.AVAILABLE.id, (short) 1) // There is a gap from 21-30 )))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); @@ -3688,7 +3688,7 @@ public void testAcquireMaxFetchRecordsExceededBeforeAcquiringGaps() { FETCH_ISOLATION_HWM), 10); - assertArrayEquals(expectedAcquiredRecord(11, 20, 3).toArray(), acquiredRecordsList.toArray()); + assertArrayEquals(expectedAcquiredRecord(11, 20, 2).toArray(), acquiredRecordsList.toArray()); assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); assertFalse(sharePartition.cachedState().isEmpty()); @@ -4418,11 +4418,11 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter assertEquals(0, sharePartition.deliveryCompleteCount()); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 1); - assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); - assertEquals(2, sharePartition.cachedState().get(10L).batchDeliveryCount()); - assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).offsetState().get(10L).state()); + assertEquals(2, sharePartition.cachedState().get(10L).offsetState().get(10L).deliveryCount()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire to archive the records that reach max delivery count. @@ -4431,13 +4431,13 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 0 && // After the second delivery attempt fails to acknowledge the record correctly, the record should be archived. - sharePartition.cachedState().get(10L).batchState() == RecordState.ARCHIVED && - sharePartition.cachedState().get(10L).batchDeliveryCount() == 2 && - sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null, + sharePartition.cachedState().get(10L).offsetState().get(10L).state() == RecordState.ARCHIVED && + sharePartition.cachedState().get(10L).offsetState().get(10L).deliveryCount() == 2 && + sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask() == null, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, - () -> assertionFailedMessage(sharePartition, Map.of(10L, List.of()))); + () -> assertionFailedMessage(sharePartition, Map.of(10L, List.of(10L, 11L, 12L, 13L, 14L, 15L, 16L, 17L, 18L, 19L)))); // After the acquisition lock expires for the second time, the records should be archived as the max delivery count is reached. - assertEquals(10, sharePartition.deliveryCompleteCount()); + assertEquals(1, sharePartition.deliveryCompleteCount()); } @Test @@ -4466,13 +4466,13 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE assertEquals(0, sharePartition.deliveryCompleteCount()); - fetchAcquiredRecords(sharePartition, memoryRecords(5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5), 1); assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()); - assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask()); - assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask()); - assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(3L).acquisitionLockTimeoutTask()); - assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(4L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(3L).acquisitionLockTimeoutTask()); + assertNull(sharePartition.cachedState().get(0L).offsetState().get(4L).acquisitionLockTimeoutTask()); assertNull(sharePartition.cachedState().get(0L).offsetState().get(5L).acquisitionLockTimeoutTask()); assertNull(sharePartition.cachedState().get(0L).offsetState().get(6L).acquisitionLockTimeoutTask()); assertNull(sharePartition.cachedState().get(0L).offsetState().get(7L).acquisitionLockTimeoutTask()); @@ -4485,17 +4485,17 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE () -> { Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(0L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); - expectedOffsetStateMap.put(1L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); - expectedOffsetStateMap.put(2L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); - expectedOffsetStateMap.put(3L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); - expectedOffsetStateMap.put(4L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(1L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(2L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(3L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(4L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); expectedOffsetStateMap.put(5L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); expectedOffsetStateMap.put(6L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); expectedOffsetStateMap.put(7L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); expectedOffsetStateMap.put(8L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); expectedOffsetStateMap.put(9L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); - return sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && + return sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 1 && expectedOffsetStateMap.equals(sharePartition.cachedState().get(0L).offsetState()); }, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, @@ -4512,9 +4512,9 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE assertNull(sharePartition.cachedState().get(0L).offsetState().get(8L).acquisitionLockTimeoutTask()); assertNull(sharePartition.cachedState().get(0L).offsetState().get(9L).acquisitionLockTimeoutTask()); - // Since only first 5 records from the batch are archived, the batch remains in the cachedState, but the + // Since only first 1 record from the batch are archived, the batch remains in the cachedState, but the // start offset is updated - assertEquals(5, sharePartition.startOffset()); + assertEquals(1, sharePartition.startOffset()); assertEquals(0, sharePartition.deliveryCompleteCount()); } @@ -4526,7 +4526,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(10), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(2), 2); assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); @@ -4543,9 +4543,15 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() assertEquals(0, sharePartition.deliveryCompleteCount()); - fetchAcquiredRecords(sharePartition, memoryRecords(10), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(2), 1); - assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()); + assertEquals(1, sharePartition.timer().size()); + + // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS); + fetchAcquiredRecords(sharePartition, memoryRecords(2), 1); + assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); // Allowing acquisition lock to expire to archive the records that reach max delivery count. @@ -4555,7 +4561,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() // After the second failed attempt to acknowledge the record batch successfully, the record batch is archived. // Since this is the first batch in the share partition, SPSO moves forward and the cachedState is cleared sharePartition.cachedState().isEmpty() && - sharePartition.nextFetchOffset() == 10, + sharePartition.nextFetchOffset() == 2, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> assertionFailedMessage(sharePartition, Map.of())); assertEquals(0, sharePartition.deliveryCompleteCount()); @@ -4734,9 +4740,9 @@ public void testDeliveryCompleteCountOnLockExpiryAndWriteFailureOnBatchLastDeliv () -> assertionFailedMessage(sharePartition, Map.of(5L, List.of()))); assertEquals(0, sharePartition.deliveryCompleteCount()); - fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 1); assertEquals(1, sharePartition.timer().size()); - assertNotNull(sharePartition.cachedState().get(15L).batchAcquisitionLockTimeoutTask()); + assertNotNull(sharePartition.cachedState().get(15L).offsetState().get(15L).acquisitionLockTimeoutTask()); assertEquals(0, sharePartition.deliveryCompleteCount()); // Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns false. @@ -4752,13 +4758,13 @@ public void testDeliveryCompleteCountOnLockExpiryAndWriteFailureOnBatchLastDeliv () -> sharePartition.timer().size() == 0 && sharePartition.nextFetchOffset() == 5 && sharePartition.cachedState().size() == 2 && - sharePartition.cachedState().get(15L).batchState() == RecordState.ARCHIVED && - sharePartition.cachedState().get(15L).batchAcquisitionLockTimeoutTask() == null, + sharePartition.cachedState().get(15L).offsetState().get(15L).state() == RecordState.ARCHIVED && + sharePartition.cachedState().get(15L).offsetState().get(15L).acquisitionLockTimeoutTask() == null, DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> assertionFailedMessage(sharePartition, Map.of(5L, List.of()))); // Even though the write state call failed, the records are still archived and deliveryCompleteCount is updated. - assertEquals(10, sharePartition.deliveryCompleteCount()); + assertEquals(1, sharePartition.deliveryCompleteCount()); } @Test @@ -5247,7 +5253,7 @@ public void testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcqu new ShareAcknowledgementBatch(10, 14, List.of(AcknowledgeType.RELEASE.id)))); assertEquals(0, sharePartition.deliveryCompleteCount()); - fetchAcquiredRecords(sharePartition, records2, 5); + fetchAcquiredRecords(sharePartition, records2, 1); CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertNull(releaseResult.join()); @@ -5255,8 +5261,7 @@ public void testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcqu assertEquals(0, sharePartition.nextFetchOffset()); assertEquals(2, sharePartition.cachedState().size()); - assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(10L).batchState()); - assertNull(sharePartition.cachedState().get(10L).offsetState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(10L).offsetState().get(10L).state()); assertEquals(0, sharePartition.deliveryCompleteCount()); } @@ -5285,12 +5290,18 @@ public void testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcqu assertEquals(3, sharePartition.deliveryCompleteCount()); - // Send next batch from offset 13, only 2 records should be acquired. - fetchAcquiredRecords(sharePartition, records1, 2); + // Send next batch from offset 13, only 1 records should be acquired. + fetchAcquiredRecords(sharePartition, records1, 1); + fetchAcquiredRecords(sharePartition, records1, 1); - // Send next batch from offset 15, only 2 records should be acquired. - fetchAcquiredRecords(sharePartition, records2, 2); - fetchAcquiredRecords(sharePartition, records3, 5); + // Send next batch from offset 15, only 1 records should be acquired. + fetchAcquiredRecords(sharePartition, records2, 1); + fetchAcquiredRecords(sharePartition, records2, 1); + fetchAcquiredRecords(sharePartition, records3, 1); + fetchAcquiredRecords(sharePartition, records3, 1); + fetchAcquiredRecords(sharePartition, records3, 1); + fetchAcquiredRecords(sharePartition, records3, 1); + fetchAcquiredRecords(sharePartition, records3, 1); CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertNull(releaseResult.join()); @@ -5302,9 +5313,8 @@ public void testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcqu assertNotNull(sharePartition.cachedState().get(10L).offsetState()); assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(15L).batchState()); assertNotNull(sharePartition.cachedState().get(10L).offsetState()); - assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).batchState()); - assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(20L).batchMemberId()); - assertNull(sharePartition.cachedState().get(20L).offsetState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(20L).offsetState().get(20L).state()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(20L).offsetState().get(20L).memberId()); Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 0, EMPTY_MEMBER_ID)); @@ -5352,11 +5362,17 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubsetCacheCleared() { // the start offset will be updated to 13. From the remaining offstes in flight, only records (17 -> 19) are in Terminal state. assertEquals(3, sharePartition.deliveryCompleteCount()); - // Send next batch from offset 13, only 2 records should be acquired. - fetchAcquiredRecords(sharePartition, records1, 2); - // Send next batch from offset 15, only 2 records should be acquired. - fetchAcquiredRecords(sharePartition, records2, 2); - fetchAcquiredRecords(sharePartition, records3, 5); + // Send next batch from offset 13, only 1 records should be acquired. + fetchAcquiredRecords(sharePartition, records1, 1); + fetchAcquiredRecords(sharePartition, records1, 1); + // Send next batch from offset 15, only 1 records should be acquired. + fetchAcquiredRecords(sharePartition, records2, 1); + fetchAcquiredRecords(sharePartition, records2, 1); + fetchAcquiredRecords(sharePartition, records3, 1); + fetchAcquiredRecords(sharePartition, records3, 1); + fetchAcquiredRecords(sharePartition, records3, 1); + fetchAcquiredRecords(sharePartition, records3, 1); + fetchAcquiredRecords(sharePartition, records3, 1); sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of( new ShareAcknowledgementBatch(13, 16, List.of(AcknowledgeType.RELEASE.id)), @@ -7782,7 +7798,7 @@ public void testMaybeUpdateCachedStateGapAfterLastOffsetAcknowledged() { new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), List.of( - new PersisterStateBatch(11L, 20L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(11L, 20L, RecordState.AVAILABLE.id, (short) 1), new PersisterStateBatch(31L, 40L, RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21 to 30 )))))); Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); @@ -8079,20 +8095,24 @@ public void testMaxDeliveryCountLimitExceededForRecordBatch() { .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .build(); - MemoryRecords records = memoryRecords(5, 10); + MemoryRecords records = memoryRecords(5, 2); + + fetchAcquiredRecords(sharePartition, records, 2); + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(5, 6, List.of(AcknowledgeType.RELEASE.id)))); - fetchAcquiredRecords(sharePartition, records, 10); + fetchAcquiredRecords(sharePartition, records, 1); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.RELEASE.id)))); + new ShareAcknowledgementBatch(5, 5, List.of(AcknowledgeType.RELEASE.id)))); - fetchAcquiredRecords(sharePartition, records, 10); + fetchAcquiredRecords(sharePartition, records, 1); sharePartition.acknowledge(MEMBER_ID, List.of( - new ShareAcknowledgementBatch(5, 14, List.of(AcknowledgeType.RELEASE.id)))); + new ShareAcknowledgementBatch(6, 6, List.of(AcknowledgeType.RELEASE.id)))); // All the records in the batch reached the max delivery count, hence they got archived and the cached state cleared. - assertEquals(15, sharePartition.nextFetchOffset()); - assertEquals(15, sharePartition.startOffset()); - assertEquals(15, sharePartition.endOffset()); + assertEquals(7, sharePartition.nextFetchOffset()); + assertEquals(7, sharePartition.startOffset()); + assertEquals(7, sharePartition.endOffset()); assertEquals(0, sharePartition.cachedState().size()); } @@ -8115,14 +8135,22 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubset() { new ShareAcknowledgementBatch(13, 16, List.of(AcknowledgeType.RELEASE.id)), new ShareAcknowledgementBatch(17, 19, List.of(AcknowledgeType.ACCEPT.id))))); - // Send next batch from offset 13, only 2 records should be acquired. - fetchAcquiredRecords(sharePartition, records1, 2); - // Send next batch from offset 15, only 2 records should be acquired. - fetchAcquiredRecords(sharePartition, records2, 2); + // Send next batch from offset 13, only 1 records should be acquired. + fetchAcquiredRecords(sharePartition, records1, 1); + // Send next batch from offset 15, only 1 records should be acquired. + fetchAcquiredRecords(sharePartition, records2, 1); sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(13, 16, List.of(AcknowledgeType.RELEASE.id)))); + // Send next batch from offset 14, only 1 records should be acquired. + fetchAcquiredRecords(sharePartition, records1, 1); + // Send next batch from offset 16, only 1 records should be acquired. + fetchAcquiredRecords(sharePartition, records2, 1); + + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(13, 16, List.of(AcknowledgeType.RELEASE.id)))); + assertEquals(20, sharePartition.nextFetchOffset()); // Cached state will be empty because after the second release, the acquired records will now have moved to // ARCHIVE state, since their max delivery count exceeded. Also, now since all the records are either in ACKNOWLEDGED or ARCHIVED @@ -8141,19 +8169,19 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubsetAndCachedStateNotCl fetchAcquiredRecords(sharePartition, records1, 5); sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of( - new ShareAcknowledgementBatch(0, 1, List.of(AcknowledgeType.RELEASE.id))))); + new ShareAcknowledgementBatch(0, 0, List.of(AcknowledgeType.RELEASE.id))))); - // Send next batch from offset 0, only 2 records should be acquired. - fetchAcquiredRecords(sharePartition, memoryRecords(2), 2); + // Send next batch from offset 0, only 1 records should be acquired. + fetchAcquiredRecords(sharePartition, memoryRecords(2), 1); sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(0, 4, List.of(AcknowledgeType.RELEASE.id)))); - assertEquals(2, sharePartition.nextFetchOffset()); + assertEquals(1, sharePartition.nextFetchOffset()); assertEquals(1, sharePartition.cachedState().size()); Map expectedOffsetStateMap = new HashMap<>(); expectedOffsetStateMap.put(0L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); - expectedOffsetStateMap.put(1L, new InFlightState(RecordState.ARCHIVED, (short) 2, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(1L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); expectedOffsetStateMap.put(2L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); expectedOffsetStateMap.put(3L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); expectedOffsetStateMap.put(4L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); @@ -9888,8 +9916,16 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep assertEquals(0, sharePartition.deliveryCompleteCount()); // Send the same batches again. - fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 1); future1 = new CompletableFuture<>(); future2 = new CompletableFuture<>(); @@ -9908,10 +9944,10 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep () -> sharePartition.cachedState().get(2L).offsetState().get(2L).state() == RecordState.ARCHIVED && sharePartition.cachedState().get(2L).offsetState().get(3L).state() == RecordState.ACKNOWLEDGED && sharePartition.cachedState().get(2L).offsetState().get(3L).acquisitionLockTimeoutTask().hasExpired() && - sharePartition.cachedState().get(7L).batchState() == RecordState.ACKNOWLEDGED && - sharePartition.cachedState().get(7L).batchAcquisitionLockTimeoutTask().hasExpired(), + sharePartition.cachedState().get(7L).offsetState().get(7L).state() == RecordState.ACKNOWLEDGED && + sharePartition.cachedState().get(7L).offsetState().get(7L).acquisitionLockTimeoutTask().hasExpired(), DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, - () -> assertionFailedMessage(sharePartition, Map.of(2L, List.of(3L), 7L, List.of()))); + () -> assertionFailedMessage(sharePartition, Map.of(2L, List.of(2L, 3L, 4L, 5L, 6L), 7L, List.of(7L, 8L, 9L, 10L, 11L)))); // After the acquisition lock timeout task has expired, records 2, 4 -> 6 are archived, and thus deliveryCompleteCount // increases by 4. @@ -9924,13 +9960,13 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep assertEquals(12, sharePartition.nextFetchOffset()); assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).offsetState().get(3L).state()); assertEquals(2, sharePartition.cachedState().get(2L).offsetState().get(3L).deliveryCount()); - assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(7L).batchState()); - assertEquals(2, sharePartition.cachedState().get(7L).batchDeliveryCount()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(7L).offsetState().get(7L).state()); + assertEquals(2, sharePartition.cachedState().get(7L).offsetState().get(7L).deliveryCount()); future2.complete(writeShareGroupStateResult); assertEquals(12L, sharePartition.nextFetchOffset()); - assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).batchState()); - assertEquals(2, sharePartition.cachedState().get(7L).batchDeliveryCount()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(7L).state()); + assertEquals(2, sharePartition.cachedState().get(7L).offsetState().get(7L).deliveryCount()); // At this point, the batch 2 -> 6 is removed from the cached state and startOffset is moved to 7. Thus, in flight // contains records 7 -> 11 which are archived. Therefore, deliveryCompleteCount becomes 5. assertEquals(5, sharePartition.deliveryCompleteCount()); @@ -11191,6 +11227,144 @@ public void testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any()); } + @Test + public void testSkipBadRecordWhenPendingDeliveriesExist() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 19L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(5, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(5, sharePartition.nextFetchOffset()); + + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 7, 13).close(); + memoryRecordsBuilder(buffer, 20, 8).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + // Set max fetch records to 500, records should be acquired till the offset 26 of the fetched batch. + // 16 records should be returned: 7-19, 23-25 + // The record at offset 26 has a delivery count of 3 and is a bad record; it should be skipped. + List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + ShareAcquireMode.BATCH_OPTIMIZED, + BATCH_SIZE, + 500, + 5, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 16); + + List expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(7, 14, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 19, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(26, sharePartition.nextFetchOffset()); + assertEquals(23, sharePartition.cachedState().get(23L).firstOffset()); + assertEquals(25, sharePartition.cachedState().get(23L).lastOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(7L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertEquals(30L, sharePartition.endOffset()); + assertEquals(3, sharePartition.deliveryCompleteCount()); + } + + @Test + public void testAcquireRecordsHalvesBatchSizeOnEachFailureUntilSingleRecordOnLastAttempt() throws InterruptedException { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 34L, RecordState.AVAILABLE.id, (short) 4))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withPersister(persister) + .withMaxDeliveryCount(7) + .withDefaultAcquisitionLockTimeoutMs(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS) + .build(); + + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + CompletableFuture result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(1, sharePartition.cachedState().size()); + assertEquals(5, sharePartition.startOffset()); + assertEquals(34, sharePartition.endOffset()); + assertEquals(5, sharePartition.nextFetchOffset()); + + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 15, 20).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + // The record at offset 15 has a delivery count of 4 and is a bad record + // First acquisition attempt fails: batch size should be halved (20 -> 10) + fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + ShareAcquireMode.BATCH_OPTIMIZED, + BATCH_SIZE, + 500, + 5, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 10); + + // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 1); + + // Second failure: batch size halved again (now ~1/4 of original, 20 -> 5) + fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + ShareAcquireMode.BATCH_OPTIMIZED, + BATCH_SIZE, + 500, + 5, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 5); + + // Allowing acquisition lock to expire. + mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 1); + + // Final delivery attempt: only the suspected bad record should be acquired + fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + ShareAcquireMode.BATCH_OPTIMIZED, + BATCH_SIZE, + 500, + 5, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 1); + } + /** * This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT). */