diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index b5150b2d7f17e..ab108ceab7383 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -526,11 +526,19 @@ private String responseDataToLogString(Set topicPartitions) { */ public boolean handleResponse(FetchResponse response, short version) { if (response.error() != Errors.NONE) { - log.info("Node {} was unable to process the fetch request with {}: {}.", - node, nextMetadata, response.error()); if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) { + // Session does not exist on the broker anymore. Recoverable and self-healing, the client re-sends a full fetch request. + log.debug("Node {} returned a {} error; the fetch session {} was likely evicted from the broker's " + + "fetch session cache. Re-sending a full fetch request to establish a new session.", + node, response.error(), nextMetadata.sessionId()); nextMetadata = FetchMetadata.INITIAL; } else { + // Other fetch-session errors (e.g. INVALID_FETCH_SESSION_EPOCH, FETCH_SESSION_TOPIC_ID_ERROR) are + // also recoverable and self-healing: the existing session is closed and a new one is re-established + // with a full fetch. + log.debug("Node {} was unable to process the fetch request with {}: {}. " + + "Re-sending a full fetch request, which closes the existing session on the broker and establishes a new one.", + node, nextMetadata, response.error()); nextMetadata = nextMetadata.nextCloseExistingAttemptNew(); } return false;