Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion utils/req-res-log-validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
"cluster|migrateslots",
}

RESP_TYPE_PREFIXES = {"+", "-", "$", ":", ",", "_", "#", "!", "=", "(", "*", "~", ">", "%", "|"}

class Request(object):
"""
This class represents a Redis request (AKA command, argv)
Expand All @@ -78,11 +80,25 @@ def __init__(self, f, docs, line_counter):
self.argv = []

while True:
pos = f.tell()
line = f.readline()
line_counter[0] += 1
if not line:
break
length = int(line)
stripped = line.strip()
try:
length = int(stripped)
except ValueError:
if not self.argv and stripped and stripped[0] in RESP_TYPE_PREFIXES:
# When blocked clients are redirected after a failover they may
# receive an async MOVED error that is logged without their request.
# These responses start with a RESP type prefix. Rewind the cursor
# to skip it.
Comment on lines +93 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the MOVED errors logged without their request? Isn't that the source of this error? Can we fix that instead when we log the requests and responses?

Copy link
Contributor

@zuiderkwast zuiderkwast Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the 1:1 match between request logging and response logging is broken.

  • reqresAppendRequest() is called in processCommand() in server.c if not reprocessing command
  • reqresAppendResponse() is called in commandProcessed() in networking.c if the client is not blocked

In the PR that introduced this bug, on these lines https://github.com/valkey-io/valkey/pull/2329/files#diff-ddd6be3a7a9289fbf41890c34da16f1ea5b0a8d6d9206ae1ff5303e50bae1970R342 we call clusterRedirectBlockedClientIfNeeded() and directly afterwards unblockClient() and then the command is reprocessed. This means it gets one more reply?

Compare to the standalone case, we send the -REDIRECT error and then call unblockClientOnError(), which sets c->flag.pending_command = 0; to prevent it from being reprocessed.

I think we need to set c->flag.pending_command = 0; also in the cluster case after sending MOVED. Otherwise I suspect we actually have a bug that we will send two replies, which is a real bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have now verified this. The client gets two MOVED redirects for one request!

I verified by modifying this test case and it still passes:

diff --git a/tests/unit/cluster/replica-redirect.tcl b/tests/unit/cluster/replica-redirect.tcl
index ac9c8bd30..70fba8fe4 100644
--- a/tests/unit/cluster/replica-redirect.tcl
+++ b/tests/unit/cluster/replica-redirect.tcl
@@ -38,6 +38,7 @@ start_cluster 1 1 {tags {external:skip cluster}} {
 
         # Check that the client blocking on the old primary was MOVED to the new primary.
         assert_error "MOVED *" {$rd0 read}
+        assert_error "MOVED *" {$rd0 read}
 
         # Check that the readonly client blocking on the old primary is still blocked.
         assert_equal 1 [s 0 blocked_clients]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @zuiderkwast for looking and explaining this. I will be closing this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @sarthakaggarwal97! It was your comment "they may receive an async MOVED error that is logged without their request" that led me to it.

f.seek(pos)
line_counter[0] -= 1
Response(f, line_counter)
continue
raise
arg = str(f.read(length))
f.read(2) # read \r\n
line_counter[0] += 1
Expand Down
Loading