Skip to content

Further decouple ChannelManager from Channel state somewhat #3539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,13 @@ pub(super) struct ReestablishResponses {
pub shutdown_msg: Option<msgs::Shutdown>,
}

/// The first message we send to our peer after connection
pub(super) enum ReconnectionMsg {
Reestablish(msgs::ChannelReestablish),
Open(OpenChannelMessage),
None,
}

/// The result of a shutdown that should be handled.
#[must_use]
pub(crate) struct ShutdownResult {
Expand Down Expand Up @@ -1266,8 +1273,7 @@ impl<SP: Deref> Channel<SP> where
})
},
ChannelPhase::UnfundedInboundV1(chan) => {
let logger = WithChannelContext::from(logger, &chan.context, None);
let accept_channel = chan.signer_maybe_unblocked(&&logger);
let accept_channel = chan.signer_maybe_unblocked(logger);
Some(SignerResumeUpdates {
commitment_update: None,
revoke_and_ack: None,
Expand All @@ -1286,51 +1292,62 @@ impl<SP: Deref> Channel<SP> where
}
}

pub fn is_resumable(&self) -> bool {
match &self.phase {
/// Should be called when the peer is disconnected. Returns true if the channel can be resumed
/// when the peer reconnects (via [`Self::peer_connected_get_handshake`]). If not, the channel
/// must be immediately closed.
pub fn peer_disconnected_is_resumable<L: Deref>(&mut self, logger: &L) -> bool where L::Target: Logger {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about naming this is_disconnected_peer_resumable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not stateless, though, I wanted to communicate that its a "this peer has disconnected" notification, plus "should i discard the channel". Not sure how best to communicate it, this name is a bit awkward.

match &mut self.phase {
ChannelPhase::Undefined => unreachable!(),
ChannelPhase::Funded(_) => false,
ChannelPhase::Funded(chan) => chan.remove_uncommitted_htlcs_and_mark_paused(logger).is_ok(),
// If we get disconnected and haven't yet committed to a funding
// transaction, we can replay the `open_channel` on reconnection, so don't
// bother dropping the channel here. However, if we already committed to
// the funding transaction we don't yet support replaying the funding
// handshake (and bailing if the peer rejects it), so we force-close in
// that case.
ChannelPhase::UnfundedOutboundV1(chan) => chan.is_resumable(),
ChannelPhase::UnfundedInboundV1(_) => false,
ChannelPhase::UnfundedV2(_) => false,
}
}

pub fn maybe_get_open_channel<L: Deref>(
/// Should be called when the peer re-connects, returning an initial message which we should
/// send our peer to begin the channel reconnection process.
pub fn peer_connected_get_handshake<L: Deref>(
&mut self, chain_hash: ChainHash, logger: &L,
) -> Option<OpenChannelMessage> where L::Target: Logger {
) -> ReconnectionMsg where L::Target: Logger {
match &mut self.phase {
ChannelPhase::Undefined => unreachable!(),
ChannelPhase::Funded(_) => None,
ChannelPhase::Funded(chan) =>
ReconnectionMsg::Reestablish(chan.get_channel_reestablish(logger)),
ChannelPhase::UnfundedOutboundV1(chan) => {
let logger = WithChannelContext::from(logger, &chan.context, None);
chan.get_open_channel(chain_hash, &&logger)
.map(|msg| OpenChannelMessage::V1(msg))
chan.get_open_channel(chain_hash, logger)
.map(|msg| ReconnectionMsg::Open(OpenChannelMessage::V1(msg)))
.unwrap_or(ReconnectionMsg::None)
},
ChannelPhase::UnfundedInboundV1(_) => {
// Since unfunded inbound channel maps are cleared upon disconnecting a peer,
// they are not persisted and won't be recovered after a crash.
// Therefore, they shouldn't exist at this point.
debug_assert!(false);
None
ReconnectionMsg::None
},
#[cfg(dual_funding)]
ChannelPhase::UnfundedV2(chan) => {
if chan.context.is_outbound() {
Some(OpenChannelMessage::V2(chan.get_open_channel_v2(chain_hash)))
ReconnectionMsg::Open(OpenChannelMessage::V2(
chan.get_open_channel_v2(chain_hash)
))
} else {
// Since unfunded inbound channel maps are cleared upon disconnecting a peer,
// they are not persisted and won't be recovered after a crash.
// Therefore, they shouldn't exist at this point.
debug_assert!(false);
None
ReconnectionMsg::None
}
},
#[cfg(not(dual_funding))]
ChannelPhase::UnfundedV2(_) => {
debug_assert!(false);
None
},
ChannelPhase::UnfundedV2(_) => ReconnectionMsg::None,
}
}

Expand Down Expand Up @@ -6166,7 +6183,7 @@ impl<SP: Deref> FundedChannel<SP> where
/// No further message handling calls may be made until a channel_reestablish dance has
/// completed.
/// May return `Err(())`, which implies [`ChannelContext::force_shutdown`] should be called immediately.
pub fn remove_uncommitted_htlcs_and_mark_paused<L: Deref>(&mut self, logger: &L) -> Result<(), ()> where L::Target: Logger {
fn remove_uncommitted_htlcs_and_mark_paused<L: Deref>(&mut self, logger: &L) -> Result<(), ()> where L::Target: Logger {
assert!(!matches!(self.context.channel_state, ChannelState::ShutdownComplete));
if self.context.channel_state.is_pre_funded_state() {
return Err(())
Expand Down Expand Up @@ -8093,7 +8110,7 @@ impl<SP: Deref> FundedChannel<SP> where

/// May panic if called on a channel that wasn't immediately-previously
/// self.remove_uncommitted_htlcs_and_mark_paused()'d
pub fn get_channel_reestablish<L: Deref>(&mut self, logger: &L) -> msgs::ChannelReestablish where L::Target: Logger {
fn get_channel_reestablish<L: Deref>(&mut self, logger: &L) -> msgs::ChannelReestablish where L::Target: Logger {
assert!(self.context.channel_state.is_peer_disconnected());
assert_ne!(self.context.cur_counterparty_commitment_transaction_number, INITIAL_COMMITMENT_NUMBER);
// This is generally the first function which gets called on any given channel once we're
Expand Down
183 changes: 81 additions & 102 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::events::{self, Event, EventHandler, EventsProvider, InboundChannelFun
use crate::ln::inbound_payment;
use crate::ln::types::ChannelId;
use crate::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret};
use crate::ln::channel::{self, Channel, ChannelError, ChannelUpdateStatus, FundedChannel, ShutdownResult, UpdateFulfillCommitFetch, OutboundV1Channel, InboundV1Channel, WithChannelContext};
use crate::ln::channel::{self, Channel, ChannelError, ChannelUpdateStatus, FundedChannel, ShutdownResult, UpdateFulfillCommitFetch, OutboundV1Channel, ReconnectionMsg, InboundV1Channel, WithChannelContext};
#[cfg(any(dual_funding, splicing))]
use crate::ln::channel::PendingV2Channel;
use crate::ln::channel_state::ChannelDetails;
Expand Down Expand Up @@ -6513,12 +6513,11 @@ where
chan.context_mut().maybe_expire_prev_config();
let unfunded_context = chan.unfunded_context_mut().expect("channel should be unfunded");
if unfunded_context.should_expire_unfunded_channel() {
let context = chan.context();
let context = chan.context_mut();
let logger = WithChannelContext::from(&self.logger, context, None);
log_error!(logger,
"Force-closing pending channel with ID {} for not establishing in a timely manner",
context.channel_id());
let context = chan.context_mut();
let mut close_res = context.force_shutdown(false, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) });
locked_close_channel!(self, peer_state, context, close_res);
shutdown_channels.push(close_res);
Expand Down Expand Up @@ -9373,49 +9372,65 @@ This indicates a bug inside LDK. Please report this error at https://github.com/

// Returns whether we should remove this channel as it's just been closed.
let unblock_chan = |chan: &mut Channel<SP>, pending_msg_events: &mut Vec<MessageSendEvent>| -> Option<ShutdownResult> {
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
let node_id = chan.context().get_counterparty_node_id();
match (chan.signer_maybe_unblocked(self.chain_hash, &self.logger), chan.as_funded()) {
(Some(msgs), Some(funded_chan)) => {
let cu_msg = msgs.commitment_update.map(|updates| events::MessageSendEvent::UpdateHTLCs {
if let Some(msgs) = chan.signer_maybe_unblocked(self.chain_hash, &&logger) {
if let Some(msg) = msgs.open_channel {
pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
node_id,
updates,
msg,
});
let raa_msg = msgs.revoke_and_ack.map(|msg| events::MessageSendEvent::SendRevokeAndACK {
}
if let Some(msg) = msgs.funding_created {
pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
node_id,
msg,
});
match (cu_msg, raa_msg) {
(Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::CommitmentFirst => {
pending_msg_events.push(cu);
pending_msg_events.push(raa);
},
(Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::RevokeAndACKFirst => {
pending_msg_events.push(raa);
pending_msg_events.push(cu);
},
(Some(cu), _) => pending_msg_events.push(cu),
(_, Some(raa)) => pending_msg_events.push(raa),
(_, _) => {},
}
if let Some(msg) = msgs.funding_signed {
pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
node_id,
msg,
});
}
}
if let Some(msg) = msgs.accept_channel {
pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
node_id,
msg,
});
}
let cu_msg = msgs.commitment_update.map(|updates| events::MessageSendEvent::UpdateHTLCs {
node_id,
updates,
});
let raa_msg = msgs.revoke_and_ack.map(|msg| events::MessageSendEvent::SendRevokeAndACK {
node_id,
msg,
});
match (cu_msg, raa_msg) {
(Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::CommitmentFirst => {
pending_msg_events.push(cu);
pending_msg_events.push(raa);
},
(Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::RevokeAndACKFirst => {
pending_msg_events.push(raa);
pending_msg_events.push(cu);
},
(Some(cu), _) => pending_msg_events.push(cu),
(_, Some(raa)) => pending_msg_events.push(raa),
(_, _) => {},
}
if let Some(msg) = msgs.funding_signed {
pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
node_id,
msg,
});
}
if let Some(msg) = msgs.closing_signed {
pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
node_id,
msg,
});
}
if let Some(funded_chan) = chan.as_funded() {
if let Some(msg) = msgs.channel_ready {
send_channel_ready!(self, pending_msg_events, funded_chan, msg);
}
if let Some(msg) = msgs.closing_signed {
pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
node_id,
msg,
});
}
if let Some(broadcast_tx) = msgs.signed_closing_tx {
let channel_id = funded_chan.context.channel_id();
let counterparty_node_id = funded_chan.context.get_counterparty_node_id();
let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None);
log_info!(logger, "Broadcasting closing tx {}", log_tx!(broadcast_tx));
self.tx_broadcaster.broadcast_transactions(&[&broadcast_tx]);

Expand All @@ -9425,30 +9440,15 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
});
}
}
msgs.shutdown_result
},
(Some(msgs), None) => {
if let Some(msg) = msgs.open_channel {
pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
node_id,
msg,
});
}
if let Some(msg) = msgs.funding_created {
pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
node_id,
msg,
});
}
if let Some(msg) = msgs.accept_channel {
pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel {
node_id,
msg,
});
}
None
} else {
// We don't know how to handle a channel_ready or signed_closing_tx for a
// non-funded channel.
debug_assert!(msgs.channel_ready.is_none());
debug_assert!(msgs.signed_closing_tx.is_none());
}
(None, _) => None,
msgs.shutdown_result
} else {
None
}
};

