Skip to content

Commit 14167eb

Browse files
committed
add functionality to periodically update routing scores from an external http source
1 parent c64d8c5 commit 14167eb

File tree

6 files changed

+176
-13
lines changed

6 files changed

+176
-13
lines changed

bindings/ldk_node.udl

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ interface Builder {
5858
void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password);
5959
void set_gossip_source_p2p();
6060
void set_gossip_source_rgs(string rgs_server_url);
61+
void set_pathfinding_scores_source(string url);
6162
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
6263
void set_storage_dir_path(string storage_dir_path);
6364
void set_filesystem_logger(string? log_file_path, LogLevel? log_level);
@@ -273,6 +274,7 @@ dictionary NodeStatus {
273274
u64? latest_onchain_wallet_sync_timestamp;
274275
u64? latest_fee_rate_cache_update_timestamp;
275276
u64? latest_rgs_snapshot_timestamp;
277+
u64? latest_pathfinding_scores_sync_timestamp;
276278
u64? latest_node_announcement_broadcast_timestamp;
277279
u32? latest_channel_monitor_archival_height;
278280
};

src/builder.rs

+29-9
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use crate::types::{
3030
};
3131
use crate::wallet::persist::KVStoreWalletPersister;
3232
use crate::wallet::Wallet;
33-
use crate::Node;
3433
use crate::{io, NodeMetrics};
34+
use crate::{Node, PathfindingScoresSyncConfig};
3535

3636
use lightning::chain::{chainmonitor, BestBlock, Watch};
3737
use lightning::io::Cursor;
@@ -41,7 +41,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
4141
use lightning::routing::gossip::NodeAlias;
4242
use lightning::routing::router::DefaultRouter;
4343
use lightning::routing::scoring::{
44-
ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters,
44+
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
45+
ProbabilisticScoringFeeParameters,
4546
};
4647
use lightning::sign::EntropySource;
4748

@@ -229,6 +230,7 @@ pub struct NodeBuilder {
229230
gossip_source_config: Option<GossipSourceConfig>,
230231
liquidity_source_config: Option<LiquiditySourceConfig>,
231232
log_writer_config: Option<LogWriterConfig>,
233+
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
232234
}
233235

