Skip to content

Commit 252b2d3

Browse files
committed
refactor hashmap to channelliquidities struct
Wrap the liquidities hash map into a struct so that decay and serialization functionality can be attached. This allows external data to be serialized into this struct and decayed to make it comparable and mergeable.
1 parent c5fd164 commit 252b2d3

File tree

1 file changed

+86
-32
lines changed

1 file changed

+86
-32
lines changed

lightning/src/routing/scoring.rs

+86-32
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,14 @@ use crate::routing::router::{Path, CandidateRouteHop, PublicHopCandidate};
5757
use crate::routing::log_approx;
5858
use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer};
5959
use crate::util::logger::Logger;
60-
6160
use crate::prelude::*;
61+
use crate::prelude::hash_map::Entry;
6262
use core::{cmp, fmt};
6363
use core::ops::{Deref, DerefMut};
6464
use core::time::Duration;
6565
use crate::io::{self, Read};
6666
use crate::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
67+
use bucketed_history::{LegacyHistoricalBucketRangeTracker, HistoricalBucketRangeTracker, DirectedHistoricalLiquidityTracker, HistoricalLiquidityTracker};
6768
#[cfg(not(c_bindings))]
6869
use {
6970
core::cell::{RefCell, RefMut, Ref},
@@ -474,7 +475,86 @@ where L::Target: Logger {
474475
decay_params: ProbabilisticScoringDecayParameters,
475476
network_graph: G,
476477
logger: L,
477-
channel_liquidities: HashMap<u64, ChannelLiquidity>,
478+
channel_liquidities: ChannelLiquidities,
479+
}
480+
/// Container for live and historical liquidity bounds for each channel.
481+
pub struct ChannelLiquidities(HashMap<u64, ChannelLiquidity>);
482+
483+
impl ChannelLiquidities {
484+
fn new() -> Self {
485+
Self(new_hash_map())
486+
}
487+
488+
fn time_passed(&mut self, duration_since_epoch: Duration, decay_params: ProbabilisticScoringDecayParameters) {
489+
self.0.retain(|_scid, liquidity| {
490+
liquidity.min_liquidity_offset_msat =
491+
liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params);
492+
liquidity.max_liquidity_offset_msat =
493+
liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params);
494+
liquidity.last_updated = duration_since_epoch;
495+
496+
// Only decay the historical buckets if there hasn't been new data for a while. This ties back to our
497+
// earlier conclusion that fixed half-lives for scoring data are inherently flawed—they tend to be either
498+
// too fast or too slow. Ideally, historical buckets should only decay as new data is added, which naturally
499+
// happens when fresh data arrives. However, scoring a channel based on month-old data while treating it the
500+
// same as one with minute-old data is problematic. To address this, we introduced a decay mechanism, but it
501+
// runs very slowly and only activates when no new data has been received for a while, as our preference is
502+
// to decay based on incoming data.
503+
let elapsed_time =
504+
duration_since_epoch.saturating_sub(liquidity.offset_history_last_updated);
505+
if elapsed_time > decay_params.historical_no_updates_half_life {
506+
let half_life = decay_params.historical_no_updates_half_life.as_secs_f64();
507+
if half_life != 0.0 {
508+
liquidity.liquidity_history.decay_buckets(elapsed_time.as_secs_f64() / half_life);
509+
liquidity.offset_history_last_updated = duration_since_epoch;
510+
}
511+
}
512+
liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 ||
513+
liquidity.liquidity_history.has_datapoints()
514+
});
515+
}
516+
517+
fn get(&self, short_channel_id: &u64) -> Option<&ChannelLiquidity> {
518+
self.0.get(short_channel_id)
519+
}
520+
521+
fn insert(&mut self, short_channel_id: u64, liquidity: ChannelLiquidity) -> Option<ChannelLiquidity> {
522+
self.0.insert(short_channel_id, liquidity)
523+
}
524+
525+
fn iter(&self) -> impl Iterator<Item = (&u64, &ChannelLiquidity)> {
526+
self.0.iter()
527+
}
528+
529+
fn entry(&mut self, short_channel_id: u64) -> Entry<u64, ChannelLiquidity, RandomState> {
530+
self.0.entry(short_channel_id)
531+
}
532+
533+
#[cfg(test)]
534+
fn get_mut(&mut self, short_channel_id: &u64) -> Option<&mut ChannelLiquidity> {
535+
self.0.get_mut(short_channel_id)
536+
}
537+
}
538+
539+
impl Readable for ChannelLiquidities {
540+
#[inline]
541+
fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
542+
let mut channel_liquidities = new_hash_map();
543+
read_tlv_fields!(r, {
544+
(0, channel_liquidities, required),
545+
});
546+
Ok(ChannelLiquidities(channel_liquidities))
547+
}
548+
}
549+
550+
impl Writeable for ChannelLiquidities {
551+
#[inline]
552+
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
553+
write_tlv_fields!(w, {
554+
(0, self.0, required),
555+
});
556+
Ok(())
557+
}
478558
}
479559

