Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 39 additions & 108 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,14 @@ struct LatestMonitorState {
/// which we haven't yet completed. We're allowed to reload with those as well, at least until
/// they're completed.
persisted_monitor_id: u64,
/// The latest serialized `ChannelMonitor` that we told LDK we persisted.
persisted_monitor: Vec<u8>,
/// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
/// The latest `ChannelMonitor` that we told LDK we persisted, if any.
persisted_monitor: Option<channelmonitor::ChannelMonitor<TestChannelSigner>>,
/// A set of (monitor id, `ChannelMonitor`)s which we're currently "persisting",
/// from LDK's perspective.
pending_monitors: Vec<(u64, Vec<u8>)>,
pending_monitors: Vec<(u64, channelmonitor::ChannelMonitor<TestChannelSigner>)>,
}

struct TestChainMonitor {
pub logger: Arc<dyn Logger>,
pub keys: Arc<KeyProvider>,
pub persister: Arc<TestPersister>,
pub chain_monitor: Arc<
chainmonitor::ChainMonitor<
Expand All @@ -277,15 +275,13 @@ impl TestChainMonitor {
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
None,
broadcaster,
logger.clone(),
logger,
feeest,
Arc::clone(&persister),
Arc::clone(&keys),
keys.get_peer_storage_key(),
false,
)),
logger,
keys,
persister,
latest_monitors: Mutex::new(new_hash_map()),
}
Expand All @@ -295,20 +291,19 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
fn watch_channel(
&self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
let mut ser = VecWriter(Vec::new());
monitor.write(&mut ser).unwrap();
let monitor_id = monitor.get_latest_update_id();
let res = self.chain_monitor.watch_channel(channel_id, monitor);
let mon = self.persister.take_latest_monitor(&channel_id);
let state = match res {
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
persisted_monitor_id: monitor_id,
persisted_monitor: ser.0,
persisted_monitor: Some(mon),
pending_monitors: Vec::new(),
},
Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState {
persisted_monitor_id: monitor_id,
persisted_monitor: Vec::new(),
pending_monitors: vec![(monitor_id, ser.0)],
persisted_monitor: None,
pending_monitors: vec![(monitor_id, mon)],
},
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
Err(()) => panic!(),
Expand All @@ -324,37 +319,15 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
) -> chain::ChannelMonitorUpdateStatus {
let mut map_lock = self.latest_monitors.lock().unwrap();
let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call");
let latest_monitor_data = map_entry
.pending_monitors
.last()
.as_ref()
.map(|(_, data)| data)
.unwrap_or(&map_entry.persisted_monitor);
let deserialized_monitor =
<(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::read(
&mut &latest_monitor_data[..],
(&*self.keys, &*self.keys),
)
.unwrap()
.1;
deserialized_monitor
.update_monitor(
update,
&&TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) },
&&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) },
&self.logger,
)
.unwrap();
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
let res = self.chain_monitor.update_channel(channel_id, update);
let mon = self.persister.take_latest_monitor(&channel_id);
Comment on lines 322 to +323
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage reduction: The old update_channel independently deserialized the latest stored monitor, applied the update via update_monitor(), re-serialized, and stored the result. This verified on every update that:

  1. The stored monitor data round-trips correctly through serialization
  2. Updates can be successfully applied to a round-tripped monitor

The new code delegates entirely to chain_monitor.update_channel and captures the result from the persister, deferring round-trip verification to reload boundaries only (line 974–981).

If a particular update introduces a serialization issue that a subsequent update happens to mask before the next reload, the new code won't catch it. This is likely an acceptable trade-off for the 3-4x speedup (more iterations compensate for reduced per-iteration verification), but worth noting as a deliberate coverage change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I indeed thought that just serialization is covered well enough elsewhere.

match res {
chain::ChannelMonitorUpdateStatus::Completed => {
map_entry.persisted_monitor_id = update.update_id;
map_entry.persisted_monitor = ser.0;
map_entry.persisted_monitor = Some(mon);
},
chain::ChannelMonitorUpdateStatus::InProgress => {
map_entry.pending_monitors.push((update.update_id, ser.0));
map_entry.pending_monitors.push((update.update_id, mon));
},
chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
}
Expand Down Expand Up @@ -864,9 +837,8 @@ fn assert_action_timeout_awaiting_response(action: &msgs::ErrorAction) {

#[inline]
pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
data: &[u8], underlying_out: Out, anchors: bool,
data: &[u8], out: Out, anchors: bool,
) {
let out = SearchingOutput::new(underlying_out);
let broadcast_a = Arc::new(TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) });
let broadcast_b = Arc::new(TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) });
let broadcast_c = Arc::new(TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) });
Expand Down Expand Up @@ -915,9 +887,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
$broadcaster.clone(),
logger.clone(),
$fee_estimator.clone(),
Arc::new(TestPersister {
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
}),
Arc::new(TestPersister::new(mon_style[$node_id as usize].borrow().clone())),
Arc::clone(&keys_manager),
));

