Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17278: Add FetchRequest compatibility tests for KafkaRaftClient #17801

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,61 @@ public void testFetchSnapshotRequestAsLeader(boolean withKip853Rpc) throws Excep
assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer());
}

@ParameterizedTest
@CsvSource({"1,0"})
public void testNewFetchSnapshotRequestToOldController(short fetchVersion, short controllerVersion) throws Exception {
int localId = randomReplicaId();
int otherNodeId = localId + 1;
ReplicaKey otherNodeKey = replicaKey(otherNodeId, fetchVersion >= 1);
Set<Integer> voters = Set.of(localId, localId + 1);
OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
List<String> records = Arrays.asList("foo", "bar");

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Collections.singletonList("a"))
.withKip853Rpc(controllerVersion == 0)
.build();

context.becomeLeader();
int epoch = context.currentEpoch();

context.advanceLocalLeaderHighWatermarkToLogEndOffset();

try (SnapshotWriter<String> snapshot = context.client.createSnapshot(snapshotId, 0).get()) {
assertEquals(snapshotId, snapshot.snapshotId());
snapshot.append(records);
snapshot.freeze();
}

RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
context.deliverRequest(
fetchSnapshotRequest(
context.clusterId,
otherNodeKey,
context.metadataPartition,
epoch,
snapshotId,
Integer.MAX_VALUE,
0
)
);

context.client.poll();

FetchSnapshotResponseData.PartitionSnapshot response = context
.assertSentFetchSnapshotResponse(context.metadataPartition)
.get();

assertEquals(Errors.NONE, Errors.forCode(response.errorCode()));
assertEquals(snapshot.sizeInBytes(), response.size());
assertEquals(0, response.position());
assertEquals(snapshot.sizeInBytes(), response.unalignedRecords().sizeInBytes());

UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.slice(0, Math.toIntExact(snapshot.sizeInBytes()));

assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer());
}

@ParameterizedTest
@ValueSource(booleans = { false, true })
public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajorityVoters(
Expand Down
29 changes: 27 additions & 2 deletions raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2116,11 +2116,11 @@ private static Stream<Short> validFetchVersions() {
public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version) throws Exception {
int localId = randomReplicaId();
int otherNodeId = localId + 1;
ReplicaKey otherNodeKey = replicaKey(otherNodeId, false);
ReplicaKey otherNodeKey = replicaKey(otherNodeId, version >= 17);
int epoch = 5;
Set<Integer> voters = Set.of(localId, otherNodeKey.id());

RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch, version);

// First poll has no high watermark advance.
context.client.poll();
Expand All @@ -2132,12 +2132,37 @@ public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short version
FetchRequestData request = new FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
assertEquals((version < 15) ? otherNodeId : -1, fetchRequestData.replicaId());
assertEquals((version < 15) ? -1 : otherNodeId, fetchRequestData.replicaState().replicaId());
assertEquals(version >= 17, otherNodeKey.directoryId().isPresent());
assertEquals((version < 17) ? ReplicaKey.NO_DIRECTORY_ID : otherNodeKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID),
fetchRequestData.topics().get(0).partitions().get(0).replicaDirectoryId());
context.deliverRequest(request, version);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
assertEquals(OptionalLong.of(1L), context.client.highWatermark());
}

@ParameterizedTest
@CsvSource({"17,15", "17,16", "17, 14", "17,13",
"16,15", "16,14", "16,13",
"15,14", "15,13",
"14,13"})
public void testNewFetchRequestToOldController(short fetchVersion, short contextVersion) throws Exception {
int localId = randomReplicaId();
int otherNodeId = localId + 1;
ReplicaKey otherNodeKey = replicaKey(otherNodeId, fetchVersion >= 17);
int epoch = 5;
Set<Integer> voters = Set.of(localId, otherNodeKey.id());

RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch, contextVersion);

FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeKey, 1L, epoch, 0);
FetchRequestData request = new FetchRequest.SimpleBuilder(fetchRequestData).build(fetchVersion).data();
context.deliverRequest(request, fetchVersion);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
assertEquals(OptionalLong.of(1L), context.client.highWatermark());
}

@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testFetchRequestClusterIdValidation(boolean withKip853Rpc) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,13 +545,14 @@ static MemoryRecords buildBatch(
return builder.build();
}

static RaftClientTestContext initializeAsLeader(int localId, Set<Integer> voters, int epoch) throws Exception {
static RaftClientTestContext initializeAsLeader(int localId, Set<Integer> voters, int epoch, short version) throws Exception {
if (epoch <= 0) {
throw new IllegalArgumentException("Cannot become leader in epoch " + epoch);
}

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withUnknownLeader(epoch - 1)
.withKip853Rpc(version >= 17)
.build();

context.assertUnknownLeader(epoch - 1);
Expand Down
42 changes: 42 additions & 0 deletions raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,24 @@ public void testSingletonFetchRequestForAllVersion(final FetchRequestTestCase te
assertEquals(testCase.expectedJson, json.toString());
}

// Test that the replicaDirectoryId field introduced in version 17 is ignorable for older versions
@ParameterizedTest
@MethodSource("singletonFetchRequestTestCases")
public void testFetchRequestCompatibility(final FetchRequestTestCase testCase) {
FetchRequestData fetchRequestData = RaftUtil.singletonFetchRequest(topicPartition, Uuid.ONE_UUID,
partition -> partition
.setPartitionMaxBytes(10)
.setCurrentLeaderEpoch(5)
.setFetchOffset(333)
.setLastFetchedEpoch(testCase.lastFetchedEpoch)
.setPartition(2)
.setReplicaDirectoryId(Uuid.fromString("AAAAAAAAAAAAAAAAAAAAAQ"))
.setLogStartOffset(0)
);
JsonNode json = FetchRequestDataJsonConverter.write(fetchRequestData, testCase.version);
assertEquals(testCase.expectedJson, json.toString());
}

@ParameterizedTest
@MethodSource("singletonFetchResponseTestCases")
public void testSingletonFetchResponseForAllVersion(final FetchResponseTestCase testCase) {
Expand Down Expand Up @@ -437,6 +455,30 @@ public void testSingletonFetchSnapshotRequestForAllVersion(final short version,
assertEquals(expectedJson, json.toString());
}

// Test that the replicaDirectoryId field introduced in version 1 is ignorable for version 0
@ParameterizedTest
@MethodSource("fetchSnapshotRequestTestCases")
public void testSingletonFetchSnapshotRequestCompatibility(final short version,
final Uuid directoryId,
final String expectedJson) {
int epoch = 1;
int maxBytes = 1000;
int position = 10;

FetchSnapshotRequestData fetchSnapshotRequestData = RaftUtil.singletonFetchSnapshotRequest(
clusterId,
ReplicaKey.of(1, directoryId),
topicPartition,
epoch,
new OffsetAndEpoch(10, epoch),
maxBytes,
position
);
fetchSnapshotRequestData.topics().get(0).partitions().get(0).setReplicaDirectoryId(Uuid.fromString("AAAAAAAAAAAAAAAAAAAAAQ"));
JsonNode json = FetchSnapshotRequestDataJsonConverter.write(fetchSnapshotRequestData, version);
assertEquals(expectedJson, json.toString());
}

@ParameterizedTest
@MethodSource("fetchSnapshotResponseTestCases")
public void testSingletonFetchSnapshotResponseForAllVersion(final short version, final String expectedJson) {
Expand Down