Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ private void onUpdateLeaderHighWatermark(
private void maybeNotifyVoterHandlerOnHWmUpdate(LeaderState<T> state, long highWatermark) {
addVoterHandler.highWatermarkUpdated(state, highWatermark);
removeVoterHandler.highWatermarkUpdated(state, highWatermark);
updateVoterHandler.highWatermarkUpdated(state, highWatermark);
}

private void updateListenersProgress(long highWatermark) {
Expand Down Expand Up @@ -580,15 +581,17 @@ public void initialize(
onBecomeFollower(currentTimeMs);
}

var requestSender = new DefaultRequestSender(
requestManager,
channel,
messageQueue,
logContext
);

// Specialized add voter handler
this.addVoterHandler = new AddVoterHandler(
partitionState,
new DefaultRequestSender(
requestManager,
channel,
messageQueue,
logContext
),
requestSender,
time,
logContext
);
Expand All @@ -606,7 +609,8 @@ public void initialize(
// Specialized update voter handler
this.updateVoterHandler = new UpdateVoterHandler(
partitionState,
channel.listenerName(),
requestSender,
time,
logContext
);
}
Expand Down Expand Up @@ -2318,19 +2322,37 @@ private boolean handleApiVersionsResponse(
return true;
}

var leaderState = quorum.leaderStateOrThrow();

ApiVersionsResponseData response = (ApiVersionsResponseData) responseMetadata.data();

Errors error = Errors.forCode(response.errorCode());
Optional<ApiVersionsResponseData.SupportedFeatureKey> supportedKraftVersions =
Optional.ofNullable(response.supportedFeatures().find(KRaftVersion.FEATURE_NAME));

return addVoterHandler.handleApiVersionsResponse(
quorum.leaderStateOrThrow(),
responseMetadata.source(),
error,
supportedKraftVersions,
currentTimeMs
);
if (leaderState.changeVoterState().addVoterHandlerState().isPresent()) {
return addVoterHandler.handleApiVersionsResponse(
leaderState,
responseMetadata.source(),
error,
supportedKraftVersions,
currentTimeMs
);
} else if (leaderState.changeVoterState().updateVoterHandlerState().isPresent()) {
return updateVoterHandler.handleApiVersionsResponse(
leaderState,
responseMetadata.source(),
error,
supportedKraftVersions,
currentTimeMs
);
} else {
logger.debug(
"Received API_VERSIONS response from {} but no voter change operation is pending",
responseMetadata.source()
);
return true;
}
}

private boolean handleAddVoterResponse(
Expand Down Expand Up @@ -3183,7 +3205,11 @@ private long pollLeader(long currentTimeMs) {

long timeUntilVoterChangeExpires = state
.changeVoterState()
.maybeExpirePendingOperation(currentTimeMs);
.maybeExpirePendingOperation(
quorum.leaderAndEpoch(),
quorum.leaderEndpoints(),
currentTimeMs
);

long timeUntilFlush = maybeAppendBatches(
state,
Expand Down
6 changes: 5 additions & 1 deletion raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,11 @@ public String name() {

@Override
public void close() {
changeVoterState.maybeResetPendingVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER);
changeVoterState.maybeResetPendingVoterHandlerState(
Errors.NOT_LEADER_OR_FOLLOWER,
leaderAndEpoch(),
leaderEndpoints()
);
kafkaRaftMetrics.removeLeaderMetrics();

accumulator.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,13 @@ public CompletionStage<AddRaftVoterResponseData> handleAddVoterRequest(
) {
var changeVoterState = leaderState.changeVoterState();
// Check if there are any pending voter change requests
if (changeVoterState.isOperationPending(currentTimeMs)) {
if (
changeVoterState.isOperationPending(
leaderState.leaderAndEpoch(),
leaderState.leaderEndpoints(),
currentTimeMs
)
) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand Down Expand Up @@ -199,6 +205,16 @@ public CompletionStage<AddRaftVoterResponseData> handleAddVoterRequest(
return state.future();
}

/**
* Handle the API_VERSIONS response for an add voter operation.
*
* @param leaderState the leader state
* @param source the node that sent the response
* @param error the error from the response
* @param supportedKraftVersions the supported kraft version range from the response
* @param currentTimeMs the current time in milliseconds
* @return true if the add voter operation should continue, false if it was aborted
*/
public boolean handleApiVersionsResponse(
LeaderState<?> leaderState,
Node source,
Expand Down Expand Up @@ -329,7 +345,7 @@ public boolean handleApiVersionsResponse(
if (!current.ackWhenCommitted()) {
// complete the future to send response, but do not reset the state,
// since the new voter set is not yet committed
current.future().complete(RaftUtil.addVoterResponse(Errors.NONE, null));
current.completeFuture(RaftUtil.addVoterResponse(Errors.NONE, null));
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public final class AddVoterHandlerState {
private final ReplicaKey voterKey;
Expand Down Expand Up @@ -87,7 +88,16 @@ public OptionalLong lastOffset() {
return lastOffset;
}

public CompletableFuture<AddRaftVoterResponseData> future() {
/**
* Completes the future with the provided response.
*
* @param response the response to complete the future with
*/
public void completeFuture(AddRaftVoterResponseData response) {
future.complete(response);
}

CompletionStage<AddRaftVoterResponseData> future() {
return future;
}
}
Loading
Loading