Skip to content

Commit 66d73e7

Browse files
authored
Standalone redirect (#84)
* feat: client capa redirect Signed-off-by: Rueian <[email protected]> * feat: client capa redirect Signed-off-by: Rueian <[email protected]> --------- Signed-off-by: Rueian <[email protected]>
1 parent ee0ba76 commit 66d73e7

File tree

2 files changed

+240
-334
lines changed

2 files changed

+240
-334
lines changed

standalone.go

Lines changed: 76 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,17 @@ func (s *standalone) redirectToPrimary(addr string) error {
7878

7979
// Atomically swap the primary and close the old one
8080
oldPrimary := s.primary.Swap(newPrimary)
81-
oldPrimary.Close()
81+
go func(oldPrimary *singleClient) {
82+
time.Sleep(time.Second * 5)
83+
oldPrimary.Close()
84+
}(oldPrimary)
8285

8386
return nil
8487
}
8588

8689
func (s *standalone) Do(ctx context.Context, cmd Completed) (resp ValkeyResult) {
8790
attempts := 1
91+
8892
if s.enableRedirect {
8993
cmd = cmd.Pin()
9094
}
@@ -100,17 +104,18 @@ retry:
100104
if s.enableRedirect {
101105
if ret, yes := IsValkeyErr(resp.Error()); yes {
102106
if addr, ok := ret.IsRedirect(); ok {
103-
// Command is already pinned at this point
104107
err := s.redirectCall.Do(ctx, func() error {
105108
return s.redirectToPrimary(addr)
106109
})
107-
// Use retryHandler to handle multiple redirects with context deadline
108110
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error()) {
109111
attempts++
110112
goto retry
111113
}
112114
}
113115
}
116+
if resp.NonValkeyError() == nil {
117+
cmds.PutCompletedForce(cmd)
118+
}
114119
}
115120

116121
return resp
@@ -119,7 +124,6 @@ retry:
119124
func (s *standalone) DoMulti(ctx context.Context, multi ...Completed) (resp []ValkeyResult) {
120125
attempts := 1
121126

122-
// Pin all commands at the beginning if redirect is enabled
123127
if s.enableRedirect {
124128
for i := range multi {
125129
multi[i] = multi[i].Pin()
@@ -143,22 +147,24 @@ retry:
143147
// Handle redirects with retry until context deadline
144148
if s.enableRedirect {
145149
for i, result := range resp {
146-
if i < len(multi) {
147-
if ret, yes := IsValkeyErr(result.Error()); yes {
148-
if addr, ok := ret.IsRedirect(); ok {
149-
err := s.redirectCall.Do(ctx, func() error {
150-
return s.redirectToPrimary(addr)
151-
})
152-
// Use retryHandler to handle multiple redirects with context deadline
153-
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, multi[0], result.Error()) {
154-
attempts++
155-
goto retry
156-
}
157-
break // Exit the loop if redirect handling fails
150+
if ret, yes := IsValkeyErr(result.Error()); yes {
151+
if addr, ok := ret.IsRedirect(); ok {
152+
err := s.redirectCall.Do(ctx, func() error {
153+
return s.redirectToPrimary(addr)
154+
})
155+
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, multi[i], result.Error()) {
156+
attempts++
157+
goto retry
158158
}
159+
break // Exit the loop if redirect handling fails
159160
}
160161
}
161162
}
163+
for i, result := range resp {
164+
if result.NonValkeyError() == nil {
165+
cmds.PutCompletedForce(multi[i])
166+
}
167+
}
162168
}
163169

164170
return resp
@@ -179,44 +185,77 @@ func (s *standalone) Close() {
179185
}
180186

181187
func (s *standalone) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp ValkeyResult) {
182-
return s.primary.Load().DoCache(ctx, cmd, ttl)
188+
attempts := 1
189+
190+
if s.enableRedirect {
191+
cmd = cmd.Pin()
192+
}
193+
194+
retry:
195+
resp = s.primary.Load().DoCache(ctx, cmd, ttl)
196+
197+
if s.enableRedirect {
198+
if ret, yes := IsValkeyErr(resp.Error()); yes {
199+
if addr, ok := ret.IsRedirect(); ok {
200+
err := s.redirectCall.Do(ctx, func() error {
201+
return s.redirectToPrimary(addr)
202+
})
203+
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error()) {
204+
attempts++
205+
goto retry
206+
}
207+
}
208+
}
209+
if resp.NonValkeyError() == nil {
210+
cmds.PutCompletedForce(Completed(cmd))
211+
}
212+
}
213+
return
183214
}
184215

185216
func (s *standalone) DoMultiCache(ctx context.Context, multi ...CacheableTTL) (resp []ValkeyResult) {
217+
attempts := 1
218+
186219
if s.enableRedirect {
187220
for i := range multi {
188221
multi[i].Cmd = multi[i].Cmd.Pin()
189222
}
190223
}
191-
return s.primary.Load().DoMultiCache(ctx, multi...)
192-
}
193224

194-
func (s *standalone) DoStream(ctx context.Context, cmd Completed) ValkeyResultStream {
225+
retry:
226+
resp = s.primary.Load().DoMultiCache(ctx, multi...)
227+
195228
if s.enableRedirect {
196-
cmd = cmd.Pin()
229+
for i, result := range resp {
230+
if ret, yes := IsValkeyErr(result.Error()); yes {
231+
if addr, ok := ret.IsRedirect(); ok {
232+
err := s.redirectCall.Do(ctx, func() error {
233+
return s.redirectToPrimary(addr)
234+
})
235+
if err == nil || s.retryer.WaitOrSkipRetry(ctx, attempts, Completed(multi[i].Cmd), result.Error()) {
236+
attempts++
237+
goto retry
238+
}
239+
break // Exit the loop if redirect handling fails
240+
}
241+
}
242+
}
243+
for i, result := range resp {
244+
if result.NonValkeyError() == nil {
245+
cmds.PutCompletedForce(Completed(multi[i].Cmd))
246+
}
247+
}
197248
}
249+
return
250+
}
251+
252+
func (s *standalone) DoStream(ctx context.Context, cmd Completed) ValkeyResultStream {
198253
var stream ValkeyResultStream
199254
if s.toReplicas != nil && s.toReplicas(cmd) {
200255
stream = s.replicas[s.pick()].DoStream(ctx, cmd)
201256
} else {
202257
stream = s.primary.Load().DoStream(ctx, cmd)
203258
}
204-
205-
// Handle redirect for stream
206-
if s.enableRedirect && stream.Error() != nil {
207-
if ret, yes := IsValkeyErr(stream.Error()); yes {
208-
if addr, ok := ret.IsRedirect(); ok {
209-
err := s.redirectCall.Do(ctx, func() error {
210-
return s.redirectToPrimary(addr)
211-
})
212-
if err == nil {
213-
// Execute the command on the updated primary
214-
return s.primary.Load().DoStream(ctx, cmd)
215-
}
216-
}
217-
}
218-
}
219-
220259
return stream
221260
}
222261

@@ -234,22 +273,6 @@ func (s *standalone) DoMultiStream(ctx context.Context, multi ...Completed) Mult
234273
} else {
235274
stream = s.primary.Load().DoMultiStream(ctx, multi...)
236275
}
237-
238-
// Handle redirect for stream
239-
if s.enableRedirect && stream.Error() != nil {
240-
if ret, yes := IsValkeyErr(stream.Error()); yes {
241-
if addr, ok := ret.IsRedirect(); ok {
242-
err := s.redirectCall.Do(ctx, func() error {
243-
return s.redirectToPrimary(addr)
244-
})
245-
if err == nil {
246-
// Execute the command on the updated primary
247-
return s.primary.Load().DoMultiStream(ctx, multi...)
248-
}
249-
}
250-
}
251-
}
252-
253276
return stream
254277
}
255278

0 commit comments

Comments
 (0)