Skip to content

Commit 0cf5774

Browse files
committed
add functionality to periodically update routing scores from an external http source
1 parent c64d8c5 commit 0cf5774

File tree

8 files changed

+193
-12
lines changed

8 files changed

+193
-12
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ log = { version = "0.4.22", default-features = false, features = ["std"]}
8484

8585
vss-client = "0.3"
8686
prost = { version = "0.11.6", default-features = false}
87+
bytes = "1.9.0"
8788

8889
[target.'cfg(windows)'.dependencies]
8990
winapi = { version = "0.3", features = ["winbase"] }

bindings/ldk_node.udl

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ interface Builder {
5858
void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password);
5959
void set_gossip_source_p2p();
6060
void set_gossip_source_rgs(string rgs_server_url);
61+
void set_scoring_source(string url);
6162
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
6263
void set_storage_dir_path(string storage_dir_path);
6364
void set_filesystem_logger(string? log_file_path, LogLevel? log_level);
@@ -273,6 +274,7 @@ dictionary NodeStatus {
273274
u64? latest_onchain_wallet_sync_timestamp;
274275
u64? latest_fee_rate_cache_update_timestamp;
275276
u64? latest_rgs_snapshot_timestamp;
277+
u64? latest_scores_sync_timestamp;
276278
u64? latest_node_announcement_broadcast_timestamp;
277279
u32? latest_channel_monitor_archival_height;
278280
};

src/builder.rs

