Skip to content

peer: replace the old cond based msgStream w/ BackpressureQueue[T] #9839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: backpressure-queue
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,6 @@ replace google.golang.org/protobuf => github.com/lightninglabs/protobuf-go-hex-d
// well).
go 1.23.6

replace github.com/lightningnetwork/lnd/queue => ./queue

retract v0.0.2
154 changes: 57 additions & 97 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -1730,12 +1730,9 @@ type msgStream struct {
startMsg string
stopMsg string

msgCond *sync.Cond
msgs []lnwire.Message

mtx sync.Mutex

producerSema chan struct{}
// queue is the underlying backpressure-aware queue that manages
// messages.
queue *queue.BackpressureQueue[lnwire.Message]

wg sync.WaitGroup
quit chan struct{}
Expand All @@ -1744,28 +1741,25 @@ type msgStream struct {
// newMsgStream creates a new instance of a chanMsgStream for a particular
// channel identified by its channel ID. bufSize is the max number of messages
// that should be buffered in the internal queue. Callers should set this to a
// sane value that avoids blocking unnecessarily, but doesn't allow an
// unbounded amount of memory to be allocated to buffer incoming messages.
func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize uint32,
// sane value that avoids blocking unnecessarily, but doesn't allow an unbounded
// amount of memory to be allocated to buffer incoming messages.
func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize int,
apply func(lnwire.Message)) *msgStream {

stream := &msgStream{
peer: p,
apply: apply,
startMsg: startMsg,
stopMsg: stopMsg,
producerSema: make(chan struct{}, bufSize),
quit: make(chan struct{}),
}
stream.msgCond = sync.NewCond(&stream.mtx)

// Before we return the active stream, we'll populate the producer's
// semaphore channel. We'll use this to ensure that the producer won't
// attempt to allocate memory in the queue for an item until it has
// sufficient extra space.
for i := uint32(0); i < bufSize; i++ {
stream.producerSema <- struct{}{}
}
peer: p,
apply: apply,
startMsg: startMsg,
stopMsg: stopMsg,
quit: make(chan struct{}),
}

// Initialize the backpressure queue with a predicate determined by
// build tags.
dropPredicate := getMsgStreamDropPredicate()
stream.queue = queue.NewBackpressureQueue[lnwire.Message](
bufSize, dropPredicate,
)

return stream
}
Expand All @@ -1778,100 +1772,58 @@ func (ms *msgStream) Start() {

// Stop stops the chanMsgStream.
func (ms *msgStream) Stop() {
// TODO(roasbeef): signal too?

close(ms.quit)

// Now that we've closed the channel, we'll repeatedly signal the msg
// consumer until we've detected that it has exited.
for atomic.LoadInt32(&ms.streamShutdown) == 0 {
ms.msgCond.Signal()
time.Sleep(time.Millisecond * 100)
}

ms.wg.Wait()
}

// msgConsumer is the main goroutine that streams messages from the peer's
// readHandler directly to the target channel.
func (ms *msgStream) msgConsumer() {
defer ms.wg.Done()
defer peerLog.Tracef(ms.stopMsg)
defer ms.peer.log.Tracef(ms.stopMsg)
defer atomic.StoreInt32(&ms.streamShutdown, 1)

peerLog.Tracef(ms.startMsg)
ms.peer.log.Tracef(ms.startMsg)

ctx, _ := ms.peer.cg.Create(context.Background())

for {
// First, we'll check our condition. If the queue of messages
// is empty, then we'll wait until a new item is added.
ms.msgCond.L.Lock()
for len(ms.msgs) == 0 {
ms.msgCond.Wait()

// If we woke up in order to exit, then we'll do so.
// Otherwise, we'll check the message queue for any new
// items.
select {
case <-ms.peer.cg.Done():
ms.msgCond.L.Unlock()
return
case <-ms.quit:
ms.msgCond.L.Unlock()
return
default:
}
// Dequeue the next message. This will block until a message is
// available or the context is canceled.
msg, err := ms.queue.Dequeue(ctx)
if err != nil {
ms.peer.log.Warnf("unable to dequeue message: %v", err)
return
}

// Grab the message off the front of the queue, shifting the
// slice's reference down one in order to remove the message
// from the queue.
msg := ms.msgs[0]
ms.msgs[0] = nil // Set to nil to prevent GC leak.
ms.msgs = ms.msgs[1:]

ms.msgCond.L.Unlock()

// Apply the dequeued message.
ms.apply(msg)

// We've just successfully processed an item, so we'll signal
// to the producer that a new slot in the buffer. We'll use
// this to bound the size of the buffer to avoid allowing it to
// grow indefinitely.
// As a precaution, we'll check to see if we're already shutting
// down before adding a new message to the queue.
select {
case ms.producerSema <- struct{}{}:
case <-ms.peer.cg.Done():
return
case <-ms.quit:
return
default:
}
}
}

// AddMsg adds a new message to the msgStream. This function is safe for
// concurrent access.
func (ms *msgStream) AddMsg(msg lnwire.Message) {
// First, we'll attempt to receive from the producerSema struct. This
// acts as a semaphore to prevent us from indefinitely buffering
// incoming items from the wire. Either the msg queue isn't full, and
// we'll not block, or the queue is full, and we'll block until either
// we're signalled to quit, or a slot is freed up.
select {
case <-ms.producerSema:
case <-ms.peer.cg.Done():
return
case <-ms.quit:
func (ms *msgStream) AddMsg(ctx context.Context, msg lnwire.Message) {
dropped, err := ms.queue.Enqueue(ctx, msg).Unpack()
if err != nil {
ms.peer.log.Warnf("unable to enqueue message: %v", err)
return
}

// Next, we'll lock the condition, and add the message to the end of
// the message queue.
ms.msgCond.L.Lock()
ms.msgs = append(ms.msgs, msg)
ms.msgCond.L.Unlock()

// With the message added, we signal to the msgConsumer that there are
// additional messages to consume.
ms.msgCond.Signal()
if dropped {
ms.peer.log.Debugf("message %T dropped by predicate", msg)
}
}

// waitUntilLinkActive waits until the target link is active and returns a
Expand Down Expand Up @@ -2026,6 +1978,8 @@ func (p *Brontide) readHandler() {
// gossiper?
p.initGossipSync()

ctx, _ := p.cg.Create(context.Background())

discStream := newDiscMsgStream(p)
discStream.Start()
defer discStream.Stop()
Expand Down Expand Up @@ -2141,11 +2095,15 @@ out:

case *lnwire.Warning:
targetChan = msg.ChanID
isLinkUpdate = p.handleWarningOrError(targetChan, msg)
isLinkUpdate = p.handleWarningOrError(
ctx, targetChan, msg,
)

case *lnwire.Error:
targetChan = msg.ChanID
isLinkUpdate = p.handleWarningOrError(targetChan, msg)
isLinkUpdate = p.handleWarningOrError(
ctx, targetChan, msg,
)

case *lnwire.ChannelReestablish:
targetChan = msg.ChanID
Expand Down Expand Up @@ -2193,7 +2151,7 @@ out:
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:

discStream.AddMsg(msg)
discStream.AddMsg(ctx, msg)

case *lnwire.Custom:
err := p.handleCustomMessage(msg)
Expand All @@ -2215,7 +2173,7 @@ out:
if isLinkUpdate {
// If this is a channel update, then we need to feed it
// into the channel's in-order message stream.
p.sendLinkUpdateMsg(targetChan, nextMsg)
p.sendLinkUpdateMsg(ctx, targetChan, nextMsg)
}

idleTimer.Reset(idleTimeout)
Expand Down Expand Up @@ -2330,8 +2288,8 @@ func (p *Brontide) storeError(err error) {
// an error from a peer with an active channel, we'll store it in memory.
//
// NOTE: This method should only be called from within the readHandler.
func (p *Brontide) handleWarningOrError(chanID lnwire.ChannelID,
msg lnwire.Message) bool {
func (p *Brontide) handleWarningOrError(ctx context.Context,
chanID lnwire.ChannelID, msg lnwire.Message) bool {

if errMsg, ok := msg.(*lnwire.Error); ok {
p.storeError(errMsg)
Expand All @@ -2342,7 +2300,7 @@ func (p *Brontide) handleWarningOrError(chanID lnwire.ChannelID,
// with this peer.
case chanID == lnwire.ConnectionWideID:
for _, chanStream := range p.activeMsgStreams {
chanStream.AddMsg(msg)
chanStream.AddMsg(ctx, msg)
}

return false
Expand Down Expand Up @@ -5297,7 +5255,9 @@ func (p *Brontide) handleRemovePendingChannel(req *newChannelMsg) {

// sendLinkUpdateMsg sends a message that updates the channel to the
// channel's message stream.
func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) {
func (p *Brontide) sendLinkUpdateMsg(ctx context.Context,
cid lnwire.ChannelID, msg lnwire.Message) {

p.log.Tracef("Sending link update msg=%v", msg.MsgType())

chanStream, ok := p.activeMsgStreams[cid]
Expand All @@ -5317,7 +5277,7 @@ func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) {

// With the stream obtained, add the message to the stream so we can
// continue processing message.
chanStream.AddMsg(msg)
chanStream.AddMsg(ctx, msg)
}

// scaleTimeout multiplies the argument duration by a constant factor depending
Expand Down
57 changes: 57 additions & 0 deletions peer/drop_predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//go:build !integration

package peer

import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue"
)

const (
// redMinThreshold is the minimum queue length before RED starts dropping
// messages.
redMinThreshold = 10

// redMaxThreshold is the queue length at or above which RED drops all
// messages (that are not protected by type).
redMaxThreshold = 40
)

// isProtectedMsgType checks if a message is of a type that should not be
// dropped by the predicate.
func isProtectedMsgType(msg lnwire.Message) bool {
switch msg.(type) {
// Never drop any messages that are heading to an active channel.
case lnwire.LinkUpdater:
return true

// Make sure to never drop an incoming announcement signatures
// message, as we need this to be able to advertise channels.
//
// TODO(roasbeef): don't drop any gossip if doing IGD?
case *lnwire.AnnounceSignatures1:
return true

default:
return false
}
}

// getMsgStreamDropPredicate returns the drop predicate for the msgStream's
// BackpressureQueue. For non-integration builds, this combines a type-based
// check for critical messages with Random Early Detection (RED).
func getMsgStreamDropPredicate() queue.DropPredicate[lnwire.Message] {
redPred := queue.RandomEarlyDrop[lnwire.Message](
redMinThreshold, redMaxThreshold,
)

// We'll never dropped protected messages, for the rest we'll use the
// RED predicate.
return func(queueLen int, item lnwire.Message) bool {
if isProtectedMsgType(item) {
return false
}

return redPred(queueLen, item)
}
}
17 changes: 17 additions & 0 deletions peer/drop_predicate_integration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:build integration

package peer

import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue"
)

// getMsgStreamDropPredicate returns the drop predicate for the msgStream's
// BackpressureQueue. For integration builds, this predicate never drops
// messages.
func getMsgStreamDropPredicate() queue.DropPredicate[lnwire.Message] {
return func(queueLen int, item lnwire.Message) bool {
return false
}
}
Loading