Skip to content

Commit 4f15f5b

Browse files
committed
Introduce MessageQueueNotifierGuard type
Previously, when enqueuing new messages to the `MessageQueue`, we'd directly attempt to call back into `PeerHandler::process_events` (via the `process_msgs_callback`), which always held the risk of deadlocks on re-entrancy, i.e., when we would still hold the peer state mutexes when doing so. To mitigate this we had to always implement ~ugly patterns ensuring we don't hold the `Mutex` when we call `MessageQueue::enqueue`. Here, we instead introduce a `MessageQueueNotifierGuard` type that will call `process_msgs_callback` when dropped, which allows us to clean up the code and reduces the risk of reintroducing such deadlock bugs: we now simply have to ensure that any method calling `enqueue` holds the notifier before retrieving any locks.
1 parent aa2c6fe commit 4f15f5b

File tree

6 files changed

+208
-258
lines changed

6 files changed

+208
-258
lines changed

lightning-liquidity/src/lsps0/client.rs

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ where
4949
/// specifcation](https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS0#lsps-specification-support-query)
5050
/// for more information.
5151
pub fn list_protocols(&self, counterparty_node_id: &PublicKey) {
52+
let _msg_queue_notifier = self.pending_messages.notifier();
5253
let msg = LSPS0Message::Request(
5354
utils::generate_request_id(&self.entropy_source),
5455
LSPS0Request::ListProtocols(ListProtocolsRequest {}),

lightning-liquidity/src/lsps0/service.rs

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ impl LSPS0ServiceHandler {
3939
fn handle_request(
4040
&self, request_id: RequestId, request: LSPS0Request, counterparty_node_id: &PublicKey,
4141
) -> Result<(), lightning::ln::msgs::LightningError> {
42+
let _msg_queue_notifier = self.pending_messages.notifier();
4243
match request {
4344
LSPS0Request::ListProtocols(_) => {
4445
let msg = LSPS0Message::Response(

lightning-liquidity/src/lsps1/client.rs

+24-35
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ where
8484
///
8585
/// [`SupportedOptionsReady`]: crate::lsps1::event::LSPS1ClientEvent::SupportedOptionsReady
8686
pub fn request_supported_options(&self, counterparty_node_id: PublicKey) -> RequestId {
87+
let _msg_queue_notifier = self.pending_messages.notifier();
8788
let request_id = crate::utils::generate_request_id(&self.entropy_source);
8889
{
8990
let mut outer_state_lock = self.per_peer_state.write().unwrap();
@@ -191,25 +192,19 @@ where
191192
&self, counterparty_node_id: &PublicKey, order: OrderParameters,
192193
refund_onchain_address: Option<Address>,
193194
) -> RequestId {
194-
let (request_id, request_msg) = {
195-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
196-
let inner_state_lock = outer_state_lock
197-
.entry(*counterparty_node_id)
198-
.or_insert(Mutex::new(PeerState::default()));
199-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
200-
201-
let request_id = crate::utils::generate_request_id(&self.entropy_source);
202-
let request =
203-
LSPS1Request::CreateOrder(CreateOrderRequest { order, refund_onchain_address });
204-
let msg = LSPS1Message::Request(request_id.clone(), request).into();
205-
peer_state_lock.pending_create_order_requests.insert(request_id.clone());
195+
let _msg_queue_notifier = self.pending_messages.notifier();
196+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
197+
let inner_state_lock = outer_state_lock
198+
.entry(*counterparty_node_id)
199+
.or_insert(Mutex::new(PeerState::default()));
200+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
206201

207-
(request_id, Some(msg))
208-
};
209-
210-
if let Some(msg) = request_msg {
211-
self.pending_messages.enqueue(&counterparty_node_id, msg);
212-
}
202+
let request_id = crate::utils::generate_request_id(&self.entropy_source);
203+
let request =
204+
LSPS1Request::CreateOrder(CreateOrderRequest { order, refund_onchain_address });
205+
let msg = LSPS1Message::Request(request_id.clone(), request).into();
206+
self.pending_messages.enqueue(&counterparty_node_id, msg);
207+
peer_state_lock.pending_create_order_requests.insert(request_id.clone());
213208

214209
request_id
215210
}
@@ -310,25 +305,19 @@ where
310305
pub fn check_order_status(
311306
&self, counterparty_node_id: &PublicKey, order_id: OrderId,
312307
) -> RequestId {
313-
let (request_id, request_msg) = {
314-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
315-
let inner_state_lock = outer_state_lock
316-
.entry(*counterparty_node_id)
317-
.or_insert(Mutex::new(PeerState::default()));
318-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
319-
320-
let request_id = crate::utils::generate_request_id(&self.entropy_source);
321-
peer_state_lock.pending_get_order_requests.insert(request_id.clone());
308+
let _msg_queue_notifier = self.pending_messages.notifier();
309+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
310+
let inner_state_lock = outer_state_lock
311+
.entry(*counterparty_node_id)
312+
.or_insert(Mutex::new(PeerState::default()));
313+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
322314

323-
let request = LSPS1Request::GetOrder(GetOrderRequest { order_id: order_id.clone() });
324-
let msg = LSPS1Message::Request(request_id.clone(), request).into();
325-
326-
(request_id, Some(msg))
327-
};
315+
let request_id = crate::utils::generate_request_id(&self.entropy_source);
316+
peer_state_lock.pending_get_order_requests.insert(request_id.clone());
328317

329-
if let Some(msg) = request_msg {
330-
self.pending_messages.enqueue(&counterparty_node_id, msg);
331-
}
318+
let request = LSPS1Request::GetOrder(GetOrderRequest { order_id: order_id.clone() });
319+
let msg = LSPS1Message::Request(request_id.clone(), request).into();
320+
self.pending_messages.enqueue(&counterparty_node_id, msg);
332321

333322
request_id
334323
}

lightning-liquidity/src/lsps2/client.rs

+2
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ where
111111
pub fn request_opening_params(
112112
&self, counterparty_node_id: PublicKey, token: Option<String>,
113113
) -> RequestId {
114+
let _msg_queue_notifier = self.pending_messages.notifier();
114115
let request_id = crate::utils::generate_request_id(&self.entropy_source);
115116

116117
{
@@ -151,6 +152,7 @@ where
151152
&self, counterparty_node_id: PublicKey, payment_size_msat: Option<u64>,
152153
opening_fee_params: OpeningFeeParams,
153154
) -> Result<RequestId, APIError> {
155+
let _msg_queue_notifier = self.pending_messages.notifier();
154156
let request_id = crate::utils::generate_request_id(&self.entropy_source);
155157

156158
{

0 commit comments

Comments
 (0)