234236
impl NodeBuilder {
@@ -245,13 +247,15 @@ impl NodeBuilder {
245247
let gossip_source_config = None;
246248
let liquidity_source_config = None;
247249
let log_writer_config = None;
250+
let scoring_source_config = None;
248251
Self {
249252
config,
250253
entropy_source_config,
251254
chain_data_source_config,
252255
gossip_source_config,
253256
liquidity_source_config,
254257
log_writer_config,
258+
pathfinding_scores_sync_config: scoring_source_config,
255259
}
256260
}
257261

@@ -322,6 +326,14 @@ impl NodeBuilder {
322326
self
323327
}
324328

329+
/// Configures the [`Node`] instance to source its external scores from the given URL.
330+
///
331+
/// The external scores are merged into the local scoring system to improve routing.
332+
pub fn set_pathfinding_scores_source(&mut self, url: String) -> &mut Self {
333+
self.pathfinding_scores_sync_config = Some(PathfindingScoresSyncConfig { url });
334+
self
335+
}
336+
325337
/// Configures the [`Node`] instance to source its inbound liquidity from the given
326338
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
327339
/// service.
@@ -540,6 +552,7 @@ impl NodeBuilder {
540552
config,
541553
self.chain_data_source_config.as_ref(),
542554
self.gossip_source_config.as_ref(),
555+
self.pathfinding_scores_sync_config.as_ref(),
543556
self.liquidity_source_config.as_ref(),
544557
seed_bytes,
545558
logger,
@@ -562,6 +575,7 @@ impl NodeBuilder {
562575
config,
563576
self.chain_data_source_config.as_ref(),
564577
self.gossip_source_config.as_ref(),
578+
self.pathfinding_scores_sync_config.as_ref(),
565579
self.liquidity_source_config.as_ref(),
566580
seed_bytes,
567581
logger,
@@ -654,6 +668,12 @@ impl ArcedNodeBuilder {
654668
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
655669
}
656670

671+
/// Configures the [`Node`] instance to source its external scores from the given URL. These scores are used to
672+
/// augment the internal pathfinding scoring system to improve routing.
673+
pub fn set_pathfinding_scores_source(&self, url: String) {
674+
self.inner.write().unwrap().set_pathfinding_scores_source(url);
675+
}
676+
657677
/// Configures the [`Node`] instance to source its inbound liquidity from the given
658678
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
659679
/// service.
@@ -806,6 +826,7 @@ impl ArcedNodeBuilder {
806826
fn build_with_store_internal(
807827
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
808828
gossip_source_config: Option<&GossipSourceConfig>,
829+
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
809830
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
810831
logger: Arc<Logger>, kv_store: Arc<DynStore>,
811832
) -> Result<Node, BuildError> {
@@ -950,26 +971,24 @@ fn build_with_store_internal(
950971
},
951972
};
952973

953-
let scorer = match io::utils::read_scorer(
974+
let local_scorer = match io::utils::read_scorer(
954975
Arc::clone(&kv_store),
955976
Arc::clone(&network_graph),
956977
Arc::clone(&logger),
957978
) {
958-
Ok(scorer) => Arc::new(Mutex::new(scorer)),
979+
Ok(scorer) => scorer,
959980
Err(e) => {
960981
if e.kind() == std::io::ErrorKind::NotFound {
961982
let params = ProbabilisticScoringDecayParameters::default();
962-
Arc::new(Mutex::new(ProbabilisticScorer::new(
963-
params,
964-
Arc::clone(&network_graph),
965-
Arc::clone(&logger),
966-
)))
983+
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
967984
} else {
968985
return Err(BuildError::ReadFailed);
969986
}
970987
},
971988
};
972989

990+
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
991+
973992
let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
974993
let router = Arc::new(DefaultRouter::new(
975994
Arc::clone(&network_graph),
@@ -1303,6 +1322,7 @@ fn build_with_store_internal(
13031322
keys_manager,
13041323
network_graph,
13051324
gossip_source,
1325+
pathfinding_scores_sync_config: pathfinding_scores_sync_config.cloned(),
13061326
liquidity_source,
13071327
kv_store,
13081328
logger,

src/config.rs

+6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
5757
// The time in-between RGS sync attempts.
5858
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
5959

60+
// The time in-between external scores sync attempts.
61+
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
62+
6063
// The time in-between node announcement broadcast attempts.
6164
pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);
6265

@@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5;
7881
// The timeout after which we abort a RGS sync operation.
7982
pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5;
8083

84+
// The timeout after which we abort a external scores sync operation.
85+
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5;
86+
8187
// The length in bytes of our wallets' keys seed.
8288
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;
8389

src/lib.rs

+31-2
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ pub mod logger;
8989
mod message_handler;
9090
pub mod payment;
9191
mod peer_store;
92+
mod scoring;
9293
mod sweep;
9394
mod tx_broadcaster;
9495
mod types;
@@ -122,8 +123,9 @@ pub use builder::NodeBuilder as Builder;
122123

123124
use chain::ChainSource;
124125
use config::{
125-
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
126-
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
126+
default_user_config, may_announce_channel, ChannelConfig, Config,
127+
EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
128+
NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
127129
};
128130
use connection::ConnectionManager;
129131
use event::{EventHandler, EventQueue};
@@ -137,6 +139,7 @@ use payment::{
137139
UnifiedQrPayment,
138140
};
139141
use peer_store::{PeerInfo, PeerStore};
142+
use scoring::setup_background_pathfinding_scores_sync;
140143
use types::{
141144
Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph,
142145
KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet,
@@ -169,6 +172,11 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
169172
#[cfg(feature = "uniffi")]
170173
uniffi::include_scaffolding!("ldk_node");
171174

175+
#[derive(Debug, Clone)]
176+
struct PathfindingScoresSyncConfig {
177+
url: String,
178+
}
179+
172180
/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
173181
///
174182
/// Needs to be initialized and instantiated through [`Builder::build`].
@@ -190,6 +198,7 @@ pub struct Node {
190198
keys_manager: Arc<KeysManager>,
191199
network_graph: Arc<Graph>,
192200
gossip_source: Arc<GossipSource>,
201+
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
193202
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
194203
kv_store: Arc<DynStore>,
195204
logger: Arc<Logger>,
@@ -304,6 +313,18 @@ impl Node {
304313
});
305314
}
306315

316+
if let Some(pathfinding_scores_sync_config) = self.pathfinding_scores_sync_config.as_ref() {
317+
setup_background_pathfinding_scores_sync(
318+
pathfinding_scores_sync_config.url.clone(),
319+
self.scorer.clone(),
320+
self.node_metrics.clone(),
321+
self.kv_store.clone(),
322+
self.logger.clone(),
323+
&runtime,
324+
&self.stop_sender,
325+
);
326+
}
327+
307328
if let Some(listening_addresses) = &self.config.listening_addresses {
308329
// Setup networking
309330
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
@@ -725,6 +746,8 @@ impl Node {
725746
locked_node_metrics.latest_fee_rate_cache_update_timestamp;
726747
let latest_rgs_snapshot_timestamp =
727748
locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64);
749+
let latest_pathfinding_scores_sync_timestamp =
750+
locked_node_metrics.latest_pathfinding_scores_sync_timestamp;
728751
let latest_node_announcement_broadcast_timestamp =
729752
locked_node_metrics.latest_node_announcement_broadcast_timestamp;
730753
let latest_channel_monitor_archival_height =
@@ -738,6 +761,7 @@ impl Node {
738761
latest_onchain_wallet_sync_timestamp,
739762
latest_fee_rate_cache_update_timestamp,
740763
latest_rgs_snapshot_timestamp,
764+
latest_pathfinding_scores_sync_timestamp,
741765
latest_node_announcement_broadcast_timestamp,
742766
latest_channel_monitor_archival_height,
743767
}
@@ -1547,6 +1571,8 @@ pub struct NodeStatus {
15471571
///
15481572
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet.
15491573
pub latest_rgs_snapshot_timestamp: Option<u64>,
1574+
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores.
1575+
pub latest_pathfinding_scores_sync_timestamp: Option<u64>,
15501576
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
15511577
/// announcement.
15521578
///
@@ -1565,6 +1591,7 @@ pub(crate) struct NodeMetrics {
15651591
latest_onchain_wallet_sync_timestamp: Option<u64>,
15661592
latest_fee_rate_cache_update_timestamp: Option<u64>,
15671593
latest_rgs_snapshot_timestamp: Option<u32>,
1594+
latest_pathfinding_scores_sync_timestamp: Option<u64>,
15681595
latest_node_announcement_broadcast_timestamp: Option<u64>,
15691596
latest_channel_monitor_archival_height: Option<u32>,
15701597
}
@@ -1576,6 +1603,7 @@ impl Default for NodeMetrics {
15761603
latest_onchain_wallet_sync_timestamp: None,
15771604
latest_fee_rate_cache_update_timestamp: None,
15781605
latest_rgs_snapshot_timestamp: None,
1606+
latest_pathfinding_scores_sync_timestamp: None,
15791607
latest_node_announcement_broadcast_timestamp: None,
15801608
latest_channel_monitor_archival_height: None,
15811609
}
@@ -1584,6 +1612,7 @@ impl Default for NodeMetrics {
15841612

15851613
impl_writeable_tlv_based!(NodeMetrics, {
15861614
(0, latest_lightning_wallet_sync_timestamp, option),
1615+
(1, latest_pathfinding_scores_sync_timestamp, option),
15871616
(2, latest_onchain_wallet_sync_timestamp, option),
15881617
(4, latest_fee_rate_cache_update_timestamp, option),
15891618
(6, latest_rgs_snapshot_timestamp, option),

src/scoring.rs

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use std::{
2+
io::Cursor,
3+
sync::{Arc, Mutex, RwLock},
4+
time::{Duration, SystemTime},
5+
};
6+
7+
use crate::{logger::LdkLogger, NodeMetrics};
8+
use crate::{
9+
write_node_metrics, DynStore, Graph, Logger, EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL,
10+
EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
11+
};
12+
use lightning::{
13+
log_error, log_info, log_trace,
14+
routing::scoring::{ChannelLiquidities, CombinedScorer},
15+
util::ser::Readable,
16+
};
17+
18+
/// Start a background task that periodically downloads scores via an external url and merges them into the local
19+
/// pathfinding scores.
20+
pub fn setup_background_pathfinding_scores_sync(
21+
url: String, scorer: Arc<Mutex<CombinedScorer<Arc<Graph>, Arc<Logger>>>>,
22+
node_metrics: Arc<RwLock<NodeMetrics>>, kv_store: Arc<DynStore>, logger: Arc<Logger>,
23+
runtime: &tokio::runtime::Runtime, stop_sender: &tokio::sync::watch::Sender<()>,
24+
) {
25+
log_info!(logger, "External scores background syncing from {} enabled", url);
26+
27+
let logger = Arc::clone(&logger);
28+
let mut stop_sync = stop_sender.subscribe();
29+
30+
runtime.spawn(async move {
31+
let mut interval = tokio::time::interval(EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL);
32+
loop {
33+
tokio::select! {
34+
_ = stop_sync.changed() => {
35+
log_trace!(
36+
logger,
37+
"Stopping background syncing external scores.",
38+
);
39+
return;
40+
}
41+
_ = interval.tick() => {
42+
log_trace!(
43+
logger,
44+
"Background sync of external scores started.",
45+
);
46+
47+
sync_external_scores(logger.as_ref(), scorer.as_ref(), node_metrics.as_ref(), Arc::clone(&kv_store), &url).await;
48+
}
49+
}
50+
}
51+
});
52+
}
53+
54+
async fn sync_external_scores(
55+
logger: &Logger, scorer: &Mutex<CombinedScorer<Arc<Graph>, Arc<Logger>>>,
56+
node_metrics: &RwLock<NodeMetrics>, kv_store: Arc<DynStore>, url: &String,
57+
) -> () {
58+
let response = tokio::time::timeout(
59+
Duration::from_secs(EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS),
60+
reqwest::get(url),
61+
)
62+
.await;
63+
64+
match response {
65+
Ok(Ok(response)) => {
66+
let body = response.bytes().await;
67+
match body {
68+
Err(e) => {
69+
log_error!(logger, "Failed to read external scores update: {}", e);
70+
return;
71+
},
72+
Ok(body) => {
73+
let mut reader = Cursor::new(body);
74+
match ChannelLiquidities::read(&mut reader) {
75+
Ok(liquidities) => {
76+
let duration_since_epoch =
77+
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
78+
scorer.lock().unwrap().merge(liquidities, duration_since_epoch);
79+
let mut locked_node_metrics = node_metrics.write().unwrap();
80+
locked_node_metrics.latest_pathfinding_scores_sync_timestamp =
81+
Some(duration_since_epoch.as_secs());
82+
write_node_metrics(&*locked_node_metrics, kv_store, logger)
83+
.unwrap_or_else(|e| {
84+
log_error!(logger, "Persisting node metrics failed: {}", e);
85+
});
86+
log_trace!(logger, "External scores merged successfully");
87+
},
88+
Err(e) => {
89+
log_error!(logger, "Failed to parse external scores update: {}", e);
90+
return;
91+
},
92+
}
93+
},
94+
}
95+
},
96+
Err(e) => {
97+
log_error!(logger, "Retrieving external scores timed out: {}", e);
98+
return;
99+
},
100+
Ok(Err(e)) => {
101+
log_error!(logger, "Failed to retrieve external scores update: {}", e);
102+
return;
103+
},
104+
}
105+
}

0 commit comments

Comments
 (0)