Skip to content

Commit 8091586

Browse files
committed
client/daemon: route liveness probing
1 parent dc52b45 commit 8091586

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2717
-180
lines changed

.golangci.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ linters:
7474
- third_party$
7575
- builtin$
7676
- examples$
77+
rules:
78+
- linters: [govet]
79+
text: 'buildtag: \+build line is no longer needed'
7780
formatters:
7881
enable:
7982
- gofmt

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ All notable changes to this project will be documented in this file.
88

99
### Changes
1010

11-
-- Add automated compatibility tests in CI to validate all actual testnet and mainnet state against the current codebase, ensuring backward compatibility across protocol versions.
11+
- Onchain programs
12+
- Add automated compatibility tests in CI to validate all actual testnet and mainnet state against the current codebase, ensuring backward compatibility across protocol versions.
13+
- Client
14+
- Add initial opt-in route liveness probing support to `doublezerod` via `--route-probing-enable` flag (not yet considered stable)
1215

1316
## [v0.6.9](https://github.com/malbeclabs/doublezero/compare/client/v0.6.7...client/v0.6.9) – 2025-10-24
1417

client/doublezerod/cmd/doublezerod/main.go

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//go:build linux
2+
13
package main
24

35
import (
@@ -11,27 +13,49 @@ import (
1113
"os"
1214
"os/signal"
1315
"syscall"
16+
"time"
1417

18+
"github.com/malbeclabs/doublezero/client/doublezerod/internal/api"
19+
"github.com/malbeclabs/doublezero/client/doublezerod/internal/bgp"
20+
"github.com/malbeclabs/doublezero/client/doublezerod/internal/manager"
21+
"github.com/malbeclabs/doublezero/client/doublezerod/internal/pim"
22+
"github.com/malbeclabs/doublezero/client/doublezerod/internal/probing"
23+
"github.com/malbeclabs/doublezero/client/doublezerod/internal/routing"
1524
"github.com/malbeclabs/doublezero/client/doublezerod/internal/runtime"
25+
"github.com/malbeclabs/doublezero/client/doublezerod/internal/services"
1626
"github.com/malbeclabs/doublezero/config"
1727
"github.com/prometheus/client_golang/prometheus"
1828
"github.com/prometheus/client_golang/prometheus/promauto"
1929
"github.com/prometheus/client_golang/prometheus/promhttp"
2030
)
2131

32+
const (
33+
defaultRouteProbingInterval = 1 * time.Second
34+
defaultRouteProbingMaxConcurrency = 1024
35+
defaultRouteProbingProbeTimeout = 1 * time.Second
36+
defaultRouteProbingUpThreshold = 3
37+
defaultRouteProbingDownThreshold = 3
38+
)
39+
2240
var (
23-
sockFile = flag.String("sock-file", "/var/run/doublezerod/doublezerod.sock", "path to doublezerod domain socket")
24-
enableLatencyProbing = flag.Bool("latency-probing", true, "enable latency probing to doublezero nodes")
25-
versionFlag = flag.Bool("version", false, "build version")
26-
env = flag.String("env", config.EnvTestnet, "environment to use")
27-
programId = flag.String("program-id", "", "override smartcontract program id to monitor")
28-
rpcEndpoint = flag.String("solana-rpc-endpoint", "", "override solana rpc endpoint url")
29-
probeInterval = flag.Int("probe-interval", 30, "latency probe interval in seconds")
30-
cacheUpdateInterval = flag.Int("cache-update-interval", 30, "latency cache update interval in seconds")
31-
enableVerboseLogging = flag.Bool("v", false, "enables verbose logging")
32-
enableLatencyMetrics = flag.Bool("enable-latency-metrics", false, "enables latency metrics")
33-
metricsEnable = flag.Bool("metrics-enable", false, "Enable prometheus metrics")
34-
metricsAddr = flag.String("metrics-addr", "localhost:0", "Address to listen on for prometheus metrics")
41+
sockFile = flag.String("sock-file", "/var/run/doublezerod/doublezerod.sock", "path to doublezerod domain socket")
42+
enableLatencyProbing = flag.Bool("latency-probing", true, "enable latency probing to doublezero nodes")
43+
versionFlag = flag.Bool("version", false, "build version")
44+
env = flag.String("env", config.EnvTestnet, "environment to use")
45+
programId = flag.String("program-id", "", "override smartcontract program id to monitor")
46+
rpcEndpoint = flag.String("solana-rpc-endpoint", "", "override solana rpc endpoint url")
47+
probeInterval = flag.Int("probe-interval", 30, "latency probe interval in seconds")
48+
cacheUpdateInterval = flag.Int("cache-update-interval", 30, "latency cache update interval in seconds")
49+
enableVerboseLogging = flag.Bool("v", false, "enables verbose logging")
50+
enableLatencyMetrics = flag.Bool("enable-latency-metrics", false, "enables latency metrics")
51+
metricsEnable = flag.Bool("metrics-enable", false, "Enable prometheus metrics")
52+
metricsAddr = flag.String("metrics-addr", "localhost:0", "Address to listen on for prometheus metrics")
53+
routeProbingEnable = flag.Bool("route-probing-enable", false, "enables route liveness probing")
54+
routeProbingInterval = flag.Duration("route-probing-interval", defaultRouteProbingInterval, "route liveness probing interval as a duration (i.e. 5s, 10s, 30s)")
55+
routeProbingProbeTimeout = flag.Duration("route-probing-probe-timeout", defaultRouteProbingProbeTimeout, "route liveness probing probe timeout as a duration (i.e. 1s, 3s, 5s)")
56+
routeProbingUpThreshold = flag.Uint("route-probing-up-threshold", defaultRouteProbingUpThreshold, "route liveness probing up threshold")
57+
routeProbingDownThreshold = flag.Uint("route-probing-down-threshold", defaultRouteProbingDownThreshold, "route liveness probing down threshold")
58+
routeProbingMaxConcurrency = flag.Uint("route-probing-max-concurrency", defaultRouteProbingMaxConcurrency, "route liveness probing max concurrency")
3559

3660
// set by LDFLAGS
3761
version = "dev"
@@ -114,7 +138,47 @@ func main() {
114138
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
115139
defer stop()
116140

117-
if err := runtime.Run(ctx, *sockFile, *enableLatencyProbing, *enableLatencyMetrics, *programId, *rpcEndpoint, *probeInterval, *cacheUpdateInterval); err != nil {
141+
db, err := manager.NewDb()
142+
if err != nil {
143+
slog.Error("error initializing db", "error", err)
144+
os.Exit(1)
145+
}
146+
147+
nlr := routing.Netlink{}
148+
149+
bgps, err := bgp.NewBgpServer(net.IPv4(1, 1, 1, 1))
150+
if err != nil {
151+
slog.Error("error creating bgp server", "error", err)
152+
os.Exit(1)
153+
}
154+
155+
pim := pim.NewPIMServer()
156+
157+
nlm := manager.NewNetlinkManager(nlr, bgps, db, func(userType api.UserType) (manager.Provisioner, error) {
158+
if userType != api.UserTypeIBRL || !*routeProbingEnable {
159+
return manager.CreatePassthroughService(userType, bgps, nlr, db, pim)
160+
}
161+
162+
return services.NewIBRLService(bgps, nlr, db, func(iface string, src net.IP) (bgp.RouteManager, error) {
163+
if *routeProbingEnable {
164+
return probing.NewRouteManager(&probing.Config{
165+
Logger: logger,
166+
Context: ctx,
167+
Netlink: nlr,
168+
Liveness: probing.NewHysteresisLivenessPolicy(*routeProbingUpThreshold, *routeProbingDownThreshold),
169+
ListenFunc: probing.DefaultListenFunc(logger, iface, src),
170+
ProbeFunc: probing.DefaultProbeFunc(logger, iface, *routeProbingProbeTimeout),
171+
Interval: *routeProbingInterval,
172+
ProbeTimeout: *routeProbingProbeTimeout,
173+
MaxConcurrency: *routeProbingMaxConcurrency,
174+
})
175+
} else {
176+
return manager.NewNetlinkerPassthroughRouteManager(nlr), nil
177+
}
178+
}), nil
179+
})
180+
181+
if err := runtime.Run(ctx, nlm, *sockFile, *enableLatencyProbing, *enableLatencyMetrics, *programId, *rpcEndpoint, *probeInterval, *cacheUpdateInterval); err != nil {
118182
slog.Error("runtime error", "error", err)
119183
os.Exit(1)
120184
}

client/doublezerod/internal/api/requests.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//go:build linux
2+
13
package api
24

35
import (

client/doublezerod/internal/bgp/bgp.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//go:build linux
2+
13
package bgp
24

35
import (
@@ -99,28 +101,27 @@ type PeerConfig struct {
99101
RouteTable int
100102
FlushRoutes bool
101103
NoInstall bool
104+
RouteManager RouteManager
102105
}
103106

104107
type BgpServer struct {
105-
server *corebgp.Server
106-
peerStatusChan chan SessionEvent
107-
peerStatus map[string]Session
108-
peerStatusLock sync.Mutex
109-
routeReaderWriter RouteReaderWriter
108+
server *corebgp.Server
109+
peerStatusChan chan SessionEvent
110+
peerStatus map[string]Session
111+
peerStatusLock sync.Mutex
110112
}
111113

112-
func NewBgpServer(routerID net.IP, r RouteReaderWriter) (*BgpServer, error) {
114+
func NewBgpServer(routerID net.IP) (*BgpServer, error) {
113115
corebgp.SetLogger(log.Print)
114116
srv, err := corebgp.NewServer(netip.MustParseAddr(routerID.String()))
115117
if err != nil {
116118
return nil, fmt.Errorf("error creating bgp server: %v", err)
117119
}
118120
return &BgpServer{
119-
server: srv,
120-
peerStatusChan: make(chan SessionEvent),
121-
peerStatus: make(map[string]Session),
122-
peerStatusLock: sync.Mutex{},
123-
routeReaderWriter: r,
121+
server: srv,
122+
peerStatusChan: make(chan SessionEvent),
123+
peerStatus: make(map[string]Session),
124+
peerStatusLock: sync.Mutex{},
124125
}, nil
125126
}
126127

@@ -142,7 +143,7 @@ func (b *BgpServer) AddPeer(p *PeerConfig, advertised []NLRI) error {
142143
if p.Port != 0 {
143144
peerOpts = append(peerOpts, corebgp.WithPort(p.Port))
144145
}
145-
plugin := NewBgpPlugin(advertised, p.RouteSrc, p.RouteTable, b.peerStatusChan, p.FlushRoutes, p.NoInstall, b.routeReaderWriter)
146+
plugin := NewBgpPlugin(advertised, p.RouteSrc, p.RouteTable, b.peerStatusChan, p.FlushRoutes, p.NoInstall, p.RouteManager)
146147
err := b.server.AddPeer(corebgp.PeerConfig{
147148
RemoteAddress: netip.MustParseAddr(p.RemoteAddress.String()),
148149
LocalAS: p.LocalAs,

client/doublezerod/internal/bgp/bgp_test.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//go:build linux
2+
13
package bgp_test
24

35
import (
@@ -21,28 +23,36 @@ import (
2123
"golang.org/x/sys/unix"
2224
)
2325

24-
type mockRouteReaderWriter struct {
26+
type mockRouteManager struct {
2527
routesAdded []*routing.Route
2628
routesDeleted []*routing.Route
2729
routesFlushed []*routing.Route
2830
mu sync.Mutex
2931
}
3032

31-
func (m *mockRouteReaderWriter) RouteAdd(route *routing.Route) error {
33+
func (m *mockRouteManager) PeerOnEstablished() error {
34+
return nil
35+
}
36+
37+
func (m *mockRouteManager) PeerOnClose() error {
38+
return nil
39+
}
40+
41+
func (m *mockRouteManager) RouteAdd(route *routing.Route) error {
3242
m.mu.Lock()
3343
defer m.mu.Unlock()
3444
m.routesAdded = append(m.routesAdded, route)
3545
return nil
3646
}
3747

38-
func (m *mockRouteReaderWriter) RouteDelete(route *routing.Route) error {
48+
func (m *mockRouteManager) RouteDelete(route *routing.Route) error {
3949
m.mu.Lock()
4050
defer m.mu.Unlock()
4151
m.routesDeleted = append(m.routesDeleted, route)
4252
return nil
4353
}
4454

45-
func (m *mockRouteReaderWriter) RouteByProtocol(int) ([]*routing.Route, error) {
55+
func (m *mockRouteManager) RouteByProtocol(int) ([]*routing.Route, error) {
4656
m.mu.Lock()
4757
defer m.mu.Unlock()
4858
return []*routing.Route{
@@ -58,19 +68,19 @@ func (m *mockRouteReaderWriter) RouteByProtocol(int) ([]*routing.Route, error) {
5868
}, nil
5969
}
6070

61-
func (m *mockRouteReaderWriter) getRoutesAdded() []*routing.Route {
71+
func (m *mockRouteManager) getRoutesAdded() []*routing.Route {
6272
m.mu.Lock()
6373
defer m.mu.Unlock()
6474
return append([]*routing.Route(nil), m.routesAdded...)
6575
}
6676

67-
func (m *mockRouteReaderWriter) getRoutesDeleted() []*routing.Route {
77+
func (m *mockRouteManager) getRoutesDeleted() []*routing.Route {
6878
m.mu.Lock()
6979
defer m.mu.Unlock()
7080
return append([]*routing.Route(nil), m.routesDeleted...)
7181
}
7282

73-
func (m *mockRouteReaderWriter) getRoutesFlushed() []*routing.Route {
83+
func (m *mockRouteManager) getRoutesFlushed() []*routing.Route {
7484
m.mu.Lock()
7585
defer m.mu.Unlock()
7686
return append([]*routing.Route(nil), m.routesFlushed...)
@@ -113,8 +123,8 @@ func (p *dummyPlugin) handleUpdate(peer corebgp.PeerConfig, u []byte) *corebgp.N
113123
}
114124

115125
func TestBgpServer(t *testing.T) {
116-
nlr := &mockRouteReaderWriter{}
117-
b, err := bgp.NewBgpServer(net.IP{1, 1, 1, 1}, nlr)
126+
routeManager := &mockRouteManager{}
127+
b, err := bgp.NewBgpServer(net.IP{1, 1, 1, 1})
118128
if err != nil {
119129
t.Fatalf("error creating bgp server: %v", err)
120130
}
@@ -135,6 +145,7 @@ func TestBgpServer(t *testing.T) {
135145
FlushRoutes: true,
136146
RouteTable: syscall.RT_TABLE_MAIN,
137147
RouteSrc: net.IP{7, 7, 7, 7},
148+
RouteManager: routeManager,
138149
},
139150
[]bgp.NLRI{
140151
{AsPath: []uint32{}, NextHop: "1.1.1.1", Prefix: "10.0.0.0", PrefixLength: 32},
@@ -243,7 +254,7 @@ func TestBgpServer(t *testing.T) {
243254
Table: syscall.RT_TABLE_MAIN,
244255
},
245256
}
246-
if diff := checkRoutes(nlr.getRoutesDeleted(), want); diff != "" {
257+
if diff := checkRoutes(routeManager.getRoutesDeleted(), want); diff != "" {
247258
t.Fatalf("bgp withdraw mismatch: -(got); +(want): %s", diff)
248259
}
249260
})
@@ -261,7 +272,7 @@ func TestBgpServer(t *testing.T) {
261272
Table: syscall.RT_TABLE_MAIN,
262273
},
263274
}
264-
if diff := checkRoutes(nlr.getRoutesAdded(), want); diff != "" {
275+
if diff := checkRoutes(routeManager.getRoutesAdded(), want); diff != "" {
265276
t.Fatalf("bgp add mismatch: -(got); +(want): %s", diff)
266277
}
267278
})
@@ -289,7 +300,7 @@ func TestBgpServer(t *testing.T) {
289300
Table: syscall.RT_TABLE_MAIN,
290301
},
291302
}
292-
if diff := checkRoutes(nlr.getRoutesFlushed(), want); diff != "" {
303+
if diff := checkRoutes(routeManager.getRoutesFlushed(), want); diff != "" {
293304
t.Fatalf("bgp flush mismatch: -(got); +(want): %s", diff)
294305
}
295306
})

0 commit comments

Comments
 (0)