Skip to content

Commit 0057599

Browse files
committed
rfc: client route liveness probing / rtt and nowfunc
1 parent 23de53b commit 0057599

File tree

7 files changed

+44
-25
lines changed

7 files changed

+44
-25
lines changed

client/doublezerod/internal/probing/config.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"errors"
88
"log/slog"
9+
"time"
910

1011
"github.com/malbeclabs/doublezero/client/doublezerod/internal/routing"
1112
)
@@ -25,10 +26,14 @@ type ProbeFunc func(context.Context, *routing.Route) (ProbeResult, error)
2526
// OK should reflect end-to-end success criteria for a single probe wave.
2627
type ProbeResult struct {
2728
OK bool
28-
Sent int
29-
Received int
29+
Sent int // number of probes sent in the wave
30+
Received int // number of probes received in the wave
31+
RTTMean time.Duration // mean RTT of all probes in the wave
3032
}
3133

34+
// NowFunc returns the current time in UTC.
35+
type NowFunc func() time.Time
36+
3237
// Config provides all dependencies and tunables for the probing system.
3338
// Fields marked “Required” must be set; Validate enforces this and applies
3439
// defaults where appropriate.
@@ -42,6 +47,7 @@ type Config struct {
4247
Scheduler Scheduler // scheduler for route probing
4348
ListenFunc ListenFunc // long-lived listener (with retry/backoff)
4449
ProbeFunc ProbeFunc // per-route probe function
50+
NowFunc NowFunc // function to get the current time
4551
}
4652

4753
// Validate verifies required fields and applies defaults for optional fields.
@@ -72,6 +78,11 @@ func (cfg *Config) Validate() error {
7278
if cfg.ProbeFunc == nil {
7379
return errors.New("probe func is required")
7480
}
81+
if cfg.NowFunc == nil {
82+
cfg.NowFunc = func() time.Time {
83+
return time.Now().UTC()
84+
}
85+
}
7586

7687
return nil
7788
}

client/doublezerod/internal/probing/default.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func DefaultProbeFunc(log *slog.Logger, iface string, timeout time.Duration) Pro
8383
OK: ok,
8484
Sent: stats.PacketsSent,
8585
Received: stats.PacketsRecv,
86+
RTTMean: stats.AvgRtt,
8687
}, nil
8788
}
8889
}

