Skip to content

Commit 90256c6

Browse files
committed
Introduce EventQueueNotifier RAII type
Previously, when enqueuing new events to the `EventQueue`, we'd directly attempt to wake any notifiers/notify any threads waiting on the `Condvar` about the newly available events. This could of course mean we'd notify them while ourselves still holding some locks, e.g., on the peer state. Here, we instead introduce a `EventQueueNotifier` RAII type that will notify about pending events if necesssary, which mitigates any potential lock contention: we now simply have to ensure that any method calling `enqueue` holds the notifier before retrieving any locks.
1 parent 082ebde commit 90256c6

File tree

5 files changed

+71
-16
lines changed

5 files changed

+71
-16
lines changed

lightning-liquidity/src/events.rs

+47-15
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub(crate) struct EventQueue {
3131
queue: Arc<Mutex<VecDeque<Event>>>,
3232
waker: Arc<Mutex<Option<Waker>>>,
3333
#[cfg(feature = "std")]
34-
condvar: crate::sync::Condvar,
34+
condvar: Arc<crate::sync::Condvar>,
3535
}
3636

3737
impl EventQueue {
@@ -40,28 +40,20 @@ impl EventQueue {
4040
let waker = Arc::new(Mutex::new(None));
4141
#[cfg(feature = "std")]
4242
{
43-
let condvar = crate::sync::Condvar::new();
43+
let condvar = Arc::new(crate::sync::Condvar::new());
4444
Self { queue, waker, condvar }
4545
}
4646
#[cfg(not(feature = "std"))]
4747
Self { queue, waker }
4848
}
4949

5050
pub fn enqueue(&self, event: Event) {
51-
{
52-
let mut queue = self.queue.lock().unwrap();
53-
if queue.len() < MAX_EVENT_QUEUE_SIZE {
54-
queue.push_back(event);
55-
} else {
56-
return;
57-
}
58-
}
59-
60-
if let Some(waker) = self.waker.lock().unwrap().take() {
61-
waker.wake();
51+
let mut queue = self.queue.lock().unwrap();
52+
if queue.len() < MAX_EVENT_QUEUE_SIZE {
53+
queue.push_back(event);
54+
} else {
55+
return;
6256
}
63-
#[cfg(feature = "std")]
64-
self.condvar.notify_one();
6557
}
6658

6759
pub fn next_event(&self) -> Option<Event> {
@@ -98,6 +90,46 @@ impl EventQueue {
9890
pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
9991
self.queue.lock().unwrap().split_off(0).into()
10092
}
93+
94+
// Returns an [`EventQueueNotifier`] that will notify about new event when dropped.
95+
pub fn notifier(&self) -> EventQueueNotifier {
96+
#[cfg(feature = "std")]
97+
{
98+
EventQueueNotifier {
99+
queue: Arc::clone(&self.queue),
100+
waker: Arc::clone(&self.waker),
101+
condvar: Arc::clone(&self.condvar),
102+
}
103+
}
104+
#[cfg(not(feature = "std"))]
105+
{
106+
EventQueueNotifier { queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }
107+
}
108+
}
109+
}
110+
111+
// An RAII type that will notify about new events when dropped.
112+
#[must_use]
113+
pub(crate) struct EventQueueNotifier {
114+
queue: Arc<Mutex<VecDeque<Event>>>,
115+
waker: Arc<Mutex<Option<Waker>>>,
116+
#[cfg(feature = "std")]
117+
condvar: Arc<crate::sync::Condvar>,
118+
}
119+
120+
impl Drop for EventQueueNotifier {
121+
fn drop(&mut self) {
122+
let should_notify = !self.queue.lock().unwrap().is_empty();
123+
124+
if should_notify {
125+
if let Some(waker) = self.waker.lock().unwrap().take() {
126+
waker.wake();
127+
}
128+
129+
#[cfg(feature = "std")]
130+
self.condvar.notify_one();
131+
}
132+
}
101133
}
102134

103135
/// An event which you should probably take some action in response to.

lightning-liquidity/src/lsps0/client.rs

+2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ where
6161
fn handle_response(
6262
&self, response: LSPS0Response, counterparty_node_id: &PublicKey,
6363
) -> Result<(), LightningError> {
64+
let _event_queue_notifier = self.pending_events.notifier();
65+
6466
match response {
6567
LSPS0Response::ListProtocols(ListProtocolsResponse { protocols }) => {
6668
self.pending_events.enqueue(Event::LSPS0Client(

lightning-liquidity/src/lsps1/client.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,9 @@ where
104104
fn handle_get_info_response(
105105
&self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse,
106106
) -> Result<(), LightningError> {
107-
let outer_state_lock = self.per_peer_state.write().unwrap();
107+
let _event_queue_notifier = self.pending_events.notifier();
108108

109+
let outer_state_lock = self.per_peer_state.write().unwrap();
109110
match outer_state_lock.get(counterparty_node_id) {
110111
Some(inner_state_lock) => {
111112
let mut peer_state_lock = inner_state_lock.lock().unwrap();
@@ -142,6 +143,8 @@ where
142143
fn handle_get_info_error(
143144
&self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError,
144145
) -> Result<(), LightningError> {
146+
let _event_queue_notifier = self.pending_events.notifier();
147+
145148
let outer_state_lock = self.per_peer_state.read().unwrap();
146149
match outer_state_lock.get(counterparty_node_id) {
147150
Some(inner_state_lock) => {
@@ -213,6 +216,8 @@ where
213216
&self, request_id: RequestId, counterparty_node_id: &PublicKey,
214217
response: CreateOrderResponse,
215218
) -> Result<(), LightningError> {
219+
let _event_queue_notifier = self.pending_events.notifier();
220+
216221
let outer_state_lock = self.per_peer_state.read().unwrap();
217222
match outer_state_lock.get(counterparty_node_id) {
218223
Some(inner_state_lock) => {
@@ -254,6 +259,8 @@ where
254259
fn handle_create_order_error(
255260
&self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError,
256261
) -> Result<(), LightningError> {
262+
let _event_queue_notifier = self.pending_events.notifier();
263+
257264
let outer_state_lock = self.per_peer_state.read().unwrap();
258265
match outer_state_lock.get(counterparty_node_id) {
259266
Some(inner_state_lock) => {
@@ -326,6 +333,8 @@ where
326333
&self, request_id: RequestId, counterparty_node_id: &PublicKey,
327334
response: CreateOrderResponse,
328335
) -> Result<(), LightningError> {
336+
let _event_queue_notifier = self.pending_events.notifier();
337+
329338
let outer_state_lock = self.per_peer_state.read().unwrap();
330339
match outer_state_lock.get(counterparty_node_id) {
331340
Some(inner_state_lock) => {
@@ -367,6 +376,8 @@ where
367376
fn handle_get_order_error(
368377
&self, request_id: RequestId, counterparty_node_id: &PublicKey, error: ResponseError,
369378
) -> Result<(), LightningError> {
379+
let _event_queue_notifier = self.pending_events.notifier();
380+
370381
let outer_state_lock = self.per_peer_state.read().unwrap();
371382
match outer_state_lock.get(counterparty_node_id) {
372383
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/client.rs

+4
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ where
185185
fn handle_get_info_response(
186186
&self, request_id: RequestId, counterparty_node_id: &PublicKey, result: GetInfoResponse,
187187
) -> Result<(), LightningError> {
188+
let _event_queue_notifier = self.pending_events.notifier();
189+
188190
let outer_state_lock = self.per_peer_state.read().unwrap();
189191
match outer_state_lock.get(counterparty_node_id) {
190192
Some(inner_state_lock) => {
@@ -251,6 +253,8 @@ where
251253
fn handle_buy_response(
252254
&self, request_id: RequestId, counterparty_node_id: &PublicKey, result: BuyResponse,
253255
) -> Result<(), LightningError> {
256+
let _event_queue_notifier = self.pending_events.notifier();
257+
254258
let outer_state_lock = self.per_peer_state.read().unwrap();
255259
match outer_state_lock.get(counterparty_node_id) {
256260
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/service.rs

+6
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,8 @@ where
740740
&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
741741
payment_hash: PaymentHash,
742742
) -> Result<(), APIError> {
743+
let _event_queue_notifier = self.pending_events.notifier();
744+
743745
let peer_by_intercept_scid = self.peer_by_intercept_scid.read().unwrap();
744746
if let Some(counterparty_node_id) = peer_by_intercept_scid.get(&intercept_scid) {
745747
let outer_state_lock = self.per_peer_state.read().unwrap();
@@ -1029,6 +1031,8 @@ where
10291031
&self, request_id: RequestId, counterparty_node_id: &PublicKey, params: GetInfoRequest,
10301032
) -> Result<(), LightningError> {
10311033
let _msg_queue_notifier = self.pending_messages.notifier();
1034+
let _event_queue_notifier = self.pending_events.notifier();
1035+
10321036
let mut outer_state_lock = self.per_peer_state.write().unwrap();
10331037
let inner_state_lock =
10341038
get_or_insert_peer_state_entry!(self, outer_state_lock, counterparty_node_id);
@@ -1056,6 +1060,8 @@ where
10561060
&self, request_id: RequestId, counterparty_node_id: &PublicKey, params: BuyRequest,
10571061
) -> Result<(), LightningError> {
10581062
let _msg_queue_notifier = self.pending_messages.notifier();
1063+
let _event_queue_notifier = self.pending_events.notifier();
1064+
10591065
if let Some(payment_size_msat) = params.payment_size_msat {
10601066
if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
10611067
let response = LSPS2Response::BuyError(ResponseError {

0 commit comments

Comments
 (0)