From acf914352e8cea1f872f4eb127e286939a0f0e05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Mon, 16 Mar 2026 14:54:49 -0400 Subject: [PATCH] KAFKA-20661; Implement checking for voter default endpoint --- .../apache/kafka/raft/KafkaRaftClient.java | 56 ++- .../org/apache/kafka/raft/LeaderState.java | 6 +- .../kafka/raft/internals/AddVoterHandler.java | 20 +- .../raft/internals/AddVoterHandlerState.java | 12 +- .../internals/ChangeVoterHandlerState.java | 199 +++++++--- .../raft/internals/RemoveVoterHandler.java | 8 +- .../internals/RemoveVoterHandlerState.java | 14 +- .../raft/internals/UpdateVoterHandler.java | 347 +++++++++++++----- .../internals/UpdateVoterHandlerState.java | 155 ++++++++ .../raft/KafkaRaftClientReconfigTest.java | 225 +++++++++++- .../kafka/raft/RaftEventSimulationTest.java | 44 ++- .../internals/AddVoterHandlerStateTest.java | 133 +++++++ .../ChangeVoterHandlerStateTest.java | 275 ++++++++++++-- .../RemoveVoterHandlerStateTest.java | 64 ++++ .../UpdateVoterHandlerStateTest.java | 146 ++++++++ 15 files changed, 1524 insertions(+), 180 deletions(-) create mode 100644 raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java create mode 100644 raft/src/test/java/org/apache/kafka/raft/internals/AddVoterHandlerStateTest.java create mode 100644 raft/src/test/java/org/apache/kafka/raft/internals/RemoveVoterHandlerStateTest.java create mode 100644 raft/src/test/java/org/apache/kafka/raft/internals/UpdateVoterHandlerStateTest.java diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index f7091cac1ecd7..db035d28f53ef 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -397,6 +397,7 @@ private void onUpdateLeaderHighWatermark( private void maybeNotifyVoterHandlerOnHWmUpdate(LeaderState state, long highWatermark) { addVoterHandler.highWatermarkUpdated(state, highWatermark); removeVoterHandler.highWatermarkUpdated(state, highWatermark); + updateVoterHandler.highWatermarkUpdated(state, highWatermark); } private void updateListenersProgress(long highWatermark) { @@ -580,15 +581,17 @@ public void initialize( onBecomeFollower(currentTimeMs); } + var requestSender = new DefaultRequestSender( + requestManager, + channel, + messageQueue, + logContext + ); + // Specialized add voter handler this.addVoterHandler = new AddVoterHandler( partitionState, - new DefaultRequestSender( - requestManager, - channel, - messageQueue, - logContext - ), + requestSender, time, logContext ); @@ -606,7 +609,8 @@ public void initialize( // Specialized update voter handler this.updateVoterHandler = new UpdateVoterHandler( partitionState, - channel.listenerName(), + requestSender, + time, logContext ); } @@ -2318,19 +2322,37 @@ private boolean handleApiVersionsResponse( return true; } + var leaderState = quorum.leaderStateOrThrow(); + ApiVersionsResponseData response = (ApiVersionsResponseData) responseMetadata.data(); Errors error = Errors.forCode(response.errorCode()); Optional supportedKraftVersions = Optional.ofNullable(response.supportedFeatures().find(KRaftVersion.FEATURE_NAME)); - return addVoterHandler.handleApiVersionsResponse( - quorum.leaderStateOrThrow(), - responseMetadata.source(), - error, - supportedKraftVersions, - currentTimeMs - ); + if (leaderState.changeVoterState().addVoterHandlerState().isPresent()) { + return addVoterHandler.handleApiVersionsResponse( + leaderState, + responseMetadata.source(), + error, + supportedKraftVersions, + currentTimeMs + ); + } else if (leaderState.changeVoterState().updateVoterHandlerState().isPresent()) { + return updateVoterHandler.handleApiVersionsResponse( + leaderState, + responseMetadata.source(), + error, + supportedKraftVersions, + currentTimeMs + ); + } else { + logger.debug( + "Received API_VERSIONS response from {} but no voter change operation is pending", + responseMetadata.source() + ); + return true; + } } private boolean handleAddVoterResponse( @@ -3183,7 +3205,11 @@ private long pollLeader(long currentTimeMs) { long timeUntilVoterChangeExpires = state .changeVoterState() - .maybeExpirePendingOperation(currentTimeMs); + .maybeExpirePendingOperation( + quorum.leaderAndEpoch(), + quorum.leaderEndpoints(), + currentTimeMs + ); long timeUntilFlush = maybeAppendBatches( state, diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 0891e952024a3..d7c0e8b5adee8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -1077,7 +1077,11 @@ public String name() { @Override public void close() { - changeVoterState.maybeResetPendingVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER); + changeVoterState.maybeResetPendingVoterHandlerState( + Errors.NOT_LEADER_OR_FOLLOWER, + leaderAndEpoch(), + leaderEndpoints() + ); kafkaRaftMetrics.removeLeaderMetrics(); accumulator.close(); diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java index 3dac2d5046d86..67ded29fb5602 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java @@ -93,7 +93,13 @@ public CompletionStage handleAddVoterRequest( ) { var changeVoterState = leaderState.changeVoterState(); // Check if there are any pending voter change requests - if (changeVoterState.isOperationPending(currentTimeMs)) { + if ( + changeVoterState.isOperationPending( + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + currentTimeMs + ) + ) { return CompletableFuture.completedFuture( RaftUtil.addVoterResponse( Errors.REQUEST_TIMED_OUT, @@ -199,6 +205,16 @@ public CompletionStage handleAddVoterRequest( return state.future(); } + /** + * Handle the API_VERSIONS response for an add voter operation. + * + * @param leaderState the leader state + * @param source the node that sent the response + * @param error the error from the response + * @param supportedKraftVersions the supported kraft version range from the response + * @param currentTimeMs the current time in milliseconds + * @return true if the add voter operation should continue, false if it was aborted + */ public boolean handleApiVersionsResponse( LeaderState leaderState, Node source, @@ -329,7 +345,7 @@ public boolean handleApiVersionsResponse( if (!current.ackWhenCommitted()) { // complete the future to send response, but do not reset the state, // since the new voter set is not yet committed - current.future().complete(RaftUtil.addVoterResponse(Errors.NONE, null)); + current.completeFuture(RaftUtil.addVoterResponse(Errors.NONE, null)); } return true; } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java index 5cd9dec47d54a..d7cbd4fa27f29 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java @@ -24,6 +24,7 @@ import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; public final class AddVoterHandlerState { private final ReplicaKey voterKey; @@ -87,7 +88,16 @@ public OptionalLong lastOffset() { return lastOffset; } - public CompletableFuture future() { + /** + * Completes the future with the provided response. + * + * @param response the response to complete the future with + */ + public void completeFuture(AddRaftVoterResponseData response) { + future.complete(response); + } + + CompletionStage future() { return future; } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/ChangeVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/ChangeVoterHandlerState.java index 7325cbf8d70ad..863f0d4fb2074 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/ChangeVoterHandlerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/ChangeVoterHandlerState.java @@ -17,6 +17,8 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.RaftUtil; import java.util.Optional; @@ -35,13 +37,10 @@ public final class ChangeVoterHandlerState { private Optional addVoterHandlerState = Optional.empty(); private Optional removeVoterHandlerState = Optional.empty(); + private Optional updateVoterHandlerState = Optional.empty(); + private final KafkaRaftMetrics kafkaRaftMetrics; - /** - * Constructs a new ChangeVoterHandlerState. - * - * @param kafkaRaftMetrics the metrics instance to update when voter change state changes - */ public ChangeVoterHandlerState(KafkaRaftMetrics kafkaRaftMetrics) { this.kafkaRaftMetrics = kafkaRaftMetrics; } @@ -61,29 +60,23 @@ public Optional addVoterHandlerState() { *

* If an add voter handler state already exists, its future will be completed with the * provided error and message before being replaced. If the new state is non-empty and a - * remove voter handler state is currently present, this method throws an IllegalStateException - * to enforce mutual exclusivity. + * remove voter or update voter handler state is currently present, this method throws an + * IllegalStateException to enforce mutual exclusivity. * * @param error the error to complete any existing add voter operation with * @param message the error message to include in the response, or null for no message * @param state the new add voter handler state, or empty to clear the state * @throws IllegalStateException if attempting to set a non-empty add voter state while a - * remove voter state is already present + * remove voter or update voter state is already present */ public void resetAddVoterHandlerState( Errors error, String message, Optional state ) { - if (state.isPresent() && removeVoterHandlerState.isPresent()) { - throw new IllegalStateException( - "Cannot set add voter handler state when remove voter handler state is already present" - ); - } + validateMutualExclusivity(state, removeVoterHandlerState, updateVoterHandlerState); addVoterHandlerState.ifPresent( - handlerState -> handlerState - .future() - .complete(RaftUtil.addVoterResponse(error, message)) + handlerState -> handlerState.completeFuture(RaftUtil.addVoterResponse(error, message)) ); addVoterHandlerState = state; updateUncommittedVoterChangeMetric(); @@ -104,37 +97,112 @@ public Optional removeVoterHandlerState() { *

* If a remove voter handler state already exists, its future will be completed with the * provided error and message before being replaced. If the new state is non-empty and an - * add voter handler state is currently present, this method throws an IllegalStateException - * to enforce mutual exclusivity. + * add voter or update voter handler state is currently present, this method throws an + * IllegalStateException to enforce mutual exclusivity. * * @param error the error to complete any existing remove voter operation with * @param message the error message to include in the response, or null for no message * @param state the new remove voter handler state, or empty to clear the state * @throws IllegalStateException if attempting to set a non-empty remove voter state while an - * add voter state is already present + * add voter or update voter state is already present */ public void resetRemoveVoterHandlerState( Errors error, String message, Optional state ) { - if (state.isPresent() && addVoterHandlerState.isPresent()) { - throw new IllegalStateException( - "Cannot set remove voter handler state when add voter handler state is already present" - ); - } + validateMutualExclusivity(addVoterHandlerState, state, updateVoterHandlerState); removeVoterHandlerState.ifPresent( - handlerState -> handlerState - .future() - .complete(RaftUtil.removeVoterResponse(error, message)) + handlerState -> handlerState.completeFuture(RaftUtil.removeVoterResponse(error, message)) ); removeVoterHandlerState = state; updateUncommittedVoterChangeMetric(); } + /** + * Returns the current update voter handler state, if one exists. + * + * @return an Optional containing the update voter handler state, or empty if no update voter + * operation is pending + */ + public Optional updateVoterHandlerState() { + return updateVoterHandlerState; + } + + /** + * Resets the update voter handler state to the specified state. + *

+ * If an update voter handler state already exists, its future will be completed with the + * provided error before being replaced. If the new state is non-empty and an add voter or + * remove voter handler state is currently present, this method throws an IllegalStateException + * to enforce mutual exclusivity. + * + * @param error the error to complete any existing update voter operation with + * @param leaderAndEpoch the current leader and epoch information + * @param leaderEndpoints the current leader endpoints + * @param state the new update voter handler state, or empty to clear the state + * @throws IllegalStateException if attempting to set a non-empty update voter state while an + * add voter or remove voter state is already present + */ + public void resetUpdateVoterHandlerState( + Errors error, + LeaderAndEpoch leaderAndEpoch, + Endpoints leaderEndpoints, + Optional state + ) { + validateMutualExclusivity(addVoterHandlerState, removeVoterHandlerState, state); + updateVoterHandlerState.ifPresent( + handlerState -> handlerState.completeFuture( + RaftUtil.updateVoterResponse( + error, + handlerState.requestListenerName(), + leaderAndEpoch, + leaderEndpoints + ) + ) + ); + updateVoterHandlerState = state; + updateUncommittedVoterChangeMetric(); + } + + /** + * Validates that at most one voter change operation is active. + *

+ * This enforces mutual exclusivity between add, remove, and update voter operations. + * + * @param newAdd the new add voter state being set (if any) + * @param newRemove the new remove voter state being set (if any) + * @param newUpdate the new update voter state being set (if any) + * @throws IllegalStateException if more than one operation would be active + */ + private void validateMutualExclusivity( + Optional newAdd, + Optional newRemove, + Optional newUpdate + ) { + int activeCount = 0; + if (newAdd.isPresent()) activeCount++; + if (newRemove.isPresent()) activeCount++; + if (newUpdate.isPresent()) activeCount++; + + if (activeCount > 1) { + throw new IllegalStateException( + String.format( + "Cannot have multiple voter change operations active simultaneously: " + + "add=%s, remove=%s, update=%s", + newAdd.isPresent(), + newRemove.isPresent(), + newUpdate.isPresent() + ) + ); + } + } + private void updateUncommittedVoterChangeMetric() { kafkaRaftMetrics.updateUncommittedVoterChange( - addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent() + addVoterHandlerState.isPresent() || + removeVoterHandlerState.isPresent() || + updateVoterHandlerState.isPresent() ); } @@ -150,7 +218,11 @@ private void updateUncommittedVoterChangeMetric() { * @return the time in milliseconds until the next operation expires, or Long.MAX_VALUE if * no operations are pending */ - public long maybeExpirePendingOperation(long currentTimeMs) { + public long maybeExpirePendingOperation( + LeaderAndEpoch leaderAndEpoch, + Endpoints leaderEndpoints, + long currentTimeMs + ) { // First abort any expired operations long timeUntilAddVoterExpiration = addVoterHandlerState() .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) @@ -168,41 +240,82 @@ public long maybeExpirePendingOperation(long currentTimeMs) { resetRemoveVoterHandlerState(Errors.REQUEST_TIMED_OUT, null, Optional.empty()); } + long timeUntilUpdateVoterExpiration = updateVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE); + + if (timeUntilUpdateVoterExpiration == 0) { + resetUpdateVoterHandlerState( + Errors.REQUEST_TIMED_OUT, + leaderAndEpoch, + leaderEndpoints, + Optional.empty() + ); + } + // Reread the timeouts and return the smaller of them return Math.min( addVoterHandlerState() .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) .orElse(Long.MAX_VALUE), - removeVoterHandlerState() - .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) - .orElse(Long.MAX_VALUE) + Math.min( + removeVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE), + updateVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE) + ) ); } /** - * Resets all pending voter handler states, completing their futures with the specified error. + * Resets all pending voter handler states with the given error. *

- * This method clears both add voter and remove voter handler states if they exist. Each - * pending operation's future is completed with the provided error. + * This method completes the futures of any pending add voter, remove voter, and update voter + * operations with the provided error. * - * @param error the error to complete any pending operations with + * @param error the error to complete any existing operations with + * @param leaderAndEpoch the current leader and epoch information + * @param leaderEndpoints the current leader endpoints */ - public void maybeResetPendingVoterHandlerState(Errors error) { + public void maybeResetPendingVoterHandlerState( + Errors error, + LeaderAndEpoch leaderAndEpoch, + Endpoints leaderEndpoints + ) { resetAddVoterHandlerState(error, null, Optional.empty()); resetRemoveVoterHandlerState(error, null, Optional.empty()); + resetUpdateVoterHandlerState(error, leaderAndEpoch, leaderEndpoints, Optional.empty()); } /** * Checks whether any voter change operation is currently pending. *

- * This method first expires any operations that have timed out at the given timestamp, - * then returns true if either an add voter or remove voter operation remains pending. + * This method first expires any operations that have timed out, then checks if any + * add voter, remove voter, or update voter operations remain active. * + * @param leaderAndEpoch the current leader and epoch information + * @param leaderEndpoints the current leader endpoints * @param currentTimeMs the current time in milliseconds - * @return true if a voter change operation is pending, false otherwise + * @return true if any voter change operation is pending, false otherwise */ - public boolean isOperationPending(long currentTimeMs) { - maybeExpirePendingOperation(currentTimeMs); - return addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent(); + public boolean isOperationPending( + LeaderAndEpoch leaderAndEpoch, + Endpoints leaderEndpoints, + long currentTimeMs + ) { + maybeExpirePendingOperation(leaderAndEpoch, leaderEndpoints, currentTimeMs); + return addVoterHandlerState.isPresent() || removeVoterHandlerState.isPresent() || updateVoterHandlerState.isPresent(); + } + + @Override + public String toString() { + return String.format( + "ChangeVoterHandlerState(addVoterHandlerState=%s, removeVoterHandlerState=%s, updateVoterHandlerState=%s)", + addVoterHandlerState, + removeVoterHandlerState, + updateVoterHandlerState + ); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java index 713221ceeeacd..baded5ce9952a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java @@ -82,7 +82,13 @@ public CompletionStage handleRemoveVoterRequest( ) { var changeVoterState = leaderState.changeVoterState(); // Check if there are any pending voter change requests - if (changeVoterState.isOperationPending(currentTimeMs)) { + if ( + changeVoterState.isOperationPending( + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + currentTimeMs + ) + ) { return CompletableFuture.completedFuture( RaftUtil.removeVoterResponse( Errors.REQUEST_TIMED_OUT, diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java index bb9ef4cb2cc89..51683a4f93586 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Timer; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; public final class RemoveVoterHandlerState { private final long lastOffset; @@ -36,11 +37,20 @@ public long timeUntilOperationExpiration(long currentTimeMs) { return timeout.remainingMs(); } - public CompletableFuture future() { - return future; + /** + * Completes the future with the provided response. + * + * @param response the response to complete the future with + */ + public void completeFuture(RemoveRaftVoterResponseData response) { + future.complete(response); } public long lastOffset() { return lastOffset; } + + CompletionStage future() { + return future; + } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java index 962e35c6cb4b6..bc3cd5c459847 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java @@ -16,11 +16,16 @@ */ package org.apache.kafka.raft.internals; +import org.apache.kafka.common.Node; import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.ApiVersionsRequestData; +import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.UpdateRaftVoterRequestData; import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiVersionsRequest; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.internals.LogContext; import org.apache.kafka.raft.Endpoints; import org.apache.kafka.raft.LeaderState; @@ -55,17 +60,20 @@ */ public final class UpdateVoterHandler { private final KRaftControlRecordStateMachine partitionState; - private final ListenerName defaultListenerName; - private final Logger log; + private final RequestSender requestSender; + private final Time time; + private final Logger logger; public UpdateVoterHandler( KRaftControlRecordStateMachine partitionState, - ListenerName defaultListenerName, + RequestSender requestSender, + Time time, LogContext logContext ) { this.partitionState = partitionState; - this.defaultListenerName = defaultListenerName; - this.log = logContext.logger(getClass()); + this.requestSender = requestSender; + this.time = time; + this.logger = logContext.logger(getClass()); } public CompletionStage handleUpdateVoterRequest( @@ -76,8 +84,15 @@ public CompletionStage handleUpdateVoterRequest( UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions, long currentTimeMs ) { + var changeVoterState = leaderState.changeVoterState(); // Check if there are any pending voter change requests - if (leaderState.changeVoterState().isOperationPending(currentTimeMs)) { + if ( + changeVoterState.isOperationPending( + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + currentTimeMs + ) + ) { return CompletableFuture.completedFuture( RaftUtil.updateVoterResponse( Errors.REQUEST_TIMED_OUT, @@ -101,50 +116,20 @@ public CompletionStage handleUpdateVoterRequest( ); } - // Read the voter set from the log or leader state - KRaftVersion kraftVersion = partitionState.lastKraftVersion(); - final Optional inMemoryVoters; - final Optional voters; - if (kraftVersion.isReconfigSupported()) { - inMemoryVoters = Optional.empty(); - - // Check that there are no uncommitted VotersRecord - Optional> votersEntry = partitionState.lastVoterSetEntry(); - if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) { - voters = Optional.empty(); - } else { - voters = votersEntry.map(LogHistory.Entry::value); - } - } else { - inMemoryVoters = leaderState.volatileVoters(); - if (inMemoryVoters.isEmpty()) { - /* This can happen if the remote voter sends an update voter request before the - * updated kraft version has been written to the log - */ - return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.REQUEST_TIMED_OUT, - requestListenerName, - leaderState.leaderAndEpoch(), - leaderState.leaderEndpoints() - ) - ); - } - voters = inMemoryVoters.map(KRaftVersionUpgrade.Voters::voters); - } - if (voters.isEmpty()) { - log.info("Unable to read the current voter set with kraft version {}", kraftVersion); + // Check that the supported version range is valid + if (!validVersionRange(partitionState.lastKraftVersion(), supportedKraftVersions)) { return CompletableFuture.completedFuture( RaftUtil.updateVoterResponse( - Errors.REQUEST_TIMED_OUT, + Errors.INVALID_REQUEST, requestListenerName, leaderState.leaderAndEpoch(), leaderState.leaderEndpoints() ) ); } - // Check that the supported version range is valid - if (!validVersionRange(kraftVersion, supportedKraftVersions)) { + + // Check that endpoints includes the default listener + if (voterEndpoints.address(requestSender.listenerName()).isEmpty()) { return CompletableFuture.completedFuture( RaftUtil.updateVoterResponse( Errors.INVALID_REQUEST, @@ -155,11 +140,27 @@ public CompletionStage handleUpdateVoterRequest( ); } - // Check that endpoints includes the default listener - if (voterEndpoints.address(defaultListenerName).isEmpty()) { + // Send API_VERSIONS request to new voter to test new default endpoint + var timeout = requestSender.send( + voterEndpoints + .address(requestSender.listenerName()) + .map(address -> new Node(voterKey.id(), address.getHostName(), address.getPort())) + .orElseThrow( + () -> new IllegalStateException( + String.format( + "Provided listeners %s do not contain a listener for %s", + voterEndpoints, + requestSender.listenerName() + ) + ) + ), + this::buildApiVersionsRequest, + currentTimeMs + ); + if (timeout.isEmpty()) { return CompletableFuture.completedFuture( RaftUtil.updateVoterResponse( - Errors.INVALID_REQUEST, + Errors.REQUEST_TIMED_OUT, requestListenerName, leaderState.leaderAndEpoch(), leaderState.leaderEndpoints() @@ -167,38 +168,165 @@ public CompletionStage handleUpdateVoterRequest( ); } + var state = new UpdateVoterHandlerState( + voterKey, + voterEndpoints, + requestListenerName, + new SupportedVersionRange( + supportedKraftVersions.minSupportedVersion(), + supportedKraftVersions.maxSupportedVersion() + ), + time.timer(timeout.getAsLong()) + ); + changeVoterState.resetUpdateVoterHandlerState( + Errors.UNKNOWN_SERVER_ERROR, + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + Optional.of(state) + ); + + return state.future(); + } + + /** + * Handle the API_VERSIONS response for an update voter operation. + * + * @param leaderState the leader state + * @param source the node that sent the response + * @param error the error from the response + * @param supportedKraftVersions the supported kraft version range from the response + * @param currentTimeMs the current time in milliseconds + * @return true if the update voter operation should continue, false if it was aborted + */ + public boolean handleApiVersionsResponse( + LeaderState leaderState, + Node source, + Errors error, + Optional supportedKraftVersions, + long currentTimeMs + ) { + var changeVoterState = leaderState.changeVoterState(); + var handlerState = changeVoterState.updateVoterHandlerState(); + if (handlerState.isEmpty()) { + // There are no pending add operation just ignore the api response + return true; + } + + // Check that the API_VERSIONS response matches the id of the voter getting added + var current = handlerState.get(); + if (!current.expectingApiResponse(source.id())) { + logger.info( + "API_VERSIONS response is not expected from {}: voterKey is {}, lastOffset is {}", + source, + current.voterKey(), + current.lastOffset() + ); + + return true; + } else if (error != Errors.NONE) { + // Abort operation if the API_VERSIONS returned an error + logger.info( + "Aborting update voter operation for {} at {} since API_VERSIONS returned an error {}", + current.voterKey(), + current.voterEndpoints(), + error + ); + + changeVoterState.resetUpdateVoterHandlerState( + Errors.REQUEST_TIMED_OUT, + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + Optional.empty() + ); + + return false; + } else if ( + !Optional.of(current.supportedKraftVersions()) + .equals(supportedKraftVersions.map(this::convertToVersionRange)) + ) { + // Check that the supported version from the ApiVersions response matches the supported + // version from the UpdateVoter requet + logger.error( + "The supported kraft version from UpdateVoters {} doesn't match the supported " + + "kraft version from ApiVersions {}", + current.supportedKraftVersions(), + supportedKraftVersions + ); + changeVoterState.resetUpdateVoterHandlerState( + Errors.INVALID_REQUEST, + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + Optional.empty() + ); + return true; + } + + // Check that the leader has established a HWM and committed the current epoch + Optional highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset); + if (highWatermark.isEmpty()) { + // This cannot happen because the update voter request handler already validated that + // the HWMN is known + throw new IllegalStateException("Expected the high-watermark to be known"); + } + + // Read the voter set from the log or leader state + KRaftVersion kraftVersion = partitionState.lastKraftVersion(); + final Optional inMemoryVoters; + final Optional voters; + if (kraftVersion.isReconfigSupported()) { + inMemoryVoters = Optional.empty(); + + // Check that there are no uncommitted VotersRecord + Optional> votersEntry = partitionState.lastVoterSetEntry(); + if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) { + voters = Optional.empty(); + } else { + voters = votersEntry.map(LogHistory.Entry::value); + } + } else { + inMemoryVoters = leaderState.volatileVoters(); + voters = inMemoryVoters.map(KRaftVersionUpgrade.Voters::voters); + } + if (voters.isEmpty()) { + logger.info("Unable to read the current voter set with kraft version {}", kraftVersion); + changeVoterState.resetUpdateVoterHandlerState( + Errors.REQUEST_TIMED_OUT, + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + Optional.empty() + ); + return true; + } + // Update the voter Optional updatedVoters = updateVoters( voters.get(), kraftVersion, VoterSet.VoterNode.of( - voterKey, - voterEndpoints, - new SupportedVersionRange( - supportedKraftVersions.minSupportedVersion(), - supportedKraftVersions.maxSupportedVersion() - ) + current.voterKey(), + current.voterEndpoints(), + current.supportedKraftVersions() ) ); if (updatedVoters.isEmpty()) { - return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.VOTER_NOT_FOUND, - requestListenerName, - leaderState.leaderAndEpoch(), - leaderState.leaderEndpoints() - ) + changeVoterState.resetUpdateVoterHandlerState( + Errors.VOTER_NOT_FOUND, + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + Optional.empty() ); + + return true; } - return storeUpdatedVoters( + storeUpdatedVoters( leaderState, - voterKey, + current, inMemoryVoters, updatedVoters.get(), - requestListenerName, currentTimeMs ); + return true; } private boolean validVersionRange( @@ -219,17 +347,30 @@ private Optional updateVoters( voters.updateVoterIgnoringDirectoryId(updatedVoter); } - private CompletionStage storeUpdatedVoters( + private void storeUpdatedVoters( LeaderState leaderState, - ReplicaKey voterKey, + UpdateVoterHandlerState current, Optional inMemoryVoters, VoterSet newVoters, - ListenerName requestListenerName, long currentTimeMs ) { + var changeVoterState = leaderState.changeVoterState(); + if (inMemoryVoters.isEmpty()) { - // Since the partition support reconfig then just write the update voter set directly to the log - leaderState.appendVotersRecord(newVoters, currentTimeMs); + /* Since the partition support reconfig then just write the update voter set directly to the log. + * + * Complete the RPC but don't reset the handler state. This allows the followr to send a FETCH + * request and help to commit the voter set change. + */ + current.setLastOffset(leaderState.appendVotersRecord(newVoters, currentTimeMs)); + current.completeFuture( + RaftUtil.updateVoterResponse( + Errors.NONE, + current.requestListenerName(), + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints() + ) + ); } else { // Store the new voters set in the leader state since it cannot be written to the log var successful = leaderState.compareAndSetVolatileVoters( @@ -237,38 +378,76 @@ private CompletionStage storeUpdatedVoters( new KRaftVersionUpgrade.Voters(newVoters) ); if (successful) { - log.info( + logger.info( "Updated in-memory voters from {} to {}", inMemoryVoters.get().voters(), newVoters ); + + // Reset the check quorum state since the leader received a successful request + leaderState.updateCheckQuorumForFollowingVoter(current.voterKey(), currentTimeMs); + + changeVoterState.resetUpdateVoterHandlerState( + Errors.NONE, + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + Optional.empty() + ); } else { - log.info( + logger.info( "Unable to update in-memory voters from {} to {}", inMemoryVoters.get().voters(), newVoters ); - return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.REQUEST_TIMED_OUT, - requestListenerName, - leaderState.leaderAndEpoch(), - leaderState.leaderEndpoints() - ) + + // Fail the pending future if present + changeVoterState.resetUpdateVoterHandlerState( + Errors.REQUEST_TIMED_OUT, + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + Optional.empty() ); } } + } - // Reset the check quorum state since the leader received a successful request - leaderState.updateCheckQuorumForFollowingVoter(voterKey, currentTimeMs); + private ApiVersionsRequestData buildApiVersionsRequest() { + return new ApiVersionsRequest.Builder().build().data(); + } - return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( - Errors.NONE, - requestListenerName, - leaderState.leaderAndEpoch(), - leaderState.leaderEndpoints() - ) + private SupportedVersionRange convertToVersionRange( + ApiVersionsResponseData.SupportedFeatureKey supportedKraftVersions + ) { + return new SupportedVersionRange( + supportedKraftVersions.minVersion(), + supportedKraftVersions.maxVersion() ); } + + /** + * Called when the high watermark is updated to check if any pending update voter operations + * can be completed. + * + * @param leaderState the leader state + * @param highWatermark the new high watermark offset + */ + public void highWatermarkUpdated(LeaderState leaderState, long highWatermark) { + var changeVoterState = leaderState.changeVoterState(); + + changeVoterState + .updateVoterHandlerState() + .ifPresent(current -> { + current.lastOffset().ifPresent(lastOffset -> { + if (highWatermark > lastOffset) { + // VotersRecord with the added voter was committed; complete the RPC + changeVoterState.resetUpdateVoterHandlerState( + Errors.NONE, + leaderState.leaderAndEpoch(), + leaderState.leaderEndpoints(), + Optional.empty() + ); + } + }); + }); + } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java new file mode 100644 index 0000000000000..8bf810bcbf23a --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.ReplicaKey; + +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public final class UpdateVoterHandlerState { + private final ReplicaKey voterKey; + private final Endpoints voterEndpoints; + private final ListenerName requestListenerName; + private final SupportedVersionRange supportedKraftVersions; + private final Timer timeout; + private final CompletableFuture future = new CompletableFuture<>(); + + private OptionalLong lastOffset = OptionalLong.empty(); + + UpdateVoterHandlerState( + ReplicaKey voterKey, + Endpoints voterEndpoints, + ListenerName requestListenerName, + SupportedVersionRange supportedKraftVersions, + Timer timeout + ) { + this.voterKey = voterKey; + this.voterEndpoints = voterEndpoints; + this.requestListenerName = requestListenerName; + this.supportedKraftVersions = supportedKraftVersions; + this.timeout = timeout; + } + + /** + * Returns the time in milliseconds until this operation expires. + * + * @param currentTimeMs the current time in milliseconds + * @return the remaining time in milliseconds until expiration + */ + public long timeUntilOperationExpiration(long currentTimeMs) { + timeout.update(currentTimeMs); + return timeout.remainingMs(); + } + + /** + * Checks whether this handler state is expecting an API_VERSIONS response from the given replica. + * + * @param replicaId the replica id to check + * @return true if expecting a response from this replica, false otherwise + */ + public boolean expectingApiResponse(int replicaId) { + return lastOffset.isEmpty() && replicaId == voterKey.id(); + } + + /** + * Sets the last offset for this update voter operation. + * + * @param lastOffset the offset of the VotersRecord that was appended to the log + * @throws IllegalStateException if the last offset has already been set + */ + public void setLastOffset(long lastOffset) { + if (this.lastOffset.isPresent()) { + throw new IllegalStateException( + String.format( + "Cannot override last offset to %s for adding voter %s because it is " + + "already set to %s", + lastOffset, + voterKey, + this.lastOffset + ) + ); + } + + this.lastOffset = OptionalLong.of(lastOffset); + } + + /** + * Returns the voter key for the voter being updated. + * + * @return the voter key + */ + public ReplicaKey voterKey() { + return voterKey; + } + + /** + * Returns the endpoints for the voter being updated. + * + * @return the voter endpoints + */ + public Endpoints voterEndpoints() { + return voterEndpoints; + } + + /** + * Returns the listener name from the update voter request. + * + * @return the listener name + */ + public ListenerName requestListenerName() { + return requestListenerName; + } + + /** + * Returns the kraft version range supported by the voter being updated. + * + * @return the supported kraft version range + */ + public SupportedVersionRange supportedKraftVersions() { + return supportedKraftVersions; + } + + /** + * Returns the offset of the VotersRecord if it has been appended to the log. + * + * @return the last offset, or empty if not yet appended + */ + public OptionalLong lastOffset() { + return lastOffset; + } + + /** + * Completes the future with the provided response. + * + * @param response the response to complete the future with + */ + public void completeFuture(UpdateRaftVoterResponseData response) { + future.complete(response); + } + + CompletionStage future() { + return future; + } +} diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index 08f3d63833b68..0a3812cab445e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -350,7 +350,7 @@ public void testAddVoter() throws Exception { // Attempt to add new voter to the quorum context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); - completeApiVersionsForAddVoter(context, newVoter, newAddress); + completeApiVersions(context, newVoter, newAddress); // Handle the API_VERSIONS response context.poll(); @@ -402,7 +402,7 @@ void testAddVoterCompletesEarlyWithAckWhenCommittedFalse() throws Exception { ).setAckWhenCommitted(false) ); - completeApiVersionsForAddVoter(context, newVoter, newAddress); + completeApiVersions(context, newVoter, newAddress); // Handle the API_VERSIONS response context.poll(); @@ -442,16 +442,16 @@ private void prepareLeaderToReceiveAddVoter( checkLeaderMetricValues(2, 1, 0, context); } - private void completeApiVersionsForAddVoter( + private void completeApiVersions( RaftClientTestContext context, - ReplicaKey newVoter, - InetSocketAddress newAddress + ReplicaKey remoteVoter, + InetSocketAddress remoteAddress ) throws Exception { - // Leader should send an API_VERSIONS request to the new voter's endpoint + // Leader should send an API_VERSIONS request to the remote voter's endpoint context.pollUntilRequest(); RaftRequest.Outbound apiVersionRequest = context.assertSentApiVersionsRequest(); assertEquals( - new Node(newVoter.id(), newAddress.getHostString(), newAddress.getPort()), + new Node(remoteVoter.id(), remoteAddress.getHostString(), remoteAddress.getPort()), apiVersionRequest.destination() ); @@ -1684,6 +1684,8 @@ void testUpdateVoter() throws Exception { ) ); + completeApiVersions(context, follower, defaultAddress); + // Expect reply for UpdateVoter request without committing the record context.pollUntilResponse(); context.assertSentUpdateVoterResponse( @@ -1696,6 +1698,172 @@ void testUpdateVoter() throws Exception { assertTrue(context.client.quorum().isVoter(follower)); } + @Test + void testUpdateVoterWithUnreachableListener() throws Exception { + ReplicaKey local = replicaKey(randomReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withRaftProtocol(RaftProtocol.KIP_1186_PROTOCOL) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.unattachedToLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update voter with new listeners + context.deliverRequest( + context.updateVoterRequest( + follower, + Feature.KRAFT_VERSION.supportedVersionRange(), + voters.listeners(follower.id()) + ) + ); + + // Leader sends API_VERSIONS to verify reachability + context.pollUntilRequest(); + context.assertSentApiVersionsRequest(); + + // Simulate unreachable voter by not responding and letting it timeout + context.time.sleep(context.requestTimeoutMs()); + context.pollUntilResponse(); + + // UpdateVoter should fail since voter is unreachable + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); + + // Voter set should not be updated + assertEquals(voters, context.listener.lastCommittedVoterSet().get()); + } + + @Test + void testUpdateVoterWithBrokerNotAvailable() throws Exception { + ReplicaKey local = replicaKey(randomReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved("localhost", 9990 + follower.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withRaftProtocol(RaftProtocol.KIP_1186_PROTOCOL) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.unattachedToLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update voter with new listeners + context.deliverRequest( + context.updateVoterRequest( + follower, + Feature.KRAFT_VERSION.supportedVersionRange(), + voters.listeners(follower.id()) + ) + ); + + // Leader sends API_VERSIONS to verify reachability + context.pollUntilRequest(); + RaftRequest.Outbound apiVersionRequest = context.assertSentApiVersionsRequest(); + assertEquals( + new Node(follower.id(), defaultAddress.getHostString(), defaultAddress.getPort()), + apiVersionRequest.destination() + ); + + // Reply with API_VERSIONS response indicating broker not available + context.deliverResponse( + apiVersionRequest.correlationId(), + apiVersionRequest.destination(), + apiVersionsResponse(Errors.BROKER_NOT_AVAILABLE) + ); + context.pollUntilResponse(); + + // UpdateVoter should fail since voter is not available + context.assertSentUpdateVoterResponse( + Errors.REQUEST_TIMED_OUT, + OptionalInt.of(local.id()), + epoch + ); + + // Voter set should not be updated + assertEquals(voters, context.listener.lastCommittedVoterSet().get()); + } + + @Test + void testUpdateVoterWithUnsupportedKraftVersion() throws Exception { + ReplicaKey local = replicaKey(randomReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved("localhost", 9990 + follower.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withRaftProtocol(RaftProtocol.KIP_1186_PROTOCOL) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.unattachedToLeader(); + int epoch = context.currentEpoch(); + + // Establish a HWM + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to update voter claiming support for kraft.version 1 + context.deliverRequest( + context.updateVoterRequest( + follower, + new SupportedVersionRange((short) 0, (short) 1), + voters.listeners(follower.id()) + ) + ); + + // Leader sends API_VERSIONS to verify the voter's capabilities + context.pollUntilRequest(); + RaftRequest.Outbound apiVersionRequest = context.assertSentApiVersionsRequest(); + assertEquals( + new Node(follower.id(), defaultAddress.getHostString(), defaultAddress.getPort()), + apiVersionRequest.destination() + ); + + // Reply with API_VERSIONS showing only kraft.version 0 support + context.deliverResponse( + apiVersionRequest.correlationId(), + apiVersionRequest.destination(), + apiVersionsResponse(Errors.NONE, new SupportedVersionRange((short) 0)) + ); + context.pollUntilResponse(); + context.assertSentUpdateVoterResponse( + Errors.INVALID_REQUEST, + OptionalInt.of(local.id()), + epoch + ); + } + @Test void testLeaderUpdatesVoter() throws Exception { ReplicaKey local = replicaKey(randomReplicaId(), true); @@ -2023,6 +2191,9 @@ void testUpdateVoterWithKraftVersion0() throws Exception { newListeners ) ); + + completeApiVersions(context, follower, defaultAddress); + context.pollUntilResponse(); context.assertSentUpdateVoterResponse( Errors.NONE, @@ -2067,13 +2238,17 @@ void testUpdateVoterWithNoneVoter() throws Exception { listenersMap.put(context.channel.listenerName(), defaultAddress); listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + var notVoter = replicaKey(follower.id(), true); context.deliverRequest( context.updateVoterRequest( - replicaKey(follower.id(), true), + notVoter, Feature.KRAFT_VERSION.supportedVersionRange(), newListeners ) ); + + completeApiVersions(context, notVoter, defaultAddress); + context.pollUntilResponse(); context.assertSentUpdateVoterResponse( Errors.VOTER_NOT_FOUND, @@ -2118,13 +2293,17 @@ void testUpdateVoterWithNoneVoterId() throws Exception { listenersMap.put(context.channel.listenerName(), defaultAddress); listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), newAddress); Endpoints newListeners = Endpoints.fromInetSocketAddresses(listenersMap); + var notVoter = ReplicaKey.of(follower.id() + 1, follower.directoryId().get()); context.deliverRequest( context.updateVoterRequest( - ReplicaKey.of(follower.id() + 1, follower.directoryId().get()), + notVoter, Feature.KRAFT_VERSION.supportedVersionRange(), newListeners ) ); + + completeApiVersions(context, notVoter, defaultAddress); + context.pollUntilResponse(); context.assertSentUpdateVoterResponse( Errors.VOTER_NOT_FOUND, @@ -2442,6 +2621,13 @@ void testKRaftUpgradeVersion() throws Exception { startingVoters.listeners(voter.id()) ) ); + + completeApiVersions( + context, + voter, + startingVoters.listeners(voter.id()).address(context.channel.listenerName()).get() + ); + context.pollUntilResponse(); context.assertSentUpdateVoterResponse( Errors.NONE, @@ -2513,6 +2699,13 @@ void testUpdateVoterAfterKRaftVersionUpgrade() throws Exception { startingVoters.listeners(voter.id()) ) ); + + completeApiVersions( + context, + voter, + startingVoters.listeners(voter.id()).address(context.channel.listenerName()).get() + ); + context.pollUntilResponse(); context.assertSentUpdateVoterResponse( Errors.NONE, @@ -2559,6 +2752,13 @@ void testUpdateVoterAfterKRaftVersionUpgrade() throws Exception { newVoter1Listeners ) ); + + completeApiVersions( + context, + voter1, + newVoter1Listeners.address(context.channel.listenerName()).get() + ); + context.pollUntilResponse(); context.assertSentUpdateVoterResponse( Errors.NONE, @@ -2626,6 +2826,13 @@ void testInvalidKRaftUpgradeVersion() throws Exception { startingVoters.listeners(voter1.id()) ) ); + + completeApiVersions( + context, + voter1, + startingVoters.listeners(voter1.id()).address(context.channel.listenerName()).get() + ); + context.pollUntilResponse(); context.assertSentUpdateVoterResponse( Errors.NONE, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 7b79ec9e2dee7..4ad4dd7e618b8 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -20,14 +20,19 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.feature.Features; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.protocol.Writable; import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.internals.BufferSupplier; @@ -36,6 +41,7 @@ import org.apache.kafka.raft.MockLog.LogEntry; import org.apache.kafka.raft.internals.BatchMemoryPool; import org.apache.kafka.server.common.Feature; +import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; @@ -63,6 +69,8 @@ import java.util.PriorityQueue; import java.util.Random; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -975,6 +983,40 @@ void poll() { } } + CompletionStage handle(RaftRequest.Inbound request) { + var apiKey = ApiKeys.forId(request.data().apiKey()); + switch (apiKey) { + case API_VERSIONS: + // KRaft doesn't handle ApiVersions request. These are handle by the "node". + // This creates a simple implementation that simply replies with the + // supported kraft versions. + var apiVersionsResponse = new ApiVersionsResponse.Builder() + .setSupportedFeatures( + Features.supportedFeatures( + Map.of( + KRaftVersion.FEATURE_NAME, + new SupportedVersionRange( + KRaftVersion.KRAFT_VERSION_0.featureLevel(), + KRaftVersion.LATEST_PRODUCTION.featureLevel() + ) + ) + ) + ) + .setApiVersions(new ApiVersionsResponseData.ApiVersionCollection()) + .setFinalizedFeatures(Map.of()) + .build() + .data(); + var apiVersions = new RaftResponse.Outbound( + request.correlationId(), + apiVersionsResponse + ); + return CompletableFuture.completedFuture(apiVersions); + + default: + return client.handle(request); + } + } + long highWatermark() { return client.quorum().highWatermark() .map(LogOffsetMetadata::offset) @@ -1405,7 +1447,7 @@ void deliver(int senderId, RaftRequest.Outbound outbound) { cluster.nodeIfRunning(destination.id()).ifPresent(node -> { inflight.put(correlationId, new InflightRequest(senderId, destination)); - node.client.handle(inbound).whenComplete((response, exception) -> { + node.handle(inbound).whenComplete((response, exception) -> { if (response != null && filters.get(destination.id()).acceptOutbound(response)) { deliver(response); } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/AddVoterHandlerStateTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/AddVoterHandlerStateTest.java new file mode 100644 index 0000000000000..ddfc6c2139fb6 --- /dev/null +++ b/raft/src/test/java/org/apache/kafka/raft/internals/AddVoterHandlerStateTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.message.AddRaftVoterResponseData; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.ReplicaKey; + +import org.junit.jupiter.api.Test; + +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class AddVoterHandlerStateTest { + + @Test + public void testSetLastOffsetOnce() { + var time = new MockTime(); + var state = new AddVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + true, + time.timer(1000) + ); + + assertTrue(state.lastOffset().isEmpty()); + state.setLastOffset(100L); + assertEquals(OptionalLong.of(100L), state.lastOffset()); + } + + @Test + public void testCannotOverrideLastOffset() { + var time = new MockTime(); + var state = new AddVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + true, + time.timer(1000) + ); + + state.setLastOffset(100L); + assertThrows(IllegalStateException.class, () -> state.setLastOffset(200L)); + } + + @Test + public void testExpectingApiResponseBeforeLastOffset() { + var time = new MockTime(); + var state = new AddVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + true, + time.timer(1000) + ); + + assertTrue(state.expectingApiResponse(1)); + assertFalse(state.expectingApiResponse(2)); + + state.setLastOffset(100L); + assertFalse(state.expectingApiResponse(1)); + } + + @Test + public void testTimeUntilExpiration() { + var time = new MockTime(); + var state = new AddVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + true, + time.timer(1000) + ); + + assertEquals(1000, state.timeUntilOperationExpiration(time.milliseconds())); + time.sleep(500); + assertEquals(500, state.timeUntilOperationExpiration(time.milliseconds())); + time.sleep(500); + assertEquals(0, state.timeUntilOperationExpiration(time.milliseconds())); + } + + @Test + public void testCompleteFuture() { + var time = new MockTime(); + var state = new AddVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + true, + time.timer(1000) + ); + + var future = state.future().toCompletableFuture(); + assertFalse(future.isDone()); + + var response = new AddRaftVoterResponseData().setErrorCode((short) 0); + state.completeFuture(response); + + assertTrue(future.isDone()); + assertEquals(response, future.join()); + } + + @Test + public void testGetters() { + var time = new MockTime(); + var voterKey = ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID); + var endpoints = Endpoints.empty(); + + var state = new AddVoterHandlerState(voterKey, endpoints, true, time.timer(1000)); + + assertEquals(voterKey, state.voterKey()); + assertEquals(endpoints, state.voterEndpoints()); + assertTrue(state.ackWhenCommitted()); + + var stateNoAck = new AddVoterHandlerState(voterKey, endpoints, false, time.timer(1000)); + assertFalse(stateNoAck.ackWhenCommitted()); + } +} diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/ChangeVoterHandlerStateTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/ChangeVoterHandlerStateTest.java index 850f6a8b714a3..1b03a88aac119 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/ChangeVoterHandlerStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/ChangeVoterHandlerStateTest.java @@ -17,13 +17,17 @@ package org.apache.kafka.raft.internals; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.AddRaftVoterResponseData; import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.ReplicaKey; import org.junit.jupiter.api.Test; @@ -62,7 +66,7 @@ public void testAddVoterLifecycle() { false, time.timer(1000) ); - CompletableFuture future = addVoterState.future(); + CompletableFuture future = addVoterState.future().toCompletableFuture(); state.resetAddVoterHandlerState(Errors.NONE, null, Optional.of(addVoterState)); assertTrue(state.addVoterHandlerState().isPresent()); @@ -100,7 +104,7 @@ public void testRemoveVoterLifecycle() { 100L, time.timer(1000) ); - CompletableFuture future = removeVoterState.future(); + CompletableFuture future = removeVoterState.future().toCompletableFuture(); state.resetRemoveVoterHandlerState(Errors.NONE, null, Optional.of(removeVoterState)); assertTrue(state.removeVoterHandlerState().isPresent()); @@ -131,7 +135,7 @@ public void testMaybeExpirePendingAddVoter() { try { ChangeVoterHandlerState state = new ChangeVoterHandlerState(raftMetrics); - assertEquals(Long.MAX_VALUE, state.maybeExpirePendingOperation(time.milliseconds())); + assertEquals(Long.MAX_VALUE, state.maybeExpirePendingOperation(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); AddVoterHandlerState addVoterState = new AddVoterHandlerState( ReplicaKey.of(1, Uuid.randomUuid()), @@ -142,16 +146,17 @@ public void testMaybeExpirePendingAddVoter() { state.resetAddVoterHandlerState(Errors.NONE, null, Optional.of(addVoterState)); time.sleep(500); - assertEquals(500, state.maybeExpirePendingOperation(time.milliseconds())); + assertEquals(500, state.maybeExpirePendingOperation(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); assertTrue(state.addVoterHandlerState().isPresent()); assertEquals(1, getMetric(metrics, "uncommitted-voter-change").metricValue()); time.sleep(500); - assertEquals(Long.MAX_VALUE, state.maybeExpirePendingOperation(time.milliseconds())); + assertEquals(Long.MAX_VALUE, state.maybeExpirePendingOperation(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); assertFalse(state.addVoterHandlerState().isPresent()); assertEquals(0, getMetric(metrics, "uncommitted-voter-change").metricValue()); - assertTrue(addVoterState.future().isDone()); - assertEquals(Errors.REQUEST_TIMED_OUT, Errors.forCode(addVoterState.future().join().errorCode())); + var future = addVoterState.future().toCompletableFuture(); + assertTrue(future.isDone()); + assertEquals(Errors.REQUEST_TIMED_OUT, Errors.forCode(future.join().errorCode())); } finally { raftMetrics.close(); metrics.close(); @@ -175,16 +180,17 @@ public void testMaybeExpirePendingRemoveVoter() { state.resetRemoveVoterHandlerState(Errors.NONE, null, Optional.of(removeVoterState)); time.sleep(500); - assertEquals(500, state.maybeExpirePendingOperation(time.milliseconds())); + assertEquals(500, state.maybeExpirePendingOperation(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); assertTrue(state.removeVoterHandlerState().isPresent()); assertEquals(1, getMetric(metrics, "uncommitted-voter-change").metricValue()); time.sleep(500); - assertEquals(Long.MAX_VALUE, state.maybeExpirePendingOperation(time.milliseconds())); + assertEquals(Long.MAX_VALUE, state.maybeExpirePendingOperation(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); assertFalse(state.removeVoterHandlerState().isPresent()); assertEquals(0, getMetric(metrics, "uncommitted-voter-change").metricValue()); - assertTrue(removeVoterState.future().isDone()); - assertEquals(Errors.REQUEST_TIMED_OUT, Errors.forCode(removeVoterState.future().join().errorCode())); + var future = removeVoterState.future().toCompletableFuture(); + assertTrue(future.isDone()); + assertEquals(Errors.REQUEST_TIMED_OUT, Errors.forCode(future.join().errorCode())); } finally { raftMetrics.close(); metrics.close(); @@ -201,7 +207,7 @@ public void testIsOperationPendingWithAddVoter() { try { ChangeVoterHandlerState state = new ChangeVoterHandlerState(raftMetrics); - assertFalse(state.isOperationPending(time.milliseconds())); + assertFalse(state.isOperationPending(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); assertEquals(0, getMetric(metrics, "uncommitted-voter-change").metricValue()); AddVoterHandlerState addVoterState = new AddVoterHandlerState( @@ -212,11 +218,11 @@ public void testIsOperationPendingWithAddVoter() { ); state.resetAddVoterHandlerState(Errors.NONE, null, Optional.of(addVoterState)); - assertTrue(state.isOperationPending(time.milliseconds())); + assertTrue(state.isOperationPending(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); assertEquals(1, getMetric(metrics, "uncommitted-voter-change").metricValue()); time.sleep(1000); - assertFalse(state.isOperationPending(time.milliseconds())); + assertFalse(state.isOperationPending(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); assertFalse(state.addVoterHandlerState().isPresent()); assertEquals(0, getMetric(metrics, "uncommitted-voter-change").metricValue()); } finally { @@ -235,7 +241,7 @@ public void testIsOperationPendingWithRemoveVoter() { try { ChangeVoterHandlerState state = new ChangeVoterHandlerState(raftMetrics); - assertFalse(state.isOperationPending(time.milliseconds())); + assertFalse(state.isOperationPending(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); RemoveVoterHandlerState removeVoterState = new RemoveVoterHandlerState( 100L, @@ -243,11 +249,11 @@ public void testIsOperationPendingWithRemoveVoter() { ); state.resetRemoveVoterHandlerState(Errors.NONE, null, Optional.of(removeVoterState)); - assertTrue(state.isOperationPending(time.milliseconds())); + assertTrue(state.isOperationPending(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); assertEquals(1, getMetric(metrics, "uncommitted-voter-change").metricValue()); time.sleep(1000); - assertFalse(state.isOperationPending(time.milliseconds())); + assertFalse(state.isOperationPending(LeaderAndEpoch.UNKNOWN, Endpoints.empty(), time.milliseconds())); assertFalse(state.removeVoterHandlerState().isPresent()); assertEquals(0, getMetric(metrics, "uncommitted-voter-change").metricValue()); } finally { @@ -274,12 +280,16 @@ public void testMaybeResetPendingVoterHandlerState() { ); state.resetAddVoterHandlerState(Errors.NONE, null, Optional.of(addVoterState)); - CompletableFuture addFuture = addVoterState.future(); + CompletableFuture addFuture = addVoterState.future().toCompletableFuture(); assertFalse(addFuture.isDone()); assertEquals(1, getMetric(metrics, "uncommitted-voter-change").metricValue()); - state.maybeResetPendingVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER); + state.maybeResetPendingVoterHandlerState( + Errors.NOT_LEADER_OR_FOLLOWER, + LeaderAndEpoch.UNKNOWN, + Endpoints.empty() + ); assertFalse(state.addVoterHandlerState().isPresent()); assertEquals(0, getMetric(metrics, "uncommitted-voter-change").metricValue()); @@ -379,7 +389,7 @@ public void testCanReplaceAddVoterWithAnotherAddVoter() { false, time.timer(1000) ); - CompletableFuture firstFuture = firstAddVoter.future(); + CompletableFuture firstFuture = firstAddVoter.future().toCompletableFuture(); state.resetAddVoterHandlerState(Errors.NONE, null, Optional.of(firstAddVoter)); AddVoterHandlerState secondAddVoter = new AddVoterHandlerState( @@ -415,7 +425,7 @@ public void testCanReplaceRemoveVoterWithAnotherRemoveVoter() { 100L, time.timer(1000) ); - CompletableFuture firstFuture = firstRemoveVoter.future(); + CompletableFuture firstFuture = firstRemoveVoter.future().toCompletableFuture(); state.resetRemoveVoterHandlerState(Errors.NONE, null, Optional.of(firstRemoveVoter)); RemoveVoterHandlerState secondRemoveVoter = new RemoveVoterHandlerState( @@ -434,4 +444,227 @@ public void testCanReplaceRemoveVoterWithAnotherRemoveVoter() { metrics.close(); } } + + @Test + public void testCannotSetAddVoterWhenUpdateVoterPresent() { + MockTime time = new MockTime(); + Metrics metrics = new Metrics(time); + KafkaRaftMetrics raftMetrics = new KafkaRaftMetrics(metrics, "raft"); + raftMetrics.addLeaderMetrics(); + + try { + ChangeVoterHandlerState state = new ChangeVoterHandlerState(raftMetrics); + + UpdateVoterHandlerState updateVoterState = new UpdateVoterHandlerState( + ReplicaKey.of(1, Uuid.randomUuid()), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + state.resetUpdateVoterHandlerState( + Errors.NONE, + LeaderAndEpoch.UNKNOWN, + Endpoints.empty(), + Optional.of(updateVoterState) + ); + assertTrue(state.updateVoterHandlerState().isPresent()); + + AddVoterHandlerState addVoterState = new AddVoterHandlerState( + ReplicaKey.of(2, Uuid.randomUuid()), + Endpoints.empty(), + false, + time.timer(1000) + ); + + assertThrows(IllegalStateException.class, () -> { + state.resetAddVoterHandlerState(Errors.NONE, null, Optional.of(addVoterState)); + }); + + assertTrue(state.updateVoterHandlerState().isPresent()); + assertFalse(state.addVoterHandlerState().isPresent()); + } finally { + raftMetrics.close(); + metrics.close(); + } + } + + @Test + public void testCannotSetRemoveVoterWhenUpdateVoterPresent() { + MockTime time = new MockTime(); + Metrics metrics = new Metrics(time); + KafkaRaftMetrics raftMetrics = new KafkaRaftMetrics(metrics, "raft"); + raftMetrics.addLeaderMetrics(); + + try { + ChangeVoterHandlerState state = new ChangeVoterHandlerState(raftMetrics); + + UpdateVoterHandlerState updateVoterState = new UpdateVoterHandlerState( + ReplicaKey.of(1, Uuid.randomUuid()), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + state.resetUpdateVoterHandlerState( + Errors.NONE, + LeaderAndEpoch.UNKNOWN, + Endpoints.empty(), + Optional.of(updateVoterState) + ); + assertTrue(state.updateVoterHandlerState().isPresent()); + + RemoveVoterHandlerState removeVoterState = new RemoveVoterHandlerState( + 100L, + time.timer(1000) + ); + + assertThrows(IllegalStateException.class, () -> { + state.resetRemoveVoterHandlerState(Errors.NONE, null, Optional.of(removeVoterState)); + }); + + assertTrue(state.updateVoterHandlerState().isPresent()); + assertFalse(state.removeVoterHandlerState().isPresent()); + } finally { + raftMetrics.close(); + metrics.close(); + } + } + + @Test + public void testCannotSetUpdateVoterWhenAddVoterPresent() { + MockTime time = new MockTime(); + Metrics metrics = new Metrics(time); + KafkaRaftMetrics raftMetrics = new KafkaRaftMetrics(metrics, "raft"); + raftMetrics.addLeaderMetrics(); + + try { + ChangeVoterHandlerState state = new ChangeVoterHandlerState(raftMetrics); + + AddVoterHandlerState addVoterState = new AddVoterHandlerState( + ReplicaKey.of(1, Uuid.randomUuid()), + Endpoints.empty(), + false, + time.timer(1000) + ); + state.resetAddVoterHandlerState(Errors.NONE, null, Optional.of(addVoterState)); + assertTrue(state.addVoterHandlerState().isPresent()); + + UpdateVoterHandlerState updateVoterState = new UpdateVoterHandlerState( + ReplicaKey.of(2, Uuid.randomUuid()), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + + assertThrows(IllegalStateException.class, () -> { + state.resetUpdateVoterHandlerState( + Errors.NONE, + LeaderAndEpoch.UNKNOWN, + Endpoints.empty(), + Optional.of(updateVoterState) + ); + }); + + assertTrue(state.addVoterHandlerState().isPresent()); + assertFalse(state.updateVoterHandlerState().isPresent()); + } finally { + raftMetrics.close(); + metrics.close(); + } + } + + @Test + public void testCannotSetUpdateVoterWhenRemoveVoterPresent() { + MockTime time = new MockTime(); + Metrics metrics = new Metrics(time); + KafkaRaftMetrics raftMetrics = new KafkaRaftMetrics(metrics, "raft"); + raftMetrics.addLeaderMetrics(); + + try { + ChangeVoterHandlerState state = new ChangeVoterHandlerState(raftMetrics); + + RemoveVoterHandlerState removeVoterState = new RemoveVoterHandlerState( + 100L, + time.timer(1000) + ); + state.resetRemoveVoterHandlerState(Errors.NONE, null, Optional.of(removeVoterState)); + assertTrue(state.removeVoterHandlerState().isPresent()); + + UpdateVoterHandlerState updateVoterState = new UpdateVoterHandlerState( + ReplicaKey.of(1, Uuid.randomUuid()), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + + assertThrows(IllegalStateException.class, () -> { + state.resetUpdateVoterHandlerState( + Errors.NONE, + LeaderAndEpoch.UNKNOWN, + Endpoints.empty(), + Optional.of(updateVoterState) + ); + }); + + assertTrue(state.removeVoterHandlerState().isPresent()); + assertFalse(state.updateVoterHandlerState().isPresent()); + } finally { + raftMetrics.close(); + metrics.close(); + } + } + + @Test + public void testCanReplaceUpdateVoterWithAnotherUpdateVoter() { + MockTime time = new MockTime(); + Metrics metrics = new Metrics(time); + KafkaRaftMetrics raftMetrics = new KafkaRaftMetrics(metrics, "raft"); + raftMetrics.addLeaderMetrics(); + + try { + ChangeVoterHandlerState state = new ChangeVoterHandlerState(raftMetrics); + + UpdateVoterHandlerState firstUpdateVoter = new UpdateVoterHandlerState( + ReplicaKey.of(1, Uuid.randomUuid()), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + CompletableFuture firstFuture = firstUpdateVoter.future().toCompletableFuture(); + state.resetUpdateVoterHandlerState( + Errors.NONE, + LeaderAndEpoch.UNKNOWN, + Endpoints.empty(), + Optional.of(firstUpdateVoter) + ); + + UpdateVoterHandlerState secondUpdateVoter = new UpdateVoterHandlerState( + ReplicaKey.of(2, Uuid.randomUuid()), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + state.resetUpdateVoterHandlerState( + Errors.OPERATION_NOT_ATTEMPTED, + LeaderAndEpoch.UNKNOWN, + Endpoints.empty(), + Optional.of(secondUpdateVoter) + ); + + assertTrue(state.updateVoterHandlerState().isPresent()); + assertEquals(secondUpdateVoter, state.updateVoterHandlerState().get()); + assertFalse(state.addVoterHandlerState().isPresent()); + assertFalse(state.removeVoterHandlerState().isPresent()); + assertTrue(firstFuture.isDone()); + assertEquals(Errors.OPERATION_NOT_ATTEMPTED, Errors.forCode(firstFuture.join().errorCode())); + } finally { + raftMetrics.close(); + metrics.close(); + } + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RemoveVoterHandlerStateTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RemoveVoterHandlerStateTest.java new file mode 100644 index 0000000000000..7ff5baa90cbd2 --- /dev/null +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RemoveVoterHandlerStateTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.utils.MockTime; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class RemoveVoterHandlerStateTest { + + @Test + public void testLastOffset() { + var time = new MockTime(); + var state = new RemoveVoterHandlerState(100L, time.timer(1000)); + + assertEquals(100L, state.lastOffset()); + } + + @Test + public void testTimeUntilExpiration() { + var time = new MockTime(); + var state = new RemoveVoterHandlerState(100L, time.timer(1000)); + + assertEquals(1000, state.timeUntilOperationExpiration(time.milliseconds())); + time.sleep(500); + assertEquals(500, state.timeUntilOperationExpiration(time.milliseconds())); + time.sleep(500); + assertEquals(0, state.timeUntilOperationExpiration(time.milliseconds())); + } + + @Test + public void testCompleteFuture() { + var time = new MockTime(); + var state = new RemoveVoterHandlerState(100L, time.timer(1000)); + + var future = state.future().toCompletableFuture(); + assertFalse(future.isDone()); + + var response = new RemoveRaftVoterResponseData().setErrorCode((short) 0); + state.completeFuture(response); + + assertTrue(future.isDone()); + assertEquals(response, future.join()); + } +} diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/UpdateVoterHandlerStateTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/UpdateVoterHandlerStateTest.java new file mode 100644 index 0000000000000..8b3e2dddba392 --- /dev/null +++ b/raft/src/test/java/org/apache/kafka/raft/internals/UpdateVoterHandlerStateTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.ReplicaKey; + +import org.junit.jupiter.api.Test; + +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class UpdateVoterHandlerStateTest { + + @Test + public void testSetLastOffsetOnce() { + var time = new MockTime(); + var state = new UpdateVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + + assertTrue(state.lastOffset().isEmpty()); + state.setLastOffset(100L); + assertEquals(OptionalLong.of(100L), state.lastOffset()); + } + + @Test + public void testCannotOverrideLastOffset() { + var time = new MockTime(); + var state = new UpdateVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + + state.setLastOffset(100L); + assertThrows(IllegalStateException.class, () -> state.setLastOffset(200L)); + } + + @Test + public void testExpectingApiResponseBeforeLastOffset() { + var time = new MockTime(); + var state = new UpdateVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + + assertTrue(state.expectingApiResponse(1)); + assertFalse(state.expectingApiResponse(2)); + + state.setLastOffset(100L); + assertFalse(state.expectingApiResponse(1)); + } + + @Test + public void testTimeUntilExpiration() { + var time = new MockTime(); + var state = new UpdateVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + + assertEquals(1000, state.timeUntilOperationExpiration(time.milliseconds())); + time.sleep(500); + assertEquals(500, state.timeUntilOperationExpiration(time.milliseconds())); + time.sleep(500); + assertEquals(0, state.timeUntilOperationExpiration(time.milliseconds())); + } + + @Test + public void testCompleteFuture() { + var time = new MockTime(); + var state = new UpdateVoterHandlerState( + ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), + Endpoints.empty(), + new ListenerName("PLAINTEXT"), + new SupportedVersionRange((short) 0, (short) 1), + time.timer(1000) + ); + + var future = state.future().toCompletableFuture(); + assertFalse(future.isDone()); + + var response = new UpdateRaftVoterResponseData().setErrorCode((short) 0); + state.completeFuture(response); + + assertTrue(future.isDone()); + assertEquals(response, future.join()); + } + + @Test + public void testGetters() { + var time = new MockTime(); + var voterKey = ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID); + var endpoints = Endpoints.empty(); + var listenerName = new ListenerName("PLAINTEXT"); + var versions = new SupportedVersionRange((short) 0, (short) 1); + + var state = new UpdateVoterHandlerState( + voterKey, + endpoints, + listenerName, + versions, + time.timer(1000) + ); + + assertEquals(voterKey, state.voterKey()); + assertEquals(endpoints, state.voterEndpoints()); + assertEquals(listenerName, state.requestListenerName()); + assertEquals(versions, state.supportedKraftVersions()); + } +}