Skip to content

Commit 1a4b051

Browse files
committed
add functionality to periodically update routing scores from an external http source
1 parent 16d1a11 commit 1a4b051

File tree

6 files changed

+169
-12
lines changed

6 files changed

+169
-12
lines changed

bindings/ldk_node.udl

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ interface Builder {
5858
void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password);
5959
void set_gossip_source_p2p();
6060
void set_gossip_source_rgs(string rgs_server_url);
61+
void set_pathfinding_scores_source(string url);
6162
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
6263
void set_storage_dir_path(string storage_dir_path);
6364
void set_filesystem_logger(string? log_file_path, LogLevel? max_log_level);
@@ -275,6 +276,7 @@ dictionary NodeStatus {
275276
u64? latest_onchain_wallet_sync_timestamp;
276277
u64? latest_fee_rate_cache_update_timestamp;
277278
u64? latest_rgs_snapshot_timestamp;
279+
u64? latest_pathfinding_scores_sync_timestamp;
278280
u64? latest_node_announcement_broadcast_timestamp;
279281
u32? latest_channel_monitor_archival_height;
280282
};

src/builder.rs

+36-8
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
4141
use lightning::routing::gossip::NodeAlias;
4242
use lightning::routing::router::DefaultRouter;
4343
use lightning::routing::scoring::{
44-
ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters,
44+
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
45+
ProbabilisticScoringFeeParameters,
4546
};
4647
use lightning::sign::EntropySource;
4748

@@ -97,6 +98,11 @@ enum GossipSourceConfig {
9798
RapidGossipSync(String),
9899
}
99100

