Skip to content

Commit 1763517

Browse files
committed
multi: updates to HasChannelPolicy
1 parent 135888d commit 1763517

File tree

5 files changed

+216
-39
lines changed

5 files changed

+216
-39
lines changed

channeldb/graph.go

Lines changed: 189 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,13 +1157,84 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx,
11571157
return chanIndex.Put(b.Bytes(), chanKey[:])
11581158
}
11591159

1160-
// HasChannelEdge returns true if the database knows of a channel edge with the
1160+
func (c *ChannelGraph) HasChannelEdge(chanID uint64) (bool, bool, error) {
1161+
var (
1162+
exists bool
1163+
isZombie bool
1164+
)
1165+
1166+
// We'll query the cache with the shared lock held to allow multiple
1167+
// readers to access values in the cache concurrently if they exist.
1168+
c.cacheMu.RLock()
1169+
if entry, ok := c.rejectCache.get(chanID); ok {
1170+
c.cacheMu.RUnlock()
1171+
exists, isZombie = entry.flags.unpack()
1172+
1173+
return exists, isZombie, nil
1174+
}
1175+
c.cacheMu.RUnlock()
1176+
1177+
c.cacheMu.Lock()
1178+
defer c.cacheMu.Unlock()
1179+
1180+
// The item was not found with the shared lock, so we'll acquire the
1181+
// exclusive lock and check the cache again in case another method added
1182+
// the entry to the cache while no lock was held.
1183+
if entry, ok := c.rejectCache.get(chanID); ok && entry.times != nil {
1184+
exists, isZombie = entry.flags.unpack()
1185+
1186+
return exists, isZombie, nil
1187+
}
1188+
1189+
if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
1190+
edges := tx.ReadBucket(edgeBucket)
1191+
if edges == nil {
1192+
return ErrGraphNoEdgesFound
1193+
}
1194+
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
1195+
if edgeIndex == nil {
1196+
return ErrGraphNoEdgesFound
1197+
}
1198+
1199+
var channelID [8]byte
1200+
byteOrder.PutUint64(channelID[:], chanID)
1201+
1202+
// If the edge doesn't exist, then we'll also check our zombie
1203+
// index.
1204+
if edgeIndex.Get(channelID[:]) == nil {
1205+
exists = false
1206+
zombieIndex := edges.NestedReadBucket(zombieBucket)
1207+
if zombieIndex != nil {
1208+
isZombie, _, _ = isZombieEdge(
1209+
zombieIndex, chanID,
1210+
)
1211+
}
1212+
1213+
return nil
1214+
}
1215+
1216+
exists = true
1217+
isZombie = false
1218+
1219+
return nil
1220+
}, func() {}); err != nil {
1221+
return exists, isZombie, err
1222+
}
1223+
1224+
c.rejectCache.insert(chanID, rejectCacheEntry{
1225+
flags: packRejectFlags(exists, isZombie),
1226+
})
1227+
1228+
return exists, isZombie, nil
1229+
}
1230+
1231+
// HasChannelEdge1 returns true if the database knows of a channel edge with the
11611232
// passed channel ID, and false otherwise. If an edge with that ID is found
11621233
// within the graph, then two time stamps representing the last time the edge
11631234
// was updated for both directed edges are returned along with the boolean. If
11641235
// it is not found, then the zombie index is checked and its result is returned
11651236
// as the second boolean.
1166-
func (c *ChannelGraph) HasChannelEdge(
1237+
func (c *ChannelGraph) HasChannelEdge1(
11671238
chanID uint64) (time.Time, time.Time, bool, bool, error) {
11681239

11691240
var (
@@ -1279,6 +1350,122 @@ func (c *ChannelGraph) HasChannelEdge(
12791350
return upd1Time, upd2Time, exists, isZombie, nil
12801351
}
12811352

1353+
func (c *ChannelGraph) HasChannelEdge2(
1354+
chanID uint64) (uint32, uint32, bool, bool, error) {
1355+
1356+
var (
1357+
upd1Height uint32
1358+
upd2Height uint32
1359+
exists bool
1360+
isZombie bool
1361+
)
1362+
1363+
// We'll query the cache with the shared lock held to allow multiple
1364+
// readers to access values in the cache concurrently if they exist.
1365+
c.cacheMu.RLock()
1366+
if entry, ok := c.rejectCache.get(chanID); ok && entry.blocks != nil {
1367+
c.cacheMu.RUnlock()
1368+
exists, isZombie = entry.flags.unpack()
1369+
1370+
return entry.blocks.updBlock1, entry.blocks.updBlock2, exists,
1371+
isZombie, nil
1372+
}
1373+
c.cacheMu.RUnlock()
1374+
1375+
c.cacheMu.Lock()
1376+
defer c.cacheMu.Unlock()
1377+
1378+
// The item was not found with the shared lock, so we'll acquire the
1379+
// exclusive lock and check the cache again in case another method added
1380+
// the entry to the cache while no lock was held.
1381+
if entry, ok := c.rejectCache.get(chanID); ok && entry.blocks != nil {
1382+
exists, isZombie = entry.flags.unpack()
1383+
1384+
return entry.blocks.updBlock1, entry.blocks.updBlock2, exists,
1385+
isZombie, nil
1386+
}
1387+
1388+
if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
1389+
edges := tx.ReadBucket(edgeBucket)
1390+
if edges == nil {
1391+
return ErrGraphNoEdgesFound
1392+
}
1393+
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
1394+
if edgeIndex == nil {
1395+
return ErrGraphNoEdgesFound
1396+
}
1397+
1398+
var channelID [8]byte
1399+
byteOrder.PutUint64(channelID[:], chanID)
1400+
1401+
// If the edge doesn't exist, then we'll also check our zombie
1402+
// index.
1403+
if edgeIndex.Get(channelID[:]) == nil {
1404+
exists = false
1405+
zombieIndex := edges.NestedReadBucket(zombieBucket)
1406+
if zombieIndex != nil {
1407+
isZombie, _, _ = isZombieEdge(
1408+
zombieIndex, chanID,
1409+
)
1410+
}
1411+
1412+
return nil
1413+
}
1414+
1415+
exists = true
1416+
isZombie = false
1417+
1418+
// If the channel has been found in the graph, then retrieve
1419+
// the edges itself so we can return the last updated
1420+
// timestamps.
1421+
nodes := tx.ReadBucket(nodeBucket)
1422+
if nodes == nil {
1423+
return ErrGraphNodeNotFound
1424+
}
1425+
1426+
edge1, edge2, err := fetchChanEdgePolicies(
1427+
edgeIndex, edges, channelID[:],
1428+
)
1429+
if err != nil {
1430+
return err
1431+
}
1432+
1433+
// As we may have only one of the edges populated, only set the
1434+
// update time if the edge was found in the database.
1435+
if edge1 != nil {
1436+
e1, ok := edge1.(*models.ChannelEdgePolicy2)
1437+
if !ok {
1438+
return fmt.Errorf("expected "+
1439+
"*ChannelEdgePolicy2, got: %T", edge1)
1440+
}
1441+
upd1Height = e1.BlockHeight.Val
1442+
}
1443+
if edge2 != nil {
1444+
e2, ok := edge2.(*models.ChannelEdgePolicy2)
1445+
if !ok {
1446+
return fmt.Errorf("expected "+
1447+
"*ChannelEdgePolicy2, got: %T", edge2)
1448+
}
1449+
1450+
upd2Height = e2.BlockHeight.Val
1451+
}
1452+
1453+
return nil
1454+
}, func() {}); err != nil {
1455+
return 0, 0, exists, isZombie, err
1456+
}
1457+
1458+
c.rejectCache.insert(chanID, rejectCacheEntry{
1459+
blocks: &updateBlocks{
1460+
updBlock1: upd1Height,
1461+
updBlock2: upd2Height,
1462+
},
1463+
flags: packRejectFlags(exists, isZombie),
1464+
})
1465+
1466+
return upd1Height, upd2Height, exists, isZombie, nil
1467+
}
1468+
12821469
// UpdateChannelEdge retrieves and update edge of the graph database. Method
12831470
// only reserved for updating an edge info after its already been created.
12841471
// In order to maintain this constraints, we return an error in the scenario

channeldb/graph_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -517,15 +517,15 @@ func TestDisconnectBlockAtHeight(t *testing.T) {
517517
}
518518

519519
// The two first edges should be removed from the db.
520-
_, _, has, isZombie, err := graph.HasChannelEdge(edgeInfo.ChannelID)
520+
has, isZombie, err := graph.HasChannelEdge(edgeInfo.ChannelID)
521521
require.NoError(t, err, "unable to query for edge")
522522
if has {
523523
t.Fatalf("edge1 was not pruned from the graph")
524524
}
525525
if isZombie {
526526
t.Fatal("reorged edge1 should not be marked as zombie")
527527
}
528-
_, _, has, isZombie, err = graph.HasChannelEdge(edgeInfo2.ChannelID)
528+
has, isZombie, err = graph.HasChannelEdge(edgeInfo2.ChannelID)
529529
require.NoError(t, err, "unable to query for edge")
530530
if has {
531531
t.Fatalf("edge2 was not pruned from the graph")
@@ -535,7 +535,7 @@ func TestDisconnectBlockAtHeight(t *testing.T) {
535535
}
536536

537537
// Edge 3 should not be removed.
538-
_, _, has, isZombie, err = graph.HasChannelEdge(edgeInfo3.ChannelID)
538+
has, isZombie, err = graph.HasChannelEdge(edgeInfo3.ChannelID)
539539
require.NoError(t, err, "unable to query for edge")
540540
if !has {
541541
t.Fatalf("edge3 was pruned from the graph")
@@ -759,7 +759,7 @@ func TestEdgeInfoUpdates(t *testing.T) {
759759

760760
// Check for existence of the edge within the database, it should be
761761
// found.
762-
_, _, found, isZombie, err := graph.HasChannelEdge(chanID)
762+
found, isZombie, err := graph.HasChannelEdge(chanID)
763763
require.NoError(t, err, "unable to query for edge")
764764
if !found {
765765
t.Fatalf("graph should have of inserted edge")
@@ -2283,7 +2283,7 @@ func TestStressTestChannelGraphAPI(t *testing.T) {
22832283
return nil
22842284
}
22852285

2286-
_, _, _, _, err := graph.HasChannelEdge(
2286+
_, _, err := graph.HasChannelEdge(
22872287
channel.id.ToUint64(),
22882288
)
22892289

graph/builder.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,9 +1146,7 @@ func (b *Builder) processUpdate(msg interface{},
11461146

11471147
// Prior to processing the announcement we first check if we
11481148
// already know of this channel, if so, then we can exit early.
1149-
_, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge(
1150-
chanID,
1151-
)
1149+
exists, isZombie, err := b.cfg.Graph.HasChannelEdge(chanID)
11521150
if err != nil &&
11531151
!errors.Is(err, channeldb.ErrGraphNoEdgesFound) {
11541152

@@ -1338,7 +1336,7 @@ func (b *Builder) processUpdate(msg interface{},
13381336
defer b.channelEdgeMtx.Unlock(msg.ChannelID)
13391337

13401338
edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
1341-
b.cfg.Graph.HasChannelEdge(msg.ChannelID)
1339+
b.cfg.Graph.HasChannelEdge1(msg.ChannelID)
13421340
if err != nil && !errors.Is(
13431341
err, channeldb.ErrGraphNoEdgesFound,
13441342
) {
@@ -1683,9 +1681,7 @@ func (b *Builder) IsPublicNode(node route.Vertex) (bool, error) {
16831681
//
16841682
// NOTE: This method is part of the ChannelGraphSource interface.
16851683
func (b *Builder) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
1686-
_, _, exists, isZombie, _ := b.cfg.Graph.HasChannelEdge(
1687-
chanID.ToUint64(),
1688-
)
1684+
exists, isZombie, _ := b.cfg.Graph.HasChannelEdge(chanID.ToUint64())
16891685

16901686
return exists || isZombie
16911687
}
@@ -1698,7 +1694,7 @@ func (b *Builder) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
16981694
timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
16991695

17001696
edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
1701-
b.cfg.Graph.HasChannelEdge(chanID.ToUint64())
1697+
b.cfg.Graph.HasChannelEdge1(chanID.ToUint64())
17021698
if err != nil {
17031699
log.Debugf("Check stale edge policy got error: %v", err)
17041700
return false

0 commit comments

Comments
 (0)