Skip to content

[graph mig 1]: graph/db: migrate graph nodes from kvdb to SQL #10036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/release-notes/release-notes-0.20.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ circuit. The indices are only available for forwarding events saved after v0.20.
* [9](https://github.com/lightningnetwork/lnd/pull/9939)
* [10](https://github.com/lightningnetwork/lnd/pull/9971)
* [11](https://github.com/lightningnetwork/lnd/pull/9972)
* Add graph SQL migration logic:
* [1](https://github.com/lightningnetwork/lnd/pull/10036)

## RPC Updates
* Previously the `RoutingPolicy` would return the inbound fee record in its
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ require (
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/ory/dockertest/v3 v3.10.0 // indirect
github.com/pmezard/go-difflib v1.0.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
Expand Down
263 changes: 263 additions & 0 deletions graph/db/sql_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package graphdb

import (
"cmp"
"context"
"errors"
"fmt"
"net"
"slices"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/sqldb"
"github.com/lightningnetwork/lnd/sqldb/sqlc"
)

// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
// backend.
//
// NOTE: this is currently not called from any code path. It is called via tests
// only for now and will be called from the main lnd binary once the
// migration is fully implemented and tested.
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
sqlDB SQLQueries, _ chainhash.Hash) error {

log.Infof("Starting migration of the graph store from KV to SQL")
t0 := time.Now()

// Check if there is a graph to migrate.
graphExists, err := checkGraphExists(kvBackend)
if err != nil {
return fmt.Errorf("failed to check graph existence: %w", err)
}
if !graphExists {
log.Infof("No graph found in KV store, skipping the migration")
return nil
}

// 1) Migrate all the nodes.
if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
return fmt.Errorf("could not migrate nodes: %w", err)
}

// 2) Migrate the source node.
if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
return fmt.Errorf("could not migrate source node: %w", err)
}

log.Infof("Finished migration of the graph store from KV to SQL in %v",
time.Since(t0))

return nil
}

// checkGraphExists checks if the graph exists in the KV backend.
func checkGraphExists(db kvdb.Backend) (bool, error) {
// Check if there is even a graph to migrate.
err := db.View(func(tx kvdb.RTx) error {
// Check for the existence of the node bucket which is a top
// level bucket that would have been created on the initial
// creation of the graph store.
nodes := tx.ReadBucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
}

return nil
}, func() {})
if errors.Is(err, ErrGraphNotFound) {
return false, nil
} else if err != nil {
return false, err
}

return true, nil
}

// migrateNodes migrates all nodes from the KV backend to the SQL database.
// This includes doing a sanity check after each migration to ensure that the
// migrated node matches the original node.
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
sqlDB SQLQueries) error {

// Keep track of the number of nodes migrated and the number of
// nodes skipped due to errors.
var (
count uint64
skipped uint64
)

// Loop through each node in the KV store and insert it into the SQL
// database.
err := forEachNode(kvBackend, func(_ kvdb.RTx,
node *models.LightningNode) error {

pub := node.PubKeyBytes

// Sanity check to ensure that the node has valid extra opaque
// data. If it does not, we'll skip it. We need to do this
// because previously we would just persist any TLV bytes that
// we received without validating them. Now, however, we
// normalise the storage of extra opaque data, so we need to
// ensure that the data is valid. We don't want to abort the
// migration if we encounter a node with invalid extra opaque
// data, so we'll just skip it and log a warning.
_, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
if errors.Is(err, ErrParsingExtraTLVBytes) {
skipped++
log.Warnf("Skipping migration of node %x with invalid "+
"extra opaque data: %v", pub,
node.ExtraOpaqueData)

return nil
} else if err != nil {
return fmt.Errorf("unable to marshal extra "+
"opaque data for node %x: %w", pub, err)
}

count++

// TODO(elle): At this point, we should check the loaded node
// to see if we should extract any DNS addresses from its
// opaque type addresses. This is expected to be done in:
// https://github.com/lightningnetwork/lnd/pull/9455.
// This TODO is being tracked in
// https://github.com/lightningnetwork/lnd/issues/9795 as this
// must be addressed before making this code path active in
// production.
Comment on lines +122 to +130
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi @mohamedawnallah: this is where the migration will happen for DNS adds currently stored as opaque addrs - which is why we dont need that migration to happen in #9455.


// Write the node to the SQL database.
id, err := upsertNode(ctx, sqlDB, node)
if err != nil {
return fmt.Errorf("could not persist node(%x): %w", pub,
err)
}

// Fetch it from the SQL store and compare it against the
// original node object to ensure the migration was successful.
dbNode, err := sqlDB.GetNodeByPubKey(
ctx, sqlc.GetNodeByPubKeyParams{
PubKey: node.PubKeyBytes[:],
Version: int16(ProtocolV1),
},
)
if err != nil {
return fmt.Errorf("could not get node by pubkey (%x)"+
"after migration: %w", pub, err)
}

// Sanity check: ensure the migrated node ID matches the one we
// just inserted.
if dbNode.ID != id {
return fmt.Errorf("node ID mismatch for node (%x) "+
"after migration: expected %d, got %d",
pub, id, dbNode.ID)
}

migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
if err != nil {
return fmt.Errorf("could not build migrated node "+
"from dbNode(db id: %d, node pub: %x): %w",
dbNode.ID, pub, err)
}

// Make sure that the node addresses are sorted before
// comparing them to ensure that the order of addresses does
// not affect the comparison.
slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
return cmp.Compare(i.String(), j.String())
})
slices.SortFunc(
migratedNode.Addresses, func(i, j net.Addr) int {
return cmp.Compare(i.String(), j.String())
},
)

return sqldb.CompareRecords(
node, migratedNode, fmt.Sprintf("node %x", pub),
)
})
if err != nil {
return fmt.Errorf("could not migrate nodes: %w", err)
}