Expand Down Expand Up @@ -967,9 +937,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
broadcaster.clone(),
logger.clone(),
Arc::clone(fee_estimator),
Arc::new(TestPersister {
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
}),
Arc::new(TestPersister::new(ChannelMonitorUpdateStatus::Completed)),
Arc::clone(keys),
));

Expand All @@ -984,30 +952,35 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
let mut monitors = new_hash_map();
let mut old_monitors = old_monitors.latest_monitors.lock().unwrap();
for (channel_id, mut prev_state) in old_monitors.drain() {
let (mon_id, serialized_mon) = if use_old_mons % 3 == 0 {
let old_mon =
prev_state.persisted_monitor.map(|m| (prev_state.persisted_monitor_id, m));
let (mon_id, mon) = if use_old_mons % 3 == 0 {
// Reload with the oldest `ChannelMonitor` (the one that we already told
// `ChannelManager` we finished persisting).
(prev_state.persisted_monitor_id, prev_state.persisted_monitor)
old_mon.expect("no persisted monitor to reload")
} else if use_old_mons % 3 == 1 {
// Reload with the second-oldest `ChannelMonitor`
let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor);
prev_state.pending_monitors.drain(..).next().unwrap_or(old_mon)
prev_state.pending_monitors.drain(..).next().or(old_mon)
.expect("no monitor to reload")
} else {
// Reload with the newest `ChannelMonitor`
let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor);
prev_state.pending_monitors.pop().unwrap_or(old_mon)
prev_state.pending_monitors.pop().or(old_mon)
.expect("no monitor to reload")
};
// Use a different value of `use_old_mons` if we have another monitor (only for node B)
// by shifting `use_old_mons` one in base-3.
use_old_mons /= 3;
let mon = <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
&mut &serialized_mon[..],
// Serialize and deserialize the monitor to verify round-trip correctness.
let mut ser = VecWriter(Vec::new());
mon.write(&mut ser).unwrap();
let (_, deserialized_mon) = <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
&mut &ser.0[..],
(&**keys, &**keys),
)
.expect("Failed to read monitor");
monitors.insert(channel_id, mon.1);
monitors.insert(channel_id, deserialized_mon);
// Update the latest `ChannelMonitor` state to match what we just told LDK.
prev_state.persisted_monitor = serialized_mon;
prev_state.persisted_monitor = Some(mon);
prev_state.persisted_monitor_id = mon_id;
// Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
// considering them discarded. LDK should replay these for us as they're stored in
Expand Down Expand Up @@ -1054,7 +1027,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
$monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap();
if id >= state.persisted_monitor_id {
state.persisted_monitor_id = id;
state.persisted_monitor = data;
state.persisted_monitor = Some(data);
}
}
}
Expand Down Expand Up @@ -1793,11 +1766,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
// Can be generated as a result of calling `timer_tick_occurred` enough
// times while peers are disconnected
},
_ => if out.may_fail.load(atomic::Ordering::Acquire) {
return;
} else {
panic!("Unhandled message event {:?}", event)
},
_ => panic!("Unhandled message event {:?}", event),
}
if $limit_events != ProcessMessages::AllMessages {
break;
Expand Down Expand Up @@ -1837,13 +1806,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
MessageSendEvent::HandleError { ref action, .. } => {
assert_action_timeout_awaiting_response(action);
},
_ => {
if out.may_fail.load(atomic::Ordering::Acquire) {
return;
} else {
panic!("Unhandled message event")
}
},
_ => panic!("Unhandled message event"),
}
}
push_excess_b_events!(
Expand All @@ -1865,13 +1828,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
MessageSendEvent::HandleError { ref action, .. } => {
assert_action_timeout_awaiting_response(action);
},
_ => {
if out.may_fail.load(atomic::Ordering::Acquire) {
return;
} else {
panic!("Unhandled message event")
}
},
_ => panic!("Unhandled message event"),
}
}
push_excess_b_events!(
Expand Down Expand Up @@ -1980,13 +1937,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
},
events::Event::SpliceFailed { .. } => {},

