Skip to content

Commit b6410bf

Browse files
committed
channeldb: update reject cache
To take block heights too.
1 parent 84abc0a commit b6410bf

File tree

5 files changed

+134
-52
lines changed

5 files changed

+134
-52
lines changed

channeldb/graph.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,10 +1162,10 @@ func (c *ChannelGraph) HasChannelEdge(
11621162
// We'll query the cache with the shared lock held to allow multiple
11631163
// readers to access values in the cache concurrently if they exist.
11641164
c.cacheMu.RLock()
1165-
if entry, ok := c.rejectCache.get(chanID); ok {
1165+
if entry, ok := c.rejectCache.get(chanID); ok && entry.times != nil {
11661166
c.cacheMu.RUnlock()
1167-
upd1Time = time.Unix(entry.upd1Time, 0)
1168-
upd2Time = time.Unix(entry.upd2Time, 0)
1167+
upd1Time = time.Unix(entry.times.upd1Time, 0)
1168+
upd2Time = time.Unix(entry.times.upd2Time, 0)
11691169
exists, isZombie = entry.flags.unpack()
11701170
return upd1Time, upd2Time, exists, isZombie, nil
11711171
}
@@ -1177,9 +1177,9 @@ func (c *ChannelGraph) HasChannelEdge(
11771177
// The item was not found with the shared lock, so we'll acquire the
11781178
// exclusive lock and check the cache again in case another method added
11791179
// the entry to the cache while no lock was held.
1180-
if entry, ok := c.rejectCache.get(chanID); ok {
1181-
upd1Time = time.Unix(entry.upd1Time, 0)
1182-
upd2Time = time.Unix(entry.upd2Time, 0)
1180+
if entry, ok := c.rejectCache.get(chanID); ok && entry.times != nil {
1181+
upd1Time = time.Unix(entry.times.upd1Time, 0)
1182+
upd2Time = time.Unix(entry.times.upd2Time, 0)
11831183
exists, isZombie = entry.flags.unpack()
11841184
return upd1Time, upd2Time, exists, isZombie, nil
11851185
}
@@ -1244,9 +1244,11 @@ func (c *ChannelGraph) HasChannelEdge(
12441244
}
12451245

12461246
c.rejectCache.insert(chanID, rejectCacheEntry{
1247-
upd1Time: upd1Time.Unix(),
1248-
upd2Time: upd2Time.Unix(),
1249-
flags: packRejectFlags(exists, isZombie),
1247+
times: &updateTimes{
1248+
upd1Time: upd1Time.Unix(),
1249+
upd2Time: upd2Time.Unix(),
1250+
},
1251+
flags: packRejectFlags(exists, isZombie),
12501252
})
12511253

12521254
return upd1Time, upd2Time, exists, isZombie, nil
@@ -2817,33 +2819,39 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy1,
28172819
return c.chanScheduler.Execute(r)
28182820
}
28192821

2820-
func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy1,
2822+
func (c *ChannelGraph) updateEdgeCache(e models.ChannelEdgePolicy,
28212823
isUpdate1 bool) {
28222824

28232825
// If an entry for this channel is found in reject cache, we'll modify
28242826
// the entry with the updated timestamp for the direction that was just
28252827
// written. If the edge doesn't exist, we'll load the cache entry lazily
28262828
// during the next query for this edge.
2827-
if entry, ok := c.rejectCache.get(e.ChannelID); ok {
2828-
if isUpdate1 {
2829-
entry.upd1Time = e.LastUpdate.Unix()
2830-
} else {
2831-
entry.upd2Time = e.LastUpdate.Unix()
2832-
}
2833-
c.rejectCache.insert(e.ChannelID, entry)
2829+
chanID := e.SCID().ToUint64()
2830+
if entry, ok := c.rejectCache.get(chanID); ok {
2831+
entry.update(isUpdate1, e)
2832+
c.rejectCache.insert(chanID, entry)
28342833
}
28352834

28362835
// If an entry for this channel is found in channel cache, we'll modify
28372836
// the entry with the updated policy for the direction that was just
28382837
// written. If the edge doesn't exist, we'll defer loading the info and
28392838
// policies and lazily read from disk during the next query.
2840-
if channel, ok := c.chanCache.get(e.ChannelID); ok {
2839+
if channel, ok := c.chanCache.get(chanID); ok {
2840+
edge, ok := e.(*models.ChannelEdgePolicy1)
2841+
if !ok {
2842+
log.Errorf("expected *models.ChannelEdgePolicy1, "+
2843+
"got: %T", e)
2844+
2845+
return
2846+
}
2847+
28412848
if isUpdate1 {
2842-
channel.Policy1 = e
2849+
channel.Policy1 = edge
28432850
} else {
2844-
channel.Policy2 = e
2851+
channel.Policy2 = edge
28452852
}
2846-
c.chanCache.insert(e.ChannelID, channel)
2853+
2854+
c.chanCache.insert(chanID, channel)
28472855
}
28482856
}
28492857

channeldb/graph_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3622,7 +3622,17 @@ func compareNodes(a, b *LightningNode) error {
36223622

36233623
// compareEdgePolicies is used to compare two ChannelEdgePolices using
36243624
// compareNodes, so as to exclude comparisons of the Nodes' Features struct.
3625-
func compareEdgePolicies(a, b *models.ChannelEdgePolicy1) error {
3625+
func compareEdgePolicies(edgeA, edgeB models.ChannelEdgePolicy) error {
3626+
a, ok := edgeA.(*models.ChannelEdgePolicy1)
3627+
if !ok {
3628+
return fmt.Errorf("wanted edge policy 1")
3629+
}
3630+
3631+
b, ok := edgeB.(*models.ChannelEdgePolicy1)
3632+
if !ok {
3633+
return fmt.Errorf("wanted edge policy 1")
3634+
}
3635+
36263636
if a.ChannelID != b.ChannelID {
36273637
return fmt.Errorf("ChannelID doesn't match: expected %v, "+
36283638
"got %v", a.ChannelID, b.ChannelID)

channeldb/reject_cache.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package channeldb
22

3+
import "github.com/lightningnetwork/lnd/channeldb/models"
4+
35
// rejectFlags is a compact representation of various metadata stored by the
46
// reject cache about a particular channel.
57
type rejectFlags uint8
@@ -41,9 +43,47 @@ func (f rejectFlags) unpack() (bool, bool) {
4143
// including the timestamps of its latest edge policies and whether or not the
4244
// channel exists in the graph.
4345
type rejectCacheEntry struct {
46+
times *updateTimes
47+
blocks *updateBlocks
48+
flags rejectFlags
49+
}
50+
51+
type updateTimes struct {
4452
upd1Time int64
4553
upd2Time int64
46-
flags rejectFlags
54+
}
55+
56+
type updateBlocks struct {
57+
updBlock1 uint32
58+
updBlock2 uint32
59+
}
60+
61+
func (e *rejectCacheEntry) update(isUpdate1 bool,
62+
policy models.ChannelEdgePolicy) {
63+
64+
switch pol := policy.(type) {
65+
case *models.ChannelEdgePolicy1:
66+
if e.times == nil {
67+
e.times = &updateTimes{}
68+
}
69+
70+
if isUpdate1 {
71+
e.times.upd1Time = pol.LastUpdate.Unix()
72+
} else {
73+
e.times.upd2Time = pol.LastUpdate.Unix()
74+
}
75+
76+
case *models.ChannelEdgePolicy2:
77+
if e.blocks == nil {
78+
e.blocks = &updateBlocks{}
79+
}
80+
81+
if isUpdate1 {
82+
e.blocks.updBlock1 = pol.BlockHeight.Val
83+
} else {
84+
e.blocks.updBlock2 = pol.BlockHeight.Val
85+
}
86+
}
4787
}
4888

4989
// rejectCache is an in-memory cache used to improve the performance of

channeldb/reject_cache_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,10 @@ func entryForInt(i uint64) rejectCacheEntry {
100100
exists := i%2 == 0
101101
isZombie := i%3 == 0
102102
return rejectCacheEntry{
103-
upd1Time: int64(2 * i),
104-
upd2Time: int64(2*i + 1),
105-
flags: packRejectFlags(exists, isZombie),
103+
times: &updateTimes{
104+
upd1Time: int64(2 * i),
105+
upd2Time: int64(2*i + 1),
106+
},
107+
flags: packRejectFlags(exists, isZombie),
106108
}
107109
}

graph/builder.go

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -474,30 +474,6 @@ func (b *Builder) syncGraphWithChain() error {
474474
return nil
475475
}
476476

477-
// isZombieChannel takes two edge policy updates and determines if the
478-
// corresponding channel should be considered a zombie. The first boolean is
479-
// true if the policy update from node 1 is considered a zombie, the second
480-
// boolean is that of node 2, and the final boolean is true if the channel
481-
// is considered a zombie.
482-
func (b *Builder) isZombieChannel(e1,
483-
e2 *models.ChannelEdgePolicy1) (bool, bool, bool) {
484-
485-
chanExpiry := b.cfg.ChannelPruneExpiry
486-
487-
e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry
488-
e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry
489-
490-
var e1Time, e2Time time.Time
491-
if e1 != nil {
492-
e1Time = e1.LastUpdate
493-
}
494-
if e2 != nil {
495-
e2Time = e2.LastUpdate
496-
}
497-
498-
return e1Zombie, e2Zombie, b.IsZombieChannel(e1Time, e2Time)
499-
}
500-
501477
// IsZombieChannel takes the timestamps of the latest channel updates for a
502478
// channel and returns true if the channel should be considered a zombie based
503479
// on these timestamps.
@@ -512,6 +488,10 @@ func (b *Builder) IsZombieChannel(updateTime1,
512488
e2Zombie := updateTime2.IsZero() ||
513489
time.Since(updateTime2) >= chanExpiry
514490

491+
return b.isZombieChannel(e1Zombie, e2Zombie)
492+
}
493+
494+
func (b *Builder) isZombieChannel(e1Zombie, e2Zombie bool) bool {
515495
// If we're using strict zombie pruning, then a channel is only
516496
// considered live if both edges have a recent update we know of.
517497
if b.cfg.StrictZombiePruning {
@@ -562,7 +542,15 @@ func (b *Builder) pruneZombieChans() error {
562542
return nil
563543
}
564544

565-
e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2)
545+
e1Zombie, err := b.isZombieEdge(e1)
546+
if err != nil {
547+
return err
548+
}
549+
550+
e2Zombie, err := b.isZombieEdge(e2)
551+
if err != nil {
552+
return err
553+
}
566554

567555
if e1Zombie {
568556
log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie",
@@ -577,7 +565,7 @@ func (b *Builder) pruneZombieChans() error {
577565
// If either edge hasn't been updated for a period of
578566
// chanExpiry, then we'll mark the channel itself as eligible
579567
// for graph pruning.
580-
if !isZombieChan {
568+
if !b.isZombieChannel(e1Zombie, e2Zombie) {
581569
return nil
582570
}
583571

@@ -662,6 +650,40 @@ func (b *Builder) pruneZombieChans() error {
662650
return nil
663651
}
664652

653+
func (b *Builder) isZombieEdge(edge models.ChannelEdgePolicy) (bool,
654+
error) {
655+
656+
if edge == nil {
657+
return true, nil
658+
}
659+
660+
switch e := edge.(type) {
661+
case *models.ChannelEdgePolicy1:
662+
chanExpiry := b.cfg.ChannelPruneExpiry
663+
664+
if e == nil {
665+
return true, nil
666+
}
667+
668+
return time.Since(e.LastUpdate) >= chanExpiry, nil
669+
670+
case *models.ChannelEdgePolicy2:
671+
chanExpiryBlocks := uint32(b.cfg.ChannelPruneExpiry.Hours() * 6)
672+
673+
if e == nil {
674+
return true, nil
675+
}
676+
677+
blockSince := b.SyncedHeight() - e.BlockHeight.Val
678+
679+
return blockSince >= chanExpiryBlocks, nil
680+
681+
default:
682+
return false, fmt.Errorf("unhandled implementation of "+
683+
"models.ChannelEdgePolicy: %T", edge)
684+
}
685+
}
686+
665687
// handleNetworkUpdate is responsible for processing the update message and
666688
// notifies topology changes, if any.
667689
//

0 commit comments

Comments
 (0)