From 3fd1a1a3014d9a412e12d6e75adb42955adf4c82 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Thu, 18 Jun 2026 12:08:46 -0400 Subject: [PATCH 1/4] upd --- .../org/apache/kafka/clients/FetchSessionHandler.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index b5150b2d7f17e..0ba7bf92d7db2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -526,11 +526,15 @@ private String responseDataToLogString(Set topicPartitions) { */ public boolean handleResponse(FetchResponse response, short version) { if (response.error() != Errors.NONE) { - log.info("Node {} was unable to process the fetch request with {}: {}.", - node, nextMetadata, response.error()); if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) { + // This is a recoverable condition logged at DEBUG to avoid noise from routine cache churn. + log.debug("Node {} returned a {} error; the fetch session {} was likely evicted from the broker's " + + "fetch session cache. Re-sending a full fetch request to establish a new session.", + node, response.error(), nextMetadata.sessionId()); nextMetadata = FetchMetadata.INITIAL; } else { + log.info("Node {} was unable to process the fetch request with {}: {}.", + node, nextMetadata, response.error()); nextMetadata = nextMetadata.nextCloseExistingAttemptNew(); } return false; From bc2497b8074ee324bdecfbb025bc84573f382e7d Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Thu, 18 Jun 2026 16:10:45 -0400 Subject: [PATCH 2/4] handle invalid epoch --- .../org/apache/kafka/clients/FetchSessionHandler.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index 0ba7bf92d7db2..0af2e8e267735 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -527,11 +527,17 @@ private String responseDataToLogString(Set topicPartitions) { public boolean handleResponse(FetchResponse response, short version) { if (response.error() != Errors.NONE) { if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) { - // This is a recoverable condition logged at DEBUG to avoid noise from routine cache churn. + // Recoverable and self-healing, so logged at DEBUG to avoid noise from routine cache churn. log.debug("Node {} returned a {} error; the fetch session {} was likely evicted from the broker's " + "fetch session cache. Re-sending a full fetch request to establish a new session.", node, response.error(), nextMetadata.sessionId()); nextMetadata = FetchMetadata.INITIAL; + } else if (response.error() == Errors.INVALID_FETCH_SESSION_EPOCH) { + // Recoverable and self-healing, so logged at DEBUG. + log.debug("Node {} returned a {} error; the fetch session {} epoch is out of sync with the broker. " + + "Re-sending a full fetch request to establish a new session.", + node, response.error(), nextMetadata.sessionId()); + nextMetadata = nextMetadata.nextCloseExistingAttemptNew(); } else { log.info("Node {} was unable to process the fetch request with {}: {}.", node, nextMetadata, response.error()); From fca798728af9bfec1857d7cf0603e421d2fc373e Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Thu, 18 Jun 2026 16:44:28 -0400 Subject: [PATCH 3/4] consolidate --- .../apache/kafka/clients/FetchSessionHandler.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index 0af2e8e267735..1bb56b07f060e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -527,19 +527,16 @@ private String responseDataToLogString(Set topicPartitions) { public boolean handleResponse(FetchResponse response, short version) { if (response.error() != Errors.NONE) { if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) { - // Recoverable and self-healing, so logged at DEBUG to avoid noise from routine cache churn. + // Session does not exist on the broker anymore. Recoverable and self-healing, the client re-sends a full fetch request. log.debug("Node {} returned a {} error; the fetch session {} was likely evicted from the broker's " + "fetch session cache. Re-sending a full fetch request to establish a new session.", node, response.error(), nextMetadata.sessionId()); nextMetadata = FetchMetadata.INITIAL; - } else if (response.error() == Errors.INVALID_FETCH_SESSION_EPOCH) { - // Recoverable and self-healing, so logged at DEBUG. - log.debug("Node {} returned a {} error; the fetch session {} epoch is out of sync with the broker. " + - "Re-sending a full fetch request to establish a new session.", - node, response.error(), nextMetadata.sessionId()); - nextMetadata = nextMetadata.nextCloseExistingAttemptNew(); } else { - log.info("Node {} was unable to process the fetch request with {}: {}.", + // Other fetch-session errors (e.g. INVALID_FETCH_SESSION_EPOCH, FETCH_SESSION_TOPIC_ID_ERROR) are + // also recoverable and self-healing: the existing session is closed and a new one is re-established + // with a full fetch. + log.debug("Node {} was unable to process the fetch request with {}: {}.", node, nextMetadata, response.error()); nextMetadata = nextMetadata.nextCloseExistingAttemptNew(); } From 254fb08e90be07fbf821d2157711fa01bb17b585 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Thu, 18 Jun 2026 16:54:57 -0400 Subject: [PATCH 4/4] msg --- .../java/org/apache/kafka/clients/FetchSessionHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index 1bb56b07f060e..ab108ceab7383 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -536,7 +536,8 @@ public boolean handleResponse(FetchResponse response, short version) { // Other fetch-session errors (e.g. INVALID_FETCH_SESSION_EPOCH, FETCH_SESSION_TOPIC_ID_ERROR) are // also recoverable and self-healing: the existing session is closed and a new one is re-established // with a full fetch. - log.debug("Node {} was unable to process the fetch request with {}: {}.", + log.debug("Node {} was unable to process the fetch request with {}: {}. " + + "Re-sending a full fetch request, which closes the existing session on the broker and establishes a new one.", node, nextMetadata, response.error()); nextMetadata = nextMetadata.nextCloseExistingAttemptNew(); }