From 55130475fcbb28dee6827d93ca491744799d7ce9 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sat, 11 Oct 2025 14:10:54 -0700 Subject: [PATCH 1/2] feat: client capa redirect Signed-off-by: Rueian --- standalone.go | 135 +++++++------ standalone_test.go | 463 +++++++++++++++++---------------------------- 2 files changed, 252 insertions(+), 346 deletions(-) diff --git a/standalone.go b/standalone.go index ae0bc31..5f7df95 100644 --- a/standalone.go +++ b/standalone.go @@ -78,13 +78,17 @@ func (s *standalone) redirectToPrimary(addr string) error { // Atomically swap the primary and close the old one oldPrimary := s.primary.Swap(newPrimary) - oldPrimary.Close() + go func(oldPrimary *singleClient) { + time.Sleep(time.Second * 5) + oldPrimary.Close() + }(oldPrimary) return nil } func (s *standalone) Do(ctx context.Context, cmd Completed) (resp ValkeyResult) { attempts := 1 + if s.enableRedirect { cmd = cmd.Pin() } @@ -100,17 +104,18 @@ retry: if s.enableRedirect { if ret, yes := IsValkeyErr(resp.Error()); yes { if addr, ok := ret.IsRedirect(); ok { - // Command is already pinned at this point err := s.redirectCall.Do(ctx, func() error { return s.redirectToPrimary(addr) }) - // Use retryHandler to handle multiple redirects with context deadline if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error()) { attempts++ goto retry } } } + if resp.NonRedisError() == nil { + cmds.PutCompletedForce(cmd) + } } return resp @@ -119,7 +124,6 @@ retry: func (s *standalone) DoMulti(ctx context.Context, multi ...Completed) (resp []ValkeyResult) { attempts := 1 - // Pin all commands at the beginning if redirect is enabled if s.enableRedirect { for i := range multi { multi[i] = multi[i].Pin() @@ -143,22 +147,24 @@ retry: // Handle redirects with retry until context deadline if s.enableRedirect { for i, result := range resp { - if i < len(multi) { - if ret, yes := IsValkeyErr(result.Error()); yes { - if addr, ok := ret.IsRedirect(); ok { - err := s.redirectCall.Do(ctx, func() error { - return s.redirectToPrimary(addr) - }) - // Use retryHandler to handle multiple redirects with context deadline - if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, multi[0], result.Error()) { - attempts++ - goto retry - } - break // Exit the loop if redirect handling fails + if ret, yes := IsRedisErr(result.Error()); yes { + if addr, ok := ret.IsRedirect(); ok { + err := s.redirectCall.Do(ctx, func() error { + return s.redirectToPrimary(addr) + }) + if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, multi[i], result.Error()) { + attempts++ + goto retry } + break // Exit the loop if redirect handling fails } } } + for i, result := range resp { + if result.NonRedisError() == nil { + cmds.PutCompletedForce(multi[i]) + } + } } return resp @@ -178,45 +184,78 @@ func (s *standalone) Close() { } } -func (s *standalone) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp ValkeyResult) { - return s.primary.Load().DoCache(ctx, cmd, ttl) +func (s *standalone) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp RedisResult) { + attempts := 1 + + if s.enableRedirect { + cmd = cmd.Pin() + } + +retry: + resp = s.primary.Load().DoCache(ctx, cmd, ttl) + + if s.enableRedirect { + if ret, yes := IsRedisErr(resp.Error()); yes { + if addr, ok := ret.IsRedirect(); ok { + err := s.redirectCall.Do(ctx, func() error { + return s.redirectToPrimary(addr) + }) + if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error()) { + attempts++ + goto retry + } + } + } + if resp.NonRedisError() == nil { + cmds.PutCompletedForce(Completed(cmd)) + } + } + return } -func (s *standalone) DoMultiCache(ctx context.Context, multi ...CacheableTTL) (resp []ValkeyResult) { +func (s *standalone) DoMultiCache(ctx context.Context, multi ...CacheableTTL) (resp []RedisResult) { + attempts := 1 + if s.enableRedirect { for i := range multi { multi[i].Cmd = multi[i].Cmd.Pin() } } - return s.primary.Load().DoMultiCache(ctx, multi...) -} -func (s *standalone) DoStream(ctx context.Context, cmd Completed) ValkeyResultStream { +retry: + resp = s.primary.Load().DoMultiCache(ctx, multi...) + if s.enableRedirect { - cmd = cmd.Pin() + for i, result := range resp { + if ret, yes := IsRedisErr(result.Error()); yes { + if addr, ok := ret.IsRedirect(); ok { + err := s.redirectCall.Do(ctx, func() error { + return s.redirectToPrimary(addr) + }) + if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, Completed(multi[i].Cmd), result.Error()) { + attempts++ + goto retry + } + break // Exit the loop if redirect handling fails + } + } + } + for i, result := range resp { + if result.NonRedisError() == nil { + cmds.PutCompletedForce(Completed(multi[i].Cmd)) + } + } } - var stream ValkeyResultStream + return +} + +func (s *standalone) DoStream(ctx context.Context, cmd Completed) RedisResultStream { + var stream RedisResultStream if s.toReplicas != nil && s.toReplicas(cmd) { stream = s.replicas[s.pick()].DoStream(ctx, cmd) } else { stream = s.primary.Load().DoStream(ctx, cmd) } - - // Handle redirect for stream - if s.enableRedirect && stream.Error() != nil { - if ret, yes := IsValkeyErr(stream.Error()); yes { - if addr, ok := ret.IsRedirect(); ok { - err := s.redirectCall.Do(ctx, func() error { - return s.redirectToPrimary(addr) - }) - if err == nil { - // Execute the command on the updated primary - return s.primary.Load().DoStream(ctx, cmd) - } - } - } - } - return stream } @@ -234,22 +273,6 @@ func (s *standalone) DoMultiStream(ctx context.Context, multi ...Completed) Mult } else { stream = s.primary.Load().DoMultiStream(ctx, multi...) } - - // Handle redirect for stream - if s.enableRedirect && stream.Error() != nil { - if ret, yes := IsValkeyErr(stream.Error()); yes { - if addr, ok := ret.IsRedirect(); ok { - err := s.redirectCall.Do(ctx, func() error { - return s.redirectToPrimary(addr) - }) - if err == nil { - // Execute the command on the updated primary - return s.primary.Load().DoMultiStream(ctx, multi...) - } - } - } - } - return stream } diff --git a/standalone_test.go b/standalone_test.go index 670e149..88daa2f 100644 --- a/standalone_test.go +++ b/standalone_test.go @@ -304,99 +304,29 @@ func TestStandaloneRedirectHandling(t *testing.T) { } } -func TestStandaloneRedirectDisabled(t *testing.T) { +func TestStandaloneDoCacheRedirectHandling(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) // Create a mock redirect response - redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + redirectErr := RedisError(strmsg('-', "REDIRECT 127.0.0.1:6380")) // Mock primary connection that returns redirect primaryConn := &mockConn{ - DoFn: func(cmd Completed) ValkeyResult { + DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult { return newErrResult(&redirectErr) }, } - s, err := newStandaloneClient(&ClientOption{ - InitAddress: []string{"primary"}, - Standalone: StandaloneOption{ - EnableRedirect: false, // Redirect disabled - }, - DisableRetry: true, - }, func(dst string, opt *ClientOption) conn { - return primaryConn - }, newRetryer(defaultRetryDelayFn)) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - defer s.Close() - - ctx := context.Background() - result := s.Do(ctx, s.B().Get().Key("test").Build()) - - // Should return the original redirect error since redirect is disabled - if result.Error() == nil { - t.Error("expected redirect error to be returned when redirect is disabled") - } - - if result.Error().Error() != "REDIRECT 127.0.0.1:6380" { - t.Errorf("expected redirect error, got: %v", result.Error()) - } -} - -func TestNewClientEnableRedirectPriority(t *testing.T) { - defer ShouldNotLeak(SetupLeakDetection()) - - // Test that EnableRedirect creates a standalone client - s, err := newStandaloneClient(&ClientOption{ - InitAddress: []string{"primary"}, - Standalone: StandaloneOption{ - EnableRedirect: true, - }, - }, func(dst string, opt *ClientOption) conn { - return &mockConn{ - DialFn: func() error { return nil }, - } - }, newRetryer(defaultRetryDelayFn)) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - defer s.Close() - - // Verify that we got a standalone client with redirect enabled - if s.Mode() != ClientModeStandalone { - t.Errorf("expected standalone client, got: %v", s.Mode()) - } - - // Verify that EnableRedirect is properly configured - if !s.enableRedirect { - t.Error("expected EnableRedirect to be true") - } -} - -func TestStandaloneDoStreamWithRedirect(t *testing.T) { - defer ShouldNotLeak(SetupLeakDetection()) - - redirectConnUsed := false - redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) - primaryConn := &mockConn{ - DialFn: func() error { return nil }, - DoStreamFn: func(cmd Completed) ValkeyResultStream { - return ValkeyResultStream{e: &redirectErr} - }, - } - + // Mock redirect target connection that returns success redirectConn := &mockConn{ - DialFn: func() error { return nil }, - DoStreamFn: func(cmd Completed) ValkeyResultStream { - redirectConnUsed = true - return ValkeyResultStream{e: nil} + DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult { + return RedisResult{val: strmsg('+', "OK")} }, - CloseFn: func() {}, } + // Track which connection is being used + var connUsed string + s, err := newStandaloneClient(&ClientOption{ InitAddress: []string{"primary"}, Standalone: StandaloneOption{ @@ -404,6 +334,7 @@ func TestStandaloneDoStreamWithRedirect(t *testing.T) { }, DisableRetry: true, }, func(dst string, opt *ClientOption) conn { + connUsed = dst if dst == "primary" { return primaryConn } @@ -415,76 +346,40 @@ func TestStandaloneDoStreamWithRedirect(t *testing.T) { } defer s.Close() - // Test DoStream with redirect - stream := s.DoStream(context.Background(), s.B().Set().Key("k").Value("v").Build()) - if stream.Error() != nil { - t.Fatalf("unexpected error: %v", stream.Error()) - } - - if !redirectConnUsed { - t.Error("expected redirect connection to be used") - } -} - -func TestStandaloneDoStreamWithRedirectFailure(t *testing.T) { - defer ShouldNotLeak(SetupLeakDetection()) - - redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) - primaryConn := &mockConn{ - DialFn: func() error { return nil }, - DoStreamFn: func(cmd Completed) ValkeyResultStream { - return ValkeyResultStream{e: &redirectErr} - }, - } - - redirectConn := &mockConn{ - DialFn: func() error { return errors.New("connection failed") }, - } - - s, err := newStandaloneClient(&ClientOption{ - InitAddress: []string{"primary"}, - Standalone: StandaloneOption{ - EnableRedirect: true, - }, - DisableRetry: true, - }, func(dst string, opt *ClientOption) conn { - if dst == "primary" { - return primaryConn - } - return redirectConn - }, newRetryer(defaultRetryDelayFn)) + ctx := context.Background() + result := s.DoCache(ctx, s.B().Get().Key("test").Cache(), time.Second) - if err != nil { - t.Fatalf("unexpected error: %v", err) + if result.Error() != nil { + t.Errorf("expected no error after redirect, got: %v", result.Error()) } - defer s.Close() - // Test DoStream with redirect failure - should return original result - stream := s.DoStream(context.Background(), s.B().Set().Key("k").Value("v").Build()) - if stream.Error() == nil { - t.Error("expected original error to be returned") + if str, _ := result.ToString(); str != "OK" { + t.Errorf("expected OK response after redirect, got: %s", str) } - if verr, ok := stream.Error().(*ValkeyError); !ok || !strings.Contains(verr.Error(), "REDIRECT") { - t.Errorf("expected REDIRECT error, got %v", stream.Error()) + // Verify that the redirect target was used + if connUsed != "127.0.0.1:6380" { + t.Errorf("expected redirect to use 127.0.0.1:6380, got: %s", connUsed) } } -func TestStandaloneDoStreamWithoutRedirect(t *testing.T) { +func TestStandaloneRedirectDisabled(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) + // Create a mock redirect response redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + + // Mock primary connection that returns redirect primaryConn := &mockConn{ - DialFn: func() error { return nil }, - DoStreamFn: func(cmd Completed) ValkeyResultStream { - return ValkeyResultStream{e: &redirectErr} + DoFn: func(cmd Completed) ValkeyResult { + return newErrResult(&redirectErr) }, } s, err := newStandaloneClient(&ClientOption{ InitAddress: []string{"primary"}, Standalone: StandaloneOption{ - EnableRedirect: false, + EnableRedirect: false, // Redirect disabled }, DisableRetry: true, }, func(dst string, opt *ClientOption) conn { @@ -496,49 +391,40 @@ func TestStandaloneDoStreamWithoutRedirect(t *testing.T) { } defer s.Close() - // Test DoStream without redirect - should return original result - stream := s.DoStream(context.Background(), s.B().Set().Key("k").Value("v").Build()) - if stream.Error() == nil { - t.Error("expected original error to be returned") + ctx := context.Background() + result := s.Do(ctx, s.B().Get().Key("test").Build()) + + // Should return the original redirect error since redirect is disabled + if result.Error() == nil { + t.Error("expected redirect error to be returned when redirect is disabled") } - if verr, ok := stream.Error().(*ValkeyError); !ok || !strings.Contains(verr.Error(), "REDIRECT") { - t.Errorf("expected REDIRECT error, got %v", stream.Error()) + if result.Error().Error() != "REDIRECT 127.0.0.1:6380" { + t.Errorf("expected redirect error, got: %v", result.Error()) } } -func TestStandaloneDoMultiStreamWithRedirect(t *testing.T) { +func TestStandaloneDoCacheRedirectDisabled(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) - redirectConnUsed := false - redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + // Create a mock redirect response + redirectErr := RedisError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + + // Mock primary connection that returns redirect primaryConn := &mockConn{ - DialFn: func() error { return nil }, - DoMultiStreamFn: func(multi ...Completed) MultiValkeyResultStream { - return MultiValkeyResultStream{e: &redirectErr} + DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult { + return newErrResult(&redirectErr) }, } - redirectConn := &mockConn{ - DialFn: func() error { return nil }, - DoMultiStreamFn: func(multi ...Completed) MultiValkeyResultStream { - redirectConnUsed = true - return MultiValkeyResultStream{e: nil} - }, - CloseFn: func() {}, - } - s, err := newStandaloneClient(&ClientOption{ InitAddress: []string{"primary"}, Standalone: StandaloneOption{ - EnableRedirect: true, + EnableRedirect: false, // Redirect disabled }, DisableRetry: true, }, func(dst string, opt *ClientOption) conn { - if dst == "primary" { - return primaryConn - } - return redirectConn + return primaryConn }, newRetryer(defaultRetryDelayFn)) if err != nil { @@ -546,43 +432,32 @@ func TestStandaloneDoMultiStreamWithRedirect(t *testing.T) { } defer s.Close() - // Test DoMultiStream with redirect - stream := s.DoMultiStream(context.Background(), s.B().Set().Key("k").Value("v").Build()) - if stream.Error() != nil { - t.Fatalf("unexpected error: %v", stream.Error()) + ctx := context.Background() + result := s.DoCache(ctx, s.B().Get().Key("test").Cache(), time.Second) + + // Should return the original redirect error since redirect is disabled + if result.Error() == nil { + t.Error("expected redirect error to be returned when redirect is disabled") } - if !redirectConnUsed { - t.Error("expected redirect connection to be used") + if result.Error().Error() != "REDIRECT 127.0.0.1:6380" { + t.Errorf("expected redirect error, got: %v", result.Error()) } } -func TestStandaloneDoMultiStreamWithRedirectFailure(t *testing.T) { +func TestNewClientEnableRedirectPriority(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) - redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) - primaryConn := &mockConn{ - DialFn: func() error { return nil }, - DoMultiStreamFn: func(multi ...Completed) MultiValkeyResultStream { - return MultiValkeyResultStream{e: &redirectErr} - }, - } - - redirectConn := &mockConn{ - DialFn: func() error { return errors.New("connection failed") }, - } - + // Test that EnableRedirect creates a standalone client s, err := newStandaloneClient(&ClientOption{ InitAddress: []string{"primary"}, Standalone: StandaloneOption{ EnableRedirect: true, }, - DisableRetry: true, }, func(dst string, opt *ClientOption) conn { - if dst == "primary" { - return primaryConn + return &mockConn{ + DialFn: func() error { return nil }, } - return redirectConn }, newRetryer(defaultRetryDelayFn)) if err != nil { @@ -590,51 +465,14 @@ func TestStandaloneDoMultiStreamWithRedirectFailure(t *testing.T) { } defer s.Close() - // Test DoMultiStream with redirect failure - should return original result - stream := s.DoMultiStream(context.Background(), s.B().Set().Key("k").Value("v").Build()) - if stream.Error() == nil { - t.Error("expected original error to be returned") - } - - if verr, ok := stream.Error().(*ValkeyError); !ok || !strings.Contains(verr.Error(), "REDIRECT") { - t.Errorf("expected REDIRECT error, got %v", stream.Error()) - } -} - -func TestStandaloneDoMultiStreamWithoutRedirect(t *testing.T) { - defer ShouldNotLeak(SetupLeakDetection()) - - redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) - primaryConn := &mockConn{ - DialFn: func() error { return nil }, - DoMultiStreamFn: func(multi ...Completed) MultiValkeyResultStream { - return MultiValkeyResultStream{e: &redirectErr} - }, - } - - s, err := newStandaloneClient(&ClientOption{ - InitAddress: []string{"primary"}, - Standalone: StandaloneOption{ - EnableRedirect: false, - }, - DisableRetry: true, - }, func(dst string, opt *ClientOption) conn { - return primaryConn - }, newRetryer(defaultRetryDelayFn)) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - defer s.Close() - - // Test DoMultiStream without redirect - should return original result - stream := s.DoMultiStream(context.Background(), s.B().Set().Key("k").Value("v").Build()) - if stream.Error() == nil { - t.Error("expected original error to be returned") + // Verify that we got a standalone client with redirect enabled + if s.Mode() != ClientModeStandalone { + t.Errorf("expected standalone client, got: %v", s.Mode()) } - if verr, ok := stream.Error().(*ValkeyError); !ok || !strings.Contains(verr.Error(), "REDIRECT") { - t.Errorf("expected REDIRECT error, got %v", stream.Error()) + // Verify that EnableRedirect is properly configured + if !s.enableRedirect { + t.Error("expected EnableRedirect to be true") } } @@ -852,83 +690,119 @@ func TestStandalonePickMultipleReplicas(t *testing.T) { } } -func TestStandaloneDoWithNonRedirectError(t *testing.T) { +func TestStandaloneDoMultiWithRedirectRetry(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) + redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + attempts := 0 + primaryConn := &mockConn{ DialFn: func() error { return nil }, - DoFn: func(cmd Completed) ValkeyResult { - return newErrResult(errors.New("other error")) + DoMultiFn: func(multi ...Completed) *valkeyresults { + attempts++ + // First attempt returns redirect error, second returns success + if attempts == 1 { + return &valkeyresults{s: []ValkeyResult{newErrResult(&redirectErr)}} + } + return &valkeyresults{s: []ValkeyResult{ValkeyResult{val: strmsg('+', "OK")}}} }, } - s, err := newStandaloneClient(&ClientOption{ - InitAddress: []string{"primary"}, - Standalone: StandaloneOption{ - EnableRedirect: true, + redirectConnCalled := false + redirectConn := &mockConn{ + DialFn: func() error { + redirectConnCalled = true + return nil }, - DisableRetry: true, - }, func(dst string, opt *ClientOption) conn { - return primaryConn - }, newRetryer(defaultRetryDelayFn)) - - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - defer s.Close() - - // Test Do with non-redirect error - result := s.Do(context.Background(), s.B().Set().Key("k").Value("v").Build()) - if result.Error() == nil || result.Error().Error() != "other error" { - t.Errorf("expected other error, got %v", result.Error()) - } -} - -func TestStandaloneDoToReplicaWithRedirect(t *testing.T) { - defer ShouldNotLeak(SetupLeakDetection()) - - // This test is simplified to avoid the command building issue - // The coverage for this scenario is already covered by other tests - primaryConn := &mockConn{ - DialFn: func() error { return nil }, + DoMultiFn: func(multi ...Completed) *valkeyresults { + return &valkeyresults{s: []ValkeyResult{ValkeyResult{val: strmsg('+', "OK")}}} + }, + CloseFn: func() {}, } - replicaConn := &mockConn{ - DialFn: func() error { return nil }, + // Mock retry handler that allows one retry + mockRetry := &mockRetryHandler{ + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, cmd Completed, err error) bool { + return attempts < 2 // Allow one retry + }, + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { + return time.Millisecond + }, + WaitForRetryFn: func(ctx context.Context, duration time.Duration) { + time.Sleep(duration) + }, } s, err := newStandaloneClient(&ClientOption{ InitAddress: []string{"primary"}, Standalone: StandaloneOption{ - ReplicaAddress: []string{"replica"}, + EnableRedirect: true, }, - SendToReplicas: func(cmd Completed) bool { return true }, // Always send to replica - DisableRetry: true, + DisableRetry: false, }, func(dst string, opt *ClientOption) conn { if dst == "primary" { return primaryConn } - return replicaConn - }, newRetryer(defaultRetryDelayFn)) + return redirectConn + }, mockRetry) if err != nil { t.Fatalf("unexpected error: %v", err) } defer s.Close() - // Just test that we can create the client successfully - if s.Mode() != ClientModeStandalone { - t.Errorf("expected standalone mode, got %v", s.Mode()) + // Create a simple command using the internal cmds package + cmd := cmds.NewCompleted([]string{"SET", "k", "v"}) + + // Test DoMulti with redirect and retry + results := s.DoMulti(context.Background(), cmd) + if len(results) != 1 { + t.Fatalf("expected 1 result, got %d", len(results)) + } + + if results[0].Error() != nil { + t.Errorf("expected success after retry, got error: %v", results[0].Error()) + } + + // The primary connection should have been called once, then redirected + if attempts != 1 { + t.Errorf("expected 1 attempt on primary before redirect, got %d", attempts) + } + + if !redirectConnCalled { + t.Error("expected redirect connection to be called") } } -func TestStandaloneDoMultiCacheWithRedirect(t *testing.T) { +func TestStandaloneDoMultiWithRedirectRetryFailure(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) + redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + primaryConn := &mockConn{ DialFn: func() error { return nil }, - DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { - return &valkeyresults{s: []ValkeyResult{ValkeyResult{val: strmsg('+', "OK")}}} + DoMultiFn: func(multi ...Completed) *valkeyresults { + return &valkeyresults{s: []ValkeyResult{newErrResult(&redirectErr)}} + }, + } + + // Redirect connection fails to dial + redirectConn := &mockConn{ + DialFn: func() error { + return errors.New("redirect connection failed") + }, + } + + // Mock retry handler that doesn't allow retries after connection failure + mockRetry := &mockRetryHandler{ + WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, cmd Completed, err error) bool { + return false // Don't retry + }, + RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration { + return time.Millisecond + }, + WaitForRetryFn: func(ctx context.Context, duration time.Duration) { + time.Sleep(duration) }, } @@ -937,44 +811,53 @@ func TestStandaloneDoMultiCacheWithRedirect(t *testing.T) { Standalone: StandaloneOption{ EnableRedirect: true, }, - DisableRetry: true, + DisableRetry: false, }, func(dst string, opt *ClientOption) conn { - return primaryConn - }, newRetryer(defaultRetryDelayFn)) + if dst == "primary" { + return primaryConn + } + return redirectConn + }, mockRetry) if err != nil { t.Fatalf("unexpected error: %v", err) } defer s.Close() - // Test DoMultiCache with redirect enabled - this exercises the Pin() code path - // Create a CacheableTTL manually to avoid the build-twice issue - cacheable := Cacheable(cmds.NewCompleted([]string{"GET", "k"})) - results := s.DoMultiCache(context.Background(), CacheableTTL{Cmd: cacheable, TTL: time.Second}) + // Create a simple command using the internal cmds package + cmd := cmds.NewCompleted([]string{"SET", "k", "v"}) + + // Test DoMulti with redirect failure + results := s.DoMulti(context.Background(), cmd) if len(results) != 1 { t.Fatalf("expected 1 result, got %d", len(results)) } - if results[0].Error() != nil { - t.Errorf("unexpected error: %v", results[0].Error()) + // Should return the original redirect error since retry is not allowed + if results[0].Error() == nil { + t.Error("expected error to be returned") + } + + if verr, ok := results[0].Error().(*ValkeyError); !ok || !strings.Contains(verr.Error(), "REDIRECT") { + t.Errorf("expected REDIRECT error, got %v", results[0].Error()) } } -func TestStandaloneDoMultiWithRedirectRetry(t *testing.T) { +func TestStandaloneDoMultiCacheWithRedirectRetry(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) - redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + redirectErr := RedisError(strmsg('-', "REDIRECT 127.0.0.1:6380")) attempts := 0 primaryConn := &mockConn{ DialFn: func() error { return nil }, - DoMultiFn: func(multi ...Completed) *valkeyresults { + DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { attempts++ // First attempt returns redirect error, second returns success if attempts == 1 { - return &valkeyresults{s: []ValkeyResult{newErrResult(&redirectErr)}} + return &redisresults{s: []RedisResult{newErrResult(&redirectErr)}} } - return &valkeyresults{s: []ValkeyResult{ValkeyResult{val: strmsg('+', "OK")}}} + return &redisresults{s: []RedisResult{RedisResult{val: strmsg('+', "OK")}}} }, } @@ -984,8 +867,8 @@ func TestStandaloneDoMultiWithRedirectRetry(t *testing.T) { redirectConnCalled = true return nil }, - DoMultiFn: func(multi ...Completed) *valkeyresults { - return &valkeyresults{s: []ValkeyResult{ValkeyResult{val: strmsg('+', "OK")}}} + DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { + return &redisresults{s: []RedisResult{RedisResult{val: strmsg('+', "OK")}}} }, CloseFn: func() {}, } @@ -1025,7 +908,7 @@ func TestStandaloneDoMultiWithRedirectRetry(t *testing.T) { cmd := cmds.NewCompleted([]string{"SET", "k", "v"}) // Test DoMulti with redirect and retry - results := s.DoMulti(context.Background(), cmd) + results := s.DoMultiCache(context.Background(), CT(Cacheable(cmd), time.Second)) if len(results) != 1 { t.Fatalf("expected 1 result, got %d", len(results)) } @@ -1044,15 +927,15 @@ func TestStandaloneDoMultiWithRedirectRetry(t *testing.T) { } } -func TestStandaloneDoMultiWithRedirectRetryFailure(t *testing.T) { +func TestStandaloneDoMultiCacheWithRedirectRetryFailure(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) - redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + redirectErr := RedisError(strmsg('-', "REDIRECT 127.0.0.1:6380")) primaryConn := &mockConn{ DialFn: func() error { return nil }, - DoMultiFn: func(multi ...Completed) *valkeyresults { - return &valkeyresults{s: []ValkeyResult{newErrResult(&redirectErr)}} + DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { + return &redisresults{s: []RedisResult{newErrResult(&redirectErr)}} }, } @@ -1098,7 +981,7 @@ func TestStandaloneDoMultiWithRedirectRetryFailure(t *testing.T) { cmd := cmds.NewCompleted([]string{"SET", "k", "v"}) // Test DoMulti with redirect failure - results := s.DoMulti(context.Background(), cmd) + results := s.DoMultiCache(context.Background(), CT(Cacheable(cmd), time.Second)) if len(results) != 1 { t.Fatalf("expected 1 result, got %d", len(results)) } @@ -1108,7 +991,7 @@ func TestStandaloneDoMultiWithRedirectRetryFailure(t *testing.T) { t.Error("expected error to be returned") } - if verr, ok := results[0].Error().(*ValkeyError); !ok || !strings.Contains(verr.Error(), "REDIRECT") { + if verr, ok := results[0].Error().(*RedisError); !ok || !strings.Contains(verr.Error(), "REDIRECT") { t.Errorf("expected REDIRECT error, got %v", results[0].Error()) } } From cd4e577146538add690b09f89196e23ad95d6ff0 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sat, 11 Oct 2025 14:15:25 -0700 Subject: [PATCH 2/2] feat: client capa redirect Signed-off-by: Rueian --- standalone.go | 22 +++++++++++----------- standalone_test.go | 32 ++++++++++++++++---------------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/standalone.go b/standalone.go index 5f7df95..b575814 100644 --- a/standalone.go +++ b/standalone.go @@ -113,7 +113,7 @@ retry: } } } - if resp.NonRedisError() == nil { + if resp.NonValkeyError() == nil { cmds.PutCompletedForce(cmd) } } @@ -147,7 +147,7 @@ retry: // Handle redirects with retry until context deadline if s.enableRedirect { for i, result := range resp { - if ret, yes := IsRedisErr(result.Error()); yes { + if ret, yes := IsValkeyErr(result.Error()); yes { if addr, ok := ret.IsRedirect(); ok { err := s.redirectCall.Do(ctx, func() error { return s.redirectToPrimary(addr) @@ -161,7 +161,7 @@ retry: } } for i, result := range resp { - if result.NonRedisError() == nil { + if result.NonValkeyError() == nil { cmds.PutCompletedForce(multi[i]) } } @@ -184,7 +184,7 @@ func (s *standalone) Close() { } } -func (s *standalone) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp RedisResult) { +func (s *standalone) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp ValkeyResult) { attempts := 1 if s.enableRedirect { @@ -195,7 +195,7 @@ retry: resp = s.primary.Load().DoCache(ctx, cmd, ttl) if s.enableRedirect { - if ret, yes := IsRedisErr(resp.Error()); yes { + if ret, yes := IsValkeyErr(resp.Error()); yes { if addr, ok := ret.IsRedirect(); ok { err := s.redirectCall.Do(ctx, func() error { return s.redirectToPrimary(addr) @@ -206,14 +206,14 @@ retry: } } } - if resp.NonRedisError() == nil { + if resp.NonValkeyError() == nil { cmds.PutCompletedForce(Completed(cmd)) } } return } -func (s *standalone) DoMultiCache(ctx context.Context, multi ...CacheableTTL) (resp []RedisResult) { +func (s *standalone) DoMultiCache(ctx context.Context, multi ...CacheableTTL) (resp []ValkeyResult) { attempts := 1 if s.enableRedirect { @@ -227,7 +227,7 @@ retry: if s.enableRedirect { for i, result := range resp { - if ret, yes := IsRedisErr(result.Error()); yes { + if ret, yes := IsValkeyErr(result.Error()); yes { if addr, ok := ret.IsRedirect(); ok { err := s.redirectCall.Do(ctx, func() error { return s.redirectToPrimary(addr) @@ -241,7 +241,7 @@ retry: } } for i, result := range resp { - if result.NonRedisError() == nil { + if result.NonValkeyError() == nil { cmds.PutCompletedForce(Completed(multi[i].Cmd)) } } @@ -249,8 +249,8 @@ retry: return } -func (s *standalone) DoStream(ctx context.Context, cmd Completed) RedisResultStream { - var stream RedisResultStream +func (s *standalone) DoStream(ctx context.Context, cmd Completed) ValkeyResultStream { + var stream ValkeyResultStream if s.toReplicas != nil && s.toReplicas(cmd) { stream = s.replicas[s.pick()].DoStream(ctx, cmd) } else { diff --git a/standalone_test.go b/standalone_test.go index 88daa2f..481f15f 100644 --- a/standalone_test.go +++ b/standalone_test.go @@ -308,19 +308,19 @@ func TestStandaloneDoCacheRedirectHandling(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) // Create a mock redirect response - redirectErr := RedisError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) // Mock primary connection that returns redirect primaryConn := &mockConn{ - DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult { + DoCacheFn: func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newErrResult(&redirectErr) }, } // Mock redirect target connection that returns success redirectConn := &mockConn{ - DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult { - return RedisResult{val: strmsg('+', "OK")} + DoCacheFn: func(cmd Cacheable, ttl time.Duration) ValkeyResult { + return ValkeyResult{val: strmsg('+', "OK")} }, } @@ -408,11 +408,11 @@ func TestStandaloneDoCacheRedirectDisabled(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) // Create a mock redirect response - redirectErr := RedisError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) // Mock primary connection that returns redirect primaryConn := &mockConn{ - DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult { + DoCacheFn: func(cmd Cacheable, ttl time.Duration) ValkeyResult { return newErrResult(&redirectErr) }, } @@ -846,18 +846,18 @@ func TestStandaloneDoMultiWithRedirectRetryFailure(t *testing.T) { func TestStandaloneDoMultiCacheWithRedirectRetry(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) - redirectErr := RedisError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) attempts := 0 primaryConn := &mockConn{ DialFn: func() error { return nil }, - DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { + DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { attempts++ // First attempt returns redirect error, second returns success if attempts == 1 { - return &redisresults{s: []RedisResult{newErrResult(&redirectErr)}} + return &valkeyresults{s: []ValkeyResult{newErrResult(&redirectErr)}} } - return &redisresults{s: []RedisResult{RedisResult{val: strmsg('+', "OK")}}} + return &valkeyresults{s: []ValkeyResult{ValkeyResult{val: strmsg('+', "OK")}}} }, } @@ -867,8 +867,8 @@ func TestStandaloneDoMultiCacheWithRedirectRetry(t *testing.T) { redirectConnCalled = true return nil }, - DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { - return &redisresults{s: []RedisResult{RedisResult{val: strmsg('+', "OK")}}} + DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { + return &valkeyresults{s: []ValkeyResult{ValkeyResult{val: strmsg('+', "OK")}}} }, CloseFn: func() {}, } @@ -930,12 +930,12 @@ func TestStandaloneDoMultiCacheWithRedirectRetry(t *testing.T) { func TestStandaloneDoMultiCacheWithRedirectRetryFailure(t *testing.T) { defer ShouldNotLeak(SetupLeakDetection()) - redirectErr := RedisError(strmsg('-', "REDIRECT 127.0.0.1:6380")) + redirectErr := ValkeyError(strmsg('-', "REDIRECT 127.0.0.1:6380")) primaryConn := &mockConn{ DialFn: func() error { return nil }, - DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults { - return &redisresults{s: []RedisResult{newErrResult(&redirectErr)}} + DoMultiCacheFn: func(multi ...CacheableTTL) *valkeyresults { + return &valkeyresults{s: []ValkeyResult{newErrResult(&redirectErr)}} }, } @@ -991,7 +991,7 @@ func TestStandaloneDoMultiCacheWithRedirectRetryFailure(t *testing.T) { t.Error("expected error to be returned") } - if verr, ok := results[0].Error().(*RedisError); !ok || !strings.Contains(verr.Error(), "REDIRECT") { + if verr, ok := results[0].Error().(*ValkeyError); !ok || !strings.Contains(verr.Error(), "REDIRECT") { t.Errorf("expected REDIRECT error, got %v", results[0].Error()) } }