diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index e7d3ab89fc590..590220a85b1ac 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -3298,6 +3298,10 @@ private long pollFollower(long currentTimeMs) { } private boolean shouldSendUpdateVoteRequest(FollowerState state) { + if (!canBecomeVoter) { + return false; + } + var version = partitionState.lastKraftVersion(); /* When the cluster supports reconfiguration, send an updated voter configuration if the * one in the log doesn't match the local configuration. diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index ba2016cae5bb0..d367a3e07aa24 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -2228,6 +2228,7 @@ void testFollowerSendsUpdateVoter() throws Exception { .withBootstrapSnapshot(Optional.of(voters)) .withElectedLeader(epoch, voter1.id()) .withLocalListeners(localListeners) + .withCanBecomeVoter(true) .build(); // waiting for FETCH requests until the UpdateRaftVoter request is sent @@ -2255,6 +2256,42 @@ void testFollowerSendsUpdateVoter() throws Exception { context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); } + @Test + void testFollowerDoesNotSendUpdateVoterWhenItCannotBecomeVoter() throws Exception { + ReplicaKey local = replicaKey(randomReplicaId(), true); + ReplicaKey voter1 = replicaKey(local.id() + 1, true); + ReplicaKey voter2 = replicaKey(local.id() + 2, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, voter1, voter2)); + int epoch = 4; + + HashMap listenersMap = new HashMap<>(2); + listenersMap.put( + VoterSetTest.DEFAULT_LISTENER_NAME, + InetSocketAddress.createUnresolved("localhost", 9990 + local.id()) + ); + listenersMap.put( + ListenerName.normalised("ANOTHER_LISTENER"), + InetSocketAddress.createUnresolved("localhost", 8990 + local.id()) + ); + Endpoints localListeners = Endpoints.fromInetSocketAddresses(listenersMap); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withElectedLeader(epoch, voter1.id()) + .withLocalListeners(localListeners) + .withCanBecomeVoter(false) + .build(); + + // waiting for FETCH requests until the UpdateRaftVoter request would be sent + context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true); + + context.pollUntilRequest(); + RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); + context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); + } + @ParameterizedTest @EnumSource(value = Errors.class, names = {"NONE", "UNSUPPORTED_VERSION"}) void testFollowerSendsUpdateVoterWithKraftVersion0(Errors updateVoterError) throws Exception { @@ -2281,6 +2318,7 @@ void testFollowerSendsUpdateVoterWithKraftVersion0(Errors updateVoterError) thro .withStaticVoters(voters) .withElectedLeader(epoch, voter1.id()) .withLocalListeners(localListeners) + .withCanBecomeVoter(true) .build(); // waiting for FETCH request until the UpdateRaftVoter request is set @@ -2352,6 +2390,7 @@ void testFollowerSendsUpdateVoterAfterElectionWithKraftVersion0(Errors updateVot .withStaticVoters(voters) .withElectedLeader(epoch, voter1.id()) .withLocalListeners(localListeners) + .withCanBecomeVoter(true) .build(); // waiting for FETCH request until the UpdateRaftVoter request is set @@ -2654,6 +2693,7 @@ void testFollowerSendsUpdateVoterWhenDifferent() throws Exception { .withKip853Rpc(true) .withBootstrapSnapshot(Optional.of(voters)) .withElectedLeader(epoch, voter1.id()) + .withCanBecomeVoter(true) .build(); // waiting for FETCH request until the UpdateRaftVoter request is set @@ -2695,6 +2735,7 @@ void testFollowerSendsUpdateVoterIfPendingFetchDuringTimeout() throws Exception .withBootstrapSnapshot(Optional.of(voters)) .withElectedLeader(epoch, voter1.id()) .withLocalListeners(localListeners) + .withCanBecomeVoter(true) .build(); // waiting up to the last FETCH request before the UpdateRaftVoter request is set @@ -2756,6 +2797,7 @@ void testUpdateVoterResponseCausesEpochChange() throws Exception { .withBootstrapSnapshot(Optional.of(voters)) .withElectedLeader(epoch, voter1.id()) .withLocalListeners(localListeners) + .withCanBecomeVoter(true) .build(); // waiting for FETCH request until the UpdateRaftVoter request is set