log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
"invalid TLV streams)", count, skipped)

return nil
}

// migrateSourceNode migrates the source node from the KV backend to the
// SQL database.
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
sqlDB SQLQueries) error {

sourceNode, err := sourceNode(kvdb)
if errors.Is(err, ErrSourceNodeNotSet) {
// If the source node has not been set yet, we can skip this
// migration step.
return nil
} else if err != nil {
return fmt.Errorf("could not get source node from kv "+
"store: %w", err)
}

pub := sourceNode.PubKeyBytes

// Get the DB ID of the source node by its public key. This node must
// already exist in the SQL database, as it should have been migrated
// in the previous node-migration step.
id, err := sqlDB.GetNodeIDByPubKey(
ctx, sqlc.GetNodeIDByPubKeyParams{
PubKey: pub[:],
Version: int16(ProtocolV1),
},
)
if err != nil {
return fmt.Errorf("could not get source node ID: %w", err)
}

// Now we can add the source node to the SQL database.
err = sqlDB.AddSourceNode(ctx, id)
if err != nil {
return fmt.Errorf("could not add source node to SQL store: %w",
err)
}

// Verify that the source node was added correctly by fetching it back
// from the SQL database and checking that the expected DB ID and
// pub key are returned. We don't need to do a whole node comparison
// here, as this was already done in the previous migration step.
srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
if err != nil {
return fmt.Errorf("could not get source nodes from SQL "+
"store: %w", err)
}

// The SQL store has support for multiple source nodes (for future
// protocol versions) but this migration is purely aimed at the V1
// store, and so we expect exactly one source node to be present.
if len(srcNodes) != 1 {
return fmt.Errorf("expected exactly one source node, "+
"got %d", len(srcNodes))
}

// Check that the source node ID and pub key match the original
// source node.
if srcNodes[0].NodeID != id {
return fmt.Errorf("source node ID mismatch after migration: "+
"expected %d, got %d", id, srcNodes[0].NodeID)
}
err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
if err != nil {
return fmt.Errorf("source node pubkey mismatch after "+
"migration: %w", err)
}

log.Infof("Migrated source node with pubkey %x to SQL", pub[:])

return nil
}
Loading
Loading