Skip to content

Commit f43cf34

Browse files
committed
Have LiqudidityManager wake BackgroundProcessor for msg processing
Instead of doing the callback dance, we have `lightning-background-processor` take `lightning-liquidity` as a dependency and wake the BP whenever we enqueue new messages to the `MessageQueue`.
1 parent dddd810 commit f43cf34

File tree

7 files changed

+124
-128
lines changed

7 files changed

+124
-128
lines changed

lightning-background-processor/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1515

1616
[features]
1717
futures = [ ]
18-
std = ["lightning/std", "bitcoin-io/std", "bitcoin_hashes/std"]
18+
std = ["lightning/std", "lightning-liquidity/std", "bitcoin-io/std", "bitcoin_hashes/std"]
1919

2020
default = ["std"]
2121

@@ -25,6 +25,7 @@ bitcoin_hashes = { version = "0.14.0", default-features = false }
2525
bitcoin-io = { version = "0.1.2", default-features = false }
2626
lightning = { version = "0.2.0", path = "../lightning", default-features = false }
2727
lightning-rapid-gossip-sync = { version = "0.2.0", path = "../lightning-rapid-gossip-sync", default-features = false }
28+
lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false }
2829

2930
[dev-dependencies]
3031
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }

lightning-background-processor/src/lib.rs

+82-17
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use lightning::util::persist::Persister;
4242
use lightning::util::wakers::Sleeper;
4343
use lightning_rapid_gossip_sync::RapidGossipSync;
4444

45+
use lightning_liquidity::ALiquidityManager;
46+
4547
use core::ops::Deref;
4648
use core::time::Duration;
4749

