Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 109 additions & 108 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17224,18 +17224,10 @@ pub(super) struct ChannelManagerData<SP: SignerProvider> {
best_block_height: u32,
best_block_hash: BlockHash,
channels: Vec<FundedChannel<SP>>,
// Marked `_legacy` because in versions > 0.2 we are taking steps to remove the requirement of
// regularly persisting the `ChannelManager` and instead rebuild the set of HTLC forwards from
// `Channel{Monitor}` data. See [`ChannelManager::read`].
forward_htlcs_legacy: HashMap<u64, Vec<HTLCForwardInfo>>,
claimable_htlcs_list: Vec<(PaymentHash, Vec<ClaimableHTLC>)>,
claimable_htlcs_list: Vec<(PaymentHash, Vec<ClaimableHTLC>, Option<RecipientOnionFields>)>,
peer_init_features: Vec<(PublicKey, InitFeatures)>,
pending_events_read: VecDeque<(events::Event, Option<EventCompletionAction>)>,
highest_seen_timestamp: u32,
// Marked `_legacy` because in versions > 0.2 we are taking steps to remove the requirement of
// regularly persisting the `ChannelManager` and instead rebuild the set of HTLC forwards from
// `Channel{Monitor}` data. See [`ChannelManager::read`].
pending_intercepted_htlcs_legacy: HashMap<InterceptId, PendingAddHTLCInfo>,
pending_outbound_payments: HashMap<PaymentId, PendingOutboundPayment>,
pending_claiming_payments: HashMap<PaymentHash, ClaimingPayment>,
received_network_pubkey: Option<PublicKey>,
Expand All @@ -17244,15 +17236,16 @@ pub(super) struct ChannelManagerData<SP: SignerProvider> {
fake_scid_rand_bytes: Option<[u8; 32]>,
claimable_htlc_purposes: Option<Vec<events::PaymentPurpose>>,
probing_cookie_secret: Option<[u8; 32]>,
claimable_htlc_onion_fields: Option<Vec<Option<RecipientOnionFields>>>,
inbound_payment_id_secret: Option<[u8; 32]>,
in_flight_monitor_updates: HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>,
peer_storage_dir: Vec<(PublicKey, Vec<u8>)>,
async_receive_offer_cache: AsyncReceiveOfferCache,
// Marked `_legacy` because in versions > 0.2 we are taking steps to remove the requirement of
// regularly persisting the `ChannelManager` and instead rebuild the set of HTLC forwards from
// `Channel{Monitor}` data. See [`ChannelManager::read`].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think ChannelManager::read adds anything to the understanding of what's going on here, can probably remove that sentence

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was pointing to the reconstruct boolean, but can't link that.

forward_htlcs_legacy: HashMap<u64, Vec<HTLCForwardInfo>>,
pending_intercepted_htlcs_legacy: HashMap<InterceptId, PendingAddHTLCInfo>,
decode_update_add_htlcs_legacy: HashMap<u64, Vec<msgs::UpdateAddHTLC>>,
inbound_payment_id_secret: Option<[u8; 32]>,
in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>>,
peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>>,
async_receive_offer_cache: AsyncReceiveOfferCache,
}

/// Arguments for deserializing [`ChannelManagerData`].
Expand Down Expand Up @@ -17404,9 +17397,9 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger>
let mut probing_cookie_secret: Option<[u8; 32]> = None;
let mut claimable_htlc_purposes = None;
let mut claimable_htlc_onion_fields = None;
let mut pending_claiming_payments = Some(new_hash_map());
let mut pending_claiming_payments = None;
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> =
Some(Vec::new());
None;
let mut events_override = None;
let mut legacy_in_flight_monitor_updates: Option<
HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>,
Expand Down Expand Up @@ -17487,6 +17480,20 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger>
// Resolve events_override: if present, it replaces pending_events.
let pending_events_read = events_override.unwrap_or(pending_events_read);

// Zip claimable_htlcs_list with claimable_htlc_onion_fields if present.
let claimable_htlcs_list = if let Some(onion_fields) = claimable_htlc_onion_fields {
if onion_fields.len() != claimable_htlcs_list.len() {
return Err(DecodeError::InvalidValue);
}
claimable_htlcs_list
.into_iter()
.zip(onion_fields)
.map(|((hash, htlcs), onion)| (hash, htlcs, onion))
.collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we introduce an intermediate allocation here. Wonder if the whole creation of claimable_payments can move here, to avoid it? It feels somewhat fitting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've considered that, but it would need the keys for the legacy handling branch. But perhaps that's ok? The distinction between stage 1 and 2 isn't carved in stone.

} else {
claimable_htlcs_list.into_iter().map(|(hash, htlcs)| (hash, htlcs, None)).collect()
};

Ok(ChannelManagerData {
chain_hash,
best_block_height,
Expand All @@ -17500,21 +17507,18 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger>
pending_intercepted_htlcs_legacy: pending_intercepted_htlcs_legacy
.unwrap_or_else(new_hash_map),
pending_outbound_payments,
// unwrap safety: pending_claiming_payments is guaranteed to be `Some` after read_tlv_fields
pending_claiming_payments: pending_claiming_payments.unwrap(),
pending_claiming_payments: pending_claiming_payments.unwrap_or_else(new_hash_map),
received_network_pubkey,
// unwrap safety: monitor_update_blocked_actions_per_peer is guaranteed to be `Some` after read_tlv_fields
monitor_update_blocked_actions_per_peer: monitor_update_blocked_actions_per_peer
.unwrap(),
.unwrap_or_else(Vec::new),
fake_scid_rand_bytes,
claimable_htlc_purposes,
probing_cookie_secret,
claimable_htlc_onion_fields,
decode_update_add_htlcs_legacy: decode_update_add_htlcs_legacy
.unwrap_or_else(new_hash_map),
inbound_payment_id_secret,
in_flight_monitor_updates,
peer_storage_dir,
in_flight_monitor_updates: in_flight_monitor_updates.unwrap_or_default(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this use HashMap::new() or new_hash_map (which I think is what we want)? CI seems fine though...

peer_storage_dir: peer_storage_dir.unwrap_or_default(),
async_receive_offer_cache,
})
}
Expand Down Expand Up @@ -17802,7 +17806,6 @@ impl<
mut fake_scid_rand_bytes,
claimable_htlc_purposes,
mut probing_cookie_secret,
claimable_htlc_onion_fields,
mut decode_update_add_htlcs_legacy,
mut inbound_payment_id_secret,
mut in_flight_monitor_updates,
Expand All @@ -17823,6 +17826,12 @@ impl<
is_connected: false,
};

