Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.record.internal.ControlRecordType;
import org.apache.kafka.common.record.internal.EndTransactionMarker;
import org.apache.kafka.common.record.internal.Record;
Expand Down Expand Up @@ -154,12 +155,11 @@ private void checkProducerEpoch(short producerEpoch, long offset, short transact
}

private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) {
// For transactions v2 idempotent producers, reject non-zero sequences when there is no producer ID state
if (verificationStateEntry != null && verificationStateEntry.supportsEpochBump() &&
appendFirstSeq != 0 && currentEntry.isEmpty()) {
throw new OutOfOrderSequenceException("Invalid sequence number for producer " + producerId + " at " +
"offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +
" (incoming seq. number). Expected sequence 0 for transactions v2 idempotent producer with no existing state.");
if (updatedEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH && appendFirstSeq != 0 &&
(verificationStateEntry == null || verificationStateEntry.supportsEpochBump())) {
throw new UnknownProducerIdException("Found no record of producer " + producerId + " at " +
"offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +
" (incoming seq. number). Expected sequence 0 for the first batch with no producer state.");
}
if (verificationStateEntry != null && appendFirstSeq > verificationStateEntry.lowestSequence()) {
throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " +
Expand All @@ -183,9 +183,7 @@ else if (producerEpoch == currentEntry.producerEpoch())
else
currentLastSeq = RecordBatch.NO_SEQUENCE;

// If there is no current producer epoch (possibly because all producer records have been deleted due to
// retention or the DeleteRecords API) accept writes with any sequence number
if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) {
if (!inSequence(currentLastSeq, appendFirstSeq)) {
throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " +
"offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +
" (incoming seq. number), " + currentLastSeq + " (current end sequence number)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.record.internal.ControlRecordType;
import org.apache.kafka.common.record.internal.EndTransactionMarker;
Expand Down Expand Up @@ -555,11 +556,13 @@ public void testRemoveExpiredPidsOnReload() throws IOException {
recoveredMapping.truncateAndReload(0L, 1L, 70000);

// entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence,
// we should accept the append and add the pid back in
appendClientEntry(recoveredMapping, producerId, epoch, 2, 2L, 70001, false);
// the producer must recover by bumping the epoch and resetting the sequence.
assertThrows(UnknownProducerIdException.class,
() -> appendClientEntry(recoveredMapping, producerId, epoch, 2, 2L, 70001, false));
appendClientEntry(recoveredMapping, producerId, (short) (epoch + 1), 0, 2L, 70001, false);

assertEquals(1, recoveredMapping.activeProducers().size());
assertEquals(2, recoveredMapping.activeProducers().values().iterator().next().lastSeq());
assertEquals(0, recoveredMapping.activeProducers().values().iterator().next().lastSeq());
assertEquals(3L, recoveredMapping.mapEndOffset());
}

Expand Down Expand Up @@ -652,15 +655,15 @@ public void testTruncateFullyAndStartAt() throws IOException {

@Test
public void testReloadSnapshots() throws IOException {
appendClientEntry(stateManager, producerId, epoch, 1, 1L, false);
appendClientEntry(stateManager, producerId, epoch, 2, 2L, false);
appendClientEntry(stateManager, producerId, epoch, 0, 1L, false);
appendClientEntry(stateManager, producerId, epoch, 1, 2L, false);
stateManager.takeSnapshot();
Map<Path, byte[]> pathAndDataList = Arrays.stream(Objects.requireNonNull(logDir.listFiles()))
.map(File::toPath)
.collect(Collectors.toMap(path -> path, path -> assertDoesNotThrow(() -> Files.readAllBytes(path))));

appendClientEntry(stateManager, producerId, epoch, 3, 3L, false);
appendClientEntry(stateManager, producerId, epoch, 4, 4L, false);
appendClientEntry(stateManager, producerId, epoch, 2, 3L, false);
appendClientEntry(stateManager, producerId, epoch, 3, 4L, false);
stateManager.takeSnapshot();
assertEquals(2, Objects.requireNonNull(logDir.listFiles()).length);
assertEquals(Set.of(3L, 5L), currentSnapshotOffsets());
Expand Down Expand Up @@ -745,9 +748,11 @@ public void testPidExpirationTimeout() {
appendClientEntry(stateManager, producerId, epoch, defaultSequence, 1L, false);
time.sleep(producerStateManagerConfig.producerIdExpirationMs() + 1);
stateManager.removeExpiredProducers(time.milliseconds());
appendClientEntry(stateManager, producerId, epoch, defaultSequence + 1, 2L, false);
assertThrows(UnknownProducerIdException.class,
() -> appendClientEntry(stateManager, producerId, epoch, defaultSequence + 1, 2L, false));
appendClientEntry(stateManager, producerId, (short) (epoch + 1), defaultSequence, 2L, false);
assertEquals(1, stateManager.activeProducers().size());
assertEquals(defaultSequence + 1, stateManager.activeProducers().values().iterator().next().lastSeq());
assertEquals(defaultSequence, stateManager.activeProducers().values().iterator().next().lastSeq());
assertEquals(3L, stateManager.mapEndOffset());
}

Expand Down Expand Up @@ -1111,91 +1116,68 @@ public void testIdempotentTransactionMarkerRetryTV2() {
}

@Test
public void testRejectNonZeroSequenceForTransactionsV2WithEmptyState() {
// Create a verification state entry that supports epoch bump (transactions v2)
VerificationStateEntry verificationEntry = stateManager.maybeCreateVerificationStateEntry(
producerId,
0,
epoch,
true
);

// Verify this is actually transactions v2
assertTrue(
verificationEntry.supportsEpochBump(),
"Should be using transactions v2 (supports epoch bump)"
);

// Create ProducerAppendInfo with empty producer state
public void testRejectNonZeroSequenceForClientAppendWithEmptyState() {
ProducerAppendInfo appendInfo = new ProducerAppendInfo(
partition,
producerId,
ProducerStateEntry.empty(producerId),
AppendOrigin.CLIENT,
verificationEntry
partition,
producerId,
ProducerStateEntry.empty(producerId),
AppendOrigin.CLIENT,
null
);

// Attempting to append with non-zero sequence number should fail for transactions v2
OutOfOrderSequenceException exception = assertThrows(
OutOfOrderSequenceException.class,
() -> appendInfo.appendDataBatch(

UnknownProducerIdException exception = assertThrows(UnknownProducerIdException.class,
() -> appendInfo.appendDataBatch(
epoch,
5,
5,
time.milliseconds(),
new LogOffsetMetadata(0L), 0L, false)
);

assertTrue(exception.getMessage().contains("Expected sequence 0 for the first batch with no producer state"));
assertTrue(exception.getMessage().contains("5 (incoming seq. number)"));

assertDoesNotThrow(() -> appendInfo.appendDataBatch(
epoch,
5,
5,
0,
0,
time.milliseconds(),
new LogOffsetMetadata(0L), 0L, false
)
new LogOffsetMetadata(0L), 0L, false)
);

assertTrue(exception.getMessage().contains("Expected sequence 0 for " +
"transactions v2 idempotent producer"
));
assertTrue(exception.getMessage().contains("5 (incoming seq. number)"));

// Attempting to append with sequence 0 should succeed
assertDoesNotThrow(() -> appendInfo.appendDataBatch(
epoch,
0,
0,
time.milliseconds(),
new LogOffsetMetadata(0L), 0L, false)
epoch,
1,
1,
time.milliseconds(),
new LogOffsetMetadata(1L), 1L, false)
);
}

@Test
public void testAllowNonZeroSequenceForTransactionsV1WithEmptyState() {
// Create a verification state entry that does NOT support epoch bump (transactions v1)
// Set lowest sequence to 5 to allow our test sequence to pass the verification check
VerificationStateEntry verificationEntry = stateManager.maybeCreateVerificationStateEntry(
producerId + 1,
5,
epoch,
false
);

// Verify this is transactions v1
assertFalse(
verificationEntry.supportsEpochBump(),
"Should be using transactions v1 (does not support epoch bump)"
producerId + 1,
5,
epoch,
false
);

// Create ProducerAppendInfo with empty producer state

assertFalse(verificationEntry.supportsEpochBump());

ProducerAppendInfo appendInfo = new ProducerAppendInfo(
partition,
producerId + 1,
ProducerStateEntry.empty(producerId + 1),
AppendOrigin.CLIENT,
verificationEntry
partition,
producerId + 1,
ProducerStateEntry.empty(producerId + 1),
AppendOrigin.CLIENT,
verificationEntry
);

// Attempting to append with non-zero sequence number should succeed for transactions v1
// (our validation should not trigger)

assertDoesNotThrow(() -> appendInfo.appendDataBatch(
epoch,
5,
5,
time.milliseconds(),
new LogOffsetMetadata(0L), 0L, false)
epoch,
5,
5,
time.milliseconds(),
new LogOffsetMetadata(0L), 0L, false)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.message.DescribeProducersResponseData;
Expand Down Expand Up @@ -5476,7 +5477,7 @@ public void testNonZeroSequenceOnFirstAppendNonZeroEpoch(boolean transactionVeri
if (transactionVerificationEnabled) {
// TV2 behavior: Create verification state that supports epoch bumps
// Should reject non-zero sequences when there's no existing producer state
assertThrows(OutOfOrderSequenceException.class, () ->
assertThrows(UnknownProducerIdException.class, () ->
log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(),
verificationGuard, TransactionVersion.TV_0.featureLevel()));
} else {
Expand Down