Skip to content

Commit e01663a

Browse files
authored
Merge pull request #3981 from tnull/2025-08-message-queue-notifier
`lightning-liquidity`: Introduce `MessageQueueNotifierGuard` type
2 parents 61e5819 + 309591a commit e01663a

File tree

10 files changed

+348
-398
lines changed

10 files changed

+348
-398
lines changed

lightning-liquidity/src/lsps0/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ where
5050
/// specifcation](https://github.com/lightning/blips/blob/master/blip-0050.md#lsps-specification-support-query)
5151
/// for more information.
5252
pub fn list_protocols(&self, counterparty_node_id: &PublicKey) {
53+
let mut message_queue_notifier = self.pending_messages.notifier();
54+
5355
let msg = LSPS0Message::Request(
5456
utils::generate_request_id(&self.entropy_source),
5557
LSPS0Request::ListProtocols(LSPS0ListProtocolsRequest {}),
5658
);
5759

58-
self.pending_messages.enqueue(counterparty_node_id, msg.into());
60+
message_queue_notifier.enqueue(counterparty_node_id, msg.into());
5961
}
6062

6163
fn handle_response(

lightning-liquidity/src/lsps0/service.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ impl LSPS0ServiceHandler {
4040
fn handle_request(
4141
&self, request_id: LSPSRequestId, request: LSPS0Request, counterparty_node_id: &PublicKey,
4242
) -> Result<(), lightning::ln::msgs::LightningError> {
43+
let mut message_queue_notifier = self.pending_messages.notifier();
44+
4345
match request {
4446
LSPS0Request::ListProtocols(_) => {
4547
let msg = LSPS0Message::Response(
@@ -48,7 +50,7 @@ impl LSPS0ServiceHandler {
4850
protocols: self.protocols.clone(),
4951
}),
5052
);
51-
self.pending_messages.enqueue(counterparty_node_id, msg.into());
53+
message_queue_notifier.enqueue(counterparty_node_id, msg.into());
5254
Ok(())
5355
},
5456
}

lightning-liquidity/src/lsps1/client.rs

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ where
9090
///
9191
/// [`SupportedOptionsReady`]: crate::lsps1::event::LSPS1ClientEvent::SupportedOptionsReady
9292
pub fn request_supported_options(&self, counterparty_node_id: PublicKey) -> LSPSRequestId {
93+
let mut message_queue_notifier = self.pending_messages.notifier();
94+
9395
let request_id = crate::utils::generate_request_id(&self.entropy_source);
9496
{
9597
let mut outer_state_lock = self.per_peer_state.write().unwrap();
@@ -102,7 +104,7 @@ where
102104

103105
let request = LSPS1Request::GetInfo(LSPS1GetInfoRequest {});
104106
let msg = LSPS1Message::Request(request_id.clone(), request).into();
105-
self.pending_messages.enqueue(&counterparty_node_id, msg);
107+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
106108
request_id
107109
}
108110

@@ -198,27 +200,21 @@ where
198200
&self, counterparty_node_id: &PublicKey, order: LSPS1OrderParams,
199201
refund_onchain_address: Option<Address>,
200202
) -> LSPSRequestId {
201-
let (request_id, request_msg) = {
202-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
203-
let inner_state_lock = outer_state_lock
204-
.entry(*counterparty_node_id)
205-
.or_insert(Mutex::new(PeerState::default()));
206-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
203+
let mut message_queue_notifier = self.pending_messages.notifier();
207204

208-
let request_id = crate::utils::generate_request_id(&self.entropy_source);
209-
let request = LSPS1Request::CreateOrder(LSPS1CreateOrderRequest {
210-
order,
211-
refund_onchain_address,
212-
});
213-
let msg = LSPS1Message::Request(request_id.clone(), request).into();
214-
peer_state_lock.pending_create_order_requests.insert(request_id.clone());
205+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
206+
let inner_state_lock = outer_state_lock
207+
.entry(*counterparty_node_id)
208+
.or_insert(Mutex::new(PeerState::default()));
209+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
215210

216-
(request_id, Some(msg))
217-
};
211+
let request_id = crate::utils::generate_request_id(&self.entropy_source);
212+
let request =
213+
LSPS1Request::CreateOrder(LSPS1CreateOrderRequest { order, refund_onchain_address });
214+
let msg = LSPS1Message::Request(request_id.clone(), request).into();
215+
peer_state_lock.pending_create_order_requests.insert(request_id.clone());
218216

219-
if let Some(msg) = request_msg {
220-
self.pending_messages.enqueue(&counterparty_node_id, msg);
221-
}
217+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
222218

223219
request_id
224220
}
@@ -322,26 +318,21 @@ where
322318
pub fn check_order_status(
323319
&self, counterparty_node_id: &PublicKey, order_id: LSPS1OrderId,
324320
) -> LSPSRequestId {
325-
let (request_id, request_msg) = {
326-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
327-
let inner_state_lock = outer_state_lock
328-
.entry(*counterparty_node_id)
329-
.or_insert(Mutex::new(PeerState::default()));
330-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
321+
let mut message_queue_notifier = self.pending_messages.notifier();
331322

332-
let request_id = crate::utils::generate_request_id(&self.entropy_source);
333-
peer_state_lock.pending_get_order_requests.insert(request_id.clone());
323+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
324+
let inner_state_lock = outer_state_lock
325+
.entry(*counterparty_node_id)
326+
.or_insert(Mutex::new(PeerState::default()));
327+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
334328

335-
let request =
336-
LSPS1Request::GetOrder(LSPS1GetOrderRequest { order_id: order_id.clone() });
337-
let msg = LSPS1Message::Request(request_id.clone(), request).into();
329+
let request_id = crate::utils::generate_request_id(&self.entropy_source);
330+
peer_state_lock.pending_get_order_requests.insert(request_id.clone());
338331

339-
(request_id, Some(msg))
340-
};
332+
let request = LSPS1Request::GetOrder(LSPS1GetOrderRequest { order_id: order_id.clone() });
333+
let msg = LSPS1Message::Request(request_id.clone(), request).into();
341334

342-
if let Some(msg) = request_msg {
343-
self.pending_messages.enqueue(&counterparty_node_id, msg);
344-
}
335+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
345336

346337
request_id
347338
}

lightning-liquidity/src/lsps1/service.rs

Lines changed: 74 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ where
177177
fn handle_get_info_request(
178178
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
179179
) -> Result<(), LightningError> {
180+
let mut message_queue_notifier = self.pending_messages.notifier();
181+
180182
let response = LSPS1Response::GetInfo(LSPS1GetInfoResponse {
181183
options: self
182184
.config
@@ -190,15 +192,17 @@ where
190192
});
191193

192194
let msg = LSPS1Message::Response(request_id, response).into();
193-
self.pending_messages.enqueue(counterparty_node_id, msg);
195+
message_queue_notifier.enqueue(counterparty_node_id, msg);
194196
Ok(())
195197
}
196198

197199
fn handle_create_order_request(
198200
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
199201
params: LSPS1CreateOrderRequest,
200202
) -> Result<(), LightningError> {
203+
let mut message_queue_notifier = self.pending_messages.notifier();
201204
let event_queue_notifier = self.pending_events.notifier();
205+
202206
if !is_valid(&params.order, &self.config.supported_options.as_ref().unwrap()) {
203207
let response = LSPS1Response::CreateOrderError(LSPSResponseError {
204208
code: LSPS1_CREATE_ORDER_REQUEST_ORDER_MISMATCH_ERROR_CODE,
@@ -209,7 +213,7 @@ where
209213
)),
210214
});
211215
let msg = LSPS1Message::Response(request_id, response).into();
212-
self.pending_messages.enqueue(counterparty_node_id, msg);
216+
message_queue_notifier.enqueue(counterparty_node_id, msg);
213217
return Err(LightningError {
214218
err: format!(
215219
"Client order does not match any supported options: {:?}",
@@ -250,66 +254,47 @@ where
250254
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
251255
payment: LSPS1PaymentInfo, created_at: LSPSDateTime,
252256
) -> Result<(), APIError> {
253-
let (result, response) = {
254-
let outer_state_lock = self.per_peer_state.read().unwrap();
255-
256-
match outer_state_lock.get(counterparty_node_id) {
257-
Some(inner_state_lock) => {
258-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
259-
260-
match peer_state_lock.pending_requests.remove(&request_id) {
261-
Some(LSPS1Request::CreateOrder(params)) => {
262-
let order_id = self.generate_order_id();
263-
let channel = OutboundCRChannel::new(
264-
params.order.clone(),
265-
created_at.clone(),
266-
order_id.clone(),
267-
payment.clone(),
268-
);
269-
270-
peer_state_lock.insert_outbound_channel(order_id.clone(), channel);
271-
272-
let response = LSPS1Response::CreateOrder(LSPS1CreateOrderResponse {
273-
order: params.order,
274-
order_id,
275-
order_state: LSPS1OrderState::Created,
276-
created_at,
277-
payment,
278-
channel: None,
279-
});
280-
281-
(Ok(()), Some(response))
282-
},
283-
284-
_ => (
285-
Err(APIError::APIMisuseError {
286-
err: format!(
287-
"No pending buy request for request_id: {:?}",
288-
request_id
289-
),
290-
}),
291-
None,
292-
),
293-
}
294-
},
295-
None => (
296-
Err(APIError::APIMisuseError {
297-
err: format!(
298-
"No state for the counterparty exists: {:?}",
299-
counterparty_node_id
300-
),
301-
}),
302-
None,
303-
),
304-
}
305-
};
257+
let mut message_queue_notifier = self.pending_messages.notifier();
306258

307-
if let Some(response) = response {
308-
let msg = LSPS1Message::Response(request_id, response).into();
309-
self.pending_messages.enqueue(counterparty_node_id, msg);
310-
}
259+
let outer_state_lock = self.per_peer_state.read().unwrap();
260+
match outer_state_lock.get(counterparty_node_id) {
261+
Some(inner_state_lock) => {
262+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
263+
264+
match peer_state_lock.pending_requests.remove(&request_id) {
265+
Some(LSPS1Request::CreateOrder(params)) => {
266+
let order_id = self.generate_order_id();
267+
let channel = OutboundCRChannel::new(
268+
params.order.clone(),
269+
created_at.clone(),
270+
order_id.clone(),
271+
payment.clone(),
272+
);
273+
274+
peer_state_lock.insert_outbound_channel(order_id.clone(), channel);
275+
276+
let response = LSPS1Response::CreateOrder(LSPS1CreateOrderResponse {
277+
order: params.order,
278+
order_id,
279+
order_state: LSPS1OrderState::Created,
280+
created_at,
281+
payment,
282+
channel: None,
283+
});
284+
let msg = LSPS1Message::Response(request_id, response).into();
285+
message_queue_notifier.enqueue(counterparty_node_id, msg);
286+
Ok(())
287+
},
311288

312-
result
289+
_ => Err(APIError::APIMisuseError {
290+
err: format!("No pending buy request for request_id: {:?}", request_id),
291+
}),
292+
}
293+
},
294+
None => Err(APIError::APIMisuseError {
295+
err: format!("No state for the counterparty exists: {:?}", counterparty_node_id),
296+
}),
297+
}
313298
}
314299

315300
fn handle_get_order_request(
@@ -376,54 +361,40 @@ where
376361
&self, request_id: LSPSRequestId, counterparty_node_id: PublicKey, order_id: LSPS1OrderId,
377362
order_state: LSPS1OrderState, channel: Option<LSPS1ChannelInfo>,
378363
) -> Result<(), APIError> {
379-
let (result, response) = {
380-
let outer_state_lock = self.per_peer_state.read().unwrap();
364+
let mut message_queue_notifier = self.pending_messages.notifier();
381365

382-
match outer_state_lock.get(&counterparty_node_id) {
383-
Some(inner_state_lock) => {
384-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
366+
let outer_state_lock = self.per_peer_state.read().unwrap();
385367

386-
if let Some(outbound_channel) =
387-
peer_state_lock.outbound_channels_by_order_id.get_mut(&order_id)
388-
{
389-
let config = &outbound_channel.config;
368+
match outer_state_lock.get(&counterparty_node_id) {
369+
Some(inner_state_lock) => {
370+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
390371

391-
let response = LSPS1Response::GetOrder(LSPS1CreateOrderResponse {
392-
order_id,
393-
order: config.order.clone(),
394-
order_state,
395-
created_at: config.created_at.clone(),
396-
payment: config.payment.clone(),
397-
channel,
398-
});
399-
(Ok(()), Some(response))
400-
} else {
401-
(
402-
Err(APIError::APIMisuseError {
403-
err: format!("Channel with order_id {} not found", order_id.0),
404-
}),
405-
None,
406-
)
407-
}
408-
},
409-
None => (
372+
if let Some(outbound_channel) =
373+
peer_state_lock.outbound_channels_by_order_id.get_mut(&order_id)
374+
{
375+
let config = &outbound_channel.config;
376+
377+
let response = LSPS1Response::GetOrder(LSPS1CreateOrderResponse {
378+
order_id,
379+
order: config.order.clone(),
380+
order_state,
381+
created_at: config.created_at.clone(),
382+
payment: config.payment.clone(),
383+
channel,
384+
});
385+
let msg = LSPS1Message::Response(request_id, response).into();
386+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
387+
Ok(())
388+
} else {
410389
Err(APIError::APIMisuseError {
411-
err: format!(
412-
"No existing state with counterparty {}",
413-
counterparty_node_id
414-
),
415-
}),
416-
None,
417-
),
418-
}
419-
};
420-
421-
if let Some(response) = response {
422-
let msg = LSPS1Message::Response(request_id, response).into();
423-
self.pending_messages.enqueue(&counterparty_node_id, msg);
390+
err: format!("Channel with order_id {} not found", order_id.0),
391+
})
392+
}
393+
},
394+
None => Err(APIError::APIMisuseError {
395+
err: format!("No existing state with counterparty {}", counterparty_node_id),
396+
}),
424397
}
425-
426-
result
427398
}
428399

429400
fn generate_order_id(&self) -> LSPS1OrderId {

lightning-liquidity/src/lsps2/client.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ where
118118
pub fn request_opening_params(
119119
&self, counterparty_node_id: PublicKey, token: Option<String>,
120120
) -> LSPSRequestId {
121+
let mut message_queue_notifier = self.pending_messages.notifier();
122+
121123
let request_id = crate::utils::generate_request_id(&self.entropy_source);
122124

123125
{
@@ -131,7 +133,7 @@ where
131133

132134
let request = LSPS2Request::GetInfo(LSPS2GetInfoRequest { token });
133135
let msg = LSPS2Message::Request(request_id.clone(), request).into();
134-
self.pending_messages.enqueue(&counterparty_node_id, msg);
136+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
135137

136138
request_id
137139
}
@@ -160,6 +162,8 @@ where
160162
&self, counterparty_node_id: PublicKey, payment_size_msat: Option<u64>,
161163
opening_fee_params: LSPS2OpeningFeeParams,
162164
) -> Result<LSPSRequestId, APIError> {
165+
let mut message_queue_notifier = self.pending_messages.notifier();
166+
163167
let request_id = crate::utils::generate_request_id(&self.entropy_source);
164168

165169
{
@@ -184,7 +188,7 @@ where
184188

185189
let request = LSPS2Request::Buy(LSPS2BuyRequest { opening_fee_params, payment_size_msat });
186190
let msg = LSPS2Message::Request(request_id.clone(), request).into();
187-
self.pending_messages.enqueue(&counterparty_node_id, msg);
191+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
188192

189193
Ok(request_id)
190194
}

0 commit comments

Comments
 (0)