Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -526,11 +526,19 @@ private String responseDataToLogString(Set<TopicPartition> topicPartitions) {
*/
public boolean handleResponse(FetchResponse response, short version) {
if (response.error() != Errors.NONE) {
log.info("Node {} was unable to process the fetch request with {}: {}.",
node, nextMetadata, response.error());
if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to handle INVALID_FETCH_SESSION_EPOCH? I think that this in principle can happen when a network connection is lost and reconnected, and the recovery action is the same.

@lianetm lianetm Jun 18, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

The invalid epoch should not be as common I would imagine but still makes sense to log as in session not found as the handling and impact on the client is the same (none, self-healing). I imagine it's less common because the consumer sends a full fetch request upon errors, so would start a new session even if it carries an old epoch it seems (just sharing because I didn't know myself this on fetch, same approach in the consumer and heartbeats: full request upon errors with the GC). Btw, I ended up consolidating all other errors like the invalid epoch (all are top-level errors with that are handled the same: re-send full fetch with nextCloseExistingAttemptNew)

// Session does not exist on the broker anymore. Recoverable and self-healing, the client re-sends a full fetch request.
log.debug("Node {} returned a {} error; the fetch session {} was likely evicted from the broker's " +
"fetch session cache. Re-sending a full fetch request to establish a new session.",
node, response.error(), nextMetadata.sessionId());
nextMetadata = FetchMetadata.INITIAL;
} else {
// Other fetch-session errors (e.g. INVALID_FETCH_SESSION_EPOCH, FETCH_SESSION_TOPIC_ID_ERROR) are
// also recoverable and self-healing: the existing session is closed and a new one is re-established
// with a full fetch.
log.debug("Node {} was unable to process the fetch request with {}: {}. " +
"Re-sending a full fetch request, which closes the existing session on the broker and establishes a new one.",
node, nextMetadata, response.error());
nextMetadata = nextMetadata.nextCloseExistingAttemptNew();
}
return false;
Expand Down
Loading