Skip to content

Commit 231040c

Browse files
lukasz-zimnochfjl
andauthoredJan 21, 2021
event: add ResubscribeErr (#22191)
This adds a way to get the error of the failing subscription for logging/debugging purposes. Co-authored-by: Felix Lange <fjl@twurst.com>
1 parent c4307a9 commit 231040c

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed
 

‎event/subscription.go

+28-4
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,26 @@ func (s *funcSub) Err() <-chan error {
9595
// Resubscribe applies backoff between calls to fn. The time between calls is adapted
9696
// based on the error rate, but will never exceed backoffMax.
9797
func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
98+
return ResubscribeErr(backoffMax, func(ctx context.Context, _ error) (Subscription, error) {
99+
return fn(ctx)
100+
})
101+
}
102+
103+
// A ResubscribeFunc attempts to establish a subscription.
104+
type ResubscribeFunc func(context.Context) (Subscription, error)
105+
106+
// ResubscribeErr calls fn repeatedly to keep a subscription established. When the
107+
// subscription is established, ResubscribeErr waits for it to fail and calls fn again. This
108+
// process repeats until Unsubscribe is called or the active subscription ends
109+
// successfully.
110+
//
111+
// The difference between Resubscribe and ResubscribeErr is that with ResubscribeErr,
112+
// the error of the failing subscription is available to the callback for logging
113+
// purposes.
114+
//
115+
// ResubscribeErr applies backoff between calls to fn. The time between calls is adapted
116+
// based on the error rate, but will never exceed backoffMax.
117+
func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscription {
98118
s := &resubscribeSub{
99119
waitTime: backoffMax / 10,
100120
backoffMax: backoffMax,
@@ -106,15 +126,18 @@ func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
106126
return s
107127
}
108128

109-
// A ResubscribeFunc attempts to establish a subscription.
110-
type ResubscribeFunc func(context.Context) (Subscription, error)
129+
// A ResubscribeErrFunc attempts to establish a subscription.
130+
// For every call but the first, the second argument to this function is
131+
// the error that occurred with the previous subscription.
132+
type ResubscribeErrFunc func(context.Context, error) (Subscription, error)
111133

112134
type resubscribeSub struct {
113-
fn ResubscribeFunc
135+
fn ResubscribeErrFunc
114136
err chan error
115137
unsub chan struct{}
116138
unsubOnce sync.Once
117139
lastTry mclock.AbsTime
140+
lastSubErr error
118141
waitTime, backoffMax time.Duration
119142
}
120143

@@ -149,7 +172,7 @@ func (s *resubscribeSub) subscribe() Subscription {
149172
s.lastTry = mclock.Now()
150173
ctx, cancel := context.WithCancel(context.Background())
151174
go func() {
152-
rsub, err := s.fn(ctx)
175+
rsub, err := s.fn(ctx, s.lastSubErr)
153176
sub = rsub
154177
subscribed <- err
155178
}()
@@ -178,6 +201,7 @@ func (s *resubscribeSub) waitForError(sub Subscription) bool {
178201
defer sub.Unsubscribe()
179202
select {
180203
case err := <-sub.Err():
204+
s.lastSubErr = err
181205
return err == nil
182206
case <-s.unsub:
183207
return true

‎event/subscription_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package event
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
23+
"reflect"
2224
"testing"
2325
"time"
2426
)
@@ -118,3 +120,37 @@ func TestResubscribeAbort(t *testing.T) {
118120
t.Fatal(err)
119121
}
120122
}
123+
124+
func TestResubscribeWithErrorHandler(t *testing.T) {
125+
t.Parallel()
126+
127+
var i int
128+
nfails := 6
129+
subErrs := make([]string, 0)
130+
sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) {
131+
i++
132+
var lastErrVal string
133+
if lastErr != nil {
134+
lastErrVal = lastErr.Error()
135+
}
136+
subErrs = append(subErrs, lastErrVal)
137+
sub := NewSubscription(func(unsubscribed <-chan struct{}) error {
138+
if i < nfails {
139+
return fmt.Errorf("err-%v", i)
140+
} else {
141+
return nil
142+
}
143+
})
144+
return sub, nil
145+
})
146+
147+
<-sub.Err()
148+
if i != nfails {
149+
t.Fatalf("resubscribe function called %d times, want %d times", i, nfails)
150+
}
151+
152+
expectedSubErrs := []string{"", "err-1", "err-2", "err-3", "err-4", "err-5"}
153+
if !reflect.DeepEqual(subErrs, expectedSubErrs) {
154+
t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs)
155+
}
156+
}

0 commit comments

Comments
 (0)
Please sign in to comment.