Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 619c9c9

Browse files
committedFeb 3, 2025·
add functionality to periodically update routing scores from an external http source
1 parent c64d8c5 commit 619c9c9

File tree

8 files changed

+192
-12
lines changed

8 files changed

+192
-12
lines changed
 

‎Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ log = { version = "0.4.22", default-features = false, features = ["std"]}
8484

8585
vss-client = "0.3"
8686
prost = { version = "0.11.6", default-features = false}
87+
bytes = "1.9.0"
8788

8889
[target.'cfg(windows)'.dependencies]
8990
winapi = { version = "0.3", features = ["winbase"] }

‎bindings/ldk_node.udl

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ interface Builder {
5858
void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password);
5959
void set_gossip_source_p2p();
6060
void set_gossip_source_rgs(string rgs_server_url);
61+
void set_scoring_source(string url);
6162
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
6263
void set_storage_dir_path(string storage_dir_path);
6364
void set_filesystem_logger(string? log_file_path, LogLevel? log_level);
@@ -273,6 +274,7 @@ dictionary NodeStatus {
273274
u64? latest_onchain_wallet_sync_timestamp;
274275
u64? latest_fee_rate_cache_update_timestamp;
275276
u64? latest_rgs_snapshot_timestamp;
277+
u64? latest_scores_sync_timestamp;
276278
u64? latest_node_announcement_broadcast_timestamp;
277279
u32? latest_channel_monitor_archival_height;
278280
};

‎src/builder.rs

