Skip to content

Commit bc3e565

Browse files
committed
f Trigger ChainMonitor's event_notifier on failure
1 parent cef636a commit bc3e565

File tree

2 files changed

+26
-14
lines changed

2 files changed

+26
-14
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -551,8 +551,13 @@ where C::Target: chain::Filter,
551551
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
552552
for funding_txo in mons_to_process {
553553
let mut ev;
554-
super::channelmonitor::process_events_body!(
555-
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
554+
match super::channelmonitor::process_events_body!(
555+
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await) {
556+
Ok(()) => {},
557+
Err(ReplayEvent ()) => {
558+
self.event_notifier.notify();
559+
}
560+
}
556561
}
557562
}
558563

@@ -879,7 +884,12 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
879884
/// [`BumpTransaction`]: events::Event::BumpTransaction
880885
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
881886
for monitor_state in self.monitors.read().unwrap().values() {
882-
monitor_state.monitor.process_pending_events(&handler);
887+
match monitor_state.monitor.process_pending_events(&handler) {
888+
Ok(()) => {},
889+
Err(ReplayEvent ()) => {
890+
self.event_notifier.notify();
891+
}
892+
}
883893
}
884894
}
885895
}

lightning/src/chain/channelmonitor.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,28 +1159,28 @@ impl<Signer: EcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signer> {
11591159
macro_rules! _process_events_body {
11601160
($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
11611161
loop {
1162+
let mut handling_res = Ok(());
11621163
let (pending_events, repeated_events);
11631164
if let Some(us) = $self_opt {
11641165
let mut inner = us.inner.lock().unwrap();
11651166
if inner.is_processing_pending_events {
1166-
break;
1167+
break handling_res;
11671168
}
11681169
inner.is_processing_pending_events = true;
11691170

11701171
pending_events = inner.pending_events.clone();
11711172
repeated_events = inner.get_repeated_events();
1172-
} else { break; }
1173+
} else { break handling_res; }
11731174

11741175
let mut num_handled_events = 0;
1175-
let mut handling_failed = false;
11761176
for event in pending_events.into_iter() {
11771177
$event_to_handle = event;
11781178
match $handle_event {
11791179
Ok(()) => num_handled_events += 1,
1180-
Err(_) => {
1180+
Err(e) => {
11811181
// If we encounter an error we stop handling events and make sure to replay
11821182
// any unhandled events on the next invocation.
1183-
handling_failed = true;
1183+
handling_res = Err(e);
11841184
break;
11851185
}
11861186
}
@@ -1197,13 +1197,13 @@ macro_rules! _process_events_body {
11971197
let mut inner = us.inner.lock().unwrap();
11981198
inner.pending_events.drain(..num_handled_events);
11991199
inner.is_processing_pending_events = false;
1200-
if !handling_failed && !inner.pending_events.is_empty() {
1200+
if handling_res.is_ok() && !inner.pending_events.is_empty() {
12011201
// If there's more events to process and we didn't fail so far, go ahead and do
12021202
// so.
12031203
continue;
12041204
}
12051205
}
1206-
break;
1206+
break handling_res;
12071207
}
12081208
}
12091209
}
@@ -1515,21 +1515,23 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
15151515
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
15161516
/// order to handle these events.
15171517
///
1518+
/// Will return a [`ReplayEvent`] error if event handling failed and should eventually be retried.
1519+
///
15181520
/// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
15191521
/// [`BumpTransaction`]: crate::events::Event::BumpTransaction
1520-
pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
1522+
pub fn process_pending_events<H: Deref>(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler {
15211523
let mut ev;
1522-
process_events_body!(Some(self), ev, handler.handle_event(ev));
1524+
process_events_body!(Some(self), ev, handler.handle_event(ev))
15231525
}
15241526

15251527
/// Processes any events asynchronously.
15261528
///
15271529
/// See [`Self::process_pending_events`] for more information.
15281530
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
15291531
&self, handler: &H
1530-
) {
1532+
) -> Result<(), ReplayEvent> {
15311533
let mut ev;
1532-
process_events_body!(Some(self), ev, { handler(ev).await });
1534+
process_events_body!(Some(self), ev, { handler(ev).await })
15331535
}
15341536

15351537
#[cfg(test)]

0 commit comments

Comments
 (0)