Skip to content

Commit 5ae6a35

Browse files
committed
cache external pathfinding scores
Save external pathfinding scores in a cache so that they will be available immediately after a node restart. Otherwise there might be a time window where new scores need to be downloaded still and the node operates on local data only.
1 parent 51915a9 commit 5ae6a35

File tree

3 files changed

+84
-9
lines changed

3 files changed

+84
-9
lines changed

src/builder.rs

+18-2
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ use crate::event::EventQueue;
1616
use crate::fee_estimator::OnchainFeeEstimator;
1717
use crate::gossip::GossipSource;
1818
use crate::io::sqlite_store::SqliteStore;
19-
use crate::io::utils::{read_node_metrics, write_node_metrics};
19+
use crate::io::utils::{
20+
read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics,
21+
};
2022
use crate::io::vss_store::VssStore;
2123
use crate::liquidity::LiquiditySource;
22-
use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger};
24+
use crate::logger::{log_error, log_info, log_trace, LdkLogger, LogLevel, LogWriter, Logger};
2325
use crate::message_handler::NodeCustomMessageHandler;
2426
use crate::payment::store::PaymentStore;
2527
use crate::peer_store::PeerStore;
@@ -1002,6 +1004,20 @@ fn build_with_store_internal(
10021004

10031005
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
10041006

1007+
// Restore external pathfinding scores from cache if possible.
1008+
match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) {
1009+
Ok(external_scores) => {
1010+
scorer.lock().unwrap().merge(external_scores, cur_time);
1011+
log_trace!(logger, "External scores from cache merged successfully");
1012+
},
1013+
Err(e) => {
1014+
if e.kind() != std::io::ErrorKind::NotFound {
1015+
log_error!(logger, "Error while reading external scores from cache: {}", e);
1016+
return Err(BuildError::ReadFailed);
1017+
}
1018+
},
1019+
}
1020+
10051021
let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
10061022
let router = Arc::new(DefaultRouter::new(
10071023
Arc::clone(&network_graph),

src/io/utils.rs

+50-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};
2323
use lightning::io::Cursor;
2424
use lightning::ln::msgs::DecodeError;
2525
use lightning::routing::gossip::NetworkGraph;
26-
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters};
26+
use lightning::routing::scoring::{
27+
ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
28+
};
2729
use lightning::util::persist::{
2830
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
2931
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -52,6 +54,8 @@ use std::ops::Deref;
5254
use std::path::Path;
5355
use std::sync::Arc;
5456

57+
pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache";
58+
5559
/// Generates a random [BIP 39] mnemonic.
5660
///
5761
/// The result may be used to initialize the [`Node`] entropy, i.e., can be given to
@@ -166,6 +170,51 @@ where
166170
})
167171
}
168172

173+
/// Read previously persisted external pathfinding scores from the cache.
174+
pub(crate) fn read_external_pathfinding_scores_from_cache<L: Deref>(
175+
kv_store: Arc<DynStore>, logger: L,
176+
) -> Result<ChannelLiquidities, std::io::Error>
177+
where
178+
L::Target: LdkLogger,
179+
{
180+
let mut reader = Cursor::new(kv_store.read(
181+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
182+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
183+
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
184+
)?);
185+
ChannelLiquidities::read(&mut reader).map_err(|e| {
186+
log_error!(logger, "Failed to deserialize scorer: {}", e);
187+
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer")
188+
})
189+
}
190+
191+
/// Persist external pathfinding scores to the cache.
192+
pub(crate) fn write_external_pathfinding_scores_to_cache<L: Deref>(
193+
kv_store: Arc<DynStore>, data: &ChannelLiquidities, logger: L,
194+
) -> Result<(), Error>
195+
where
196+
L::Target: LdkLogger,
197+
{
198+
kv_store
199+
.write(
200+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
201+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
202+
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
203+
&data.encode(),
204+
)
205+
.map_err(|e| {
206+
log_error!(
207+
logger,
208+
"Writing data to key {}/{}/{} failed due to: {}",
209+
NODE_METRICS_PRIMARY_NAMESPACE,
210+
NODE_METRICS_SECONDARY_NAMESPACE,
211+
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
212+
e
213+
);
214+
Error::PersistenceFailed
215+
})
216+
}
217+
169218
/// Read previously persisted events from the store.
170219
pub(crate) fn read_event_queue<L: Deref + Clone>(
171220
kv_store: Arc<DynStore>, logger: L,

src/scoring.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ use std::{
44
time::{Duration, SystemTime},
55
};
66

7-
use crate::{logger::LdkLogger, NodeMetrics, Scorer};
7+
use crate::io::utils::write_external_pathfinding_scores_to_cache;
8+
use crate::logger::LdkLogger;
89
use crate::{
9-
write_node_metrics, DynStore, Logger, EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL,
10-
EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
10+
write_node_metrics, DynStore, Logger, NodeMetrics, Scorer,
11+
EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
1112
};
1213
use lightning::{
1314
log_error, log_info, log_trace, routing::scoring::ChannelLiquidities, util::ser::Readable,
@@ -78,15 +79,24 @@ async fn sync_external_scores(
7879
let mut reader = Cursor::new(body.unwrap());
7980
match ChannelLiquidities::read(&mut reader) {
8081
Ok(liquidities) => {
82+
if let Err(e) = write_external_pathfinding_scores_to_cache(
83+
Arc::clone(&kv_store),
84+
&liquidities,
85+
logger,
86+
) {
87+
log_error!(logger, "Failed to persist external scores to cache: {}", e);
88+
}
89+
8190
let duration_since_epoch =
8291
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
8392
scorer.lock().unwrap().merge(liquidities, duration_since_epoch);
8493
let mut locked_node_metrics = node_metrics.write().unwrap();
8594
locked_node_metrics.latest_pathfinding_scores_sync_timestamp =
8695
Some(duration_since_epoch.as_secs());
87-
write_node_metrics(&*locked_node_metrics, kv_store, logger).unwrap_or_else(|e| {
88-
log_error!(logger, "Persisting node metrics failed: {}", e);
89-
});
96+
write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), logger)
97+
.unwrap_or_else(|e| {
98+
log_error!(logger, "Persisting node metrics failed: {}", e);
99+
});
90100
log_trace!(logger, "External scores merged successfully");
91101
},
92102
Err(e) => {

0 commit comments

Comments
 (0)