Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 76 additions & 53 deletions standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,17 @@ func (s *standalone) redirectToPrimary(addr string) error {

// Atomically swap the primary and close the old one
oldPrimary := s.primary.Swap(newPrimary)
oldPrimary.Close()
go func(oldPrimary *singleClient) {
time.Sleep(time.Second * 5)
oldPrimary.Close()
}(oldPrimary)

return nil
}

func (s *standalone) Do(ctx context.Context, cmd Completed) (resp ValkeyResult) {
attempts := 1

if s.enableRedirect {
cmd = cmd.Pin()
}
Expand All @@ -100,17 +104,18 @@ retry:
if s.enableRedirect {
if ret, yes := IsValkeyErr(resp.Error()); yes {
if addr, ok := ret.IsRedirect(); ok {
// Command is already pinned at this point
err := s.redirectCall.Do(ctx, func() error {
return s.redirectToPrimary(addr)
})
// Use retryHandler to handle multiple redirects with context deadline
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error()) {
attempts++
goto retry
}
}
}
if resp.NonValkeyError() == nil {
cmds.PutCompletedForce(cmd)
}
}

return resp
Expand All @@ -119,7 +124,6 @@ retry:
func (s *standalone) DoMulti(ctx context.Context, multi ...Completed) (resp []ValkeyResult) {
attempts := 1

// Pin all commands at the beginning if redirect is enabled
if s.enableRedirect {
for i := range multi {
multi[i] = multi[i].Pin()
Expand All @@ -143,22 +147,24 @@ retry:
// Handle redirects with retry until context deadline
if s.enableRedirect {
for i, result := range resp {
if i < len(multi) {
if ret, yes := IsValkeyErr(result.Error()); yes {
if addr, ok := ret.IsRedirect(); ok {
err := s.redirectCall.Do(ctx, func() error {
return s.redirectToPrimary(addr)
})
// Use retryHandler to handle multiple redirects with context deadline
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, multi[0], result.Error()) {
attempts++
goto retry
}
break // Exit the loop if redirect handling fails
if ret, yes := IsValkeyErr(result.Error()); yes {
if addr, ok := ret.IsRedirect(); ok {
err := s.redirectCall.Do(ctx, func() error {
return s.redirectToPrimary(addr)
})
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, multi[i], result.Error()) {
attempts++
goto retry
}
break // Exit the loop if redirect handling fails
}
}
}
for i, result := range resp {
if result.NonValkeyError() == nil {
cmds.PutCompletedForce(multi[i])
}
}
}

return resp
Expand All @@ -179,44 +185,77 @@ func (s *standalone) Close() {
}

func (s *standalone) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp ValkeyResult) {
return s.primary.Load().DoCache(ctx, cmd, ttl)
attempts := 1

if s.enableRedirect {
cmd = cmd.Pin()
}

retry:
resp = s.primary.Load().DoCache(ctx, cmd, ttl)

if s.enableRedirect {
if ret, yes := IsValkeyErr(resp.Error()); yes {
if addr, ok := ret.IsRedirect(); ok {
err := s.redirectCall.Do(ctx, func() error {
return s.redirectToPrimary(addr)
})
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error()) {
attempts++
goto retry
}
}
}
if resp.NonValkeyError() == nil {
cmds.PutCompletedForce(Completed(cmd))
}
}
return
}

func (s *standalone) DoMultiCache(ctx context.Context, multi ...CacheableTTL) (resp []ValkeyResult) {
attempts := 1

if s.enableRedirect {
for i := range multi {
multi[i].Cmd = multi[i].Cmd.Pin()
}
}
return s.primary.Load().DoMultiCache(ctx, multi...)
}

func (s *standalone) DoStream(ctx context.Context, cmd Completed) ValkeyResultStream {
retry:
resp = s.primary.Load().DoMultiCache(ctx, multi...)

if s.enableRedirect {
cmd = cmd.Pin()
for i, result := range resp {
if ret, yes := IsValkeyErr(result.Error()); yes {
if addr, ok := ret.IsRedirect(); ok {
err := s.redirectCall.Do(ctx, func() error {
return s.redirectToPrimary(addr)
})
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, Completed(multi[i].Cmd), result.Error()) {
attempts++
goto retry
}
break // Exit the loop if redirect handling fails
}
}
}
for i, result := range resp {
if result.NonValkeyError() == nil {
cmds.PutCompletedForce(Completed(multi[i].Cmd))
}
}
}
return
}

func (s *standalone) DoStream(ctx context.Context, cmd Completed) ValkeyResultStream {
var stream ValkeyResultStream
if s.toReplicas != nil && s.toReplicas(cmd) {
stream = s.replicas[s.pick()].DoStream(ctx, cmd)
} else {
stream = s.primary.Load().DoStream(ctx, cmd)
}

// Handle redirect for stream
if s.enableRedirect && stream.Error() != nil {
if ret, yes := IsValkeyErr(stream.Error()); yes {
if addr, ok := ret.IsRedirect(); ok {
err := s.redirectCall.Do(ctx, func() error {
return s.redirectToPrimary(addr)
})
if err == nil {
// Execute the command on the updated primary
return s.primary.Load().DoStream(ctx, cmd)
}
}
}
}

return stream
}

Expand All @@ -234,22 +273,6 @@ func (s *standalone) DoMultiStream(ctx context.Context, multi ...Completed) Mult
} else {
stream = s.primary.Load().DoMultiStream(ctx, multi...)
}

// Handle redirect for stream
if s.enableRedirect && stream.Error() != nil {
if ret, yes := IsValkeyErr(stream.Error()); yes {
if addr, ok := ret.IsRedirect(); ok {
err := s.redirectCall.Do(ctx, func() error {
return s.redirectToPrimary(addr)
})
if err == nil {
// Execute the command on the updated primary
return s.primary.Load().DoMultiStream(ctx, multi...)
}
}
}
}

return stream
}

Expand Down
Loading
Loading