Skip to content

Commit 6fa7295

Browse files
committed
client/daemon: route liveness / bfd / run error
1 parent 8c8778c commit 6fa7295

File tree

6 files changed

+212
-12
lines changed

6 files changed

+212
-12
lines changed

client/doublezerod/internal/liveness/manager.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type RouteKey struct {
4747
type ManagerConfig struct {
4848
Logger *slog.Logger
4949
Netlinker RouteReaderWriter
50+
UDP *UDPService
5051

5152
BindIP string // local bind address for the UDP socket (IPv4)
5253
Port int // UDP port to listen/transmit on
@@ -124,6 +125,7 @@ type Manager struct {
124125
ctx context.Context
125126
cancel context.CancelFunc
126127
wg sync.WaitGroup
128+
errCh chan error
127129

128130
log *slog.Logger
129131
cfg *ManagerConfig
@@ -150,9 +152,13 @@ func NewManager(ctx context.Context, cfg *ManagerConfig) (*Manager, error) {
150152
return nil, fmt.Errorf("error validating manager config: %v", err)
151153
}
152154

153-
udp, err := ListenUDP(cfg.BindIP, cfg.Port)
154-
if err != nil {
155-
return nil, fmt.Errorf("error creating UDP connection: %v", err)
155+
udp := cfg.UDP
156+
if udp == nil {
157+
var err error
158+
udp, err = ListenUDP(cfg.BindIP, cfg.Port)
159+
if err != nil {
160+
return nil, fmt.Errorf("error creating UDP connection: %w", err)
161+
}
156162
}
157163

158164
log := cfg.Logger
@@ -162,6 +168,7 @@ func NewManager(ctx context.Context, cfg *ManagerConfig) (*Manager, error) {
162168
m := &Manager{
163169
ctx: ctx,
164170
cancel: cancel,
171+
errCh: make(chan error, 10),
165172

166173
log: log,
167174
cfg: cfg,
@@ -184,22 +191,32 @@ func NewManager(ctx context.Context, cfg *ManagerConfig) (*Manager, error) {
184191
defer m.wg.Done()
185192
err := m.recv.Run(m.ctx)
186193
if err != nil {
187-
// TODO(snormore): What should we do when this returns an error? Reconnect/retry or
188-
// propagate up and exit the daemon?
189194
m.log.Error("liveness: error running receiver", "error", err)
195+
cancel()
196+
m.errCh <- err
190197
}
191198
}()
192199

193200
// Scheduler goroutine: handles periodic TX and detect expirations.
194201
m.wg.Add(1)
195202
go func() {
196203
defer m.wg.Done()
197-
m.sched.Run(m.ctx)
204+
err := m.sched.Run(m.ctx)
205+
if err != nil {
206+
m.log.Error("liveness: error running scheduler", "error", err)
207+
cancel()
208+
m.errCh <- err
209+
}
198210
}()
199211

200212
return m, nil
201213
}
202214

215+
// Err returns a channel that will receive any errors from the manager.
216+
func (m *Manager) Err() chan error {
217+
return m.errCh
218+
}
219+
203220
// RegisterRoute declares interest in monitoring reachability for route r via iface.
204221
// It optionally installs the route immediately in PassiveMode, then creates or
205222
// reuses a liveness Session and schedules immediate TX to begin handshake.

client/doublezerod/internal/liveness/manager_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,71 @@ func TestClient_LivenessManager_PeerKey_IPv4Canonicalization(t *testing.T) {
452452
require.True(t, ok, "peer key should use IPv4 string forms")
453453
}
454454

455+
func TestClient_Liveness_Manager_ReceiverFailure_PropagatesOnErr(t *testing.T) {
456+
t.Parallel()
457+
m, err := newTestManager(t, nil)
458+
require.NoError(t, err)
459+
defer func() { _ = m.Close() }()
460+
461+
errCh := m.Err()
462+
463+
// Close the UDP socket directly to force Receiver.Run to error out.
464+
var udp *UDPService
465+
m.mu.Lock()
466+
udp = m.udp
467+
m.mu.Unlock()
468+
require.NotNil(t, udp)
469+
_ = udp.Close()
470+
471+
// Expect an error to surface on Err().
472+
select {
473+
case e := <-errCh:
474+
require.Error(t, e)
475+
default:
476+
select {
477+
case e := <-errCh:
478+
require.Error(t, e)
479+
case <-time.After(2 * time.Second):
480+
t.Fatalf("timeout waiting for error from manager.Err after UDP close")
481+
}
482+
}
483+
484+
// Close should complete cleanly after the receiver failure.
485+
require.NoError(t, m.Close())
486+
}
487+
488+
func TestClient_Liveness_Manager_Close_NoErrOnErrCh(t *testing.T) {
489+
t.Parallel()
490+
m, err := newTestManager(t, nil)
491+
require.NoError(t, err)
492+
493+
// No spurious errors before close.
494+
func() {
495+
timer := time.NewTimer(200 * time.Millisecond)
496+
defer timer.Stop()
497+
select {
498+
case <-timer.C:
499+
return
500+
case <-m.Err():
501+
t.Fatalf("unexpected error before Close")
502+
}
503+
}()
504+
505+
require.NoError(t, m.Close())
506+
507+
// No spurious errors after close either.
508+
func() {
509+
timer := time.NewTimer(200 * time.Millisecond)
510+
defer timer.Stop()
511+
select {
512+
case <-timer.C:
513+
return
514+
case <-m.Err():
515+
t.Fatalf("unexpected error after Close")
516+
}
517+
}()
518+
}
519+
455520
func newTestManager(t *testing.T, mutate func(*ManagerConfig)) (*Manager, error) {
456521
cfg := &ManagerConfig{
457522
Logger: newTestLogger(t),

client/doublezerod/internal/liveness/scheduler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func NewScheduler(log *slog.Logger, udp *UDPService, onSessionDown func(s *Sessi
136136
// It continuously pops and processes due events, sleeping until the next one if necessary.
137137
// Each TX event sends a control packet and re-schedules the next TX;
138138
// each Detect event checks for timeout and invokes onSessionDown if expired.
139-
func (s *Scheduler) Run(ctx context.Context) {
139+
func (s *Scheduler) Run(ctx context.Context) error {
140140
s.log.Debug("liveness.scheduler: tx loop started")
141141

142142
t := time.NewTimer(time.Hour)
@@ -146,7 +146,7 @@ func (s *Scheduler) Run(ctx context.Context) {
146146
select {
147147
case <-ctx.Done():
148148
s.log.Debug("liveness.scheduler: stopped by context done", "reason", ctx.Err())
149-
return
149+
return nil
150150
default:
151151
}
152152

@@ -167,7 +167,7 @@ func (s *Scheduler) Run(ctx context.Context) {
167167
select {
168168
case <-ctx.Done():
169169
s.log.Debug("liveness.scheduler: stopped by context done", "reason", ctx.Err())
170-
return
170+
return nil
171171
case <-t.C:
172172
continue
173173
}

client/doublezerod/internal/liveness/scheduler_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,9 @@ func TestClient_Liveness_Scheduler_Run_SendsAndReschedules(t *testing.T) {
183183
s := NewScheduler(log, w, func(*Session) {})
184184
ctx, cancel := context.WithCancel(t.Context())
185185
defer cancel()
186-
go s.Run(ctx)
186+
go func() {
187+
require.NoError(t, s.Run(ctx))
188+
}()
187189

188190
sess := &Session{
189191
state: StateInit,

client/doublezerod/internal/runtime/run.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,5 +133,7 @@ func Run(ctx context.Context, sockFile string, routeConfigPath string, enableLat
133133
return nil
134134
case err := <-errCh:
135135
return err
136+
case err := <-lm.Err():
137+
return err
136138
}
137139
}

client/doublezerod/internal/runtime/run_test.go

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/malbeclabs/doublezero/client/doublezerod/internal/liveness"
3434
"github.com/malbeclabs/doublezero/client/doublezerod/internal/pim"
3535
"github.com/malbeclabs/doublezero/client/doublezerod/internal/runtime"
36+
"github.com/stretchr/testify/require"
3637
"golang.org/x/net/ipv4"
3738
"golang.org/x/sys/unix"
3839

@@ -1649,12 +1650,11 @@ func TestServiceNoCoExistence(t *testing.T) {
16491650
}()
16501651

16511652
errChan := make(chan error, 1)
1652-
ctx, _ := context.WithCancel(context.Background())
16531653

16541654
sockFile := filepath.Join(rootPath, "doublezerod.sock")
16551655
go func() {
16561656
programId := ""
1657-
err := runtime.Run(ctx, sockFile, "", false, false, programId, "", 30, 30, newTestLivenessManagerConfig())
1657+
err := runtime.Run(t.Context(), sockFile, "", false, false, programId, "", 30, 30, newTestLivenessManagerConfig())
16581658
errChan <- err
16591659
}()
16601660

@@ -2052,6 +2052,120 @@ func TestServiceCoexistence(t *testing.T) {
20522052
})
20532053
}
20542054

2055+
func TestRuntime_Run_ReturnsOnContextCancel(t *testing.T) {
2056+
errChan := make(chan error, 1)
2057+
ctx, cancel := context.WithCancel(t.Context())
2058+
defer cancel()
2059+
2060+
rootPath, err := os.MkdirTemp("", "doublezerod")
2061+
require.NoError(t, err)
2062+
defer os.RemoveAll(rootPath)
2063+
t.Setenv("XDG_STATE_HOME", rootPath)
2064+
2065+
path := filepath.Join(rootPath, "doublezerod")
2066+
if err := os.Mkdir(path, 0766); err != nil {
2067+
t.Fatalf("error creating state dir: %v", err)
2068+
}
2069+
2070+
sockFile := filepath.Join(rootPath, "doublezerod.sock")
2071+
go func() {
2072+
programId := ""
2073+
err := runtime.Run(ctx, sockFile, "", false, false, programId, "", 30, 30, newTestLivenessManagerConfig())
2074+
errChan <- err
2075+
}()
2076+
2077+
// Give the runtime a moment to start, then cancel the context to force exit.
2078+
select {
2079+
case err := <-errChan:
2080+
require.NoError(t, err)
2081+
case <-time.After(300 * time.Millisecond):
2082+
}
2083+
2084+
cancel()
2085+
select {
2086+
case err := <-errChan:
2087+
require.NoError(t, err)
2088+
case <-time.After(5 * time.Second):
2089+
t.Fatalf("timed out waiting for runtime to exit after context cancel")
2090+
}
2091+
}
2092+
2093+
func TestRuntime_Run_PropagatesLivenessStartupError(t *testing.T) {
2094+
errChan := make(chan error, 1)
2095+
ctx, cancel := context.WithCancel(t.Context())
2096+
defer cancel()
2097+
2098+
rootPath, err := os.MkdirTemp("", "doublezerod")
2099+
require.NoError(t, err)
2100+
defer os.RemoveAll(rootPath)
2101+
t.Setenv("XDG_STATE_HOME", rootPath)
2102+
2103+
// Invalid liveness config (port < 0) -> NewManager.Validate() error.
2104+
bad := *newTestLivenessManagerConfig()
2105+
bad.Port = -1
2106+
2107+
sockFile := filepath.Join(rootPath, "doublezerod.sock")
2108+
go func() {
2109+
programId := ""
2110+
err := runtime.Run(ctx, sockFile, "", false, false, programId, "", 30, 30, &bad)
2111+
errChan <- err
2112+
}()
2113+
2114+
select {
2115+
case err := <-errChan:
2116+
require.Error(t, err)
2117+
require.Contains(t, err.Error(), "port must be greater than or equal to 0")
2118+
case <-time.After(5 * time.Second):
2119+
t.Fatalf("expected startup error from runtime.Run with bad liveness config")
2120+
}
2121+
}
2122+
2123+
func TestRuntime_Run_PropagatesLivenessError_FromUDPClosure(t *testing.T) {
2124+
errCh := make(chan error, 1)
2125+
ctx, cancel := context.WithCancel(context.Background())
2126+
defer cancel()
2127+
2128+
// Minimal state dir + socket path
2129+
rootPath, err := os.MkdirTemp("", "doublezerod")
2130+
if err != nil {
2131+
t.Fatalf("mktemp: %v", err)
2132+
}
2133+
defer os.RemoveAll(rootPath)
2134+
t.Setenv("XDG_STATE_HOME", rootPath)
2135+
sockFile := filepath.Join(rootPath, "doublezerod.sock")
2136+
2137+
// Create a real UDPService we can close to induce a receiver error.
2138+
udp, err := liveness.ListenUDP("127.0.0.1", 0)
2139+
if err != nil {
2140+
t.Fatalf("ListenUDP: %v", err)
2141+
}
2142+
2143+
// Build a liveness config that uses our injected UDP service.
2144+
cfg := newTestLivenessManagerConfig()
2145+
cfg.UDP = udp
2146+
cfg.PassiveMode = true
2147+
2148+
// Start the runtime.
2149+
go func() {
2150+
programID := ""
2151+
errCh <- runtime.Run(ctx, sockFile, "", false, false, programID, "", 30, 30, cfg)
2152+
}()
2153+
2154+
// Give the liveness receiver a moment to start, then close the UDP socket.
2155+
time.Sleep(200 * time.Millisecond)
2156+
_ = udp.Close()
2157+
2158+
// The receiver should error, Manager should send on lm.Err(), and Run should return that error.
2159+
select {
2160+
case err := <-errCh:
2161+
if err == nil {
2162+
t.Fatalf("expected non-nil error propagated from liveness manager, got nil")
2163+
}
2164+
case <-time.After(5 * time.Second):
2165+
t.Fatalf("timeout waiting for runtime to return error from liveness manager")
2166+
}
2167+
}
2168+
20552169
func setupTest(t *testing.T) (func(), error) {
20562170
abortIfLinksAreUp(t)
20572171
rootPath, err := os.MkdirTemp("", "doublezerod")

0 commit comments

Comments
 (0)