480560
/// Parameters for configuring [`ProbabilisticScorer`].
@@ -849,7 +929,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ProbabilisticScorer<G, L> whe
849929
decay_params,
850930
network_graph,
851931
logger,
852-
channel_liquidities: new_hash_map(),
932+
channel_liquidities: ChannelLiquidities::new(),
853933
}
854934
}
855935

@@ -1603,26 +1683,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ScoreUpdate for Probabilistic
16031683
}
16041684

16051685
fn time_passed(&mut self, duration_since_epoch: Duration) {
1606-
let decay_params = self.decay_params;
1607-
self.channel_liquidities.retain(|_scid, liquidity| {
1608-
liquidity.min_liquidity_offset_msat =
1609-
liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params);
1610-
liquidity.max_liquidity_offset_msat =
1611-
liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params);
1612-
liquidity.last_updated = duration_since_epoch;
1613-
1614-
let elapsed_time =
1615-
duration_since_epoch.saturating_sub(liquidity.offset_history_last_updated);
1616-
if elapsed_time > decay_params.historical_no_updates_half_life {
1617-
let half_life = decay_params.historical_no_updates_half_life.as_secs_f64();
1618-
if half_life != 0.0 {
1619-
liquidity.liquidity_history.decay_buckets(elapsed_time.as_secs_f64() / half_life);
1620-
liquidity.offset_history_last_updated = duration_since_epoch;
1621-
}
1622-
}
1623-
liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 ||
1624-
liquidity.liquidity_history.has_datapoints()
1625-
});
1686+
self.channel_liquidities.time_passed(duration_since_epoch, self.decay_params);
16261687
}
16271688
}
16281689

@@ -2060,15 +2121,11 @@ mod bucketed_history {
20602121
}
20612122
}
20622123
}
2063-
use bucketed_history::{LegacyHistoricalBucketRangeTracker, HistoricalBucketRangeTracker, DirectedHistoricalLiquidityTracker, HistoricalLiquidityTracker};
20642124

20652125
impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> Writeable for ProbabilisticScorer<G, L> where L::Target: Logger {
20662126
#[inline]
20672127
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
2068-
write_tlv_fields!(w, {
2069-
(0, self.channel_liquidities, required),
2070-
});
2071-
Ok(())
2128+
self.channel_liquidities.write(w)
20722129
}
20732130
}
20742131

@@ -2079,10 +2136,7 @@ ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScore
20792136
r: &mut R, args: (ProbabilisticScoringDecayParameters, G, L)
20802137
) -> Result<Self, DecodeError> {
20812138
let (decay_params, network_graph, logger) = args;
2082-
let mut channel_liquidities = new_hash_map();
2083-
read_tlv_fields!(r, {
2084-
(0, channel_liquidities, required),
2085-
});
2139+
let channel_liquidities = ChannelLiquidities::read(r)?;
20862140
Ok(Self {
20872141
decay_params,
20882142
network_graph,

0 commit comments

Comments
 (0)