diff --git a/Cargo.toml b/Cargo.toml index d89ad28e2..826d2d0cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,16 +28,16 @@ panic = 'abort' # Abort on panic default = [] [dependencies] -lightning = { version = "0.1.0", features = ["std"] } -lightning-types = { version = "0.2.0" } -lightning-invoice = { version = "0.33.0", features = ["std"] } -lightning-net-tokio = { version = "0.1.0" } -lightning-persister = { version = "0.1.0" } -lightning-background-processor = { version = "0.1.0", features = ["futures"] } -lightning-rapid-gossip-sync = { version = "0.1.0" } -lightning-block-sync = { version = "0.1.0", features = ["rpc-client", "tokio"] } -lightning-transaction-sync = { version = "0.1.0", features = ["esplora-async-https", "time"] } -lightning-liquidity = { version = "0.1.0", features = ["std"] } +#lightning = { version = "0.1.0", features = ["std"] } +#lightning-types = { version = "0.2.0" } +#lightning-invoice = { version = "0.33.0", features = ["std"] } +#lightning-net-tokio = { version = "0.1.0" } +#lightning-persister = { version = "0.1.0" } +#lightning-background-processor = { version = "0.1.0", features = ["futures"] } +#lightning-rapid-gossip-sync = { version = "0.1.0" } +#lightning-block-sync = { version = "0.1.0", features = ["rpc-client", "tokio"] } +#lightning-transaction-sync = { version = "0.1.0", features = ["esplora-async-https", "time"] } +#lightning-liquidity = { version = "0.1.0", features = ["std"] } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["std"] } #lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } @@ -50,6 +50,18 @@ lightning-liquidity = { version = "0.1.0", features = ["std"] } #lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main", features = ["esplora-async-https", "time"] } #lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", branch = "main" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["std"] } +lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" } +lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["futures"] } +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["esplora-async-https", "time"] } +lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb" } + #lightning = { path = "../rust-lightning/lightning", features = ["std"] } #lightning-types = { path = "../rust-lightning/lightning-types" } #lightning-invoice = { path = "../rust-lightning/lightning-invoice", features = ["std"] } @@ -89,8 +101,9 @@ prost = { version = "0.11.6", default-features = false} winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { version = "0.1.0", features = ["std", "_test_utils"] } +#lightning = { version = "0.1.0", features = ["std", "_test_utils"] } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "f866e2c0404d495f8bc8b16588c8b8a62bba8deb", features = ["std", "_test_utils"] } #lightning = { path = "../rust-lightning/lightning", features = ["std", "_test_utils"] } electrum-client = { version = "0.21.0", default-features = true } bitcoincore-rpc = { version = "0.19.0", default-features = false } diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 632daa7df..7e1210a08 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -58,6 +58,7 @@ interface Builder { void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_gossip_source_p2p(); void set_gossip_source_rgs(string rgs_server_url); + void set_pathfinding_scores_source(string url); void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token); void set_storage_dir_path(string storage_dir_path); void set_filesystem_logger(string? log_file_path, LogLevel? max_log_level); @@ -275,6 +276,7 @@ dictionary NodeStatus { u64? latest_onchain_wallet_sync_timestamp; u64? latest_fee_rate_cache_update_timestamp; u64? latest_rgs_snapshot_timestamp; + u64? latest_pathfinding_scores_sync_timestamp; u64? latest_node_announcement_broadcast_timestamp; u32? latest_channel_monitor_archival_height; }; diff --git a/src/builder.rs b/src/builder.rs index 81deb7ddf..908b4204c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -16,10 +16,12 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; -use crate::io::utils::{read_node_metrics, write_node_metrics}; +use crate::io::utils::{ + read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics, +}; use crate::io::vss_store::VssStore; use crate::liquidity::LiquiditySource; -use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger}; +use crate::logger::{log_error, log_info, log_trace, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::store::PaymentStore; use crate::peer_store::PeerStore; @@ -41,7 +43,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; use lightning::routing::gossip::NodeAlias; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ - ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters, + CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters, + ProbabilisticScoringFeeParameters, }; use lightning::sign::EntropySource; @@ -97,6 +100,11 @@ enum GossipSourceConfig { RapidGossipSync(String), } +#[derive(Debug, Clone)] +struct PathfindingScoresSyncConfig { + url: String, +} + #[derive(Debug, Clone)] struct LiquiditySourceConfig { // LSPS2 service's (address, node_id, token) @@ -211,6 +219,7 @@ pub struct NodeBuilder { gossip_source_config: Option, liquidity_source_config: Option, log_writer_config: Option, + pathfinding_scores_sync_config: Option, } impl NodeBuilder { @@ -227,6 +236,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let pathfinding_scores_sync_config = None; Self { config, entropy_source_config, @@ -234,6 +244,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + pathfinding_scores_sync_config, } } @@ -304,6 +315,14 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source its external scores from the given URL. + /// + /// The external scores are merged into the local scoring system to improve routing. + pub fn set_pathfinding_scores_source(&mut self, url: String) -> &mut Self { + self.pathfinding_scores_sync_config = Some(PathfindingScoresSyncConfig { url }); + self + } + /// Configures the [`Node`] instance to source its inbound liquidity from the given /// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md) /// service. @@ -529,6 +548,7 @@ impl NodeBuilder { config, self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), + self.pathfinding_scores_sync_config.as_ref(), self.liquidity_source_config.as_ref(), seed_bytes, logger, @@ -551,6 +571,7 @@ impl NodeBuilder { config, self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), + self.pathfinding_scores_sync_config.as_ref(), self.liquidity_source_config.as_ref(), seed_bytes, logger, @@ -643,6 +664,13 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url); } + /// Configures the [`Node`] instance to source its external scores from the given URL. + /// + /// The external scores are merged into the local scoring system to improve routing. + pub fn set_pathfinding_scores_source(&self, url: String) { + self.inner.write().unwrap().set_pathfinding_scores_source(url); + } + /// Configures the [`Node`] instance to source its inbound liquidity from the given /// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md) /// service. @@ -802,6 +830,7 @@ impl ArcedNodeBuilder { fn build_with_store_internal( config: Arc, chain_data_source_config: Option<&ChainDataSourceConfig>, gossip_source_config: Option<&GossipSourceConfig>, + pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64], logger: Arc, kv_store: Arc, ) -> Result { @@ -957,26 +986,38 @@ fn build_with_store_internal( }, }; - let scorer = match io::utils::read_scorer( + let local_scorer = match io::utils::read_scorer( Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger), ) { - Ok(scorer) => Arc::new(Mutex::new(scorer)), + Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { let params = ProbabilisticScoringDecayParameters::default(); - Arc::new(Mutex::new(ProbabilisticScorer::new( - params, - Arc::clone(&network_graph), - Arc::clone(&logger), - ))) + ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger)) } else { return Err(BuildError::ReadFailed); } }, }; + let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); + + // Restore external pathfinding scores from cache if possible. + match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) { + Ok(external_scores) => { + scorer.lock().unwrap().merge(external_scores, cur_time); + log_trace!(logger, "External scores from cache merged successfully"); + }, + Err(e) => { + if e.kind() != std::io::ErrorKind::NotFound { + log_error!(logger, "Error while reading external scores from cache: {}", e); + return Err(BuildError::ReadFailed); + } + }, + } + let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), @@ -1078,8 +1119,8 @@ fn build_with_store_internal( // Give ChannelMonitors to ChainMonitor for (_blockhash, channel_monitor) in channel_monitors.into_iter() { - let funding_outpoint = channel_monitor.get_funding_txo().0; - chain_monitor.watch_channel(funding_outpoint, channel_monitor).map_err(|e| { + let channel_id = channel_monitor.channel_id(); + chain_monitor.watch_channel(channel_id, channel_monitor).map_err(|e| { log_error!(logger, "Failed to watch channel monitor: {:?}", e); BuildError::InvalidChannelMonitor })?; @@ -1282,6 +1323,8 @@ fn build_with_store_internal( let (stop_sender, _) = tokio::sync::watch::channel(()); let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); + let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone()); + Ok(Node { runtime, stop_sender, @@ -1300,6 +1343,7 @@ fn build_with_store_internal( keys_manager, network_graph, gossip_source, + pathfinding_scores_sync_url, liquidity_source, kv_store, logger, diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 59ce78f37..96b889f2d 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -306,7 +306,7 @@ impl ChainSource { if let Some(worst_channel_monitor_block_hash) = chain_monitor .list_monitors() .iter() - .flat_map(|(txo, _)| chain_monitor.get_monitor(*txo)) + .flat_map(|channel_id| chain_monitor.get_monitor(*channel_id)) .map(|m| m.current_best_block()) .min_by_key(|b| b.height) .map(|b| b.block_hash) diff --git a/src/config.rs b/src/config.rs index fc2ac8a78..249e31773 100644 --- a/src/config.rs +++ b/src/config.rs @@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10); // The time in-between RGS sync attempts. pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); +// The time in-between external scores sync attempts. +pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); + // The time in-between node announcement broadcast attempts. pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60); @@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5; // The timeout after which we abort a RGS sync operation. pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5; +// The timeout after which we abort a external scores sync operation. +pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5; + // The length in bytes of our wallets' keys seed. pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64; diff --git a/src/io/utils.rs b/src/io/utils.rs index b5537ed7d..6e60b4ed2 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -23,7 +23,9 @@ use crate::{Error, EventQueue, NodeMetrics, PaymentDetails}; use lightning::io::Cursor; use lightning::ln::msgs::DecodeError; use lightning::routing::gossip::NetworkGraph; -use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters}; +use lightning::routing::scoring::{ + ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters, +}; use lightning::util::persist::{ KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, @@ -52,6 +54,8 @@ use std::ops::Deref; use std::path::Path; use std::sync::Arc; +pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache"; + /// Generates a random [BIP 39] mnemonic. /// /// The result may be used to initialize the [`Node`] entropy, i.e., can be given to @@ -166,6 +170,51 @@ where }) } +/// Read previously persisted external pathfinding scores from the cache. +pub(crate) fn read_external_pathfinding_scores_from_cache( + kv_store: Arc, logger: L, +) -> Result +where + L::Target: LdkLogger, +{ + let mut reader = Cursor::new(kv_store.read( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + )?); + ChannelLiquidities::read(&mut reader).map_err(|e| { + log_error!(logger, "Failed to deserialize scorer: {}", e); + std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer") + }) +} + +/// Persist external pathfinding scores to the cache. +pub(crate) fn write_external_pathfinding_scores_to_cache( + kv_store: Arc, data: &ChannelLiquidities, logger: L, +) -> Result<(), Error> +where + L::Target: LdkLogger, +{ + kv_store + .write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + &data.encode(), + ) + .map_err(|e| { + log_error!( + logger, + "Writing data to key {}/{}/{} failed due to: {}", + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + e + ); + Error::PersistenceFailed + }) +} + /// Read previously persisted events from the store. pub(crate) fn read_event_queue( kv_store: Arc, logger: L, diff --git a/src/lib.rs b/src/lib.rs index 2b292e3f6..05fae97f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,7 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +mod scoring; mod sweep; mod tx_broadcaster; mod types; @@ -122,8 +123,9 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ - default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, - PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + default_user_config, may_announce_channel, ChannelConfig, Config, + EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS, + NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -137,6 +139,7 @@ use payment::{ UnifiedQrPayment, }; use peer_store::{PeerInfo, PeerStore}; +use scoring::setup_background_pathfinding_scores_sync; use types::{ Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet, @@ -189,6 +192,7 @@ pub struct Node { keys_manager: Arc, network_graph: Arc, gossip_source: Arc, + pathfinding_scores_sync_url: Option, liquidity_source: Option>>>, kv_store: Arc, logger: Arc, @@ -272,7 +276,6 @@ impl Node { return; } _ = interval.tick() => { - let gossip_sync_logger = Arc::clone(&gossip_sync_logger); let now = Instant::now(); match gossip_source.update_rgs_snapshot().await { Ok(updated_timestamp) => { @@ -304,6 +307,18 @@ impl Node { }); } + if let Some(pathfinding_scores_sync_url) = self.pathfinding_scores_sync_url.as_ref() { + setup_background_pathfinding_scores_sync( + pathfinding_scores_sync_url.clone(), + Arc::clone(&self.scorer), + Arc::clone(&self.node_metrics), + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + &runtime, + self.stop_sender.subscribe(), + ); + } + if let Some(listening_addresses) = &self.config.listening_addresses { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); @@ -725,6 +740,8 @@ impl Node { locked_node_metrics.latest_fee_rate_cache_update_timestamp; let latest_rgs_snapshot_timestamp = locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64); + let latest_pathfinding_scores_sync_timestamp = + locked_node_metrics.latest_pathfinding_scores_sync_timestamp; let latest_node_announcement_broadcast_timestamp = locked_node_metrics.latest_node_announcement_broadcast_timestamp; let latest_channel_monitor_archival_height = @@ -738,6 +755,7 @@ impl Node { latest_onchain_wallet_sync_timestamp, latest_fee_rate_cache_update_timestamp, latest_rgs_snapshot_timestamp, + latest_pathfinding_scores_sync_timestamp, latest_node_announcement_broadcast_timestamp, latest_channel_monitor_archival_height, } @@ -1376,8 +1394,8 @@ impl Node { let mut total_lightning_balance_sats = 0; let mut lightning_balances = Vec::new(); - for (funding_txo, channel_id) in self.chain_monitor.list_monitors() { - match self.chain_monitor.get_monitor(funding_txo) { + for channel_id in self.chain_monitor.list_monitors() { + match self.chain_monitor.get_monitor(channel_id) { Ok(monitor) => { // unwrap safety: `get_counterparty_node_id` will always be `Some` after 0.0.110 and // LDK Node 0.1 depended on 0.0.115 already. @@ -1566,6 +1584,8 @@ pub struct NodeStatus { /// /// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet. pub latest_rgs_snapshot_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores. + pub latest_pathfinding_scores_sync_timestamp: Option, /// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node /// announcement. /// @@ -1584,6 +1604,7 @@ pub(crate) struct NodeMetrics { latest_onchain_wallet_sync_timestamp: Option, latest_fee_rate_cache_update_timestamp: Option, latest_rgs_snapshot_timestamp: Option, + latest_pathfinding_scores_sync_timestamp: Option, latest_node_announcement_broadcast_timestamp: Option, latest_channel_monitor_archival_height: Option, } @@ -1595,6 +1616,7 @@ impl Default for NodeMetrics { latest_onchain_wallet_sync_timestamp: None, latest_fee_rate_cache_update_timestamp: None, latest_rgs_snapshot_timestamp: None, + latest_pathfinding_scores_sync_timestamp: None, latest_node_announcement_broadcast_timestamp: None, latest_channel_monitor_archival_height: None, } @@ -1603,6 +1625,7 @@ impl Default for NodeMetrics { impl_writeable_tlv_based!(NodeMetrics, { (0, latest_lightning_wallet_sync_timestamp, option), + (1, latest_pathfinding_scores_sync_timestamp, option), (2, latest_onchain_wallet_sync_timestamp, option), (4, latest_fee_rate_cache_update_timestamp, option), (6, latest_rgs_snapshot_timestamp, option), diff --git a/src/liquidity.rs b/src/liquidity.rs index 9e9450f8f..c9ae6ee65 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -13,10 +13,10 @@ use lightning::ln::channelmanager::MIN_FINAL_CLTV_EXPIRY_DELTA; use lightning::ln::msgs::SocketAddress; use lightning::routing::router::{RouteHint, RouteHintHop}; use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, InvoiceBuilder, RoutingFees}; -use lightning_liquidity::events::Event; -use lightning_liquidity::lsps0::ser::RequestId; +use lightning_liquidity::events::LiquidityEvent; +use lightning_liquidity::lsps0::ser::LSPSRequestId; use lightning_liquidity::lsps2::event::LSPS2ClientEvent; -use lightning_liquidity::lsps2::msgs::OpeningFeeParams; +use lightning_liquidity::lsps2::msgs::LSPS2OpeningFeeParams; use lightning_liquidity::lsps2::utils::compute_opening_fee; use bitcoin::hashes::{sha256, Hash}; @@ -35,8 +35,8 @@ struct LSPS2Service { address: SocketAddress, node_id: PublicKey, token: Option, - pending_fee_requests: Mutex>>, - pending_buy_requests: Mutex>>, + pending_fee_requests: Mutex>>, + pending_buy_requests: Mutex>>, } pub(crate) struct LiquiditySource @@ -87,7 +87,7 @@ where pub(crate) async fn handle_next_event(&self) { match self.liquidity_manager.next_event_async().await { - Event::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady { + LiquidityEvent::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady { request_id, counterparty_node_id, opening_fee_params_menu, @@ -137,7 +137,7 @@ where ); } }, - Event::LSPS2Client(LSPS2ClientEvent::InvoiceParametersReady { + LiquidityEvent::LSPS2Client(LSPS2ClientEvent::InvoiceParametersReady { request_id, counterparty_node_id, intercept_scid, @@ -330,7 +330,7 @@ where } async fn lsps2_send_buy_request( - &self, amount_msat: Option, opening_fee_params: OpeningFeeParams, + &self, amount_msat: Option, opening_fee_params: LSPS2OpeningFeeParams, ) -> Result { let lsps2_service = self.lsps2_service.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; @@ -430,7 +430,7 @@ where #[derive(Debug, Clone)] pub(crate) struct LSPS2FeeResponse { - opening_fee_params_menu: Vec, + opening_fee_params_menu: Vec, } #[derive(Debug, Clone)] diff --git a/src/scoring.rs b/src/scoring.rs new file mode 100644 index 000000000..13682b17f --- /dev/null +++ b/src/scoring.rs @@ -0,0 +1,106 @@ +use std::{ + io::Cursor, + sync::{Arc, Mutex, RwLock}, + time::{Duration, SystemTime}, +}; + +use crate::io::utils::write_external_pathfinding_scores_to_cache; +use crate::logger::LdkLogger; +use crate::{ + write_node_metrics, DynStore, Logger, NodeMetrics, Scorer, + EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS, +}; +use lightning::{ + log_error, log_info, log_trace, routing::scoring::ChannelLiquidities, util::ser::Readable, +}; + +/// Start a background task that periodically downloads scores via an external url and merges them into the local +/// pathfinding scores. +pub fn setup_background_pathfinding_scores_sync( + url: String, scorer: Arc>, node_metrics: Arc>, + kv_store: Arc, logger: Arc, runtime: &tokio::runtime::Runtime, + mut stop_receiver: tokio::sync::watch::Receiver<()>, +) { + log_info!(logger, "External scores background syncing enabled from {}", url); + + let logger = Arc::clone(&logger); + + runtime.spawn(async move { + let mut interval = tokio::time::interval(EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL); + loop { + tokio::select! { + _ = stop_receiver.changed() => { + log_trace!( + logger, + "Stopping background syncing external scores.", + ); + return; + } + _ = interval.tick() => { + log_trace!( + logger, + "Background sync of external scores started.", + ); + + sync_external_scores(logger.as_ref(), scorer.as_ref(), node_metrics.as_ref(), Arc::clone(&kv_store), &url).await; + } + } + } + }); +} + +async fn sync_external_scores( + logger: &Logger, scorer: &Mutex, node_metrics: &RwLock, + kv_store: Arc, url: &String, +) -> () { + let response = tokio::time::timeout( + Duration::from_secs(EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS), + reqwest::get(url), + ) + .await; + + if let Err(e) = response { + log_error!(logger, "Retrieving external scores timed out: {}", e); + return; + } + let response = response.unwrap(); + + if let Err(e) = response { + log_error!(logger, "Failed to retrieve external scores update: {}", e); + return; + } + + let body = response.unwrap().bytes().await; + if let Err(e) = body { + log_error!(logger, "Failed to read external scores update: {}", e); + return; + } + + let mut reader = Cursor::new(body.unwrap()); + match ChannelLiquidities::read(&mut reader) { + Ok(liquidities) => { + if let Err(e) = write_external_pathfinding_scores_to_cache( + Arc::clone(&kv_store), + &liquidities, + logger, + ) { + log_error!(logger, "Failed to persist external scores to cache: {}", e); + } + + let duration_since_epoch = + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + scorer.lock().unwrap().merge(liquidities, duration_since_epoch); + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_pathfinding_scores_sync_timestamp = + Some(duration_since_epoch.as_secs()); + write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), logger) + .unwrap_or_else(|e| { + log_error!(logger, "Persisting node metrics failed: {}", e); + }); + log_trace!(logger, "External scores merged successfully"); + }, + Err(e) => { + log_error!(logger, "Failed to parse external scores update: {}", e); + }, + } +} diff --git a/src/types.rs b/src/types.rs index 1c9ab64b9..738203127 100644 --- a/src/types.rs +++ b/src/types.rs @@ -21,7 +21,8 @@ use lightning::ln::peer_handler::IgnoringMessageHandler; use lightning::ln::types::ChannelId; use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; -use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; +use lightning::routing::scoring::CombinedScorer; +use lightning::routing::scoring::ProbabilisticScoringFeeParameters; use lightning::sign::InMemorySigner; use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, Writeable, Writer}; @@ -88,7 +89,7 @@ pub(crate) type Router = DefaultRouter< ProbabilisticScoringFeeParameters, Scorer, >; -pub(crate) type Scorer = ProbabilisticScorer, Arc>; +pub(crate) type Scorer = CombinedScorer, Arc>; pub(crate) type Graph = gossip::NetworkGraph>;