Expand Down Expand Up @@ -11499,26 +11499,10 @@ where
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;
peer_state.channel_by_id.retain(|_, chan| {
match chan.as_funded_mut() {
Some(funded_chan) => {
let logger = WithChannelContext::from(&self.logger, &funded_chan.context, None);
if funded_chan.remove_uncommitted_htlcs_and_mark_paused(&&logger).is_ok() {
// We only retain funded channels that are not shutdown.
return true;
}
},
// If we get disconnected and haven't yet committed to a funding
// transaction, we can replay the `open_channel` on reconnection, so don't
// bother dropping the channel here. However, if we already committed to
// the funding transaction we don't yet support replaying the funding
// handshake (and bailing if the peer rejects it), so we force-close in
// that case.
None => {
if chan.is_resumable() {
return true;
}
},
};
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
if chan.peer_disconnected_is_resumable(&&logger) {
return true;
}
// Clean up for removal.
let context = chan.context_mut();
let mut close_res = context.force_shutdown(false, ClosureReason::DisconnectedPeer);
Expand Down Expand Up @@ -11662,30 +11646,25 @@ where
let pending_msg_events = &mut peer_state.pending_msg_events;

for (_, chan) in peer_state.channel_by_id.iter_mut() {
match chan.as_funded_mut() {
Some(funded_chan) => {
let logger = WithChannelContext::from(&self.logger, &funded_chan.context, None);
let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
ReconnectionMsg::Reestablish(msg) =>
pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
node_id: funded_chan.context.get_counterparty_node_id(),
msg: funded_chan.get_channel_reestablish(&&logger),
});
},
None => match chan.maybe_get_open_channel(self.chain_hash, &self.logger) {
Some(OpenChannelMessage::V1(msg)) => {
pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
node_id: chan.context().get_counterparty_node_id(),
msg,
});
},
#[cfg(dual_funding)]
Some(OpenChannelMessage::V2(msg)) => {
pending_msg_events.push(events::MessageSendEvent::SendOpenChannelV2 {
node_id: chan.context().get_counterparty_node_id(),
msg,
});
},
None => {},
},
node_id: chan.context().get_counterparty_node_id(),
msg,
}),
ReconnectionMsg::Open(OpenChannelMessage::V1(msg)) =>
pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
node_id: chan.context().get_counterparty_node_id(),
msg,
}),
#[cfg(dual_funding)]
ReconnectionMsg::Open(OpenChannelMessage::V2(msg)) =>
pending_msg_events.push(events::MessageSendEvent::SendOpenChannelV2 {
node_id: chan.context().get_counterparty_node_id(),
msg,
}),
ReconnectionMsg::None => {},
}
}
}
Expand Down