+44-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger}
2323
use crate::message_handler::NodeCustomMessageHandler;
2424
use crate::payment::store::PaymentStore;
2525
use crate::peer_store::PeerStore;
26+
use crate::scoring::ScoringSource;
2627
use crate::tx_broadcaster::TransactionBroadcaster;
2728
use crate::types::{
2829
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
@@ -41,7 +42,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
4142
use lightning::routing::gossip::NodeAlias;
4243
use lightning::routing::router::DefaultRouter;
4344
use lightning::routing::scoring::{
44-
ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters,
45+
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
46+
ProbabilisticScoringFeeParameters,
4547
};
4648
use lightning::sign::EntropySource;
4749

@@ -97,6 +99,11 @@ enum GossipSourceConfig {
9799
RapidGossipSync(String),
98100
}
99101

102+
#[derive(Debug, Clone)]
103+
struct ScoringSourceConfig {
104+
url: String,
105+
}
106+
100107
#[derive(Debug, Clone)]
101108
struct LiquiditySourceConfig {
102109
// LSPS2 service's (address, node_id, token)
@@ -229,6 +236,7 @@ pub struct NodeBuilder {
229236
gossip_source_config: Option<GossipSourceConfig>,
230237
liquidity_source_config: Option<LiquiditySourceConfig>,
231238
log_writer_config: Option<LogWriterConfig>,
239+
scoring_source_config: Option<ScoringSourceConfig>,
232240
}
233241

234242
impl NodeBuilder {
@@ -245,13 +253,15 @@ impl NodeBuilder {
245253
let gossip_source_config = None;
246254
let liquidity_source_config = None;
247255
let log_writer_config = None;
256+
let scoring_source_config = None;
248257
Self {
249258
config,
250259
entropy_source_config,
251260
chain_data_source_config,
252261
gossip_source_config,
253262
liquidity_source_config,
254263
log_writer_config,
264+
scoring_source_config,
255265
}
256266
}
257267

@@ -322,6 +332,13 @@ impl NodeBuilder {
322332
self
323333
}
324334

335+
/// Configures the [`Node`] instance to source its external scores from the given URL. These scores are used to
336+
/// augment the internal pathfinding scoring system to improve routing.
337+
pub fn set_scoring_source(&mut self, url: String) -> &mut Self {
338+
self.scoring_source_config = Some(ScoringSourceConfig { url });
339+
self
340+
}
341+
325342
/// Configures the [`Node`] instance to source its inbound liquidity from the given
326343
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
327344
/// service.
@@ -540,6 +557,7 @@ impl NodeBuilder {
540557
config,
541558
self.chain_data_source_config.as_ref(),
542559
self.gossip_source_config.as_ref(),
560+
self.scoring_source_config.as_ref(),
543561
self.liquidity_source_config.as_ref(),
544562
seed_bytes,
545563
logger,
@@ -562,6 +580,7 @@ impl NodeBuilder {
562580
config,
563581
self.chain_data_source_config.as_ref(),
564582
self.gossip_source_config.as_ref(),
583+
self.scoring_source_config.as_ref(),
565584
self.liquidity_source_config.as_ref(),
566585
seed_bytes,
567586
logger,
@@ -654,6 +673,12 @@ impl ArcedNodeBuilder {
654673
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
655674
}
656675

676+
/// Configures the [`Node`] instance to source its external scores from the given URL. These scores are used to
677+
/// augment the internal pathfinding scoring system to improve routing.
678+
pub fn set_scoring_source(&self, url: String) {
679+
self.inner.write().unwrap().scoring_source_config = Some(ScoringSourceConfig { url });
680+
}
681+
657682
/// Configures the [`Node`] instance to source its inbound liquidity from the given
658683
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
659684
/// service.
@@ -806,6 +831,7 @@ impl ArcedNodeBuilder {
806831
fn build_with_store_internal(
807832
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
808833
gossip_source_config: Option<&GossipSourceConfig>,
834+
scoring_source_config: Option<&ScoringSourceConfig>,
809835
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
810836
logger: Arc<Logger>, kv_store: Arc<DynStore>,
811837
) -> Result<Node, BuildError> {
@@ -950,26 +976,24 @@ fn build_with_store_internal(
950976
},
951977
};
952978

953-
let scorer = match io::utils::read_scorer(
979+
let local_scorer = match io::utils::read_scorer(
954980
Arc::clone(&kv_store),
955981
Arc::clone(&network_graph),
956982
Arc::clone(&logger),
957983
) {
958-
Ok(scorer) => Arc::new(Mutex::new(scorer)),
984+
Ok(scorer) => scorer,
959985
Err(e) => {
960986
if e.kind() == std::io::ErrorKind::NotFound {
961987
let params = ProbabilisticScoringDecayParameters::default();
962-
Arc::new(Mutex::new(ProbabilisticScorer::new(
963-
params,
964-
Arc::clone(&network_graph),
965-
Arc::clone(&logger),
966-
)))
988+
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
967989
} else {
968990
return Err(BuildError::ReadFailed);
969991
}
970992
},
971993
};
972994

995+
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
996+
973997
let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
974998
let router = Arc::new(DefaultRouter::new(
975999
Arc::clone(&network_graph),
@@ -1129,6 +1153,17 @@ fn build_with_store_internal(
11291153
},
11301154
};
11311155

1156+
let scoring_source = if let Some(config) = scoring_source_config {
1157+
Some(Arc::new(ScoringSource::new(
1158+
config.url.clone(),
1159+
Arc::clone(&scorer),
1160+
Arc::clone(&node_metrics),
1161+
Arc::clone(&logger),
1162+
)))
1163+
} else {
1164+
None
1165+
};
1166+
11321167
let liquidity_source = liquidity_source_config.as_ref().and_then(|lsc| {
11331168
lsc.lsps2_service.as_ref().map(|(address, node_id, token)| {
11341169
let lsps2_client_config = Some(LSPS2ClientConfig {});
@@ -1303,6 +1338,7 @@ fn build_with_store_internal(
13031338
keys_manager,
13041339
network_graph,
13051340
gossip_source,
1341+
scoring_source,
13061342
liquidity_source,
13071343
kv_store,
13081344
logger,

‎src/config.rs

+6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
5757
// The time in-between RGS sync attempts.
5858
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
5959

60+
// The time in-between external scores sync attempts.
61+
pub(crate) const EXTERNAL_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
62+
6063
// The time in-between node announcement broadcast attempts.
6164
pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);
6265

@@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5;
7881
// The timeout after which we abort a RGS sync operation.
7982
pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5;
8083

84+
// The timeout after which we abort a external scores sync operation.
85+
pub(crate) const EXTERNAL_SCORES_SYNC_TIMEOUT_SECS: u64 = 5;
86+
8187
// The length in bytes of our wallets' keys seed.
8288
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;
8389

‎src/io/utils.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@ use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
2020
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
2121
use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};
2222

23+
use bytes::Bytes;
2324
use lightning::io::Cursor;
2425
use lightning::ln::msgs::DecodeError;
2526
use lightning::routing::gossip::NetworkGraph;
26-
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters};
27+
use lightning::routing::scoring::{
28+
ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
29+
};
2730
use lightning::util::persist::{
2831
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
2932
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -166,6 +169,12 @@ where
166169
})
167170
}
168171

172+
/// Read external scores from a Bytes object.
173+
pub(crate) fn read_external_scores(bytes: Bytes) -> Result<ChannelLiquidities, DecodeError> {
174+
let mut reader = Cursor::new(bytes);
175+
ChannelLiquidities::read(&mut reader)
176+
}
177+
169178
/// Read previously persisted events from the store.
170179
pub(crate) fn read_event_queue<L: Deref + Clone>(
171180
kv_store: Arc<DynStore>, logger: L,

‎src/lib.rs

+52-1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ pub mod logger;
8989
mod message_handler;
9090
pub mod payment;
9191
mod peer_store;
92+
mod scoring;
9293
mod sweep;
9394
mod tx_broadcaster;
9495
mod types;
@@ -122,7 +123,8 @@ pub use builder::NodeBuilder as Builder;
122123

123124
use chain::ChainSource;
124125
use config::{
125-
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
126+
default_user_config, may_announce_channel, ChannelConfig, Config,
127+
EXTERNAL_SCORES_SYNC_INTERVAL, EXTERNAL_SCORES_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL,
126128
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
127129
};
128130
use connection::ConnectionManager;
@@ -137,6 +139,7 @@ use payment::{
137139
UnifiedQrPayment,
138140
};
139141
use peer_store::{PeerInfo, PeerStore};
142+
use scoring::ScoringSource;
140143
use types::{
141144
Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph,
142145
KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet,
@@ -190,6 +193,7 @@ pub struct Node {
190193
keys_manager: Arc<KeysManager>,
191194
network_graph: Arc<Graph>,
192195
gossip_source: Arc<GossipSource>,
196+
scoring_source: Option<Arc<ScoringSource>>,
193197
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
194198
kv_store: Arc<DynStore>,
195199
logger: Arc<Logger>,
@@ -304,6 +308,10 @@ impl Node {
304308
});
305309
}
306310

311+
if self.scoring_source.is_some() {
312+
self.setup_external_scores_syncing(&runtime);
313+
}
314+
307315
if let Some(listening_addresses) = &self.config.listening_addresses {
308316
// Setup networking
309317
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
@@ -634,6 +642,42 @@ impl Node {
634642
Ok(())
635643
}
636644

645+
/// Spawn a background task to sync external scores.
646+
fn setup_external_scores_syncing(&self, runtime: &tokio::runtime::Runtime) {
647+
let scoring_source = Arc::clone(&self.scoring_source.as_ref().unwrap());
648+
log_info!(
649+
self.logger,
650+
"External scores background syncing from {} enabled",
651+
scoring_source.get_url()
652+
);
653+
654+
let external_scores_sync_logger = Arc::clone(&self.logger);
655+
let mut stop_sync = self.stop_sender.subscribe();
656+
657+
runtime.spawn(async move {
658+
let mut interval = tokio::time::interval(EXTERNAL_SCORES_SYNC_INTERVAL);
659+
loop {
660+
tokio::select! {
661+
_ = stop_sync.changed() => {
662+
log_trace!(
663+
external_scores_sync_logger,
664+
"Stopping background syncing external scores.",
665+
);
666+
return;
667+
}
668+
_ = interval.tick() => {
669+
log_trace!(
670+
external_scores_sync_logger,
671+
"Background sync of external scores started.",
672+
);
673+
674+
scoring_source.sync_external_scores().await;
675+
}
676+
}
677+
}
678+
});
679+
}
680+
637681
/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
638682
///
639683
/// After this returns most API methods will return [`Error::NotRunning`].
@@ -725,6 +769,7 @@ impl Node {
725769
locked_node_metrics.latest_fee_rate_cache_update_timestamp;
726770
let latest_rgs_snapshot_timestamp =
727771
locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64);
772+
let latest_scores_sync_timestamp = locked_node_metrics.latest_scores_sync_timestamp;
728773
let latest_node_announcement_broadcast_timestamp =
729774
locked_node_metrics.latest_node_announcement_broadcast_timestamp;
730775
let latest_channel_monitor_archival_height =
@@ -738,6 +783,7 @@ impl Node {
738783
latest_onchain_wallet_sync_timestamp,
739784
latest_fee_rate_cache_update_timestamp,
740785
latest_rgs_snapshot_timestamp,
786+
latest_scores_sync_timestamp,
741787
latest_node_announcement_broadcast_timestamp,
742788
latest_channel_monitor_archival_height,
743789
}
@@ -1547,6 +1593,8 @@ pub struct NodeStatus {
15471593
///
15481594
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet.
15491595
pub latest_rgs_snapshot_timestamp: Option<u64>,
1596+
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores.
1597+
pub latest_scores_sync_timestamp: Option<u64>,
15501598
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
15511599
/// announcement.
15521600
///
@@ -1565,6 +1613,7 @@ pub(crate) struct NodeMetrics {
15651613
latest_onchain_wallet_sync_timestamp: Option<u64>,
15661614
latest_fee_rate_cache_update_timestamp: Option<u64>,
15671615
latest_rgs_snapshot_timestamp: Option<u32>,
1616+
latest_scores_sync_timestamp: Option<u64>,
15681617
latest_node_announcement_broadcast_timestamp: Option<u64>,
15691618
latest_channel_monitor_archival_height: Option<u32>,
15701619
}
@@ -1576,6 +1625,7 @@ impl Default for NodeMetrics {
15761625
latest_onchain_wallet_sync_timestamp: None,
15771626
latest_fee_rate_cache_update_timestamp: None,
15781627
latest_rgs_snapshot_timestamp: None,
1628+
latest_scores_sync_timestamp: None,
15791629
latest_node_announcement_broadcast_timestamp: None,
15801630
latest_channel_monitor_archival_height: None,
15811631
}
@@ -1589,6 +1639,7 @@ impl_writeable_tlv_based!(NodeMetrics, {
15891639
(6, latest_rgs_snapshot_timestamp, option),
15901640
(8, latest_node_announcement_broadcast_timestamp, option),
15911641
(10, latest_channel_monitor_archival_height, option),
1642+
(12, latest_scores_sync_timestamp, option),
15921643
});
15931644

15941645
pub(crate) fn total_anchor_channels_reserve_sats(

‎src/scoring.rs

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::{
2+
sync::{Arc, Mutex, RwLock},
3+
time::{Duration, SystemTime},
4+
};
5+
6+
use crate::{io::utils::read_external_scores, Graph, Logger, EXTERNAL_SCORES_SYNC_TIMEOUT_SECS};
7+
use crate::{logger::LdkLogger, NodeMetrics};
8+
use lightning::{log_error, log_trace, routing::scoring::CombinedScorer};
9+
10+
pub struct ScoringSource {
11+
logger: Arc<Logger>,
12+
scorer: Arc<Mutex<CombinedScorer<Arc<Graph>, Arc<Logger>>>>,
13+
metrics: Arc<RwLock<NodeMetrics>>,
14+
url: String,
15+
}
16+
17+
impl ScoringSource {
18+
pub fn new(
19+
url: String, scorer: Arc<Mutex<CombinedScorer<Arc<Graph>, Arc<Logger>>>>,
20+
metrics: Arc<RwLock<NodeMetrics>>, logger: Arc<Logger>,
21+
) -> Self {
22+
Self { logger, scorer, metrics, url }
23+
}
24+
25+
pub fn get_url(&self) -> String {
26+
return self.url.clone();
27+
}
28+
29+
pub async fn sync_external_scores(&self) -> () {
30+
let response = tokio::time::timeout(
31+
Duration::from_secs(EXTERNAL_SCORES_SYNC_TIMEOUT_SECS),
32+
reqwest::get(&self.url),
33+
)
34+
.await;
35+
36+
match response {
37+
Ok(Ok(response)) => {
38+
let body = response.bytes().await;
39+
match body {
40+
Err(e) => {
41+
log_error!(self.logger, "Failed to read external scores update: {}", e);
42+
return;
43+
},
44+
Ok(body) => match read_external_scores(body) {
45+
Ok(liquidities) => {
46+
let duration_since_epoch =
47+
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
48+
self.scorer.lock().unwrap().merge(liquidities, duration_since_epoch);
49+
self.metrics.write().unwrap().latest_scores_sync_timestamp =
50+
Some(duration_since_epoch.as_secs());
51+
log_trace!(self.logger, "External scores merged successfully");
52+
},
53+
Err(e) => {
54+
log_error!(
55+
self.logger,
56+
"Failed to parse external scores update: {}",
57+
e
58+
);
59+
return;
60+
},
61+
},
62+
}
63+
},
64+
Err(e) => {
65+
log_error!(self.logger, "Retrieving external scores timed out: {}", e);
66+
return;
67+
},
68+
Ok(Err(e)) => {
69+
log_error!(self.logger, "Failed to retrieve external scores update: {}", e);
70+
return;
71+
},
72+
}
73+
}
74+
}

‎src/types.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use lightning::ln::peer_handler::IgnoringMessageHandler;
2121
use lightning::ln::types::ChannelId;
2222
use lightning::routing::gossip;
2323
use lightning::routing::router::DefaultRouter;
24-
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
24+
use lightning::routing::scoring::CombinedScorer;
25+
use lightning::routing::scoring::ProbabilisticScoringFeeParameters;
2526
use lightning::sign::InMemorySigner;
2627
use lightning::util::persist::KVStore;
2728
use lightning::util::ser::{Readable, Writeable, Writer};
@@ -88,7 +89,7 @@ pub(crate) type Router = DefaultRouter<
8889
ProbabilisticScoringFeeParameters,
8990
Scorer,
9091
>;
91-
pub(crate) type Scorer = ProbabilisticScorer<Arc<Graph>, Arc<Logger>>;
92+
pub(crate) type Scorer = CombinedScorer<Arc<Graph>, Arc<Logger>>;
9293

9394
pub(crate) type Graph = gossip::NetworkGraph<Arc<Logger>>;
9495

0 commit comments

Comments
 (0)
Please sign in to comment.