Skip to content

Commit 0fc68e9

Browse files
committed
client/daemon: route liveness probing / tests cleanup
1 parent 19dce48 commit 0fc68e9

File tree

8 files changed

+150
-306
lines changed

8 files changed

+150
-306
lines changed

client/doublezerod/internal/probing/limiter_test.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ func TestProbing_SemaphoreLimiter_BasicAcquireRelease(t *testing.T) {
2626
l, err := NewSemaphoreLimiter(2)
2727
require.NoError(t, err)
2828

29-
// Acquire twice should succeed immediately.
3029
rel1, ok := l.Acquire(context.Background())
3130
require.True(t, ok)
3231
require.NotNil(t, rel1)
@@ -35,7 +34,6 @@ func TestProbing_SemaphoreLimiter_BasicAcquireRelease(t *testing.T) {
3534
require.True(t, ok)
3635
require.NotNil(t, rel2)
3736

38-
// Third acquire should block until a release happens.
3937
acquired := make(chan struct{})
4038
var rel3 func()
4139

@@ -47,25 +45,30 @@ func TestProbing_SemaphoreLimiter_BasicAcquireRelease(t *testing.T) {
4745
}
4846
}()
4947

50-
// Ensure it does NOT acquire within a short window (still blocked).
48+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
49+
defer cancel()
50+
done := make(chan struct{})
51+
go func() {
52+
select {
53+
case <-acquired:
54+
case <-ctx.Done():
55+
}
56+
close(done)
57+
}()
5158
select {
52-
case <-acquired:
53-
t.Fatalf("third Acquire should be blocked until a release")
54-
case <-time.After(50 * time.Millisecond):
55-
// expected: still blocked
59+
case <-done:
60+
require.NotNil(t, ctx.Err())
61+
default:
5662
}
5763

58-
// Release one permit; now the goroutine should acquire shortly.
5964
rel1()
6065

6166
select {
6267
case <-acquired:
63-
// got the third permit
64-
case <-time.After(250 * time.Millisecond):
68+
case <-time.After(500 * time.Millisecond):
6569
t.Fatalf("third Acquire did not succeed after a release")
6670
}
6771

68-
// Cleanup remaining permits
6972
rel2()
7073
if rel3 != nil {
7174
rel3()
@@ -78,33 +81,30 @@ func TestProbing_SemaphoreLimiter_CancelWhileBlocked(t *testing.T) {
7881
l, err := NewSemaphoreLimiter(1)
7982
require.NoError(t, err)
8083

81-
// Fill the single permit.
8284
rel, ok := l.Acquire(context.Background())
8385
require.True(t, ok)
8486
require.NotNil(t, rel)
8587

86-
// Attempt another acquire with a short timeout; it should fail when context expires.
8788
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
8889
defer cancel()
8990

90-
done := make(chan struct{})
91+
got := make(chan struct{})
9192
var gotOK bool
9293
var gotRel func()
9394

9495
go func() {
95-
defer close(done)
96+
defer close(got)
9697
gotRel, gotOK = l.Acquire(ctx)
9798
}()
9899

99100
select {
100-
case <-done:
101-
require.False(t, gotOK, "Acquire should return ok=false when context times out")
101+
case <-got:
102+
require.False(t, gotOK)
102103
require.Nil(t, gotRel)
103104
case <-time.After(200 * time.Millisecond):
104105
t.Fatalf("Acquire did not return after context timeout")
105106
}
106107

107-
// Release original permit to avoid leaks.
108108
rel()
109109
}
110110

@@ -123,21 +123,21 @@ func TestProbing_SemaphoreLimiter_SequentialAcquireReleaseCycles(t *testing.T) {
123123
require.True(t, ok)
124124
require.NotNil(t, relB)
125125

126-
// A third should block; verify it doesn’t acquire within 20ms.
127-
acquired := make(chan struct{})
126+
blockCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
127+
defer cancel()
128+
acqDone := make(chan struct{})
129+
var acqOK bool
128130
go func() {
129-
if r, ok := l.Acquire(context.Background()); ok {
130-
r() // immediately release if we ever acquire (we shouldn't before the test releases)
131-
close(acquired)
132-
}
131+
defer close(acqDone)
132+
_, acqOK = l.Acquire(blockCtx)
133133
}()
134134
select {
135-
case <-acquired:
136-
t.Fatalf("Acquire should not succeed before a release (iteration %d)", i)
137-
case <-time.After(20 * time.Millisecond):
135+
case <-acqDone:
136+
require.False(t, acqOK, "Acquire should not succeed before a release (iteration %d)", i)
137+
case <-time.After(200 * time.Millisecond):
138+
t.Fatalf("Acquire did not return within timeout (iteration %d)", i)
138139
}
139140

140-
// Release both and ensure we can acquire twice again.
141141
relA()
142142
relB()
143143

client/doublezerod/internal/probing/main_test.go

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -342,42 +342,6 @@ func (s *fakeScheduler) Complete(k RouteKey, _ ProbeOutcome) {
342342
s.mu.Unlock()
343343
}
344344

345-
//nolint:unused
346-
func waitForProbe(t *testing.T, ch <-chan struct{}, sched *fakeScheduler, d time.Duration) {
347-
t.Helper()
348-
deadline := time.After(d)
349-
tick := time.NewTicker(5 * time.Millisecond)
350-
defer tick.Stop()
351-
352-
for {
353-
select {
354-
case <-ch:
355-
return
356-
case <-tick.C:
357-
sched.Trigger() // keep nudging the worker
358-
case <-deadline:
359-
t.Fatalf("probe did not start within %v", d)
360-
}
361-
}
362-
}
363-
364-
func startNudger(t *testing.T, sched *fakeScheduler, every time.Duration) (stop func()) {
365-
done := make(chan struct{})
366-
go func() {
367-
tk := time.NewTicker(every)
368-
defer tk.Stop()
369-
for {
370-
select {
371-
case <-tk.C:
372-
sched.Trigger()
373-
case <-done:
374-
return
375-
}
376-
}
377-
}()
378-
return func() { close(done) }
379-
}
380-
381345
func waitEdge(t *testing.T, ch <-chan struct{}, d time.Duration, msg string) {
382346
t.Helper()
383347
select {
@@ -419,24 +383,6 @@ want:
419383
t.Logf("\n==== BEGIN FILTERED GOROUTINES ====\n%s==== END FILTERED GOROUTINES ====\n", out.String())
420384
}
421385

422-
//nolint:unused
423-
func requireEventuallyDump(t *testing.T, cond func() bool, wait, tick time.Duration, why string) {
424-
deadline := time.Now().Add(wait)
425-
for time.Now().Before(deadline) {
426-
if cond() {
427-
return
428-
}
429-
time.Sleep(tick)
430-
}
431-
// last chance
432-
if cond() {
433-
return
434-
}
435-
dumpGoroutines(t)
436-
dumpGoroutinesFiltered(t, "/internal/probing", "probingWorker", "fakeScheduler")
437-
t.Fatalf("eventually failed: %s (wait=%v tick=%v)", why, wait, tick)
438-
}
439-
440386
type memoryNetlinker struct {
441387
routesByDst map[string][]*routing.Route
442388

client/doublezerod/internal/probing/manager_test.go

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ import (
1414
)
1515

1616
func TestProbing_RouteManager_PeerLifecycle_StartsAndStopsWorker_Idempotent(t *testing.T) {
17-
t.Parallel()
18-
1917
cfg := newTestConfig(t, func(c *Config) {
2018
c.Limiter, _ = NewSemaphoreLimiter(4)
2119
c.Scheduler = newFakeScheduler()
@@ -38,8 +36,6 @@ func TestProbing_RouteManager_PeerLifecycle_StartsAndStopsWorker_Idempotent(t *t
3836
}
3937

4038
func TestProbing_RouteManager_RouteAdd_WorkerRunning_StoresButNoKernelAdd(t *testing.T) {
41-
t.Parallel()
42-
4339
var addCalls int64
4440
cfg := newTestConfig(t, func(c *Config) {
4541
c.Limiter, _ = NewSemaphoreLimiter(4)
@@ -61,11 +57,8 @@ func TestProbing_RouteManager_RouteAdd_WorkerRunning_StoresButNoKernelAdd(t *tes
6157
}
6258

6359
func TestProbing_RouteManager_RouteAdd_WorkerStopped_CallsKernelAdd(t *testing.T) {
64-
t.Parallel()
65-
6660
var addCalls int64
6761
cfg := newTestConfig(t, func(c *Config) {
68-
// No scheduler/limiter needed since worker won't be started
6962
c.Netlink = &MockNetlinker{
7063
RouteAddFunc: func(*routing.Route) error { atomic.AddInt64(&addCalls, 1); return nil },
7164
RouteDeleteFunc: func(*routing.Route) error { return nil },
@@ -81,22 +74,18 @@ func TestProbing_RouteManager_RouteAdd_WorkerStopped_CallsKernelAdd(t *testing.T
8174
}
8275

8376
func TestProbing_RouteManager_RouteAdd_InvalidRoute_Err(t *testing.T) {
84-
t.Parallel()
85-
8677
cfg := newTestConfig(t, func(c *Config) {
8778
c.Limiter, _ = NewSemaphoreLimiter(1)
8879
c.Scheduler = newFakeScheduler()
8980
})
9081
m, _ := NewRouteManager(cfg)
9182
_ = m.PeerOnEstablished()
92-
err := m.RouteAdd(&routing.Route{}) // invalid
83+
err := m.RouteAdd(&routing.Route{})
9384
require.Error(t, err)
9485
require.Equal(t, 0, m.store.Len())
9586
}
9687

9788
func TestProbing_RouteManager_RouteDelete_WorkerRunning_RemovesAndKernelDelete(t *testing.T) {
98-
t.Parallel()
99-
10089
var delCalls int64
10190
cfg := newTestConfig(t, func(c *Config) {
10291
c.Limiter, _ = NewSemaphoreLimiter(4)
@@ -118,8 +107,6 @@ func TestProbing_RouteManager_RouteDelete_WorkerRunning_RemovesAndKernelDelete(t
118107
}
119108

120109
func TestProbing_RouteManager_RouteDelete_WorkerRunning_RouteNotFoundInKernel_DoesNotReturnError(t *testing.T) {
121-
t.Parallel()
122-
123110
var delCalls int64
124111
cfg := newTestConfig(t, func(c *Config) {
125112
c.Limiter, _ = NewSemaphoreLimiter(4)
@@ -141,8 +128,6 @@ func TestProbing_RouteManager_RouteDelete_WorkerRunning_RouteNotFoundInKernel_Do
141128
}
142129

143130
func TestProbing_RouteManager_RouteDelete_WorkerStopped_CallsKernelDelete(t *testing.T) {
144-
t.Parallel()
145-
146131
var delCalls int64
147132
cfg := newTestConfig(t, func(c *Config) {
148133
c.Netlink = &MockNetlinker{
@@ -158,8 +143,6 @@ func TestProbing_RouteManager_RouteDelete_WorkerStopped_CallsKernelDelete(t *tes
158143
}
159144

160145
func TestProbing_RouteManager_RouteDelete_InvalidRoute_Err(t *testing.T) {
161-
t.Parallel()
162-
163146
cfg := newTestConfig(t, func(c *Config) {
164147
c.Limiter, _ = NewSemaphoreLimiter(1)
165148
c.Scheduler = newFakeScheduler()
@@ -172,8 +155,6 @@ func TestProbing_RouteManager_RouteDelete_InvalidRoute_Err(t *testing.T) {
172155
}
173156

174157
func TestProbing_RouteManager_RouteByProtocol_Passthrough(t *testing.T) {
175-
t.Parallel()
176-
177158
r := newTestRouteWithDst(net.IPv4(10, 0, 0, 30))
178159

179160
cfg := newTestConfig(t, func(c *Config) {
@@ -193,16 +174,12 @@ func TestProbing_RouteManager_RouteByProtocol_Passthrough(t *testing.T) {
193174
}
194175

195176
func TestProbing_RouteManager_NewRouteManager_ConfigValidateErrorBubbles(t *testing.T) {
196-
t.Parallel()
197-
198177
cfg := newTestConfig(t, func(c *Config) { c.Logger = nil })
199178
_, err := NewRouteManager(cfg)
200179
require.Error(t, err)
201180
}
202181

203-
func TestProbing_RouteManager_PeerOnEstablished_StartsWorkerAndProbes(t *testing.T) {
204-
t.Parallel()
205-
182+
func TestProbing_RouteManager_PeerOnEstablished_StartsAndProbes(t *testing.T) {
206183
probed := make(chan struct{}, 1)
207184
sched := newFakeScheduler()
208185
cfg := newTestConfig(t, func(c *Config) {
@@ -227,17 +204,14 @@ func TestProbing_RouteManager_PeerOnEstablished_StartsWorkerAndProbes(t *testing
227204
r2 := newTestRouteWithDst(net.IPv4(10, 0, 0, 41))
228205
require.NoError(t, m.RouteAdd(r2))
229206

230-
stop := startNudger(t, sched, 5*time.Millisecond)
231-
t.Cleanup(stop)
207+
sched.Trigger()
232208
waitEdge(t, probed, 2*time.Second, "probe did not start")
233209

234210
require.NoError(t, m.PeerOnClose())
235211
require.Equal(t, 0, m.store.Len())
236212
}
237213

238214
func TestProbing_RouteManager_PeerOnEstablished_ClearsStore(t *testing.T) {
239-
t.Parallel()
240-
241215
cfg := newTestConfig(t, func(c *Config) {
242216
c.Limiter, _ = NewSemaphoreLimiter(1)
243217
c.Scheduler = newFakeScheduler()
@@ -251,8 +225,6 @@ func TestProbing_RouteManager_PeerOnEstablished_ClearsStore(t *testing.T) {
251225
}
252226

253227
func TestProbing_RouteManager_PeerOnClose_ClearsStore(t *testing.T) {
254-
t.Parallel()
255-
256228
cfg := newTestConfig(t, func(c *Config) {
257229
c.Limiter, _ = NewSemaphoreLimiter(1)
258230
c.Scheduler = newFakeScheduler()

0 commit comments

Comments
 (0)