+44-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger}
2323
use crate::message_handler::NodeCustomMessageHandler;
2424
use crate::payment::store::PaymentStore;
2525
use crate::peer_store::PeerStore;
26+
use crate::scoring::ScoringSource;
2627
use crate::tx_broadcaster::TransactionBroadcaster;
2728
use crate::types::{
2829
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
@@ -41,7 +42,8 @@ use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
4142
use lightning::routing::gossip::NodeAlias;
4243
use lightning::routing::router::DefaultRouter;
4344
use lightning::routing::scoring::{
44-
ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters,
45+
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
46+
ProbabilisticScoringFeeParameters,
4547
};
4648
use lightning::sign::EntropySource;
4749

@@ -97,6 +99,11 @@ enum GossipSourceConfig {
9799
RapidGossipSync(String),
98100
}
99101

102+
#[derive(Debug, Clone)]
103+
struct ScoringSourceConfig {
104+
url: String,
105+
}
106+
100107
#[derive(Debug, Clone)]
101108
struct LiquiditySourceConfig {
102109
// LSPS2 service's (address, node_id, token)
@@ -229,6 +236,7 @@ pub struct NodeBuilder {
229236
gossip_source_config: Option<GossipSourceConfig>,
230237
liquidity_source_config: Option<LiquiditySourceConfig>,
231238
log_writer_config: Option<LogWriterConfig>,
239+
scoring_source_config: Option<ScoringSourceConfig>,
232240
}
233241

234242
impl NodeBuilder {
@@ -245,13 +253,15 @@ impl NodeBuilder {
245253
let gossip_source_config = None;
246254
let liquidity_source_config = None;
247255
let log_writer_config = None;
256+
let scoring_source_config = None;
248257
Self {
249258
config,
250259
entropy_source_config,
251260
chain_data_source_config,
252261
gossip_source_config,
253262
liquidity_source_config,
254263
log_writer_config,
264+
scoring_source_config,
255265
}
256266
}
257267

@@ -322,6 +332,13 @@ impl NodeBuilder {
322332
self
323333
}
324334

335+
/// Configures the [`Node`] instance to source its external scores from the given URL. These scores are used to
336+
/// augment the internal pathfinding scoring system to improve routing.
337+
pub fn set_scoring_source(&mut self, url: String) -> &mut Self {
338+
self.scoring_source_config = Some(ScoringSourceConfig { url });
339+
self
340+
}
341+
325342
/// Configures the [`Node`] instance to source its inbound liquidity from the given
326343
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
327344
/// service.
@@ -540,6 +557,7 @@ impl NodeBuilder {
540557
config,
541558
self.chain_data_source_config.as_ref(),
542559
self.gossip_source_config.as_ref(),
560+
self.scoring_source_config.as_ref(),
543561
self.liquidity_source_config.as_ref(),
544562
seed_bytes,
545563
logger,
@@ -562,6 +580,7 @@ impl NodeBuilder {
562580
config,
563581
self.chain_data_source_config.as_ref(),
564582
self.gossip_source_config.as_ref(),
583+
self.scoring_source_config.as_ref(),
565584
self.liquidity_source_config.as_ref(),
566585
seed_bytes,
567586
logger,
@@ -654,6 +673,12 @@ impl ArcedNodeBuilder {
654673
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
655674
}
656675

676+
/// Configures the [`Node`] instance to source its external scores from the given URL. These scores are used to
677+
/// augment the internal pathfinding scoring system to improve routing.
678+
pub fn set_scoring_source(&self, url: String) {
679+
self.inner.write().unwrap().scoring_source_config = Some(ScoringSourceConfig { url });
680+
}
681+
657682
/// Configures the [`Node`] instance to source its inbound liquidity from the given
658683
/// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md)
659684
/// service.
@@ -806,6 +831,7 @@ impl ArcedNodeBuilder {
806831
fn build_with_store_internal(
807832
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
808833
gossip_source_config: Option<&GossipSourceConfig>,
834+
scoring_source_config: Option<&ScoringSourceConfig>,
809835
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
810836
logger: Arc<Logger>, kv_store: Arc<DynStore>,
811837
) -> Result<Node, BuildError> {
@@ -950,26 +976,24 @@ fn build_with_store_internal(
950976
},
951977
};
952978

953-
let scorer = match io::utils::read_scorer(
979+
let local_scorer = match io::utils::read_scorer(
954980
Arc::clone(&kv_store),
955981
Arc::clone(&network_graph),
956982
Arc::clone(&logger),
957983
) {
958-
Ok(scorer) => Arc::new(Mutex::new(scorer)),
984+
Ok(scorer) => scorer,
959985
Err(e) => {
960986
if e.kind() == std::io::ErrorKind::NotFound {
961987
let params = ProbabilisticScoringDecayParameters::default();
962-
Arc::new(Mutex::new(ProbabilisticScorer::new(
963-
params,
964-
Arc::clone(&network_graph),
965-
Arc::clone(&logger),
966-
)))
988+
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
967989
} else {
968990
return Err(BuildError::ReadFailed);
969991
}
970992
},
971993
};
972994

995+
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
996+
973997
let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
974998
let router = Arc::new(DefaultRouter::new(
975999
Arc::clone(&network_graph),
@@ -1129,6 +1153,17 @@ fn build_with_store_internal(
11291153
},
11301154
};
11311155

1156+
let scoring_source = if let Some(config) = scoring_source_config {
1157+
Some(Arc::new(ScoringSource::new(
1158+
config.url.clone(),
1159+
Arc::clone(&scorer),
1160+
Arc::clone(&node_metrics),
1161+
Arc::clone(&logger),
1162+
)))
1163+
} else {
1164+
None
1165+
};
1166+
11321167
let liquidity_source = liquidity_source_config.as_ref().and_then(|lsc| {
11331168
lsc.lsps2_service.as_ref().map(|(address, node_id, token)| {
11341169
let lsps2_client_config = Some(LSPS2ClientConfig {});
@@ -1303,6 +1338,7 @@ fn build_with_store_internal(
13031338
keys_manager,
13041339
network_graph,
13051340
gossip_source,
1341+
scoring_source,
13061342
liquidity_source,
13071343
kv_store,
13081344
logger,

src/config.rs

+6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
5757
// The time in-between RGS sync attempts.
5858
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
5959

60+
// The time in-between external scores sync attempts.
61+
pub(crate) const EXTERNAL_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
62+
6063
// The time in-between node announcement broadcast attempts.
6164
pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);
6265

@@ -78,6 +81,9 @@ pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5;
7881
// The timeout after which we abort a RGS sync operation.
7982
pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5;
8083

84+
// The timeout after which we abort a external scores sync operation.
85+
pub(crate) const EXTERNAL_SCORES_SYNC_TIMEOUT_SECS: u64 = 5;
86+
8187
// The length in bytes of our wallets' keys seed.
8288
pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64;
8389

src/io/utils.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@ use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
2020
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
2121
use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};
2222

23+
use bytes::Bytes;
2324
use lightning::io::Cursor;
2425
use lightning::ln::msgs::DecodeError;
2526
use lightning::routing::gossip::NetworkGraph;
26-
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters};
27+
use lightning::routing::scoring::{
28+
ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
29+
};
2730
use lightning::util::persist::{
2831
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
2932
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -166,6 +169,12 @@ where
166169
})
167170
}
168171

172+
/// Read external scores from a Bytes object.
173+
pub(crate) fn read_external_scores(bytes: Bytes) -> Result<ChannelLiquidities, DecodeError> {
174+
let mut reader = Cursor::new(bytes);
175+
ChannelLiquidities::read(&mut reader)
176+
}
177+
169178
/// Read previously persisted events from the store.
170179
pub(crate) fn read_event_queue<L: Deref + Clone>(
171180
kv_store: Arc<DynStore>, logger: L,

src/lib.rs

+53-1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ pub mod logger;
8989
mod message_handler;
9090
pub mod payment;
9191
mod peer_store;
92+
mod scoring;
9293
mod sweep;
9394
mod tx_broadcaster;
9495
mod types;
@@ -122,7 +123,8 @@ pub use builder::NodeBuilder as Builder;
122123

123124
use chain::ChainSource;
124125
use config::{
125-
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
126+
default_user_config, may_announce_channel, ChannelConfig, Config,
127+
EXTERNAL_SCORES_SYNC_INTERVAL, EXTERNAL_SCORES_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL,
126128
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
127129
};
128130
use connection::ConnectionManager;
@@ -137,6 +139,7 @@ use payment::{
137139
UnifiedQrPayment,
138140
};
139141
use peer_store::{PeerInfo, PeerStore};
142+
use scoring::ScoringSource;
140143
use types::{
141144
Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph,
142145
KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet,
@@ -190,6 +193,7 @@ pub struct Node {
190193
keys_manager: Arc<KeysManager>,
191194
network_graph: Arc<Graph>,
192195
gossip_source: Arc<GossipSource>,
196+
scoring_source: Option<Arc<ScoringSource>>,
193197
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
194198
kv_store: Arc<DynStore>,
195199
logger: Arc<Logger>,
@@ -304,6 +308,11 @@ impl Node {
304308
});
305309
}
306310

311+
// If scoring source is set
312+
if self.scoring_source.is_some() {
313+
self.setup_external_scores_syncing(&runtime);
314+
}
315+
307316
if let Some(listening_addresses) = &self.config.listening_addresses {
308317
// Setup networking
309318
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
@@ -634,6 +643,42 @@ impl Node {
634643
Ok(())
635644
}
636645

646+
/// Spawn a background task to sync external scores from the given `url`.
647+
fn setup_external_scores_syncing(&self, runtime: &tokio::runtime::Runtime) {
648+
let scoring_source = Arc::clone(&self.scoring_source.as_ref().unwrap());
649+
log_info!(
650+
self.logger,
651+
"External scores background syncing from {} enabled",
652+
scoring_source.get_url()
653+
);
654+
655+
let external_scores_sync_logger = Arc::clone(&self.logger);
656+
let mut stop_sync = self.stop_sender.subscribe();
657+
658+
runtime.spawn(async move {
659+
let mut interval = tokio::time::interval(EXTERNAL_SCORES_SYNC_INTERVAL);
660+
loop {
661+
tokio::select! {
662+
_ = stop_sync.changed() => {
663+
log_trace!(
664+
external_scores_sync_logger,
665+
"Stopping background syncing external scores.",
666+
);
667+
return;
668+
}
669+
_ = interval.tick() => {
670+
log_trace!(
671+
external_scores_sync_logger,
672+
"Background sync of external scores started.",
673+
);
674+
675+
scoring_source.sync_external_scores().await;
676+
}
677+
}
678+
}
679+
});
680+
}
681+
637682
/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
638683
///
639684
/// After this returns most API methods will return [`Error::NotRunning`].
@@ -725,6 +770,7 @@ impl Node {
725770
locked_node_metrics.latest_fee_rate_cache_update_timestamp;
726771
let latest_rgs_snapshot_timestamp =
727772
locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64);
773+
let latest_scores_sync_timestamp = locked_node_metrics.latest_scores_sync_timestamp;
728774
let latest_node_announcement_broadcast_timestamp =
729775
locked_node_metrics.latest_node_announcement_broadcast_timestamp;
730776
let latest_channel_monitor_archival_height =
@@ -738,6 +784,7 @@ impl Node {
738784
latest_onchain_wallet_sync_timestamp,
739785
latest_fee_rate_cache_update_timestamp,
740786
latest_rgs_snapshot_timestamp,
787+
latest_scores_sync_timestamp,
741788
latest_node_announcement_broadcast_timestamp,
742789
latest_channel_monitor_archival_height,
743790
}
@@ -1547,6 +1594,8 @@ pub struct NodeStatus {
15471594
///
15481595
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet.
15491596
pub latest_rgs_snapshot_timestamp: Option<u64>,
1597+
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores.
1598+
pub latest_scores_sync_timestamp: Option<u64>,
15501599
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
15511600
/// announcement.
15521601
///
@@ -1565,6 +1614,7 @@ pub(crate) struct NodeMetrics {
15651614
latest_onchain_wallet_sync_timestamp: Option<u64>,
15661615
latest_fee_rate_cache_update_timestamp: Option<u64>,
15671616
latest_rgs_snapshot_timestamp: Option<u32>,
1617+
latest_scores_sync_timestamp: Option<u64>,
15681618
latest_node_announcement_broadcast_timestamp: Option<u64>,
15691619
latest_channel_monitor_archival_height: Option<u32>,
15701620
}
@@ -1576,6 +1626,7 @@ impl Default for NodeMetrics {
15761626
latest_onchain_wallet_sync_timestamp: None,
15771627
latest_fee_rate_cache_update_timestamp: None,
15781628
latest_rgs_snapshot_timestamp: None,
1629+
latest_scores_sync_timestamp: None,
15791630
latest_node_announcement_broadcast_timestamp: None,
15801631
latest_channel_monitor_archival_height: None,
15811632
}
@@ -1589,6 +1640,7 @@ impl_writeable_tlv_based!(NodeMetrics, {
15891640
(6, latest_rgs_snapshot_timestamp, option),
15901641
(8, latest_node_announcement_broadcast_timestamp, option),
15911642
(10, latest_channel_monitor_archival_height, option),
1643+
(12, latest_scores_sync_timestamp, option),
15921644
});
15931645

15941646
pub(crate) fn total_anchor_channels_reserve_sats(

0 commit comments

Comments
 (0)