diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 1fc43cd2e14f..9ff8e0d38cd0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -794,6 +794,61 @@ public void testFetchSnapshotRequestAsLeader(boolean withKip853Rpc) throws Excep assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); } + @ParameterizedTest + @CsvSource({"1,0"}) + public void testNewFetchSnapshotRequestToOldController(short fetchVersion, short controllerVersion) throws Exception { + int localId = randomReplicaId(); + int otherNodeId = localId + 1; + ReplicaKey otherNodeKey = replicaKey(otherNodeId, fetchVersion >= 1); + Set voters = Set.of(localId, localId + 1); + OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1); + List records = Arrays.asList("foo", "bar"); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .appendToLog(snapshotId.epoch(), Collections.singletonList("a")) + .withKip853Rpc(controllerVersion == 0) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); + + try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId, 0).get()) { + assertEquals(snapshotId, snapshot.snapshotId()); + snapshot.append(records); + snapshot.freeze(); + } + + RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); + context.deliverRequest( + fetchSnapshotRequest( + context.clusterId, + otherNodeKey, + context.metadataPartition, + epoch, + snapshotId, + Integer.MAX_VALUE, + 0 + ) + ); + + context.client.poll(); + + FetchSnapshotResponseData.PartitionSnapshot response = context + .assertSentFetchSnapshotResponse(context.metadataPartition) + .get(); + + assertEquals(Errors.NONE, Errors.forCode(response.errorCode())); + assertEquals(snapshot.sizeInBytes(), response.size()); + assertEquals(0, response.position()); + assertEquals(snapshot.sizeInBytes(), response.unalignedRecords().sizeInBytes()); + + UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.slice(0, Math.toIntExact(snapshot.sizeInBytes())); + + assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer()); + } + @ParameterizedTest @ValueSource(booleans = { false, true }) public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajorityVoters( diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index fd0a2eef89a6..bc865eb2baff 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -2116,11 +2116,11 @@ private static Stream validFetchVersions() { public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception { int localId = randomReplicaId(); int otherNodeId = localId + 1; - ReplicaKey otherNodeKey = replicaKey(otherNodeId, false); + ReplicaKey otherNodeKey = replicaKey(otherNodeId, version >= 17); int epoch = 5; Set voters = Set.of(localId, otherNodeKey.id()); - RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch, version); // First poll has no high watermark advance. context.client.poll(); @@ -2132,12 +2132,37 @@ public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version FetchRequestData request = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data(); assertEquals((version < 15) ? otherNodeId : -1, fetchRequestData.replicaId()); assertEquals((version < 15) ? -1 : otherNodeId, fetchRequestData.replicaState().replicaId()); + assertEquals(version >= 17, otherNodeKey.directoryId().isPresent()); + assertEquals((version < 17) ? ReplicaKey.NO_DIRECTORY_ID : otherNodeKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID), + fetchRequestData.topics().get(0).partitions().get(0).replicaDirectoryId()); context.deliverRequest(request, version); context.pollUntilResponse(); context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); assertEquals(OptionalLong.of(1L), context.client.highWatermark()); } + @ParameterizedTest + @CsvSource({"17,15", "17,16", "17, 14", "17,13", + "16,15", "16,14", "16,13", + "15,14", "15,13", + "14,13"}) + public void testNewFetchRequestToOldController(short fetchVersion, short contextVersion) throws Exception { + int localId = randomReplicaId(); + int otherNodeId = localId + 1; + ReplicaKey otherNodeKey = replicaKey(otherNodeId, fetchVersion >= 17); + int epoch = 5; + Set voters = Set.of(localId, otherNodeKey.id()); + + RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch, contextVersion); + + FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeKey, 1L, epoch, 0); + FetchRequestData request = new FetchRequest.SimpleBuilder(fetchRequestData).build(fetchVersion).data(); + context.deliverRequest(request, fetchVersion); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); + assertEquals(OptionalLong.of(1L), context.client.highWatermark()); + } + @ParameterizedTest @ValueSource(booleans = { true, false }) public void testFetchRequestClusterIdValidation(boolean withKip853Rpc) throws Exception { diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index dc8e978abfcb..d96aea1c4b9a 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -545,13 +545,14 @@ static MemoryRecords buildBatch( return builder.build(); } - static RaftClientTestContext initializeAsLeader(int localId, Set voters, int epoch) throws Exception { + static RaftClientTestContext initializeAsLeader(int localId, Set voters, int epoch, short version) throws Exception { if (epoch <= 0) { throw new IllegalArgumentException("Cannot become leader in epoch " + epoch); } RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) .withUnknownLeader(epoch - 1) + .withKip853Rpc(version >= 17) .build(); context.assertUnknownLeader(epoch - 1); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java index 5e1d234c279a..af68badf71ef 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java @@ -335,6 +335,24 @@ public void testSingletonFetchRequestForAllVersion(final FetchRequestTestCase te assertEquals(testCase.expectedJson, json.toString()); } + // Test that the replicaDirectoryId field introduced in version 17 is ignorable for older versions + @ParameterizedTest + @MethodSource("singletonFetchRequestTestCases") + public void testFetchRequestCompatibility(final FetchRequestTestCase testCase) { + FetchRequestData fetchRequestData = RaftUtil.singletonFetchRequest(topicPartition, Uuid.ONE_UUID, + partition -> partition + .setPartitionMaxBytes(10) + .setCurrentLeaderEpoch(5) + .setFetchOffset(333) + .setLastFetchedEpoch(testCase.lastFetchedEpoch) + .setPartition(2) + .setReplicaDirectoryId(Uuid.fromString("AAAAAAAAAAAAAAAAAAAAAQ")) + .setLogStartOffset(0) + ); + JsonNode json = FetchRequestDataJsonConverter.write(fetchRequestData, testCase.version); + assertEquals(testCase.expectedJson, json.toString()); + } + @ParameterizedTest @MethodSource("singletonFetchResponseTestCases") public void testSingletonFetchResponseForAllVersion(final FetchResponseTestCase testCase) { @@ -437,6 +455,30 @@ public void testSingletonFetchSnapshotRequestForAllVersion(final short version, assertEquals(expectedJson, json.toString()); } + // Test that the replicaDirectoryId field introduced in version 1 is ignorable for version 0 + @ParameterizedTest + @MethodSource("fetchSnapshotRequestTestCases") + public void testSingletonFetchSnapshotRequestCompatibility(final short version, + final Uuid directoryId, + final String expectedJson) { + int epoch = 1; + int maxBytes = 1000; + int position = 10; + + FetchSnapshotRequestData fetchSnapshotRequestData = RaftUtil.singletonFetchSnapshotRequest( + clusterId, + ReplicaKey.of(1, directoryId), + topicPartition, + epoch, + new OffsetAndEpoch(10, epoch), + maxBytes, + position + ); + fetchSnapshotRequestData.topics().get(0).partitions().get(0).setReplicaDirectoryId(Uuid.fromString("AAAAAAAAAAAAAAAAAAAAAQ")); + JsonNode json = FetchSnapshotRequestDataJsonConverter.write(fetchSnapshotRequestData, version); + assertEquals(expectedJson, json.toString()); + } + @ParameterizedTest @MethodSource("fetchSnapshotResponseTestCases") public void testSingletonFetchSnapshotResponseForAllVersion(final short version, final String expectedJson) {