Skip to content

Commit 9063901

Browse files
committed
Decouple spending from chain notifications
To prepare for asynchronous processing of the sweep, we need to decouple the spending from the chain notifications. These notifications run in a sync context and wouldn't allow calls into an async trait. Instead we now periodically call into the sweeper, to open up the possibility to do so from an async context if desired.
1 parent f507778 commit 9063901

File tree

2 files changed

+138
-86
lines changed

2 files changed

+138
-86
lines changed

lightning-background-processor/src/lib.rs

+92-13
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3737
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
3838
use lightning::routing::utxo::UtxoLookup;
39+
use lightning::sign::{ChangeDestinationSource, OutputSpender};
3940
use lightning::util::logger::Logger;
40-
use lightning::util::persist::Persister;
41+
use lightning::util::persist::{KVStore, Persister};
42+
use lightning::util::sweep::OutputSweeper;
4143
#[cfg(feature = "std")]
4244
use lightning::util::wakers::Sleeper;
4345
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -132,6 +134,11 @@ const REBROADCAST_TIMER: u64 = 30;
132134
#[cfg(test)]
133135
const REBROADCAST_TIMER: u64 = 1;
134136

137+
#[cfg(not(test))]
138+
const SWEEPER_TIMER: u64 = 30;
139+
#[cfg(test)]
140+
const SWEEPER_TIMER: u64 = 1;
141+
135142
#[cfg(feature = "futures")]
136143
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
137144
const fn min_u64(a: u64, b: u64) -> u64 {
@@ -308,6 +315,7 @@ macro_rules! define_run_body {
308315
$channel_manager: ident, $process_channel_manager_events: expr,
309316
$onion_messenger: ident, $process_onion_message_handler_events: expr,
310317
$peer_manager: ident, $gossip_sync: ident,
318+
$process_sweeper: expr,
311319
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
312320
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
313321
) => { {
@@ -322,6 +330,7 @@ macro_rules! define_run_body {
322330
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
323331
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
324332
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
333+
let mut last_sweeper_call = $get_timer(SWEEPER_TIMER);
325334
let mut have_pruned = false;
326335
let mut have_decayed_scorer = false;
327336

@@ -465,6 +474,12 @@ macro_rules! define_run_body {
465474
$chain_monitor.rebroadcast_pending_claims();
466475
last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
467476
}
477+
478+
if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) {
479+
log_trace!($logger, "Regenerating sweeper spends if necessary");
480+
let _ = $process_sweeper;
481+
last_sweeper_call = $get_timer(SWEEPER_TIMER);
482+
}
468483
}
469484

470485
// After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -627,6 +642,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627642
/// ```
628643
/// # use lightning::io;
629644
/// # use lightning::events::ReplayEvent;
645+
/// # use lightning::util::sweep::OutputSweeper;
630646
/// # use std::sync::{Arc, RwLock};
631647
/// # use std::sync::atomic::{AtomicBool, Ordering};
632648
/// # use std::time::SystemTime;
@@ -666,6 +682,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
666682
/// # F: lightning::chain::Filter + Send + Sync + 'static,
667683
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
668684
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
685+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
686+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
687+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
669688
/// # > {
670689
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
671690
/// # event_handler: Arc<EventHandler>,
@@ -677,14 +696,18 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
677696
/// # persister: Arc<Store>,
678697
/// # logger: Arc<Logger>,
679698
/// # scorer: Arc<Scorer>,
699+
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
680700
/// # }
681701
/// #
682702
/// # async fn setup_background_processing<
683703
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
684704
/// # F: lightning::chain::Filter + Send + Sync + 'static,
685705
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
686706
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
687-
/// # >(node: Node<B, F, FE, UL>) {
707+
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
708+
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
709+
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
710+
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
688711
/// let background_persister = Arc::clone(&node.persister);
689712
/// let background_event_handler = Arc::clone(&node.event_handler);
690713
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -695,7 +718,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
695718
/// let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
696719
/// let background_logger = Arc::clone(&node.logger);
697720
/// let background_scorer = Arc::clone(&node.scorer);
698-
///
721+
/// let background_sweeper = Arc::clone(&node.sweeper);
722+
699723
/// // Setup the sleeper.
700724
#[cfg_attr(
701725
feature = "std",
@@ -729,6 +753,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
729753
/// background_gossip_sync,
730754
/// background_peer_man,
731755
/// Some(background_liquidity_manager),
756+
/// Some(background_sweeper),
732757
/// background_logger,
733758
/// Some(background_scorer),
734759
/// sleeper,
@@ -767,6 +792,10 @@ pub async fn process_events_async<
767792
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
768793
PM: 'static + Deref,
769794
LM: 'static + Deref,
795+
D: 'static + Deref,
796+
O: 'static + Deref,
797+
K: 'static + Deref,
798+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
770799
S: 'static + Deref<Target = SC> + Send + Sync,
771800
SC: for<'b> WriteableScore<'b>,
772801
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -775,12 +804,12 @@ pub async fn process_events_async<
775804
>(
776805
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
777806
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
778-
liquidity_manager: Option<LM>, logger: L, scorer: Option<S>, sleeper: Sleeper,
779-
mobile_interruptable_platform: bool, fetch_time: FetchTime,
807+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
808+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
780809
) -> Result<(), lightning::io::Error>
781810
where
782811
UL::Target: 'static + UtxoLookup,
783-
CF::Target: 'static + chain::Filter,
812+
CF::Target: 'static + chain::Filter + Sync + Send,
784813
T::Target: 'static + BroadcasterInterface,
785814
F::Target: 'static + FeeEstimator,
786815
L::Target: 'static + Logger,
@@ -790,6 +819,9 @@ where
790819
OM::Target: AOnionMessenger,
791820
PM::Target: APeerManager,
792821
LM::Target: ALiquidityManager,
822+
O::Target: 'static + OutputSpender,
823+
D::Target: 'static + ChangeDestinationSource,
824+
K::Target: 'static + KVStore,
793825
{
794826
let mut should_break = false;
795827
let async_event_handler = |event| {
@@ -833,6 +865,13 @@ where
833865
},
834866
peer_manager,
835867
gossip_sync,
868+
{
869+
if let Some(ref sweeper) = sweeper {
870+
sweeper.regenerate_and_broadcast_spend_if_necessary()
871+
} else {
872+
Ok(())
873+
}
874+
},
836875
logger,
837876
scorer,
838877
should_break,
@@ -953,14 +992,18 @@ impl BackgroundProcessor {
953992
LM: 'static + Deref + Send,
954993
S: 'static + Deref<Target = SC> + Send + Sync,
955994
SC: for<'b> WriteableScore<'b>,
995+
D: 'static + Deref,
996+
O: 'static + Deref,
997+
K: 'static + Deref,
998+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send + Sync,
956999
>(
9571000
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
9581001
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
959-
liquidity_manager: Option<LM>, logger: L, scorer: Option<S>,
1002+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
9601003
) -> Self
9611004
where
9621005
UL::Target: 'static + UtxoLookup,
963-
CF::Target: 'static + chain::Filter,
1006+
CF::Target: 'static + chain::Filter + Sync + Send,
9641007
T::Target: 'static + BroadcasterInterface,
9651008
F::Target: 'static + FeeEstimator,
9661009
L::Target: 'static + Logger,
@@ -970,6 +1013,9 @@ impl BackgroundProcessor {
9701013
OM::Target: AOnionMessenger,
9711014
PM::Target: APeerManager,
9721015
LM::Target: ALiquidityManager,
1016+
O::Target: 'static + OutputSpender,
1017+
D::Target: 'static + ChangeDestinationSource,
1018+
K::Target: 'static + KVStore,
9731019
{
9741020
let stop_thread = Arc::new(AtomicBool::new(false));
9751021
let stop_thread_clone = stop_thread.clone();
@@ -1005,6 +1051,13 @@ impl BackgroundProcessor {
10051051
},
10061052
peer_manager,
10071053
gossip_sync,
1054+
{
1055+
if let Some(ref sweeper) = sweeper {
1056+
sweeper.regenerate_and_broadcast_spend_if_necessary()
1057+
} else {
1058+
Ok(())
1059+
}
1060+
},
10081061
logger,
10091062
scorer,
10101063
stop_thread.load(Ordering::Acquire),
@@ -1269,7 +1322,7 @@ mod tests {
12691322
Arc<test_utils::TestBroadcaster>,
12701323
Arc<TestWallet>,
12711324
Arc<test_utils::TestFeeEstimator>,
1272-
Arc<dyn Filter + Sync + Send>,
1325+
Arc<test_utils::TestChainSource>,
12731326
Arc<FilesystemStore>,
12741327
Arc<test_utils::TestLogger>,
12751328
Arc<KeysManager>,
@@ -1648,7 +1701,7 @@ mod tests {
16481701
best_block,
16491702
Arc::clone(&tx_broadcaster),
16501703
Arc::clone(&fee_estimator),
1651-
None::<Arc<dyn Filter + Sync + Send>>,
1704+
None::<Arc<test_utils::TestChainSource>>,
16521705
Arc::clone(&keys_manager),
16531706
wallet,
16541707
Arc::clone(&kv_store),
@@ -1888,6 +1941,7 @@ mod tests {
18881941
nodes[0].p2p_gossip_sync(),
18891942
nodes[0].peer_manager.clone(),
18901943
Some(Arc::clone(&nodes[0].liquidity_manager)),
1944+
Some(nodes[0].sweeper.clone()),
18911945
nodes[0].logger.clone(),
18921946
Some(nodes[0].scorer.clone()),
18931947
);
@@ -1982,6 +2036,7 @@ mod tests {
19822036
nodes[0].no_gossip_sync(),
19832037
nodes[0].peer_manager.clone(),
19842038
Some(Arc::clone(&nodes[0].liquidity_manager)),
2039+
Some(nodes[0].sweeper.clone()),
19852040
nodes[0].logger.clone(),
19862041
Some(nodes[0].scorer.clone()),
19872042
);
@@ -2025,6 +2080,7 @@ mod tests {
20252080
nodes[0].no_gossip_sync(),
20262081
nodes[0].peer_manager.clone(),
20272082
Some(Arc::clone(&nodes[0].liquidity_manager)),
2083+
Some(nodes[0].sweeper.clone()),
20282084
nodes[0].logger.clone(),
20292085
Some(nodes[0].scorer.clone()),
20302086
);
@@ -2058,6 +2114,7 @@ mod tests {
20582114
nodes[0].rapid_gossip_sync(),
20592115
nodes[0].peer_manager.clone(),
20602116
Some(Arc::clone(&nodes[0].liquidity_manager)),
2117+
Some(nodes[0].sweeper.clone()),
20612118
nodes[0].logger.clone(),
20622119
Some(nodes[0].scorer.clone()),
20632120
move |dur: Duration| {
@@ -2095,6 +2152,7 @@ mod tests {
20952152
nodes[0].p2p_gossip_sync(),
20962153
nodes[0].peer_manager.clone(),
20972154
Some(Arc::clone(&nodes[0].liquidity_manager)),
2155+
Some(nodes[0].sweeper.clone()),
20982156
nodes[0].logger.clone(),
20992157
Some(nodes[0].scorer.clone()),
21002158
);
@@ -2125,6 +2183,7 @@ mod tests {
21252183
nodes[0].no_gossip_sync(),
21262184
nodes[0].peer_manager.clone(),
21272185
Some(Arc::clone(&nodes[0].liquidity_manager)),
2186+
Some(nodes[0].sweeper.clone()),
21282187
nodes[0].logger.clone(),
21292188
Some(nodes[0].scorer.clone()),
21302189
);
@@ -2172,6 +2231,7 @@ mod tests {
21722231
nodes[0].no_gossip_sync(),
21732232
nodes[0].peer_manager.clone(),
21742233
Some(Arc::clone(&nodes[0].liquidity_manager)),
2234+
Some(nodes[0].sweeper.clone()),
21752235
nodes[0].logger.clone(),
21762236
Some(nodes[0].scorer.clone()),
21772237
);
@@ -2235,6 +2295,7 @@ mod tests {
22352295
nodes[0].no_gossip_sync(),
22362296
nodes[0].peer_manager.clone(),
22372297
Some(Arc::clone(&nodes[0].liquidity_manager)),
2298+
Some(nodes[0].sweeper.clone()),
22382299
nodes[0].logger.clone(),
22392300
Some(nodes[0].scorer.clone()),
22402301
);
@@ -2280,10 +2341,22 @@ mod tests {
22802341

22812342
advance_chain(&mut nodes[0], 3);
22822343

2344+
let tx_broadcaster = nodes[0].tx_broadcaster.clone();
2345+
let wait_for_sweep_tx = || -> Transaction {
2346+
loop {
2347+
let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
2348+
if let Some(sweep_tx) = sweep_tx {
2349+
return sweep_tx;
2350+
}
2351+
2352+
std::thread::sleep(Duration::from_millis(100));
2353+
}
2354+
};
2355+
22832356
// Check we generate an initial sweeping tx.
22842357
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2358+
let sweep_tx_0 = wait_for_sweep_tx();
22852359
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2286-
let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22872360
match tracked_output.status {
22882361
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
22892362
assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
@@ -2294,8 +2367,8 @@ mod tests {
22942367
// Check we regenerate and rebroadcast the sweeping tx each block.
22952368
advance_chain(&mut nodes[0], 1);
22962369
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2370+
let sweep_tx_1 = wait_for_sweep_tx();
22972371
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2298-
let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
22992372
match tracked_output.status {
23002373
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
23012374
assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
@@ -2306,8 +2379,8 @@ mod tests {
23062379

23072380
advance_chain(&mut nodes[0], 1);
23082381
assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2382+
let sweep_tx_2 = wait_for_sweep_tx();
23092383
let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2310-
let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
23112384
match tracked_output.status {
23122385
OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
23132386
assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
@@ -2387,6 +2460,7 @@ mod tests {
23872460
nodes[0].no_gossip_sync(),
23882461
nodes[0].peer_manager.clone(),
23892462
Some(Arc::clone(&nodes[0].liquidity_manager)),
2463+
Some(nodes[0].sweeper.clone()),
23902464
nodes[0].logger.clone(),
23912465
Some(nodes[0].scorer.clone()),
23922466
);
@@ -2417,6 +2491,7 @@ mod tests {
24172491
nodes[0].no_gossip_sync(),
24182492
nodes[0].peer_manager.clone(),
24192493
Some(Arc::clone(&nodes[0].liquidity_manager)),
2494+
Some(nodes[0].sweeper.clone()),
24202495
nodes[0].logger.clone(),
24212496
Some(nodes[0].scorer.clone()),
24222497
);
@@ -2513,6 +2588,7 @@ mod tests {
25132588
nodes[0].rapid_gossip_sync(),
25142589
nodes[0].peer_manager.clone(),
25152590
Some(Arc::clone(&nodes[0].liquidity_manager)),
2591+
Some(nodes[0].sweeper.clone()),
25162592
nodes[0].logger.clone(),
25172593
Some(nodes[0].scorer.clone()),
25182594
);
@@ -2546,6 +2622,7 @@ mod tests {
25462622
nodes[0].rapid_gossip_sync(),
25472623
nodes[0].peer_manager.clone(),
25482624
Some(Arc::clone(&nodes[0].liquidity_manager)),
2625+
Some(nodes[0].sweeper.clone()),
25492626
nodes[0].logger.clone(),
25502627
Some(nodes[0].scorer.clone()),
25512628
move |dur: Duration| {
@@ -2709,6 +2786,7 @@ mod tests {
27092786
nodes[0].no_gossip_sync(),
27102787
nodes[0].peer_manager.clone(),
27112788
Some(Arc::clone(&nodes[0].liquidity_manager)),
2789+
Some(nodes[0].sweeper.clone()),
27122790
nodes[0].logger.clone(),
27132791
Some(nodes[0].scorer.clone()),
27142792
);
@@ -2760,6 +2838,7 @@ mod tests {
27602838
nodes[0].no_gossip_sync(),
27612839
nodes[0].peer_manager.clone(),
27622840
Some(Arc::clone(&nodes[0].liquidity_manager)),
2841+
Some(nodes[0].sweeper.clone()),
27632842
nodes[0].logger.clone(),
27642843
Some(nodes[0].scorer.clone()),
27652844
move |dur: Duration| {

0 commit comments

Comments
 (0)