// === Channel/Monitor Reconciliation ===
//
// Validate each deserialized channel against its corresponding ChannelMonitor to ensure
// consistency. Channels that are behind their monitors are force-closed. Channels without
// monitors (except those awaiting initial persistence) are rejected. Monitors without
Comment on lines +17832 to +17833
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's true that channels without monitors will be rejected -- instead, we'll DecodeError::InvalidValue on the entire manager read. Were these comments added by Claude? I'm kinda not sold on them, they add to the review quite a bit and seem error prone/at risk of becoming out-of-date

// channels get force-close updates queued.
const MAX_ALLOC_SIZE: usize = 1024 * 64;
let mut failed_htlcs = Vec::new();
let channel_count = channels.len();
Expand Down Expand Up @@ -18083,14 +18092,17 @@ impl<
}
}

// Apply peer features from deserialized data
// Apply peer features from deserialized data.
for (peer_pubkey, latest_features) in peer_init_features {
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
peer_state.get_mut().unwrap().latest_features = latest_features;
}
}

// Post-deserialization processing
// === Initialize Missing Optional Fields ===
//
// Fields that were added in later versions may be None when reading older data.
// Generate fresh random values for these fields to maintain functionality.
let mut decode_update_add_htlcs = new_hash_map();
if fake_scid_rand_bytes.is_none() {
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
Expand All @@ -18110,14 +18122,14 @@ impl<

let pending_outbounds = OutboundPayments::new(pending_outbound_payments);

if let Some(peer_storage_dir) = peer_storage_dir {
for (peer_pubkey, peer_storage) in peer_storage_dir {
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
peer_state.get_mut().unwrap().peer_storage = peer_storage;
}
for (peer_pubkey, peer_storage) in peer_storage_dir {
if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
peer_state.get_mut().unwrap().peer_storage = peer_storage;
}
}

// === Replay In-Flight Monitor Updates ===
//
// We have to replay (or skip, if they were completed after we wrote the `ChannelManager`)
// each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to
// check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we
Expand Down Expand Up @@ -18193,22 +18205,20 @@ impl<
.get(chan_id)
.expect("We already checked for monitor presence when loading channels");
let mut max_in_flight_update_id = monitor.get_latest_update_id();
if let Some(in_flight_upds) = &mut in_flight_monitor_updates {
if let Some(mut chan_in_flight_upds) =
in_flight_upds.remove(&(*counterparty_id, *chan_id))
{
max_in_flight_update_id = cmp::max(
max_in_flight_update_id,
handle_in_flight_updates!(
*counterparty_id,
chan_in_flight_upds,
monitor,
peer_state,
logger,
""
),
);
}
if let Some(mut chan_in_flight_upds) =
in_flight_monitor_updates.remove(&(*counterparty_id, *chan_id))
{
max_in_flight_update_id = cmp::max(
max_in_flight_update_id,
handle_in_flight_updates!(
*counterparty_id,
chan_in_flight_upds,
monitor,
peer_state,
logger,
""
),
);
}
if funded_chan.get_latest_unblocked_monitor_update_id()
> max_in_flight_update_id
Expand Down Expand Up @@ -18237,44 +18247,38 @@ impl<
}
}

