Skip to content

[5/7]: multi: thread ChannelUpdate through codebase #8254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: elle-g175-thread-interfaces-2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 115 additions & 42 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2403,48 +2403,114 @@ func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
// IsKeepAliveUpdate determines whether this channel update is considered a
// keep-alive update based on the previous channel update processed for the same
// direction.
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
prev *models.ChannelEdgePolicy1) bool {
func IsKeepAliveUpdate(update lnwire.ChannelUpdate,
prevPolicy models.ChannelEdgePolicy) (bool, error) {

// Both updates should be from the same direction.
if update.ChannelFlags&lnwire.ChanUpdateDirection !=
prev.ChannelFlags&lnwire.ChanUpdateDirection {
switch upd := update.(type) {
case *lnwire.ChannelUpdate1:
prev, ok := prevPolicy.(*models.ChannelEdgePolicy1)
if !ok {
return false, fmt.Errorf("expected chan edge policy 1")
}

return false
}
// Both updates should be from the same direction.
if upd.ChannelFlags&lnwire.ChanUpdateDirection !=
prev.ChannelFlags&lnwire.ChanUpdateDirection {

// The timestamp should always increase for a keep-alive update.
timestamp := time.Unix(int64(update.Timestamp), 0)
if !timestamp.After(prev.LastUpdate) {
return false
}
return false, nil
}

// None of the remaining fields should change for a keep-alive update.
if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
return false
}
if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
return false
}
if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
return false
}
if update.TimeLockDelta != prev.TimeLockDelta {
return false
}
if update.HtlcMinimumMsat != prev.MinHTLC {
return false
}
if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
return false
}
if update.HtlcMaximumMsat != prev.MaxHTLC {
return false
}
if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
return false
// The timestamp should always increase for a keep-alive update.
timestamp := time.Unix(int64(upd.Timestamp), 0)
if !timestamp.After(prev.LastUpdate) {
return false, nil
}

// None of the remaining fields should change for a keep-alive
// update.
if upd.ChannelFlags.IsDisabled() !=
prev.ChannelFlags.IsDisabled() {

return false, nil
}
if lnwire.MilliSatoshi(upd.BaseFee) != prev.FeeBaseMSat {
return false, nil
}
if lnwire.MilliSatoshi(upd.FeeRate) !=
prev.FeeProportionalMillionths {

return false, nil
}
if upd.TimeLockDelta != prev.TimeLockDelta {
return false, nil
}
if upd.HtlcMinimumMsat != prev.MinHTLC {
return false, nil
}
if upd.MessageFlags.HasMaxHtlc() &&
!prev.MessageFlags.HasMaxHtlc() {

return false, nil
}
if upd.HtlcMaximumMsat != prev.MaxHTLC {
return false, nil
}
if !bytes.Equal(upd.ExtraOpaqueData, prev.ExtraOpaqueData) {
return false, nil
}

return true, nil

case *lnwire.ChannelUpdate2:
prev, ok := prevPolicy.(*models.ChannelEdgePolicy2)
if !ok {
return false, fmt.Errorf("expected chan edge policy 2")
}

// Both updates should be from the same direction.
if upd.IsNode1() != prev.IsNode1() {
return false, nil
}

// The block-height should always increase for a keep-alive
// update.
if upd.BlockHeight.Val <= prev.BlockHeight.Val {
return false, nil
}

// None of the remaining fields should change for a keep-alive
// update.
if upd.IsDisabled() != prev.IsDisabled() {
return false, nil
}
fwd := upd.ForwardingPolicy()
prevFwd := upd.ForwardingPolicy()

if fwd.BaseFee != prevFwd.BaseFee {
return false, nil
}
if fwd.FeeRate != prevFwd.FeeRate {
return false, nil
}
if fwd.TimeLockDelta != prevFwd.TimeLockDelta {
return false, nil
}
if fwd.MinHTLC != prevFwd.MinHTLC {
return false, nil
}
if fwd.MaxHTLC != prevFwd.MinHTLC {
return false, nil
}
if !bytes.Equal(upd.ExtraOpaqueData, prev.ExtraOpaqueData) {
return false, nil
}

return true, nil

default:
return false, fmt.Errorf("unhandled implementation of "+
"ChannelUpdate: %T", update)
}
return true
}

// latestHeight returns the gossiper's latest height known of the chain.
Expand Down Expand Up @@ -2968,16 +3034,14 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
graphScid = upd.ShortChannelID
}

if d.cfg.Graph.IsStaleEdgePolicy(
graphScid, timestamp, upd.ChannelFlags,
) {

if d.cfg.Graph.IsStaleEdgePolicy(graphScid, upd) {
log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
"peer=%v, msg=%s, is_remote=%v", shortChanID,
nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
)

nMsg.err <- nil

return nil, true
}