client/doublezerod/internal/probing/main_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func newTestConfig(t *testing.T, mutate func(*Config)) *Config {
9090
time.Sleep(1 * time.Millisecond)
9191
return ProbeResult{OK: true, Sent: 1, Received: 1}, nil
9292
},
93+
NowFunc: now,
9394
}
9495
if mutate != nil {
9596
mutate(&cfg)
@@ -293,7 +294,7 @@ func (s *fakeScheduler) Peek() (time.Time, bool) {
293294
p := s.wavePending
294295
s.mu.Unlock()
295296
if p {
296-
return time.Now(), true
297+
return now(), true
297298
}
298299
return time.Time{}, false
299300
}
@@ -569,3 +570,7 @@ func (m *mockLivenessTracker) ConsecutiveOK() uint {
569570
func (m *mockLivenessTracker) ConsecutiveFail() uint {
570571
return m.ConsecutiveFailFunc()
571572
}
573+
574+
func now() time.Time {
575+
return time.Now().UTC()
576+
}

client/doublezerod/internal/probing/manager.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"errors"
77
"fmt"
88
"log/slog"
9-
"time"
109

1110
"github.com/malbeclabs/doublezero/client/doublezerod/internal/routing"
1211
)
@@ -99,7 +98,7 @@ func (m *RouteManager) handleRouteAdd(route *routing.Route) error {
9998

10099
// Add the route to managed route store.
101100
key := newRouteKey(route)
102-
now := time.Now()
101+
now := m.cfg.NowFunc()
103102
m.cfg.Scheduler.Add(key, now)
104103
m.store.Set(key, managedRoute{
105104
route: route,

client/doublezerod/internal/probing/scheduler.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type IntervalConfig struct {
3939
Interval time.Duration // base interval between probes
4040
Jitter time.Duration // max absolute jitter (+/-) applied to the interval/phase anchor
4141
Phase bool // whether to phase routes deterministically
42+
NowFunc NowFunc // function to get the current time
4243
}
4344

4445
// Validate ensures the configuration is usable.
@@ -49,6 +50,11 @@ func (cfg *IntervalConfig) Validate() error {
4950
if cfg.Jitter < 0 {
5051
return errors.New("jitter must be >= 0")
5152
}
53+
if cfg.NowFunc == nil {
54+
cfg.NowFunc = func() time.Time {
55+
return time.Now().UTC()
56+
}
57+
}
5258
return nil
5359
}
5460

@@ -105,7 +111,7 @@ func (s *IntervalScheduler) Add(k RouteKey, base time.Time) {
105111
return
106112
}
107113
seed := hash64(k)
108-
due := firstDue(base, s.cfg.Interval, s.cfg.Jitter, s.cfg.Phase, k, seed)
114+
due := firstDue(base, s.cfg.Interval, s.cfg.Jitter, s.cfg.Phase, k, seed, s.cfg.NowFunc)
109115
s.routes[k] = &schedulerRouteState{seed: seed, nextDue: due}
110116
s.maybeSignalLocked()
111117
}
@@ -250,9 +256,9 @@ func phaseOffset(iv time.Duration, k RouteKey) time.Duration {
250256

251257
// firstDue computes the initial due time for a route given the config.
252258
// If Phase is enabled, routes are staggered deterministically by key.
253-
func firstDue(base time.Time, iv time.Duration, jitter time.Duration, phase bool, k RouteKey, seed uint64) time.Time {
259+
func firstDue(base time.Time, iv time.Duration, jitter time.Duration, phase bool, k RouteKey, seed uint64, nowFunc NowFunc) time.Time {
254260
if base.IsZero() {
255-
base = time.Now()
261+
base = nowFunc()
256262
}
257263
if phase {
258264
d := base.Truncate(iv).Add(phaseOffset(iv, k))

client/doublezerod/internal/probing/worker.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ func (w *probingWorker) Run(ctx context.Context) {
143143
for {
144144
// Handle “due now” immediately to avoid Reset(0) races.
145145
if next, ok := w.cfg.Scheduler.Peek(); ok {
146-
if !next.After(time.Now()) {
147-
launchDue(time.Now())
146+
if !next.After(w.cfg.NowFunc()) {
147+
launchDue(w.cfg.NowFunc())
148148
// After processing, loop to re-peek and re-arm.
149149
continue
150150
}
@@ -164,7 +164,7 @@ func (w *probingWorker) Run(ctx context.Context) {
164164
wakeCh = w.cfg.Scheduler.Wake()
165165

166166
case <-tc:
167-
launchDue(time.Now())
167+
launchDue(w.cfg.NowFunc())
168168
}
169169
}
170170
}
@@ -180,7 +180,7 @@ func (w *probingWorker) runProbe(parent context.Context, rk RouteKey, mr managed
180180
ctx, cancel := context.WithTimeout(parent, 10*time.Second)
181181
defer cancel()
182182

183-
outcome := ProbeOutcome{When: time.Now()}
183+
outcome := ProbeOutcome{When: w.cfg.NowFunc()}
184184
defer func() {
185185
if r := recover(); r != nil {
186186
outcome.OK = false
@@ -197,14 +197,11 @@ func (w *probingWorker) runProbe(parent context.Context, rk RouteKey, mr managed
197197
}
198198
defer rel()
199199

200-
start := time.Now()
201200
res, err := w.cfg.ProbeFunc(ctx, mr.route)
202-
outcome.When = time.Now()
201+
outcome.When = w.cfg.NowFunc()
203202
outcome.OK = (err == nil && res.OK)
204203
outcome.Err = err
205-
if err == nil {
206-
outcome.RTT = outcome.When.Sub(start)
207-
}
204+
outcome.RTT = res.RTTMean
208205

209206
if ctx.Err() != nil {
210207
return

client/doublezerod/internal/probing/worker_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ func TestProbing_Worker_SuccessThenFailure_TransitionsAndKernel(t *testing.T) {
5757
k1, k2 := newRouteKey(r1), newRouteKey(r2)
5858
w.store.Set(k1, managedRoute{route: r1, liveness: cfg.Liveness.NewTracker()})
5959
w.store.Set(k2, managedRoute{route: r2, liveness: cfg.Liveness.NewTracker()})
60-
sched.Add(k1, time.Now())
61-
sched.Add(k2, time.Now())
60+
sched.Add(k1, now())
61+
sched.Add(k2, now())
6262

6363
w.Start(cfg.Context)
6464
t.Cleanup(w.Stop)
@@ -110,8 +110,8 @@ func TestProbing_Worker_ErrorCountsAsFailure(t *testing.T) {
110110
r2 := newTestRouteWithDst(net.IPv4(10, 0, 1, 2))
111111
w.store.Set(newRouteKey(r1), managedRoute{route: r1, liveness: cfg.Liveness.NewTracker()})
112112
w.store.Set(newRouteKey(r2), managedRoute{route: r2, liveness: cfg.Liveness.NewTracker()})
113-
sched.Add(newRouteKey(r1), time.Now())
114-
sched.Add(newRouteKey(r2), time.Now())
113+
sched.Add(newRouteKey(r1), now())
114+
sched.Add(newRouteKey(r2), now())
115115

116116
w.Start(cfg.Context)
117117
t.Cleanup(w.Stop)
@@ -182,7 +182,7 @@ func TestProbing_Worker_RespectsLimiterConcurrency(t *testing.T) {
182182
r := newTestRouteWithDst(net.IPv4(10, 0, 2, byte(i+1)))
183183
rk := newRouteKey(r)
184184
w.store.Set(rk, managedRoute{route: r, liveness: cfg.Liveness.NewTracker()})
185-
sched.Add(rk, time.Now())
185+
sched.Add(rk, now())
186186
}
187187

188188
w.Start(cfg.Context)
@@ -239,7 +239,7 @@ func TestProbing_Worker_IgnoresResultIfRouteRemoved(t *testing.T) {
239239
r := newTestRouteWithDst(net.IPv4(10, 0, 3, 1))
240240
key := newRouteKey(r)
241241
w.store.Set(key, managedRoute{route: r, liveness: cfg.Liveness.NewTracker()})
242-
sched.Add(key, time.Now())
242+
sched.Add(key, now())
243243

244244
w.Start(cfg.Context)
245245
t.Cleanup(w.Stop)
@@ -284,7 +284,7 @@ func TestProbing_Worker_ContextCancelIsNoop(t *testing.T) {
284284
r := newTestRouteWithDst(net.IPv4(10, 0, 4, 1))
285285
key := newRouteKey(r)
286286
w.store.Set(key, managedRoute{route: r, liveness: cfg.Liveness.NewTracker()})
287-
sched.Add(key, time.Now())
287+
sched.Add(key, now())
288288

289289
ctx, cancel := context.WithCancel(cfg.Context)
290290
cfg.Context = ctx
@@ -318,7 +318,7 @@ func TestProbing_Worker_KernelError_DoesNotBlockStateAdvance(t *testing.T) {
318318
r := newTestRouteWithDst(net.IPv4(10, 0, 6, 1))
319319
key := newRouteKey(r)
320320
w.store.Set(key, managedRoute{route: r, liveness: cfg.Liveness.NewTracker()})
321-
sched.Add(key, time.Now())
321+
sched.Add(key, now())
322322

323323
w.Start(cfg.Context)
324324
defer w.Stop()

0 commit comments

Comments
 (0)