Skip to content

Commit 19dce48

Browse files
committed
client/daemon: route liveness probing / listenfunc with retry
1 parent b535b34 commit 19dce48

File tree

6 files changed

+45
-137
lines changed

6 files changed

+45
-137
lines changed

client/doublezerod/cmd/doublezerod/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func main() {
183183
Liveness: liveness,
184184
Limiter: limiter,
185185
Scheduler: scheduler,
186-
ListenFunc: probing.DefaultListenFunc(logger, iface, src),
186+
ListenFunc: probing.DefaultListenFuncWithRetry(logger, iface, src),
187187
ProbeFunc: probing.DefaultProbeFunc(logger, iface, *routeProbingProbeTimeout),
188188
})
189189
} else {

client/doublezerod/internal/probing/config.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,10 @@ import (
66
"context"
77
"errors"
88
"log/slog"
9-
"time"
109

1110
"github.com/malbeclabs/doublezero/client/doublezerod/internal/routing"
1211
)
1312

14-
var (
15-
// defaultListenBackoff is used when no ListenBackoff is provided.
16-
defaultListenBackoff = ListenBackoffConfig{
17-
Initial: 1 * time.Second,
18-
Max: 30 * time.Second,
19-
Multiplier: 2,
20-
}
21-
)
22-
2313
// ListenFunc starts a long-lived listener for probe responses or control-plane
2414
// events. It should block until the context is canceled or an unrecoverable
2515
// error occurs, returning nil on clean shutdown (ctx canceled).
@@ -31,14 +21,6 @@ type ListenFunc func(context.Context) error
3121
// the error is due to worker stop (ctx canceled by parent).
3222
type ProbeFunc func(context.Context, *routing.Route) (ProbeResult, error)
3323

34-
// ListenBackoffConfig controls exponential backoff for ListenFunc retries.
35-
// Multiplier is applied to the previous backoff duration, capped at Max.
36-
type ListenBackoffConfig struct {
37-
Initial time.Duration // starting delay before first retry
38-
Max time.Duration // upper bound for backoff delay
39-
Multiplier float64 // growth factor per retry, e.g. 2.0 for doubling
40-
}
41-
4224
// ProbeResult contains probe outcome and basic packet counts.
4325
// OK should reflect end-to-end success criteria for a single probe wave.
4426
type ProbeResult struct {
@@ -60,9 +42,6 @@ type Config struct {
6042
Scheduler Scheduler // scheduler for route probing
6143
ListenFunc ListenFunc // long-lived listener (with retry/backoff)
6244
ProbeFunc ProbeFunc // per-route probe function
63-
64-
// Required scalar fields.
65-
ListenBackoff ListenBackoffConfig // retry policy for ListenFunc errors; defaulted if zero
6645
}
6746

6847
// Validate verifies required fields and applies defaults for optional fields.
@@ -94,10 +73,5 @@ func (cfg *Config) Validate() error {
9473
return errors.New("probe func is required")
9574
}
9675

97-
// Default values.
98-
if cfg.ListenBackoff == (ListenBackoffConfig{}) {
99-
cfg.ListenBackoff = defaultListenBackoff
100-
}
101-
10276
return nil
10377
}

client/doublezerod/internal/probing/default.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,50 @@ import (
99
"net"
1010
"time"
1111

12+
"github.com/cenkalti/backoff/v4"
1213
"github.com/malbeclabs/doublezero/client/doublezerod/internal/routing"
1314
"github.com/malbeclabs/doublezero/tools/uping/pkg/uping"
1415
promprobing "github.com/prometheus-community/pro-bing"
1516
)
1617

17-
// DefaultListenFunc returns a ListenFunc that starts an ICMP listener
18-
// using the uping package bound to the given interface and source IP.
18+
// DefaultListenFunc returns a ListenFunc that starts an ICMP listener bound to iface/src.
1919
// It blocks until the context is canceled or a fatal error occurs.
2020
func DefaultListenFunc(log *slog.Logger, iface string, src net.IP) ListenFunc {
2121
return func(ctx context.Context) error {
22-
listener, err := uping.NewListener(uping.ListenerConfig{
22+
l, err := uping.NewListener(uping.ListenerConfig{
2323
Logger: log,
2424
Interface: iface,
2525
IP: src,
2626
})
2727
if err != nil {
2828
return err
2929
}
30-
return listener.Listen(ctx)
30+
return l.Listen(ctx)
31+
}
32+
}
33+
func DefaultListenFuncWithRetry(log *slog.Logger, iface string, src net.IP, opts ...backoff.ExponentialBackOffOpts) ListenFunc {
34+
base := DefaultListenFunc(log, iface, src)
35+
36+
opts = append([]backoff.ExponentialBackOffOpts{
37+
backoff.WithInitialInterval(100 * time.Millisecond),
38+
backoff.WithMultiplier(2.0),
39+
backoff.WithMaxInterval(5 * time.Second),
40+
backoff.WithMaxElapsedTime(1 * time.Minute), // stop retrying after a minute…
41+
backoff.WithRandomizationFactor(0), // deterministic (no jitter)
42+
}, opts...)
43+
return func(ctx context.Context) error {
44+
b := backoff.NewExponentialBackOff(opts...)
45+
46+
bo := backoff.WithContext(b, ctx) // …and also stop on ctx cancel/timeout
47+
48+
op := func() error {
49+
err := base(ctx)
50+
// If you detect a non-retryable error, wrap it:
51+
// if errors.Is(err, uping.ErrInvalidInterface) { return backoff.Permanent(err) }
52+
return err
53+
}
54+
55+
return backoff.Retry(op, bo)
3156
}
3257
}
3358

client/doublezerod/internal/probing/main_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,6 @@ func newTestConfig(t *testing.T, mutate func(*Config)) *Config {
9090
time.Sleep(1 * time.Millisecond)
9191
return ProbeResult{OK: true, Sent: 1, Received: 1}, nil
9292
},
93-
ListenBackoff: ListenBackoffConfig{
94-
Initial: 10 * time.Millisecond,
95-
Max: 100 * time.Millisecond,
96-
Multiplier: 2,
97-
},
9893
}
9994
if mutate != nil {
10095
mutate(&cfg)

client/doublezerod/internal/probing/worker.go

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ func newWorker(log *slog.Logger, cfg *Config, store *routeStore) *probingWorker
4141
// the provided context is canceled or Stop is called. Safe to call concurrently
4242
// with IsRunning/Stop.
4343
func (w *probingWorker) Start(ctx context.Context) {
44-
if w.IsRunning() {
44+
if !w.running.CompareAndSwap(false, true) {
4545
return
4646
}
4747
ctx, cancel := context.WithCancel(ctx)
4848
w.cancelMu.Lock()
4949
w.cancel = cancel
5050
w.cancelMu.Unlock()
51-
w.running.Store(true)
51+
5252
w.wg.Add(1)
5353
go func() {
5454
defer w.wg.Done()
@@ -87,11 +87,21 @@ func (w *probingWorker) Run(ctx context.Context) {
8787
"scheduler", w.cfg.Scheduler.String(),
8888
)
8989

90-
// Listener runs in parallel and is retried with backoff on failure.
90+
// Listener runs in parallel.
9191
w.wg.Add(1)
9292
go func() {
9393
defer w.wg.Done()
94-
w.listen(ctx)
94+
err := w.cfg.ListenFunc(ctx)
95+
if err != nil {
96+
w.log.Error("listener error", "error", err)
97+
98+
// Cancel the worker to stop the run loop.
99+
w.cancelMu.Lock()
100+
if w.cancel != nil {
101+
w.cancel()
102+
}
103+
w.cancelMu.Unlock()
104+
}
95105
}()
96106

97107
// Single reusable timer; we re-arm it whenever the earliest due changes.
@@ -222,50 +232,6 @@ func (w *probingWorker) runProbe(parent context.Context, rk RouteKey, mr managed
222232
w.applyProbeResult(&mr, outcome.OK)
223233
}
224234

225-
// listen runs cfg.ListenFunc until it returns nil or ctx is canceled, retrying
226-
// with exponential backoff on error. Backoff sleeps are ctx-cancelable.
227-
// Contract: ListenFunc should return nil on a clean, permanent exit; transient
228-
// failures should be reported as errors to trigger backoff/retry.
229-
func (w *probingWorker) listen(ctx context.Context) {
230-
backoff := w.cfg.ListenBackoff
231-
attempt := 0
232-
for {
233-
if ctx.Err() != nil {
234-
return
235-
}
236-
237-
if err := w.cfg.ListenFunc(ctx); err == nil {
238-
// Listener exited cleanly; we’re done.
239-
return
240-
} else {
241-
w.log.Error("listener error", "error", err)
242-
}
243-
244-
// Calculate the backoff duration.
245-
attempt++
246-
d := backoff.Initial
247-
for i := 1; i < attempt; i++ {
248-
d = time.Duration(float64(d) * backoff.Multiplier)
249-
if d > backoff.Max {
250-
d = backoff.Max
251-
break
252-
}
253-
}
254-
255-
// Cancelable sleep between retries.
256-
t := time.NewTimer(d)
257-
select {
258-
case <-t.C:
259-
// Backoff timer fired; retry.
260-
case <-ctx.Done():
261-
if !t.Stop() {
262-
<-t.C
263-
}
264-
return
265-
}
266-
}
267-
}
268-
269235
// applyProbeResult updates liveness state and reconciles kernel routes:
270236
//
271237
// Up -> Netlink.RouteAdd
@@ -309,8 +275,7 @@ func (w *probingWorker) applyProbeResult(mr *managedRoute, ok bool) {
309275
// No kernel operation required.
310276
}
311277

312-
// Persist the updated snapshot back into the store.
313-
w.store.Set(key, cur)
278+
// No need to store.Set since we mutated the managedRoute in place.
314279
}
315280

316281
// validateRoute enforces IPv4-only Src/Dst/NextHop. This worker currently

client/doublezerod/internal/probing/worker_test.go

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -330,57 +330,6 @@ func TestProbing_Worker_ContextCancelIsNoop(t *testing.T) {
330330
require.True(t, hasRouteLiveness(w, r, LivenessStatusDown, 0, 0))
331331
}
332332

333-
func TestProbing_Worker_ListenRetry_UntilContextDone(t *testing.T) {
334-
t.Parallel()
335-
336-
fourthStarted := make(chan struct{}, 1)
337-
var listenCalls atomic.Int64
338-
339-
sched := newFakeScheduler()
340-
cfg := newTestConfig(t, func(c *Config) {
341-
c.Liveness = seqPolicy([]LivenessTransition{LivenessTransitionNoChange})
342-
c.ProbeFunc = func(context.Context, *routing.Route) (ProbeResult, error) { return ProbeResult{OK: true}, nil }
343-
c.ListenFunc = func(ctx context.Context) error {
344-
n := listenCalls.Add(1)
345-
if n <= 3 {
346-
return errors.New("synthetic listen error")
347-
}
348-
select {
349-
case fourthStarted <- struct{}{}:
350-
default:
351-
}
352-
<-ctx.Done()
353-
return nil
354-
}
355-
c.ListenBackoff = ListenBackoffConfig{Initial: time.Millisecond, Max: time.Millisecond, Multiplier: 1}
356-
c.Limiter, _ = NewSemaphoreLimiter(2)
357-
c.Scheduler = sched
358-
})
359-
360-
w := newWorker(cfg.Logger, cfg, newRouteStore())
361-
r := newTestRouteWithDst(net.IPv4(10, 0, 5, 10))
362-
rk := newRouteKey(r)
363-
w.store.Set(rk, managedRoute{route: r, liveness: cfg.Liveness.NewTracker()})
364-
sched.Add(rk, time.Now())
365-
366-
w.Start(cfg.Context)
367-
t.Cleanup(w.Stop)
368-
369-
requireEventuallyDump(t, func() bool { return w.IsRunning() }, 2*time.Second, 10*time.Millisecond, "worker did not start")
370-
371-
// Wait until the 4th listen attempt has occurred
372-
requireEventuallyDump(t, func() bool {
373-
select {
374-
case <-fourthStarted:
375-
return true
376-
default:
377-
return false
378-
}
379-
}, 5*time.Second, 5*time.Millisecond, "4th listen attempt did not start")
380-
381-
require.True(t, w.IsRunning())
382-
}
383-
384333
func TestProbing_Worker_KernelError_DoesNotBlockStateAdvance(t *testing.T) {
385334
t.Parallel()
386335

0 commit comments

Comments
 (0)