Skip to content

Commit 11f6648

Browse files
authored
Merge pull request #10036 from ellemouton/graphMig1-nodes
[graph mig 1]: graph/db: migrate graph nodes from kvdb to SQL
2 parents b815109 + cb959bd commit 11f6648

File tree

10 files changed

+818
-60
lines changed

10 files changed

+818
-60
lines changed

docs/release-notes/release-notes-0.20.0.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ circuit. The indices are only available for forwarding events saved after v0.20.
9292
* [9](https://github.com/lightningnetwork/lnd/pull/9939)
9393
* [10](https://github.com/lightningnetwork/lnd/pull/9971)
9494
* [11](https://github.com/lightningnetwork/lnd/pull/9972)
95+
* Add graph SQL migration logic:
96+
* [1](https://github.com/lightningnetwork/lnd/pull/10036)
9597

9698
## RPC Updates
9799
* Previously the `RoutingPolicy` would return the inbound fee record in its

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ require (
140140
github.com/opencontainers/image-spec v1.0.2 // indirect
141141
github.com/opencontainers/runc v1.1.12 // indirect
142142
github.com/ory/dockertest/v3 v3.10.0 // indirect
143-
github.com/pmezard/go-difflib v1.0.0
143+
github.com/pmezard/go-difflib v1.0.0 // indirect
144144
github.com/prometheus/client_model v0.2.0 // indirect
145145
github.com/prometheus/common v0.26.0 // indirect
146146
github.com/prometheus/procfs v0.6.0 // indirect

graph/db/sql_migration.go

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
package graphdb
2+
3+
import (
4+
"cmp"
5+
"context"
6+
"errors"
7+
"fmt"
8+
"net"
9+
"slices"
10+
"time"
11+
12+
"github.com/btcsuite/btcd/chaincfg/chainhash"
13+
"github.com/lightningnetwork/lnd/graph/db/models"
14+
"github.com/lightningnetwork/lnd/kvdb"
15+
"github.com/lightningnetwork/lnd/sqldb"
16+
"github.com/lightningnetwork/lnd/sqldb/sqlc"
17+
)
18+
19+
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
20+
// backend.
21+
//
22+
// NOTE: this is currently not called from any code path. It is called via tests
23+
// only for now and will be called from the main lnd binary once the
24+
// migration is fully implemented and tested.
25+
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
26+
sqlDB SQLQueries, _ chainhash.Hash) error {
27+
28+
log.Infof("Starting migration of the graph store from KV to SQL")
29+
t0 := time.Now()
30+
31+
// Check if there is a graph to migrate.
32+
graphExists, err := checkGraphExists(kvBackend)
33+
if err != nil {
34+
return fmt.Errorf("failed to check graph existence: %w", err)
35+
}
36+
if !graphExists {
37+
log.Infof("No graph found in KV store, skipping the migration")
38+
return nil
39+
}
40+
41+
// 1) Migrate all the nodes.
42+
if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
43+
return fmt.Errorf("could not migrate nodes: %w", err)
44+
}
45+
46+
// 2) Migrate the source node.
47+
if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
48+
return fmt.Errorf("could not migrate source node: %w", err)
49+
}
50+
51+
log.Infof("Finished migration of the graph store from KV to SQL in %v",
52+
time.Since(t0))
53+
54+
return nil
55+
}
56+
57+
// checkGraphExists checks if the graph exists in the KV backend.
58+
func checkGraphExists(db kvdb.Backend) (bool, error) {
59+
// Check if there is even a graph to migrate.
60+
err := db.View(func(tx kvdb.RTx) error {
61+
// Check for the existence of the node bucket which is a top
62+
// level bucket that would have been created on the initial
63+
// creation of the graph store.
64+
nodes := tx.ReadBucket(nodeBucket)
65+
if nodes == nil {
66+
return ErrGraphNotFound
67+
}
68+
69+
return nil
70+
}, func() {})
71+
if errors.Is(err, ErrGraphNotFound) {
72+
return false, nil
73+
} else if err != nil {
74+
return false, err
75+
}
76+
77+
return true, nil
78+
}
79+
80+
// migrateNodes migrates all nodes from the KV backend to the SQL database.
81+
// This includes doing a sanity check after each migration to ensure that the
82+
// migrated node matches the original node.
83+
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
84+
sqlDB SQLQueries) error {
85+
86+
// Keep track of the number of nodes migrated and the number of
87+
// nodes skipped due to errors.
88+
var (
89+
count uint64
90+
skipped uint64
91+
)
92+
93+
// Loop through each node in the KV store and insert it into the SQL
94+
// database.
95+
err := forEachNode(kvBackend, func(_ kvdb.RTx,
96+
node *models.LightningNode) error {
97+
98+
pub := node.PubKeyBytes
99+
100+
// Sanity check to ensure that the node has valid extra opaque
101+
// data. If it does not, we'll skip it. We need to do this
102+
// because previously we would just persist any TLV bytes that
103+
// we received without validating them. Now, however, we
104+
// normalise the storage of extra opaque data, so we need to
105+
// ensure that the data is valid. We don't want to abort the
106+
// migration if we encounter a node with invalid extra opaque
107+
// data, so we'll just skip it and log a warning.
108+
_, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
109+
if errors.Is(err, ErrParsingExtraTLVBytes) {
110+
skipped++
111+
log.Warnf("Skipping migration of node %x with invalid "+
112+
"extra opaque data: %v", pub,
113+
node.ExtraOpaqueData)
114+
115+
return nil
116+
} else if err != nil {
117+
return fmt.Errorf("unable to marshal extra "+
118+
"opaque data for node %x: %w", pub, err)
119+
}
120+
121+
count++
122+
123+
// TODO(elle): At this point, we should check the loaded node
124+
// to see if we should extract any DNS addresses from its
125+
// opaque type addresses. This is expected to be done in:
126+
// https://github.com/lightningnetwork/lnd/pull/9455.
127+
// This TODO is being tracked in
128+
// https://github.com/lightningnetwork/lnd/issues/9795 as this
129+
// must be addressed before making this code path active in
130+
// production.
131+
132+
// Write the node to the SQL database.
133+
id, err := upsertNode(ctx, sqlDB, node)
134+
if err != nil {
135+
return fmt.Errorf("could not persist node(%x): %w", pub,
136+
err)
137+
}
138+
139+
// Fetch it from the SQL store and compare it against the
140+
// original node object to ensure the migration was successful.
141+
dbNode, err := sqlDB.GetNodeByPubKey(
142+
ctx, sqlc.GetNodeByPubKeyParams{
143+
PubKey: node.PubKeyBytes[:],
144+
Version: int16(ProtocolV1),
145+
},
146+
)
147+
if err != nil {
148+
return fmt.Errorf("could not get node by pubkey (%x)"+
149+
"after migration: %w", pub, err)
150+
}
151+
152+
// Sanity check: ensure the migrated node ID matches the one we
153+
// just inserted.
154+
if dbNode.ID != id {
155+
return fmt.Errorf("node ID mismatch for node (%x) "+
156+
"after migration: expected %d, got %d",
157+
pub, id, dbNode.ID)
158+
}
159+
160+
migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
161+
if err != nil {
162+
return fmt.Errorf("could not build migrated node "+
163+
"from dbNode(db id: %d, node pub: %x): %w",
164+
dbNode.ID, pub, err)
165+
}
166+
167+
// Make sure that the node addresses are sorted before
168+
// comparing them to ensure that the order of addresses does
169+
// not affect the comparison.
170+
slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
171+
return cmp.Compare(i.String(), j.String())
172+
})
173+
slices.SortFunc(
174+
migratedNode.Addresses, func(i, j net.Addr) int {
175+
return cmp.Compare(i.String(), j.String())
176+
},
177+
)
178+
179+
return sqldb.CompareRecords(
180+
node, migratedNode, fmt.Sprintf("node %x", pub),
181+
)
182+
})
183+
if err != nil {
184+
return fmt.Errorf("could not migrate nodes: %w", err)
185+
}
186+
187+
log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
188+
"invalid TLV streams)", count, skipped)
189+
190+
return nil
191+
}
192+
193+
// migrateSourceNode migrates the source node from the KV backend to the
194+
// SQL database.
195+
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
196+
sqlDB SQLQueries) error {
197+
198+
sourceNode, err := sourceNode(kvdb)
199+
if errors.Is(err, ErrSourceNodeNotSet) {
200+
// If the source node has not been set yet, we can skip this
201+
// migration step.
202+
return nil
203+
} else if err != nil {
204+
return fmt.Errorf("could not get source node from kv "+
205+
"store: %w", err)
206+
}
207+
208+
pub := sourceNode.PubKeyBytes
209+
210+
// Get the DB ID of the source node by its public key. This node must
211+
// already exist in the SQL database, as it should have been migrated
212+
// in the previous node-migration step.
213+
id, err := sqlDB.GetNodeIDByPubKey(
214+
ctx, sqlc.GetNodeIDByPubKeyParams{
215+
PubKey: pub[:],
216+
Version: int16(ProtocolV1),
217+
},
218+
)
219+
if err != nil {
220+
return fmt.Errorf("could not get source node ID: %w", err)
221+
}
222+
223+
// Now we can add the source node to the SQL database.
224+
err = sqlDB.AddSourceNode(ctx, id)
225+
if err != nil {
226+
return fmt.Errorf("could not add source node to SQL store: %w",
227+
err)
228+
}
229+
230+
// Verify that the source node was added correctly by fetching it back
231+
// from the SQL database and checking that the expected DB ID and
232+
// pub key are returned. We don't need to do a whole node comparison
233+
// here, as this was already done in the previous migration step.
234+
srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
235+
if err != nil {
236+
return fmt.Errorf("could not get source nodes from SQL "+
237+
"store: %w", err)
238+
}
239+
240+
// The SQL store has support for multiple source nodes (for future
241+
// protocol versions) but this migration is purely aimed at the V1
242+
// store, and so we expect exactly one source node to be present.
243+
if len(srcNodes) != 1 {
244+
return fmt.Errorf("expected exactly one source node, "+
245+
"got %d", len(srcNodes))
246+
}
247+
248+
// Check that the source node ID and pub key match the original
249+
// source node.
250+
if srcNodes[0].NodeID != id {
251+
return fmt.Errorf("source node ID mismatch after migration: "+
252+
"expected %d, got %d", id, srcNodes[0].NodeID)
253+
}
254+
err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
255+
if err != nil {
256+
return fmt.Errorf("source node pubkey mismatch after "+
257+
"migration: %w", err)
258+
}
259+
260+
log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
261+
262+
return nil
263+
}

0 commit comments

Comments
 (0)