@@ -492,27 +494,31 @@ pub(crate) mod futures_util {
492494
A: Future<Output = ()> + Unpin,
493495
B: Future<Output = ()> + Unpin,
494496
C: Future<Output = ()> + Unpin,
495-
D: Future<Output = bool> + Unpin,
497+
D: Future<Output = ()> + Unpin,
498+
E: Future<Output = bool> + Unpin,
496499
> {
497500
pub a: A,
498501
pub b: B,
499502
pub c: C,
500503
pub d: D,
504+
pub e: E,
501505
}
502506

503507
pub(crate) enum SelectorOutput {
504508
A,
505509
B,
506510
C,
507-
D(bool),
511+
D,
512+
E(bool),
508513
}
509514

510515
impl<
511516
A: Future<Output = ()> + Unpin,
512517
B: Future<Output = ()> + Unpin,
513518
C: Future<Output = ()> + Unpin,
514-
D: Future<Output = bool> + Unpin,
515-
> Future for Selector<A, B, C, D>
519+
D: Future<Output = ()> + Unpin,
520+
E: Future<Output = bool> + Unpin,
521+
> Future for Selector<A, B, C, D, E>
516522
{
517523
type Output = SelectorOutput;
518524
fn poll(
@@ -537,8 +543,14 @@ pub(crate) mod futures_util {
537543
Poll::Pending => {},
538544
}
539545
match Pin::new(&mut self.d).poll(ctx) {
546+
Poll::Ready(()) => {
547+
return Poll::Ready(SelectorOutput::D);
548+
},
549+
Poll::Pending => {},
550+
}
551+
match Pin::new(&mut self.e).poll(ctx) {
540552
Poll::Ready(res) => {
541-
return Poll::Ready(SelectorOutput::D(res));
553+
return Poll::Ready(SelectorOutput::E(res));
542554
},
543555
Poll::Pending => {},
544556
}
@@ -648,6 +660,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
648660
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
649661
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
650662
/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
663+
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
651664
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
652665
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
653666
/// #
@@ -661,6 +674,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
661674
/// # event_handler: Arc<EventHandler>,
662675
/// # channel_manager: Arc<ChannelManager<B, F, FE>>,
663676
/// # onion_messenger: Arc<OnionMessenger<B, F, FE>>,
677+
/// # liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
664678
/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
665679
/// # gossip_sync: Arc<P2PGossipSync<UL>>,
666680
/// # persister: Arc<Store>,
@@ -681,6 +695,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
681695
/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
682696
/// let background_peer_man = Arc::clone(&node.peer_manager);
683697
/// let background_onion_messenger = Arc::clone(&node.onion_messenger);
698+
/// let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
684699
/// let background_logger = Arc::clone(&node.logger);
685700
/// let background_scorer = Arc::clone(&node.scorer);
686701
///
@@ -708,6 +723,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
708723
/// Some(background_onion_messenger),
709724
/// background_gossip_sync,
710725
/// background_peer_man,
726+
/// Some(background_liquidity_manager),
711727
/// background_logger,
712728
/// Some(background_scorer),
713729
/// sleeper,
@@ -745,6 +761,7 @@ pub async fn process_events_async<
745761
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
746762
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
747763
PM: 'static + Deref + Send + Sync,
764+
LM: 'static + Deref + Send + Sync,
748765
S: 'static + Deref<Target = SC> + Send + Sync,
749766
SC: for<'b> WriteableScore<'b>,
750767
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -753,8 +770,8 @@ pub async fn process_events_async<
753770
>(
754771
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
755772
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
756-
logger: L, scorer: Option<S>, sleeper: Sleeper, mobile_interruptable_platform: bool,
757-
fetch_time: FetchTime,
773+
liquidity_manager: Option<LM>, logger: L, scorer: Option<S>, sleeper: Sleeper,
774+
mobile_interruptable_platform: bool, fetch_time: FetchTime,
758775
) -> Result<(), lightning::io::Error>
759776
where
760777
UL::Target: 'static + UtxoLookup,
@@ -767,6 +784,7 @@ where
767784
CM::Target: AChannelManager + Send + Sync,
768785
OM::Target: AOnionMessenger + Send + Sync,
769786
PM::Target: APeerManager + Send + Sync,
787+
LM::Target: ALiquidityManager + Send + Sync,
770788
{
771789
let mut should_break = false;
772790
let async_event_handler = |event| {
@@ -820,19 +838,26 @@ where
820838
} else {
821839
OptionalSelector { optional_future: None }
822840
};
841+
let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
842+
let fut = lm.get_lm().get_pending_msgs_future();
843+
OptionalSelector { optional_future: Some(fut) }
844+
} else {
845+
OptionalSelector { optional_future: None }
846+
};
823847
let fut = Selector {
824848
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
825849
b: chain_monitor.get_update_future(),
826850
c: om_fut,
827-
d: sleeper(if mobile_interruptable_platform {
851+
d: lm_fut,
852+
e: sleeper(if mobile_interruptable_platform {
828853
Duration::from_millis(100)
829854
} else {
830855
Duration::from_secs(FASTEST_TIMER)
831856
}),
832857
};
833858
match fut.await {
834-
SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {},
835-
SelectorOutput::D(exit) => {
859+
SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {},
860+
SelectorOutput::E(exit) => {
836861
should_break = exit;
837862
},
838863
}
@@ -920,12 +945,13 @@ impl BackgroundProcessor {
920945
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
921946
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
922947
PM: 'static + Deref + Send + Sync,
948+
LM: 'static + Deref + Send + Sync,
923949
S: 'static + Deref<Target = SC> + Send + Sync,
924950
SC: for<'b> WriteableScore<'b>,
925951
>(
926952
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
927953
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
928-
logger: L, scorer: Option<S>,
954+
liquidity_manager: Option<LM>, logger: L, scorer: Option<S>,
929955
) -> Self
930956
where
931957
UL::Target: 'static + UtxoLookup,
@@ -938,6 +964,7 @@ impl BackgroundProcessor {
938964
CM::Target: AChannelManager + Send + Sync,
939965
OM::Target: AOnionMessenger + Send + Sync,
940966
PM::Target: APeerManager + Send + Sync,
967+
LM::Target: ALiquidityManager + Send + Sync,
941968
{
942969
let stop_thread = Arc::new(AtomicBool::new(false));
943970
let stop_thread_clone = stop_thread.clone();
@@ -977,17 +1004,27 @@ impl BackgroundProcessor {
9771004
scorer,
9781005
stop_thread.load(Ordering::Acquire),
9791006
{
980-
let sleeper = if let Some(om) = onion_messenger.as_ref() {
981-
Sleeper::from_three_futures(
1007+
let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1008+
(Some(om), Some(lm)) => Sleeper::from_four_futures(
9821009
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
9831010
&chain_monitor.get_update_future(),
9841011
&om.get_om().get_update_future(),
985-
)
986-
} else {
987-
Sleeper::from_two_futures(
1012+
&lm.get_lm().get_pending_msgs_future(),
1013+
),
1014+
(Some(om), None) => Sleeper::from_three_futures(
9881015
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
9891016
&chain_monitor.get_update_future(),
990-
)
1017+
&om.get_om().get_update_future(),
1018+
),
1019+
(None, Some(lm)) => Sleeper::from_three_futures(
1020+
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1021+
&chain_monitor.get_update_future(),
1022+
&lm.get_lm().get_pending_msgs_future(),
1023+
),
1024+
(None, None) => Sleeper::from_two_futures(
1025+
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1026+
&chain_monitor.get_update_future(),
1027+
),
9911028
};
9921029
sleeper.wait_timeout(Duration::from_millis(100));
9931030
},
@@ -1102,6 +1139,7 @@ mod tests {
11021139
use lightning::util::sweep::{OutputSpendStatus, OutputSweeper, PRUNE_DELAY_BLOCKS};
11031140
use lightning::util::test_utils;
11041141
use lightning::{get_event, get_event_msg};
1142+
use lightning_liquidity::LiquidityManager;
11051143
use lightning_persister::fs_store::FilesystemStore;
11061144
use lightning_rapid_gossip_sync::RapidGossipSync;
11071145
use std::collections::VecDeque;
@@ -1196,6 +1234,9 @@ mod tests {
11961234
IgnoringMessageHandler,
11971235
>;
11981236

1237+
type LM =
1238+
LiquidityManager<Arc<KeysManager>, Arc<ChannelManager>, Arc<dyn Filter + Sync + Send>>;
1239+
11991240
struct Node {
12001241
node: Arc<ChannelManager>,
12011242
messenger: Arc<OM>,
@@ -1212,6 +1253,7 @@ mod tests {
12121253
Arc<KeysManager>,
12131254
>,
12141255
>,
1256+
liquidity_manager: Arc<LM>,
12151257
chain_monitor: Arc<ChainMonitor>,
12161258
kv_store: Arc<FilesystemStore>,
12171259
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -1631,11 +1673,20 @@ mod tests {
16311673
logger.clone(),
16321674
keys_manager.clone(),
16331675
));
1676+
let liquidity_manager = Arc::new(LiquidityManager::new(
1677+
Arc::clone(&keys_manager),
1678+
Arc::clone(&manager),
1679+
None,
1680+
None,
1681+
None,
1682+
None,
1683+
));
16341684
let node = Node {
16351685
node: manager,
16361686
p2p_gossip_sync,
16371687
rapid_gossip_sync,
16381688
peer_manager,
1689+
liquidity_manager,
16391690
chain_monitor,
16401691
kv_store,
16411692
tx_broadcaster,
@@ -1833,6 +1884,7 @@ mod tests {
18331884
Some(nodes[0].messenger.clone()),
18341885
nodes[0].p2p_gossip_sync(),
18351886
nodes[0].peer_manager.clone(),
1887+
Some(Arc::clone(&nodes[0].liquidity_manager)),
18361888
nodes[0].logger.clone(),
18371889
Some(nodes[0].scorer.clone()),
18381890
);
@@ -1926,6 +1978,7 @@ mod tests {
19261978
Some(nodes[0].messenger.clone()),
19271979
nodes[0].no_gossip_sync(),
19281980
nodes[0].peer_manager.clone(),
1981+
Some(Arc::clone(&nodes[0].liquidity_manager)),
19291982
nodes[0].logger.clone(),
19301983
Some(nodes[0].scorer.clone()),
19311984
);
@@ -1968,6 +2021,7 @@ mod tests {
19682021
Some(nodes[0].messenger.clone()),
19692022
nodes[0].no_gossip_sync(),
19702023
nodes[0].peer_manager.clone(),
2024+
Some(Arc::clone(&nodes[0].liquidity_manager)),
19712025
nodes[0].logger.clone(),
19722026
Some(nodes[0].scorer.clone()),
19732027
);
@@ -2000,6 +2054,7 @@ mod tests {
20002054
Some(nodes[0].messenger.clone()),
20012055
nodes[0].rapid_gossip_sync(),
20022056
nodes[0].peer_manager.clone(),
2057+
Some(Arc::clone(&nodes[0].liquidity_manager)),
20032058
nodes[0].logger.clone(),
20042059
Some(nodes[0].scorer.clone()),
20052060
move |dur: Duration| {
@@ -2036,6 +2091,7 @@ mod tests {
20362091
Some(nodes[0].messenger.clone()),
20372092
nodes[0].p2p_gossip_sync(),
20382093
nodes[0].peer_manager.clone(),
2094+
Some(Arc::clone(&nodes[0].liquidity_manager)),
20392095
nodes[0].logger.clone(),
20402096
Some(nodes[0].scorer.clone()),
20412097
);
@@ -2065,6 +2121,7 @@ mod tests {
20652121
Some(nodes[0].messenger.clone()),
20662122
nodes[0].no_gossip_sync(),
20672123
nodes[0].peer_manager.clone(),
2124+
Some(Arc::clone(&nodes[0].liquidity_manager)),
20682125
nodes[0].logger.clone(),
20692126
Some(nodes[0].scorer.clone()),
20702127
);
@@ -2111,6 +2168,7 @@ mod tests {
21112168
Some(nodes[0].messenger.clone()),
21122169
nodes[0].no_gossip_sync(),
21132170
nodes[0].peer_manager.clone(),
2171+
Some(Arc::clone(&nodes[0].liquidity_manager)),
21142172
nodes[0].logger.clone(),
21152173
Some(nodes[0].scorer.clone()),
21162174
);
@@ -2173,6 +2231,7 @@ mod tests {
21732231
Some(nodes[0].messenger.clone()),
21742232
nodes[0].no_gossip_sync(),
21752233
nodes[0].peer_manager.clone(),
2234+
Some(Arc::clone(&nodes[0].liquidity_manager)),
21762235
nodes[0].logger.clone(),
21772236
Some(nodes[0].scorer.clone()),
21782237
);
@@ -2324,6 +2383,7 @@ mod tests {
23242383
Some(nodes[0].messenger.clone()),
23252384
nodes[0].no_gossip_sync(),
23262385
nodes[0].peer_manager.clone(),
2386+
Some(Arc::clone(&nodes[0].liquidity_manager)),
23272387
nodes[0].logger.clone(),
23282388
Some(nodes[0].scorer.clone()),
23292389
);
@@ -2353,6 +2413,7 @@ mod tests {
23532413
Some(nodes[0].messenger.clone()),
23542414
nodes[0].no_gossip_sync(),
23552415
nodes[0].peer_manager.clone(),
2416+
Some(Arc::clone(&nodes[0].liquidity_manager)),
23562417
nodes[0].logger.clone(),
23572418
Some(nodes[0].scorer.clone()),
23582419
);
@@ -2448,6 +2509,7 @@ mod tests {
24482509
Some(nodes[0].messenger.clone()),
24492510
nodes[0].rapid_gossip_sync(),
24502511
nodes[0].peer_manager.clone(),
2512+
Some(Arc::clone(&nodes[0].liquidity_manager)),
24512513
nodes[0].logger.clone(),
24522514
Some(nodes[0].scorer.clone()),
24532515
);
@@ -2480,6 +2542,7 @@ mod tests {
24802542
Some(nodes[0].messenger.clone()),
24812543
nodes[0].rapid_gossip_sync(),
24822544
nodes[0].peer_manager.clone(),
2545+
Some(Arc::clone(&nodes[0].liquidity_manager)),
24832546
nodes[0].logger.clone(),
24842547
Some(nodes[0].scorer.clone()),
24852548
move |dur: Duration| {
@@ -2638,6 +2701,7 @@ mod tests {
26382701
Some(nodes[0].messenger.clone()),
26392702
nodes[0].no_gossip_sync(),
26402703
nodes[0].peer_manager.clone(),
2704+
Some(Arc::clone(&nodes[0].liquidity_manager)),
26412705
nodes[0].logger.clone(),
26422706
Some(nodes[0].scorer.clone()),
26432707
);
@@ -2688,6 +2752,7 @@ mod tests {
26882752
Some(nodes[0].messenger.clone()),
26892753
nodes[0].no_gossip_sync(),
26902754
nodes[0].peer_manager.clone(),
2755+
Some(Arc::clone(&nodes[0].liquidity_manager)),
26912756
nodes[0].logger.clone(),
26922757
Some(nodes[0].scorer.clone()),
26932758
move |dur: Duration| {

lightning-liquidity/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,6 @@ mod sync;
7474
mod tests;
7575
mod utils;
7676

77-
pub use manager::{LiquidityClientConfig, LiquidityManager, LiquidityServiceConfig};
77+
pub use manager::{
78+
ALiquidityManager, LiquidityClientConfig, LiquidityManager, LiquidityServiceConfig,
79+
};

0 commit comments

Comments
 (0)