if let Some(in_flight_upds) = in_flight_monitor_updates {
for ((counterparty_id, channel_id), mut chan_in_flight_updates) in in_flight_upds {
let logger =
WithContext::from(&args.logger, Some(counterparty_id), Some(channel_id), None);
if let Some(monitor) = args.channel_monitors.get(&channel_id) {
// Now that we've removed all the in-flight monitor updates for channels that are
// still open, we need to replay any monitor updates that are for closed channels,
// creating the neccessary peer_state entries as we go.
let peer_state_mutex = per_peer_state
.entry(counterparty_id)
.or_insert_with(|| Mutex::new(empty_peer_state()));
let mut peer_state = peer_state_mutex.lock().unwrap();
handle_in_flight_updates!(
counterparty_id,
chan_in_flight_updates,
monitor,
peer_state,
logger,
"closed "
);
} else {
log_error!(logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!");
log_error!(
logger,
" The ChannelMonitor for channel {} is missing.",
channel_id
);
log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
log_error!(
logger,
" Pending in-flight updates are: {:?}",
chan_in_flight_updates
);
return Err(DecodeError::InvalidValue);
}
for ((counterparty_id, channel_id), mut chan_in_flight_updates) in in_flight_monitor_updates
{
let logger =
WithContext::from(&args.logger, Some(counterparty_id), Some(channel_id), None);
if let Some(monitor) = args.channel_monitors.get(&channel_id) {
// Now that we've removed all the in-flight monitor updates for channels that are
// still open, we need to replay any monitor updates that are for closed channels,
// creating the neccessary peer_state entries as we go.
let peer_state_mutex = per_peer_state
.entry(counterparty_id)
.or_insert_with(|| Mutex::new(empty_peer_state()));
let mut peer_state = peer_state_mutex.lock().unwrap();
handle_in_flight_updates!(
counterparty_id,
chan_in_flight_updates,
monitor,
peer_state,
logger,
"closed "
);
} else {
log_error!(logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!");
log_error!(logger, " The ChannelMonitor for channel {} is missing.", channel_id);
log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
log_error!(
logger,
" Without the latest ChannelMonitor we cannot continue without risking funds."
);
log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
log_error!(logger, " Pending in-flight updates are: {:?}", chan_in_flight_updates);
return Err(DecodeError::InvalidValue);
}
}

Expand Down Expand Up @@ -18344,6 +18348,8 @@ impl<
pending_background_events.push(new_event);
}

// === Reconstruct HTLC State From Monitors ===
//
// In LDK 0.2 and below, the `ChannelManager` would track all payments and HTLCs internally and
// persist that state, relying on it being up-to-date on restart. Newer versions are moving
// towards reducing this reliance on regular persistence of the `ChannelManager`, and instead
Expand Down Expand Up @@ -18748,42 +18754,30 @@ impl<
}
}

// === Reconstruct Claimable Payments ===
//
// Combine claimable_htlcs_list with their purposes. For very old data (pre-0.0.107) that
// lacks purposes, reconstruct them from legacy hop data.
let expanded_inbound_key = args.node_signer.get_expanded_key();

let mut claimable_payments = hash_map_with_capacity(claimable_htlcs_list.len());
if let Some(purposes) = claimable_htlc_purposes {
if purposes.len() != claimable_htlcs_list.len() {
return Err(DecodeError::InvalidValue);
}
if let Some(onion_fields) = claimable_htlc_onion_fields {
if onion_fields.len() != claimable_htlcs_list.len() {
for (purpose, (payment_hash, htlcs, onion_fields)) in
purposes.into_iter().zip(claimable_htlcs_list.into_iter())
{
let claimable = ClaimablePayment { purpose, htlcs, onion_fields };
let existing_payment = claimable_payments.insert(payment_hash, claimable);
if existing_payment.is_some() {
return Err(DecodeError::InvalidValue);
}
for (purpose, (onion, (payment_hash, htlcs))) in purposes
.into_iter()
.zip(onion_fields.into_iter().zip(claimable_htlcs_list.into_iter()))
{
let claimable = ClaimablePayment { purpose, htlcs, onion_fields: onion };
let existing_payment = claimable_payments.insert(payment_hash, claimable);
if existing_payment.is_some() {
return Err(DecodeError::InvalidValue);
}
}
} else {
for (purpose, (payment_hash, htlcs)) in
purposes.into_iter().zip(claimable_htlcs_list.into_iter())
{
let claimable = ClaimablePayment { purpose, htlcs, onion_fields: None };
let existing_payment = claimable_payments.insert(payment_hash, claimable);
if existing_payment.is_some() {
return Err(DecodeError::InvalidValue);
}
}
}
} else {
// LDK versions prior to 0.0.107 did not write a `pending_htlc_purposes`, but do
// include a `_legacy_hop_data` in the `OnionPayload`.
for (payment_hash, htlcs) in claimable_htlcs_list.drain(..) {
for (payment_hash, htlcs, _) in claimable_htlcs_list.drain(..) {
if htlcs.is_empty() {
return Err(DecodeError::InvalidValue);
}
Expand Down Expand Up @@ -19073,6 +19067,9 @@ impl<
}
}

// === Construct the ChannelManager ===
//
// All data has been validated and reconciled. Build the final ChannelManager struct.
let best_block = BestBlock::new(best_block_hash, best_block_height);
let flow = OffersMessageFlow::new(
chain_hash,
Expand Down Expand Up @@ -19151,6 +19148,10 @@ impl<
testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()),
};

// === Replay Pending Claims and Fail HTLCs ===
//
// Process any payment preimages stored in monitors that need to be claimed on the inbound
// edge. Also fail any HTLCs that were dropped during channel reconciliation.
let mut processed_claims: HashSet<Vec<MPPClaimHTLCSource>> = new_hash_set();
for (_, monitor) in args.channel_monitors.iter() {
for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages()
Expand Down
Loading