101+
#[derive(Debug, Clone)]
102+
struct PathfindingScoresSyncConfig {
103+
url: String,
104+
}
105+
100106
#[derive(Debug, Clone)]
101107
struct LiquiditySourceConfig {
102108
// LSPS2 service's (address, node_id, token)
@@ -211,6 +217,7 @@ pub struct NodeBuilder {
211217
gossip_source_config: Option<GossipSourceConfig>,
212218
liquidity_source_config: Option<LiquiditySourceConfig>,
213219
log_writer_config: Option<LogWriterConfig>,
220+
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
214221
}
215222

216223
impl NodeBuilder {
@@ -227,13 +234,15 @@ impl NodeBuilder {
227234
let gossip_source_config = None;
228235
let liquidity_source_config = None;
229236
let log_writer_config = None;
237+
let pathfinding_scores_sync_config = None;
230238
Self {
231239
config,
232240
entropy_source_config,
233241
chain_data_source_config,
234242
gossip_source_config,
235243
liquidity_source_config,
236244
log_writer_config,
245+
pathfinding_scores_sync_config,
237246
}
238247
}
239248

@@ -304,6 +313,14 @@ impl NodeBuilder {
304313
self
305314
}
306315

316+
/// Configures the [`Node`] instance to source its external scores from the given URL.
317+
///
318+
/// The external scores are merged into the local scoring system to improve routing.
319+
pub fn set_pathfinding_scores_source(&mut self, url: String) -> &mut Self {
320+
self.pathfinding_scores_sync_config = Some(PathfindingScoresSyncConfig { url });
321+
self
322+
}
323+
307324
/// Configures the [`Node`] instance to source its inbound liquidity from the given
308325
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
309326
/// service.
@@ -529,6 +546,7 @@ impl NodeBuilder {
529546
config,
530547
self.chain_data_source_config.as_ref(),
531548
self.gossip_source_config.as_ref(),
549+
self.pathfinding_scores_sync_config.as_ref(),
532550
self.liquidity_source_config.as_ref(),
533551
seed_bytes,
534552
logger,
@@ -551,6 +569,7 @@ impl NodeBuilder {
551569
config,
552570
self.chain_data_source_config.as_ref(),
553571
self.gossip_source_config.as_ref(),
572+
self.pathfinding_scores_sync_config.as_ref(),
554573
self.liquidity_source_config.as_ref(),
555574
seed_bytes,
556575
logger,
@@ -643,6 +662,13 @@ impl ArcedNodeBuilder {
643662
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
644663
}
645664

665+
/// Configures the [`Node`] instance to source its external scores from the given URL.
666+
///
667+
/// The external scores are merged into the local scoring system to improve routing.
668+
pub fn set_pathfinding_scores_source(&self, url: String) {
669+
self.inner.write().unwrap().set_pathfinding_scores_source(url);
670+
}
671+
646672
/// Configures the [`Node`] instance to source its inbound liquidity from the given
647673
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
648674
/// service.
@@ -802,6 +828,7 @@ impl ArcedNodeBuilder {
802828
fn build_with_store_internal(
803829
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
804830
gossip_source_config: Option<&GossipSourceConfig>,
831+
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
805832
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
806833
logger: Arc<Logger>, kv_store: Arc<DynStore>,
807834
) -> Result<Node, BuildError> {
@@ -957,26 +984,24 @@ fn build_with_store_internal(
957984
},
958985
};
959986

960-
let scorer = match io::utils::read_scorer(
987+
let local_scorer = match io::utils::read_scorer(
961988
Arc::clone(&kv_store),
962989
Arc::clone(&network_graph),
963990
Arc::clone(&logger),
964991
) {
965-
Ok(scorer) => Arc::new(Mutex::new(scorer)),
992+
Ok(scorer) => scorer,
966993
Err(e) => {
967994
if e.kind() == std::io::ErrorKind::NotFound {
968995
let params = ProbabilisticScoringDecayParameters::default();
969-
Arc::new(Mutex::new(ProbabilisticScorer::new(
970-
params,
971-
Arc::clone(&network_graph),
972-
Arc::clone(&logger),
973-
)))
996+
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
974997
} else {
975998
return Err(BuildError::ReadFailed);
976999
}
9771000
},
9781001
};
9791002

1003+
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
1004+
9801005
let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
9811006
let router = Arc::new(DefaultRouter::new(
9821007
Arc::clone(&network_graph),
@@ -1282,6 +1307,8 @@ fn build_with_store_internal(
12821307
let (stop_sender, _) = tokio::sync::watch::channel(());
12831308
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
12841309

1310+
let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone());
1311+
12851312
Ok(Node {
12861313
runtime,
12871314
stop_sender,
@@ -1300,6 +1327,7 @@ fn build_with_store_internal(
13001327
keys_manager,
13011328
network_graph,
13021329
gossip_source,
1330+
pathfinding_scores_sync_url,
13031331
liquidity_source,
13041332
kv_store,
13051333
logger,

src/config.rs

+6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
5757
// The time in-between RGS sync attempts.
5858
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
5959

60+
// The time in-between external scores sync attempts.
61+
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
62+
6063
// The time in-between node announcement broadcast attempts.
6164
pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);
6265

@@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5;
7881
// The timeout after which we abort a RGS sync operation.
7982
pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5;
8083

84+
// The timeout after which we abort a external scores sync operation.
85+
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5;
86+
8187
// The length in bytes of our wallets' keys seed.
8288
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;
8389

src/lib.rs

+26-2
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ pub mod logger;
8989
mod message_handler;
9090
pub mod payment;
9191
mod peer_store;
92+
mod scoring;
9293
mod sweep;
9394
mod tx_broadcaster;
9495
mod types;
@@ -122,8 +123,9 @@ pub use builder::NodeBuilder as Builder;
122123

123124
use chain::ChainSource;
124125
use config::{
125-
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
126-
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
126+
default_user_config, may_announce_channel, ChannelConfig, Config,
127+
EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
128+
NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
127129
};
128130
use connection::ConnectionManager;
129131
use event::{EventHandler, EventQueue};
@@ -137,6 +139,7 @@ use payment::{
137139
UnifiedQrPayment,
138140
};
139141
use peer_store::{PeerInfo, PeerStore};
142+
use scoring::setup_background_pathfinding_scores_sync;
140143
use types::{
141144
Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph,
142145
KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet,
@@ -189,6 +192,7 @@ pub struct Node {
189192
keys_manager: Arc<KeysManager>,
190193
network_graph: Arc<Graph>,
191194
gossip_source: Arc<GossipSource>,
195+
pathfinding_scores_sync_url: Option<String>,
192196
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
193197
kv_store: Arc<DynStore>,
194198
logger: Arc<Logger>,
@@ -303,6 +307,18 @@ impl Node {
303307
});
304308
}
305309

310+
if let Some(pathfinding_scores_sync_url) = self.pathfinding_scores_sync_url.as_ref() {
311+
setup_background_pathfinding_scores_sync(
312+
pathfinding_scores_sync_url.clone(),
313+
Arc::clone(&self.scorer),
314+
Arc::clone(&self.node_metrics),
315+
Arc::clone(&self.kv_store),
316+
Arc::clone(&self.logger),
317+
&runtime,
318+
self.stop_sender.subscribe(),
319+
);
320+
}
321+
306322
if let Some(listening_addresses) = &self.config.listening_addresses {
307323
// Setup networking
308324
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
@@ -724,6 +740,8 @@ impl Node {
724740
locked_node_metrics.latest_fee_rate_cache_update_timestamp;
725741
let latest_rgs_snapshot_timestamp =
726742
locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64);
743+
let latest_pathfinding_scores_sync_timestamp =
744+
locked_node_metrics.latest_pathfinding_scores_sync_timestamp;
727745
let latest_node_announcement_broadcast_timestamp =
728746
locked_node_metrics.latest_node_announcement_broadcast_timestamp;
729747
let latest_channel_monitor_archival_height =
@@ -737,6 +755,7 @@ impl Node {
737755
latest_onchain_wallet_sync_timestamp,
738756
latest_fee_rate_cache_update_timestamp,
739757
latest_rgs_snapshot_timestamp,
758+
latest_pathfinding_scores_sync_timestamp,
740759
latest_node_announcement_broadcast_timestamp,
741760
latest_channel_monitor_archival_height,
742761
}
@@ -1565,6 +1584,8 @@ pub struct NodeStatus {
15651584
///
15661585
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet.
15671586
pub latest_rgs_snapshot_timestamp: Option<u64>,
1587+
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores.
1588+
pub latest_pathfinding_scores_sync_timestamp: Option<u64>,
15681589
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
15691590
/// announcement.
15701591
///
@@ -1583,6 +1604,7 @@ pub(crate) struct NodeMetrics {
15831604
latest_onchain_wallet_sync_timestamp: Option<u64>,
15841605
latest_fee_rate_cache_update_timestamp: Option<u64>,
15851606
latest_rgs_snapshot_timestamp: Option<u32>,
1607+
latest_pathfinding_scores_sync_timestamp: Option<u64>,
15861608
latest_node_announcement_broadcast_timestamp: Option<u64>,
15871609
latest_channel_monitor_archival_height: Option<u32>,
15881610
}
@@ -1594,6 +1616,7 @@ impl Default for NodeMetrics {
15941616
latest_onchain_wallet_sync_timestamp: None,
15951617
latest_fee_rate_cache_update_timestamp: None,
15961618
latest_rgs_snapshot_timestamp: None,
1619+
latest_pathfinding_scores_sync_timestamp: None,
15971620
latest_node_announcement_broadcast_timestamp: None,
15981621
latest_channel_monitor_archival_height: None,
15991622
}
@@ -1602,6 +1625,7 @@ impl Default for NodeMetrics {
16021625

16031626
impl_writeable_tlv_based!(NodeMetrics, {
16041627
(0, latest_lightning_wallet_sync_timestamp, option),
1628+
(1, latest_pathfinding_scores_sync_timestamp, option),
16051629
(2, latest_onchain_wallet_sync_timestamp, option),
16061630
(4, latest_fee_rate_cache_update_timestamp, option),
16071631
(6, latest_rgs_snapshot_timestamp, option),

src/scoring.rs

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::{
2+
io::Cursor,
3+
sync::{Arc, Mutex, RwLock},
4+
time::{Duration, SystemTime},
5+
};
6+
7+
use crate::{logger::LdkLogger, NodeMetrics, Scorer};
8+
use crate::{
9+
write_node_metrics, DynStore, Logger, EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL,
10+
EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
11+
};
12+
use lightning::{
13+
log_error, log_info, log_trace, routing::scoring::ChannelLiquidities, util::ser::Readable,
14+
};
15+
16+
/// Start a background task that periodically downloads scores via an external url and merges them into the local
17+
/// pathfinding scores.
18+
pub fn setup_background_pathfinding_scores_sync(
19+
url: String, scorer: Arc<Mutex<crate::types::Scorer>>, node_metrics: Arc<RwLock<NodeMetrics>>,
20+
kv_store: Arc<DynStore>, logger: Arc<Logger>, runtime: &tokio::runtime::Runtime,
21+
mut stop_receiver: tokio::sync::watch::Receiver<()>,
22+
) {
23+
log_info!(logger, "External scores background syncing enabled from {}", url);
24+
25+
let logger = Arc::clone(&logger);
26+
27+
runtime.spawn(async move {
28+
let mut interval = tokio::time::interval(EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL);
29+
loop {
30+
tokio::select! {
31+
_ = stop_receiver.changed() => {
32+
log_trace!(
33+
logger,
34+
"Stopping background syncing external scores.",
35+
);
36+
return;
37+
}
38+
_ = interval.tick() => {
39+
log_trace!(
40+
logger,
41+
"Background sync of external scores started.",
42+
);
43+
44+
sync_external_scores(logger.as_ref(), scorer.as_ref(), node_metrics.as_ref(), Arc::clone(&kv_store), &url).await;
45+
}
46+
}
47+
}
48+
});
49+
}
50+
51+
async fn sync_external_scores(
52+
logger: &Logger, scorer: &Mutex<Scorer>, node_metrics: &RwLock<NodeMetrics>,
53+
kv_store: Arc<DynStore>, url: &String,
54+
) -> () {
55+
let response = tokio::time::timeout(
56+
Duration::from_secs(EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS),
57+
reqwest::get(url),
58+
)
59+
.await;
60+
61+
if let Err(e) = response {
62+
log_error!(logger, "Retrieving external scores timed out: {}", e);
63+
return;
64+
}
65+
let response = response.unwrap();
66+
67+
if let Err(e) = response {
68+
log_error!(logger, "Failed to retrieve external scores update: {}", e);
69+
return;
70+
}
71+
72+
let body = response.unwrap().bytes().await;
73+
if let Err(e) = body {
74+
log_error!(logger, "Failed to read external scores update: {}", e);
75+
return;
76+
}
77+
78+
let mut reader = Cursor::new(body.unwrap());
79+
match ChannelLiquidities::read(&mut reader) {
80+
Ok(liquidities) => {
81+
let duration_since_epoch =
82+
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
83+
scorer.lock().unwrap().merge(liquidities, duration_since_epoch);
84+
let mut locked_node_metrics = node_metrics.write().unwrap();
85+
locked_node_metrics.latest_pathfinding_scores_sync_timestamp =
86+
Some(duration_since_epoch.as_secs());
87+
write_node_metrics(&*locked_node_metrics, kv_store, logger).unwrap_or_else(|e| {
88+
log_error!(logger, "Persisting node metrics failed: {}", e);
89+
});
90+
log_trace!(logger, "External scores merged successfully");
91+
},
92+
Err(e) => {
93+
log_error!(logger, "Failed to parse external scores update: {}", e);
94+
},
95+
}
96+
}

0 commit comments

Comments
 (0)