1
1
package graphdb
2
2
3
3
import (
4
+ "cmp"
4
5
"context"
5
6
"errors"
6
7
"fmt"
8
+ "net"
9
+ "slices"
7
10
"time"
8
11
9
12
"github.com/btcsuite/btcd/chaincfg/chainhash"
13
+ "github.com/lightningnetwork/lnd/graph/db/models"
10
14
"github.com/lightningnetwork/lnd/kvdb"
15
+ "github.com/lightningnetwork/lnd/sqldb"
16
+ "github.com/lightningnetwork/lnd/sqldb/sqlc"
11
17
)
12
18
13
19
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
@@ -16,8 +22,8 @@ import (
16
22
// NOTE: this is currently not called from any code path. It is called via tests
17
23
// only for now and will be called from the main lnd binary once the
18
24
// migration is fully implemented and tested.
19
- func MigrateGraphToSQL (_ context.Context , kvBackend kvdb.Backend ,
20
- _ SQLQueries , _ chainhash.Hash ) error {
25
+ func MigrateGraphToSQL (ctx context.Context , kvBackend kvdb.Backend ,
26
+ sqlDB SQLQueries , _ chainhash.Hash ) error {
21
27
22
28
log .Infof ("Starting migration of the graph store from KV to SQL" )
23
29
t0 := time .Now ()
@@ -32,6 +38,11 @@ func MigrateGraphToSQL(_ context.Context, kvBackend kvdb.Backend,
32
38
return nil
33
39
}
34
40
41
+ // 1) Migrate all the nodes.
42
+ if err := migrateNodes (ctx , kvBackend , sqlDB ); err != nil {
43
+ return fmt .Errorf ("could not migrate nodes: %w" , err )
44
+ }
45
+
35
46
log .Infof ("Finished migration of the graph store from KV to SQL in %v" ,
36
47
time .Since (t0 ))
37
48
@@ -60,3 +71,116 @@ func checkGraphExists(db kvdb.Backend) (bool, error) {
60
71
61
72
return true , nil
62
73
}
74
+
75
+ // migrateNodes migrates all nodes from the KV backend to the SQL database.
76
+ // This includes doing a sanity check after each migration to ensure that the
77
+ // migrated node matches the original node.
78
+ func migrateNodes (ctx context.Context , kvBackend kvdb.Backend ,
79
+ sqlDB SQLQueries ) error {
80
+
81
+ // Keep track of the number of nodes migrated and the number of
82
+ // nodes skipped due to errors.
83
+ var (
84
+ count uint64
85
+ skipped uint64
86
+ )
87
+
88
+ // Loop through each node in the KV store and insert it into the SQL
89
+ // database.
90
+ err := forEachNode (kvBackend , func (_ kvdb.RTx ,
91
+ node * models.LightningNode ) error {
92
+
93
+ pub := node .PubKeyBytes
94
+
95
+ // Sanity check to ensure that the node has valid extra opaque
96
+ // data. If it does not, we'll skip it. We need to do this
97
+ // because previously we would just persist any TLV bytes that
98
+ // we received without validating them. Now, however, we
99
+ // normalise the storage of extra opaque data, so we need to
100
+ // ensure that the data is valid. We don't want to abort the
101
+ // migration if we encounter a node with invalid extra opaque
102
+ // data, so we'll just skip it and log a warning.
103
+ _ , err := marshalExtraOpaqueData (node .ExtraOpaqueData )
104
+ if errors .Is (err , ErrParsingExtraTLVBytes ) {
105
+ skipped ++
106
+ log .Warnf ("Skipping migration of node %x with invalid " +
107
+ "extra opaque data: %v" , pub ,
108
+ node .ExtraOpaqueData )
109
+
110
+ return nil
111
+ } else if err != nil {
112
+ return fmt .Errorf ("unable to marshal extra " +
113
+ "opaque data for node %x: %w" , pub , err )
114
+ }
115
+
116
+ count ++
117
+
118
+ // TODO(elle): At this point, we should check the loaded node
119
+ // to see if we should extract any DNS addresses from its
120
+ // opaque type addresses. This is expected to be done in:
121
+ // https://github.com/lightningnetwork/lnd/pull/9455.
122
+ // This TODO is being tracked in
123
+ // https://github.com/lightningnetwork/lnd/issues/9795 as this
124
+ // must be addressed before making this code path active in
125
+ // production.
126
+
127
+ // Write the node to the SQL database.
128
+ id , err := upsertNode (ctx , sqlDB , node )
129
+ if err != nil {
130
+ return fmt .Errorf ("could not persist node(%x): %w" , pub ,
131
+ err )
132
+ }
133
+
134
+ // Fetch it from the SQL store and compare it against the
135
+ // original node object to ensure the migration was successful.
136
+ dbNode , err := sqlDB .GetNodeByPubKey (
137
+ ctx , sqlc.GetNodeByPubKeyParams {
138
+ PubKey : node .PubKeyBytes [:],
139
+ Version : int16 (ProtocolV1 ),
140
+ },
141
+ )
142
+ if err != nil {
143
+ return fmt .Errorf ("could not get node by pubkey (%x)" +
144
+ "after migration: %w" , pub , err )
145
+ }
146
+
147
+ // Sanity check: ensure the migrated node ID matches the one we
148
+ // just inserted.
149
+ if dbNode .ID != id {
150
+ return fmt .Errorf ("node ID mismatch for node (%x) " +
151
+ "after migration: expected %d, got %d" ,
152
+ pub , id , dbNode .ID )
153
+ }
154
+
155
+ migratedNode , err := buildNode (ctx , sqlDB , & dbNode )
156
+ if err != nil {
157
+ return fmt .Errorf ("could not build migrated node " +
158
+ "from dbNode(db id: %d, node pub: %x): %w" ,
159
+ dbNode .ID , pub , err )
160
+ }
161
+
162
+ // Make sure that the node addresses are sorted before
163
+ // comparing them to ensure that the order of addresses does
164
+ // not affect the comparison.
165
+ slices .SortFunc (node .Addresses , func (i , j net.Addr ) int {
166
+ return cmp .Compare (i .String (), j .String ())
167
+ })
168
+ slices .SortFunc (
169
+ migratedNode .Addresses , func (i , j net.Addr ) int {
170
+ return cmp .Compare (i .String (), j .String ())
171
+ },
172
+ )
173
+
174
+ return sqldb .CompareRecords (
175
+ node , migratedNode , fmt .Sprintf ("node %x" , pub ),
176
+ )
177
+ })
178
+ if err != nil {
179
+ return fmt .Errorf ("could not migrate nodes: %w" , err )
180
+ }
181
+
182
+ log .Infof ("Migrated %d nodes from KV to SQL (skipped %d nodes due to " +
183
+ "invalid TLV streams)" , count , skipped )
184
+
185
+ return nil
186
+ }
0 commit comments