diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java index 0a5a863f998a5..5f9a9e6557c55 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.record.internal.ControlRecordType; import org.apache.kafka.common.record.internal.EndTransactionMarker; import org.apache.kafka.common.record.internal.Record; @@ -154,12 +155,11 @@ private void checkProducerEpoch(short producerEpoch, long offset, short transact } private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) { - // For transactions v2 idempotent producers, reject non-zero sequences when there is no producer ID state - if (verificationStateEntry != null && verificationStateEntry.supportsEpochBump() && - appendFirstSeq != 0 && currentEntry.isEmpty()) { - throw new OutOfOrderSequenceException("Invalid sequence number for producer " + producerId + " at " + - "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + - " (incoming seq. number). Expected sequence 0 for transactions v2 idempotent producer with no existing state."); + if (updatedEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH && appendFirstSeq != 0 && + (verificationStateEntry == null || verificationStateEntry.supportsEpochBump())) { + throw new UnknownProducerIdException("Found no record of producer " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + + " (incoming seq. number). Expected sequence 0 for the first batch with no producer state."); } if (verificationStateEntry != null && appendFirstSeq > verificationStateEntry.lowestSequence()) { throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + @@ -183,9 +183,7 @@ else if (producerEpoch == currentEntry.producerEpoch()) else currentLastSeq = RecordBatch.NO_SEQUENCE; - // If there is no current producer epoch (possibly because all producer records have been deleted due to - // retention or the DeleteRecords API) accept writes with any sequence number - if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { + if (!inSequence(currentLastSeq, appendFirstSeq)) { throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java index c1a2d7007baa8..76fcd47b4e754 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.record.internal.ControlRecordType; import org.apache.kafka.common.record.internal.EndTransactionMarker; @@ -555,11 +556,13 @@ public void testRemoveExpiredPidsOnReload() throws IOException { recoveredMapping.truncateAndReload(0L, 1L, 70000); // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence, - // we should accept the append and add the pid back in - appendClientEntry(recoveredMapping, producerId, epoch, 2, 2L, 70001, false); + // the producer must recover by bumping the epoch and resetting the sequence. + assertThrows(UnknownProducerIdException.class, + () -> appendClientEntry(recoveredMapping, producerId, epoch, 2, 2L, 70001, false)); + appendClientEntry(recoveredMapping, producerId, (short) (epoch + 1), 0, 2L, 70001, false); assertEquals(1, recoveredMapping.activeProducers().size()); - assertEquals(2, recoveredMapping.activeProducers().values().iterator().next().lastSeq()); + assertEquals(0, recoveredMapping.activeProducers().values().iterator().next().lastSeq()); assertEquals(3L, recoveredMapping.mapEndOffset()); } @@ -652,15 +655,15 @@ public void testTruncateFullyAndStartAt() throws IOException { @Test public void testReloadSnapshots() throws IOException { - appendClientEntry(stateManager, producerId, epoch, 1, 1L, false); - appendClientEntry(stateManager, producerId, epoch, 2, 2L, false); + appendClientEntry(stateManager, producerId, epoch, 0, 1L, false); + appendClientEntry(stateManager, producerId, epoch, 1, 2L, false); stateManager.takeSnapshot(); Map pathAndDataList = Arrays.stream(Objects.requireNonNull(logDir.listFiles())) .map(File::toPath) .collect(Collectors.toMap(path -> path, path -> assertDoesNotThrow(() -> Files.readAllBytes(path)))); - appendClientEntry(stateManager, producerId, epoch, 3, 3L, false); - appendClientEntry(stateManager, producerId, epoch, 4, 4L, false); + appendClientEntry(stateManager, producerId, epoch, 2, 3L, false); + appendClientEntry(stateManager, producerId, epoch, 3, 4L, false); stateManager.takeSnapshot(); assertEquals(2, Objects.requireNonNull(logDir.listFiles()).length); assertEquals(Set.of(3L, 5L), currentSnapshotOffsets()); @@ -745,9 +748,11 @@ public void testPidExpirationTimeout() { appendClientEntry(stateManager, producerId, epoch, defaultSequence, 1L, false); time.sleep(producerStateManagerConfig.producerIdExpirationMs() + 1); stateManager.removeExpiredProducers(time.milliseconds()); - appendClientEntry(stateManager, producerId, epoch, defaultSequence + 1, 2L, false); + assertThrows(UnknownProducerIdException.class, + () -> appendClientEntry(stateManager, producerId, epoch, defaultSequence + 1, 2L, false)); + appendClientEntry(stateManager, producerId, (short) (epoch + 1), defaultSequence, 2L, false); assertEquals(1, stateManager.activeProducers().size()); - assertEquals(defaultSequence + 1, stateManager.activeProducers().values().iterator().next().lastSeq()); + assertEquals(defaultSequence, stateManager.activeProducers().values().iterator().next().lastSeq()); assertEquals(3L, stateManager.mapEndOffset()); } @@ -1111,91 +1116,68 @@ public void testIdempotentTransactionMarkerRetryTV2() { } @Test - public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() { - // Create a verification state entry that supports epoch bump (transactions v2) - VerificationStateEntry verificationEntry = stateManager.maybeCreateVerificationStateEntry( - producerId, - 0, - epoch, - true - ); - - // Verify this is actually transactions v2 - assertTrue( - verificationEntry.supportsEpochBump(), - "Should be using transactions v2 (supports epoch bump)" - ); - - // Create ProducerAppendInfo with empty producer state + public void testRejectNonZeroSequenceForClientAppendWithEmptyState() { ProducerAppendInfo appendInfo = new ProducerAppendInfo( - partition, - producerId, - ProducerStateEntry.empty(producerId), - AppendOrigin.CLIENT, - verificationEntry + partition, + producerId, + ProducerStateEntry.empty(producerId), + AppendOrigin.CLIENT, + null ); - - // Attempting to append with non-zero sequence number should fail for transactions v2 - OutOfOrderSequenceException exception = assertThrows( - OutOfOrderSequenceException.class, - () -> appendInfo.appendDataBatch( + + UnknownProducerIdException exception = assertThrows(UnknownProducerIdException.class, + () -> appendInfo.appendDataBatch( + epoch, + 5, + 5, + time.milliseconds(), + new LogOffsetMetadata(0L), 0L, false) + ); + + assertTrue(exception.getMessage().contains("Expected sequence 0 for the first batch with no producer state")); + assertTrue(exception.getMessage().contains("5 (incoming seq. number)")); + + assertDoesNotThrow(() -> appendInfo.appendDataBatch( epoch, - 5, - 5, + 0, + 0, time.milliseconds(), - new LogOffsetMetadata(0L), 0L, false - ) + new LogOffsetMetadata(0L), 0L, false) ); - - assertTrue(exception.getMessage().contains("Expected sequence 0 for " + - "transactions v2 idempotent producer" - )); - assertTrue(exception.getMessage().contains("5 (incoming seq. number)")); - - // Attempting to append with sequence 0 should succeed assertDoesNotThrow(() -> appendInfo.appendDataBatch( - epoch, - 0, - 0, - time.milliseconds(), - new LogOffsetMetadata(0L), 0L, false) + epoch, + 1, + 1, + time.milliseconds(), + new LogOffsetMetadata(1L), 1L, false) ); } @Test public void testAllowNonZeroSequenceForTransactionsV1WithEmptyState() { - // Create a verification state entry that does NOT support epoch bump (transactions v1) - // Set lowest sequence to 5 to allow our test sequence to pass the verification check VerificationStateEntry verificationEntry = stateManager.maybeCreateVerificationStateEntry( - producerId + 1, - 5, - epoch, - false - ); - - // Verify this is transactions v1 - assertFalse( - verificationEntry.supportsEpochBump(), - "Should be using transactions v1 (does not support epoch bump)" + producerId + 1, + 5, + epoch, + false ); - - // Create ProducerAppendInfo with empty producer state + + assertFalse(verificationEntry.supportsEpochBump()); + ProducerAppendInfo appendInfo = new ProducerAppendInfo( - partition, - producerId + 1, - ProducerStateEntry.empty(producerId + 1), - AppendOrigin.CLIENT, - verificationEntry + partition, + producerId + 1, + ProducerStateEntry.empty(producerId + 1), + AppendOrigin.CLIENT, + verificationEntry ); - - // Attempting to append with non-zero sequence number should succeed for transactions v1 - // (our validation should not trigger) + assertDoesNotThrow(() -> appendInfo.appendDataBatch( - epoch, - 5, - 5, - time.milliseconds(), - new LogOffsetMetadata(0L), 0L, false) + epoch, + 5, + 5, + time.milliseconds(), + new LogOffsetMetadata(0L), 0L, false) ); } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index 48953d133a5f7..18578ad762580 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.AbortedTxn; import org.apache.kafka.common.message.DescribeProducersResponseData; @@ -5476,7 +5477,7 @@ public void testNonZeroSequenceOnFirstAppendNonZeroEpoch(boolean transactionVeri if (transactionVerificationEnabled) { // TV2 behavior: Create verification state that supports epoch bumps // Should reject non-zero sequences when there's no existing producer state - assertThrows(OutOfOrderSequenceException.class, () -> + assertThrows(UnknownProducerIdException.class, () -> log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), verificationGuard, TransactionVersion.TV_0.featureLevel())); } else {