Expand Down Expand Up @@ -3159,7 +3223,16 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// heuristic of sending keep-alive updates after the same
// duration (see retransmitStaleAnns).
timeSinceLastUpdate := timestamp.Sub(edge.LastUpdate)
if IsKeepAliveUpdate(upd, edge) {
isKeepAlive, err := IsKeepAliveUpdate(upd, edge)
if err != nil {
log.Errorf("Could not determine if update is "+
"keepalive: %v", err)
nMsg.err <- err

return nil, false
}

if isKeepAlive {
if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
log.Debugf("Ignoring keep alive update not "+
"within %v period for channel %v",
Expand Down
14 changes: 10 additions & 4 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,18 @@ func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
// IsStaleEdgePolicy returns true if the graph source has a channel edge for
// the passed channel ID (and flags) that have a more recent timestamp.
func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
policy lnwire.ChannelUpdate) bool {

r.mu.Lock()
defer r.mu.Unlock()

pol, ok := policy.(*lnwire.ChannelUpdate1)
if !ok {
panic("expected chan update 1")
}

timestamp := time.Unix(int64(pol.Timestamp), 0)

chanIDInt := chanID.ToUint64()
edges, ok := r.edges[chanIDInt]
if !ok {
Expand All @@ -373,23 +380,22 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
if !isZombie {
return false
}

// Since it exists within our zombie index, we'll check that it
// respects the router's live edge horizon to determine whether
// it is stale or not.
return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
}

switch {
case flags&lnwire.ChanUpdateDirection == 0 && edges[0] != nil:
case policy.IsNode1() && edges[0] != nil:
switch edge := edges[0].(type) {
case *models.ChannelEdgePolicy1:
return !timestamp.After(edge.LastUpdate)
default:
panic(fmt.Sprintf("unhandled: %T", edges[0]))
}

case flags&lnwire.ChanUpdateDirection == 1 && edges[1] != nil:
case !policy.IsNode1() && edges[1] != nil:
switch edge := edges[1].(type) {
case *models.ChannelEdgePolicy1:
return !timestamp.After(edge.LastUpdate)
Expand Down
135 changes: 95 additions & 40 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1761,53 +1761,108 @@ func (b *Builder) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (b *Builder) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
update lnwire.ChannelUpdate) bool {

edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
b.cfg.Graph.HasChannelEdge1(chanID.ToUint64())
if err != nil {
log.Debugf("Check stale edge policy got error: %v", err)
return false
}
var (
disabled = update.IsDisabled()
isNode1 = update.IsNode1()
)

switch upd := update.(type) {
case *lnwire.ChannelUpdate1:
timestamp := time.Unix(int64(upd.Timestamp), 0)

edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
b.cfg.Graph.HasChannelEdge1(chanID.ToUint64())
if err != nil {
log.Debugf("Check stale edge policy got error: %v", err)

return false
}

// If we know of the edge as a zombie, then we'll make some additional
// checks to determine if the new policy is fresh.
if isZombie {
// When running with AssumeChannelValid, we also prune channels
// if both of their edges are disabled. We'll mark the new
// policy as stale if it remains disabled.
if b.cfg.AssumeChannelValid {
isDisabled := flags&lnwire.ChanUpdateDisabled ==
lnwire.ChanUpdateDisabled
if isDisabled {
return true
// If we know of the edge as a zombie, then we'll make some
// additional checks to determine if the new policy is fresh.
if isZombie {
// When running with AssumeChannelValid, we also prune
// channels if both of their edges are disabled. We'll
// mark the new policy as stale if it remains disabled.
if b.cfg.AssumeChannelValid {
if disabled {
return true
}
}

// Otherwise, we'll fall back to our usual
// ChannelPruneExpiry.
return time.Since(timestamp) > b.cfg.ChannelPruneExpiry
}

// Otherwise, we'll fall back to our usual ChannelPruneExpiry.
return time.Since(timestamp) > b.cfg.ChannelPruneExpiry
}
// If we don't know of the edge, then it means it's fresh (thus
// not stale).
if !exists {
return false
}

// If we don't know of the edge, then it means it's fresh (thus not
// stale).
if !exists {
return false
}
// As edges are directional edge node has a unique policy for
// the direction of the edge they control. Therefore we first
// check if we already have the most up to date information for
// that edge. If so, then we can exit early.
switch {
case isNode1:
return !edge1Timestamp.Before(timestamp)

// As edges are directional edge node has a unique policy for the
// direction of the edge they control. Therefore, we first check if we
// already have the most up-to-date information for that edge. If so,
// then we can exit early.
switch {
// A flag set of 0 indicates this is an announcement for the "first"
// node in the channel.
case flags&lnwire.ChanUpdateDirection == 0:
return !edge1Timestamp.Before(timestamp)

// Similarly, a flag set of 1 indicates this is an announcement for the
// "second" node in the channel.
case flags&lnwire.ChanUpdateDirection == 1:
return !edge2Timestamp.Before(timestamp)
case !isNode1:
return !edge2Timestamp.Before(timestamp)
}

case *lnwire.ChannelUpdate2:
height := upd.BlockHeight

edge1Height, edge2Height, exists, isZombie, err :=
b.cfg.Graph.HasChannelEdge2(chanID.ToUint64())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we can come up with an abstraction over the return values here (height vs timestamp), then we can condense this function a bit and eliminate some of the duplication.

if err != nil {
log.Debugf("Check stale edge policy got error: %v", err)

return false
}

// If we know of the edge as a zombie, then we'll make some
// additional checks to determine if the new policy is fresh.
if isZombie {
// When running with AssumeChannelValid, we also prune
// channels if both of their edges are disabled. We'll
// mark the new policy as stale if it remains disabled.
if b.cfg.AssumeChannelValid {
if disabled {
return true
}
}

// Otherwise, we'll fall back to our usual
// ChannelPruneExpiry.
blocksSince := b.SyncedHeight() - height.Val

return blocksSince >
uint32(b.cfg.ChannelPruneExpiry.Hours()*6)
}

// If we don't know of the edge, then it means it's fresh (thus
// not stale).
if !exists {
return false
}

// As edges are directional edge node has a unique policy for
// the direction of the edge they control. Therefore we first
// check if we already have the most up to date information for
// that edge. If so, then we can exit early.
switch {
case isNode1:
return edge1Height >= height.Val

case !isNode1:
return edge2Height >= height.Val
}
}

return false
Expand Down
Loading