Skip to content

Commit 4ceceda

Browse files
committed
peernotifier: Add peer notifier package for peer online/offline events
This commit adds a peer notifier package which provides clients with a subscription to peer online and offline events.
1 parent 8c9c4b5 commit 4ceceda

File tree

4 files changed

+150
-2
lines changed

4 files changed

+150
-2
lines changed

log.go

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/lightningnetwork/lnd/lnwallet"
3232
"github.com/lightningnetwork/lnd/monitoring"
3333
"github.com/lightningnetwork/lnd/netann"
34+
"github.com/lightningnetwork/lnd/peernotifier"
3435
"github.com/lightningnetwork/lnd/routing"
3536
"github.com/lightningnetwork/lnd/signal"
3637
"github.com/lightningnetwork/lnd/sweep"
@@ -90,6 +91,7 @@ var (
9091
chbuLog = build.NewSubLogger("CHBU", backendLog.Logger)
9192
promLog = build.NewSubLogger("PROM", backendLog.Logger)
9293
wtclLog = build.NewSubLogger("WTCL", backendLog.Logger)
94+
prnfLog = build.NewSubLogger("PRNF", backendLog.Logger)
9395
)
9496

9597
// Initialize package-global logger variables.
@@ -119,6 +121,7 @@ func init() {
119121
chanbackup.UseLogger(chbuLog)
120122
monitoring.UseLogger(promLog)
121123
wtclient.UseLogger(wtclLog)
124+
peernotifier.UseLogger(prnfLog)
122125

123126
addSubLogger(routerrpc.Subsystem, routerrpc.UseLogger)
124127
addSubLogger(wtclientrpc.Subsystem, wtclientrpc.UseLogger)
@@ -165,6 +168,7 @@ var subsystemLoggers = map[string]btclog.Logger{
165168
"CHBU": chbuLog,
166169
"PROM": promLog,
167170
"WTCL": wtclLog,
171+
"PRNF": prnfLog,
168172
}
169173

170174
// initLogRotator initializes the logging rotator to write logs to logFile and

peernotifier/log.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package peernotifier
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// log is a logger that is initialized with no output filters. This
9+
// means the package will not perform any logging by default until the caller
10+
// requests it.
11+
var log btclog.Logger
12+
13+
// The default amount of logging is none.
14+
func init() {
15+
UseLogger(build.NewSubLogger("PRNF", nil))
16+
}
17+
18+
// DisableLog disables all library log output. Logging output is disabled
19+
// by default until UseLogger is called.
20+
func DisableLog() {
21+
UseLogger(btclog.Disabled)
22+
}
23+
24+
// UseLogger uses a specified Logger to output package logging info.
25+
// This should be used in preference to SetLogWriter if the caller is also
26+
// using btclog.
27+
func UseLogger(logger btclog.Logger) {
28+
log = logger
29+
}

peernotifier/peernotifier.go

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package peernotifier
2+
3+
import (
4+
"sync"
5+
6+
"github.com/lightningnetwork/lnd/subscribe"
7+
)
8+
9+
// PeerNotifier is a subsystem which observes peer offline and online events.
10+
// It takes subscriptions for its events, and whenever it observes a new event
11+
// it notifies its subscribers over the proper channel.
12+
type PeerNotifier struct {
13+
started sync.Once
14+
stopped sync.Once
15+
16+
ntfnServer *subscribe.Server
17+
}
18+
19+
// PeerOnlineEvent represents a new event where a peer comes online.
20+
type PeerOnlineEvent struct {
21+
// PubKey is the peer's compressed public key.
22+
PubKey [33]byte
23+
}
24+
25+
// PeerOfflineEvent represents a new event where a peer goes offline.
26+
type PeerOfflineEvent struct {
27+
// PubKey is the peer's compressed public key.
28+
PubKey [33]byte
29+
}
30+
31+
// New creates a new peer notifier which notifies clients of peer online
32+
// and offline events.
33+
func New() *PeerNotifier {
34+
return &PeerNotifier{
35+
ntfnServer: subscribe.NewServer(),
36+
}
37+
}
38+
39+
// Start starts the PeerNotifier's subscription server.
40+
func (p *PeerNotifier) Start() error {
41+
var err error
42+
43+
p.started.Do(func() {
44+
log.Info("PeerNotifier starting")
45+
err = p.ntfnServer.Start()
46+
})
47+
48+
return err
49+
}
50+
51+
// Stop signals the notifier for a graceful shutdown.
52+
func (p *PeerNotifier) Stop() {
53+
p.stopped.Do(func() {
54+
log.Info("Stopping PeerNotifier")
55+
p.ntfnServer.Stop()
56+
})
57+
}
58+
59+
// SubscribePeerEvents returns a subscribe.Client that will receive updates
60+
// any time the Server is informed of a peer event.
61+
func (p *PeerNotifier) SubscribePeerEvents() (*subscribe.Client, error) {
62+
return p.ntfnServer.Subscribe()
63+
}
64+
65+
// NotifyPeerOnline sends a peer online event to all clients subscribed to the
66+
// peer notifier.
67+
func (p *PeerNotifier) NotifyPeerOnline(pubKey [33]byte) {
68+
event := PeerOnlineEvent{PubKey: pubKey}
69+
70+
log.Debugf("PeerNotifier notifying peer: %x online", pubKey)
71+
72+
if err := p.ntfnServer.SendUpdate(event); err != nil {
73+
log.Warnf("Unable to send peer online update: %v", err)
74+
}
75+
}
76+
77+
// NotifyPeerOffline sends a peer offline event to all the clients subscribed
78+
// to the peer notifier.
79+
func (p *PeerNotifier) NotifyPeerOffline(pubKey [33]byte) {
80+
event := PeerOfflineEvent{PubKey: pubKey}
81+
82+
log.Debugf("PeerNotifier notifying peer: %x offline", pubKey)
83+
84+
if err := p.ntfnServer.SendUpdate(event); err != nil {
85+
log.Warnf("Unable to send peer offline update: %v", err)
86+
}
87+
}

server.go

+30-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/lightningnetwork/lnd/lnwire"
4545
"github.com/lightningnetwork/lnd/nat"
4646
"github.com/lightningnetwork/lnd/netann"
47+
"github.com/lightningnetwork/lnd/peernotifier"
4748
"github.com/lightningnetwork/lnd/pool"
4849
"github.com/lightningnetwork/lnd/routing"
4950
"github.com/lightningnetwork/lnd/routing/route"
@@ -188,6 +189,8 @@ type server struct {
188189

189190
channelNotifier *channelnotifier.ChannelNotifier
190191

192+
peerNotifier *peernotifier.PeerNotifier
193+
191194
witnessBeacon contractcourt.WitnessBeacon
192195

193196
breachArbiter *breachArbiter
@@ -1081,6 +1084,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
10811084
return nil, err
10821085
}
10831086

1087+
// Assemble a peer notifier which will provide clients with subscriptions
1088+
// to peer online and offline events.
1089+
s.peerNotifier = peernotifier.New()
1090+
10841091
if cfg.WtClient.Active {
10851092
policy := wtpolicy.DefaultPolicy()
10861093

@@ -1183,6 +1190,10 @@ func (s *server) Start() error {
11831190
startErr = err
11841191
return
11851192
}
1193+
if err := s.peerNotifier.Start(); err != nil {
1194+
startErr = err
1195+
return
1196+
}
11861197
if err := s.sphinx.Start(); err != nil {
11871198
startErr = err
11881199
return
@@ -1341,6 +1352,7 @@ func (s *server) Stop() error {
13411352
s.chainArb.Stop()
13421353
s.sweeper.Stop()
13431354
s.channelNotifier.Stop()
1355+
s.peerNotifier.Stop()
13441356
s.cc.wallet.Shutdown()
13451357
s.cc.chainView.Stop()
13461358
s.connMgr.Stop()
@@ -2713,7 +2725,8 @@ func (s *server) addPeer(p *peer) {
27132725
// TODO(roasbeef): pipe all requests through to the
27142726
// queryHandler/peerManager
27152727

2716-
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
2728+
pubSer := p.addr.IdentityKey.SerializeCompressed()
2729+
pubStr := string(pubSer)
27172730

27182731
s.peersByPub[pubStr] = p
27192732

@@ -2722,6 +2735,13 @@ func (s *server) addPeer(p *peer) {
27222735
} else {
27232736
s.outboundPeers[pubStr] = p
27242737
}
2738+
2739+
// Inform the peer notifier of a peer online event so that it can be reported
2740+
// to clients listening for peer events.
2741+
var pubKey [33]byte
2742+
copy(pubKey[:], pubSer)
2743+
2744+
s.peerNotifier.NotifyPeerOnline(pubKey)
27252745
}
27262746

27272747
// peerInitializer asynchronously starts a newly connected peer after it has
@@ -2963,7 +2983,8 @@ func (s *server) removePeer(p *peer) {
29632983
return
29642984
}
29652985

2966-
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
2986+
pubSer := p.addr.IdentityKey.SerializeCompressed()
2987+
pubStr := string(pubSer)
29672988

29682989
delete(s.peersByPub, pubStr)
29692990

@@ -2972,6 +2993,13 @@ func (s *server) removePeer(p *peer) {
29722993
} else {
29732994
delete(s.outboundPeers, pubStr)
29742995
}
2996+
2997+
// Inform the peer notifier of a peer offline event so that it can be
2998+
// reported to clients listening for peer events.
2999+
var pubKey [33]byte
3000+
copy(pubKey[:], pubSer)
3001+
3002+
s.peerNotifier.NotifyPeerOffline(pubKey)
29753003
}
29763004

29773005
// openChanReq is a message sent to the server in order to request the

0 commit comments

Comments
 (0)