_ => {
if out.may_fail.load(atomic::Ordering::Acquire) {
return;
} else {
panic!("Unhandled event")
}
},
_ => panic!("Unhandled event"),
}
}
while nodes[$node].needs_pending_htlc_processing() {
Expand All @@ -2004,10 +1955,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(

let complete_first = |v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None };
let complete_second = |v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None };
type PendingMonitors = Vec<(u64, channelmonitor::ChannelMonitor<TestChannelSigner>)>;
let complete_monitor_update =
|monitor: &Arc<TestChainMonitor>,
chan_funding,
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
compl_selector: &dyn Fn(&mut PendingMonitors) -> Option<(u64, channelmonitor::ChannelMonitor<TestChannelSigner>)>| {
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
assert!(
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
Expand All @@ -2017,7 +1969,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
if id > state.persisted_monitor_id {
state.persisted_monitor_id = id;
state.persisted_monitor = data;
state.persisted_monitor = Some(data);
}
}
}
Expand All @@ -2033,7 +1985,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap();
if id > state.persisted_monitor_id {
state.persisted_monitor_id = id;
state.persisted_monitor = data;
state.persisted_monitor = Some(data);
}
}
}
Expand Down Expand Up @@ -2809,27 +2761,6 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
}
}

/// We actually have different behavior based on if a certain log string has been seen, so we have
/// to do a bit more tracking.
#[derive(Clone)]
struct SearchingOutput<O: Output> {
output: O,
may_fail: Arc<atomic::AtomicBool>,
}
impl<O: Output> Output for SearchingOutput<O> {
fn locked_write(&self, data: &[u8]) {
// We hit a design limitation of LN state machine (see CONCURRENT_INBOUND_HTLC_FEE_BUFFER)
if std::str::from_utf8(data).unwrap().contains("Outbound update_fee HTLC buffer overflow - counterparty should force-close this channel") {
self.may_fail.store(true, atomic::Ordering::Release);
}
self.output.locked_write(data)
}
}
impl<O: Output> SearchingOutput<O> {
pub fn new(output: O) -> Self {
Self { output, may_fail: Arc::new(atomic::AtomicBool::new(false)) }
}
}

pub fn chanmon_consistency_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
do_test(data, out.clone(), false);
Expand Down
2 changes: 1 addition & 1 deletion fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger + MaybeSend + MaybeSync>
broadcast.clone(),
Arc::clone(&logger),
fee_est.clone(),
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }),
Arc::new(TestPersister::new(ChannelMonitorUpdateStatus::Completed)),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
false,
Expand Down
4 changes: 4 additions & 0 deletions fuzz/src/utils/test_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// licenses.

use lightning::util::logger::{Logger, Record};
use std::any::TypeId;
use std::io::Write;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -66,6 +67,9 @@ impl<'a, Out: Output> Write for LockedWriteAdapter<'a, Out> {

impl<Out: Output> Logger for TestLogger<Out> {
fn log(&self, record: Record) {
if TypeId::of::<Out>() == TypeId::of::<DevNull>() {
return;
}
writeln!(LockedWriteAdapter(&self.out), "{:<6} {}", self.id, record).unwrap();
}
}
24 changes: 22 additions & 2 deletions fuzz/src/utils/test_persister.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,45 @@
use lightning::chain;
use lightning::chain::{chainmonitor, channelmonitor};
use lightning::ln::types::ChannelId;
use lightning::util::persist::MonitorName;
use lightning::util::test_channel_signer::TestChannelSigner;

use std::collections::HashMap;
use std::sync::Mutex;

pub struct TestPersister {
pub update_ret: Mutex<chain::ChannelMonitorUpdateStatus>,
latest_monitors: Mutex<HashMap<ChannelId, channelmonitor::ChannelMonitor<TestChannelSigner>>>,
}
impl TestPersister {
pub fn new(update_ret: chain::ChannelMonitorUpdateStatus) -> Self {
Self {
update_ret: Mutex::new(update_ret),
latest_monitors: Mutex::new(HashMap::new()),
}
}

pub fn take_latest_monitor(
&self, channel_id: &ChannelId,
) -> channelmonitor::ChannelMonitor<TestChannelSigner> {
self.latest_monitors.lock().unwrap().remove(channel_id)
.expect("Persister should have monitor for channel")
}
}
impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
fn persist_new_channel(
&self, _monitor_name: MonitorName,
_data: &channelmonitor::ChannelMonitor<TestChannelSigner>,
data: &channelmonitor::ChannelMonitor<TestChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
self.latest_monitors.lock().unwrap().insert(data.channel_id(), data.clone());
self.update_ret.lock().unwrap().clone()
}

fn update_persisted_channel(
&self, _monitor_name: MonitorName, _update: Option<&channelmonitor::ChannelMonitorUpdate>,
_data: &channelmonitor::ChannelMonitor<TestChannelSigner>,
data: &channelmonitor::ChannelMonitor<TestChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
self.latest_monitors.lock().unwrap().insert(data.channel_id(), data.clone());
self.update_ret.lock().unwrap().clone()
}

Expand Down
Loading