Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3298,6 +3298,10 @@ private long pollFollower(long currentTimeMs) {
}

private boolean shouldSendUpdateVoteRequest(FollowerState state) {
if (!canBecomeVoter) {
return false;
}

var version = partitionState.lastKraftVersion();
/* When the cluster supports reconfiguration, send an updated voter configuration if the
* one in the log doesn't match the local configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,7 @@ void testFollowerSendsUpdateVoter() throws Exception {
.withBootstrapSnapshot(Optional.of(voters))
.withElectedLeader(epoch, voter1.id())
.withLocalListeners(localListeners)
.withCanBecomeVoter(true)
.build();

// waiting for FETCH requests until the UpdateRaftVoter request is sent
Expand Down Expand Up @@ -2255,6 +2256,42 @@ void testFollowerSendsUpdateVoter() throws Exception {
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}

@Test
void testFollowerDoesNotSendUpdateVoterWhenItCannotBecomeVoter() throws Exception {
ReplicaKey local = replicaKey(randomReplicaId(), true);
ReplicaKey voter1 = replicaKey(local.id() + 1, true);
ReplicaKey voter2 = replicaKey(local.id() + 2, true);

VoterSet voters = VoterSetTest.voterSet(Stream.of(local, voter1, voter2));
int epoch = 4;

HashMap<ListenerName, InetSocketAddress> listenersMap = new HashMap<>(2);
listenersMap.put(
VoterSetTest.DEFAULT_LISTENER_NAME,
InetSocketAddress.createUnresolved("localhost", 9990 + local.id())
);
listenersMap.put(
ListenerName.normalised("ANOTHER_LISTENER"),
InetSocketAddress.createUnresolved("localhost", 8990 + local.id())
);
Endpoints localListeners = Endpoints.fromInetSocketAddresses(listenersMap);

RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
.withKip853Rpc(true)
.withBootstrapSnapshot(Optional.of(voters))
.withElectedLeader(epoch, voter1.id())
.withLocalListeners(localListeners)
.withCanBecomeVoter(false)
.build();

// waiting for FETCH requests until the UpdateRaftVoter request would be sent
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);

context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}

@ParameterizedTest
@EnumSource(value = Errors.class, names = {"NONE", "UNSUPPORTED_VERSION"})
void testFollowerSendsUpdateVoterWithKraftVersion0(Errors updateVoterError) throws Exception {
Expand All @@ -2281,6 +2318,7 @@ void testFollowerSendsUpdateVoterWithKraftVersion0(Errors updateVoterError) thro
.withStaticVoters(voters)
.withElectedLeader(epoch, voter1.id())
.withLocalListeners(localListeners)
.withCanBecomeVoter(true)
.build();

// waiting for FETCH request until the UpdateRaftVoter request is set
Expand Down Expand Up @@ -2352,6 +2390,7 @@ void testFollowerSendsUpdateVoterAfterElectionWithKraftVersion0(Errors updateVot
.withStaticVoters(voters)
.withElectedLeader(epoch, voter1.id())
.withLocalListeners(localListeners)
.withCanBecomeVoter(true)
.build();

// waiting for FETCH request until the UpdateRaftVoter request is set
Expand Down Expand Up @@ -2654,6 +2693,7 @@ void testFollowerSendsUpdateVoterWhenDifferent() throws Exception {
.withKip853Rpc(true)
.withBootstrapSnapshot(Optional.of(voters))
.withElectedLeader(epoch, voter1.id())
.withCanBecomeVoter(true)
.build();

// waiting for FETCH request until the UpdateRaftVoter request is set
Expand Down Expand Up @@ -2695,6 +2735,7 @@ void testFollowerSendsUpdateVoterIfPendingFetchDuringTimeout() throws Exception
.withBootstrapSnapshot(Optional.of(voters))
.withElectedLeader(epoch, voter1.id())
.withLocalListeners(localListeners)
.withCanBecomeVoter(true)
.build();

// waiting up to the last FETCH request before the UpdateRaftVoter request is set
Expand Down Expand Up @@ -2756,6 +2797,7 @@ void testUpdateVoterResponseCausesEpochChange() throws Exception {
.withBootstrapSnapshot(Optional.of(voters))
.withElectedLeader(epoch, voter1.id())
.withLocalListeners(localListeners)
.withCanBecomeVoter(true)
.build();

// waiting for FETCH request until the UpdateRaftVoter request is set
Expand Down