Skip to content

Commit 8cfa0ae

Browse files
committed
Introduce EventQueueNotifierGuard type
Previously, when enqueuing new events to the `EventQueue`, we'd directly attempt to wake any notifiers/notify any threads waiting on the `Condvar` about the newly available events. This could of course mean we'd notify them while ourselves still holding some locks, e.g., on the peer state. Here, we instead introduce a `EventQueueNotifierGuard` type that will notify about pending events if necesssary, which mitigates any potential lock contention: we now simply have to ensure that any method calling `enqueue` holds the notifier before retrieving any locks.
1 parent d16ee36 commit 8cfa0ae

File tree

5 files changed

+125
-20
lines changed

5 files changed

+125
-20
lines changed

lightning-liquidity/src/events/event_queue.rs

Lines changed: 103 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use alloc::collections::VecDeque;
55
use alloc::vec::Vec;
66

77
use core::future::Future;
8+
#[cfg(debug_assertions)]
9+
use core::sync::atomic::{AtomicU8, Ordering};
810
use core::task::{Poll, Waker};
911

1012
/// The maximum queue size we allow before starting to drop events.
@@ -14,37 +16,40 @@ pub(crate) struct EventQueue {
1416
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
1517
waker: Arc<Mutex<Option<Waker>>>,
1618
#[cfg(feature = "std")]
17-
condvar: crate::sync::Condvar,
19+
condvar: Arc<crate::sync::Condvar>,
20+
#[cfg(debug_assertions)]
21+
num_held_notifier_guards: Arc<AtomicU8>,
1822
}
1923

2024
impl EventQueue {
2125
pub fn new() -> Self {
2226
let queue = Arc::new(Mutex::new(VecDeque::new()));
2327
let waker = Arc::new(Mutex::new(None));
24-
#[cfg(feature = "std")]
25-
{
26-
let condvar = crate::sync::Condvar::new();
27-
Self { queue, waker, condvar }
28+
Self {
29+
queue,
30+
waker,
31+
#[cfg(feature = "std")]
32+
condvar: Arc::new(crate::sync::Condvar::new()),
33+
#[cfg(debug_assertions)]
34+
num_held_notifier_guards: Arc::new(AtomicU8::new(0)),
2835
}
29-
#[cfg(not(feature = "std"))]
30-
Self { queue, waker }
3136
}
3237

3338
pub fn enqueue<E: Into<LiquidityEvent>>(&self, event: E) {
39+
#[cfg(debug_assertions)]
3440
{
35-
let mut queue = self.queue.lock().unwrap();
36-
if queue.len() < MAX_EVENT_QUEUE_SIZE {
37-
queue.push_back(event.into());
38-
} else {
39-
return;
40-
}
41+
let num_held_notifier_guards = self.num_held_notifier_guards.load(Ordering::Relaxed);
42+
debug_assert!(
43+
num_held_notifier_guards > 0,
44+
"We should be holding at least one notifier guard whenever enqueuing new events"
45+
);
4146
}
42-
43-
if let Some(waker) = self.waker.lock().unwrap().take() {
44-
waker.wake();
47+
let mut queue = self.queue.lock().unwrap();
48+
if queue.len() < MAX_EVENT_QUEUE_SIZE {
49+
queue.push_back(event.into());
50+
} else {
51+
return;
4552
}
46-
#[cfg(feature = "std")]
47-
self.condvar.notify_one();
4853
}
4954

5055
pub fn next_event(&self) -> Option<LiquidityEvent> {
@@ -83,6 +88,81 @@ impl EventQueue {
8388
pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
8489
self.queue.lock().unwrap().split_off(0).into()
8590
}
91+
92+
// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
93+
pub fn notifier(&self) -> EventQueueNotifierGuard {
94+
#[cfg(debug_assertions)]
95+
{
96+
self.num_held_notifier_guards.fetch_add(1, Ordering::Relaxed);
97+
}
98+
EventQueueNotifierGuard {
99+
queue: Arc::clone(&self.queue),
100+
waker: Arc::clone(&self.waker),
101+
#[cfg(feature = "std")]
102+
condvar: Arc::clone(&self.condvar),
103+
#[cfg(debug_assertions)]
104+
num_held_notifier_guards: Arc::clone(&self.num_held_notifier_guards),
105+
}
106+
}
107+
}
108+
109+
impl Drop for EventQueue {
110+
fn drop(&mut self) {
111+
#[cfg(debug_assertions)]
112+
{
113+
let num_held_notifier_guards = self.num_held_notifier_guards.load(Ordering::Relaxed);
114+
debug_assert!(
115+
num_held_notifier_guards == 0,
116+
"We should not be holding any notifier guards when the event queue is dropped"
117+
);
118+
}
119+
}
120+
}
121+
122+
// A guard type that will notify about new events when dropped.
123+
#[must_use]
124+
pub(crate) struct EventQueueNotifierGuard {
125+
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
126+
waker: Arc<Mutex<Option<Waker>>>,
127+
#[cfg(feature = "std")]
128+
condvar: Arc<crate::sync::Condvar>,
129+
#[cfg(debug_assertions)]
130+
num_held_notifier_guards: Arc<AtomicU8>,
131+
}
132+
133+
impl Drop for EventQueueNotifierGuard {
134+
fn drop(&mut self) {
135+
let should_notify = !self.queue.lock().unwrap().is_empty();
136+
137+
if should_notify {
138+
if let Some(waker) = self.waker.lock().unwrap().take() {
139+
waker.wake();
140+
}
141+
142+
#[cfg(feature = "std")]
143+
self.condvar.notify_one();
144+
}
145+
146+
#[cfg(debug_assertions)]
147+
{
148+
let res = self.num_held_notifier_guards.fetch_update(
149+
Ordering::Relaxed,
150+
Ordering::Relaxed,
151+
|x| Some(x.saturating_sub(1)),
152+
);
153+
match res {
154+
Ok(previous_value) if previous_value == 0 => debug_assert!(
155+
false,
156+
"num_held_notifier_guards counter out-of-sync! This should never happen!"
157+
),
158+
Err(_) => debug_assert!(
159+
false,
160+
"num_held_notifier_guards counter out-of-sync! This should never happen!"
161+
),
162+
_ => {},
163+
}
164+
}
165+
}
86166
}
87167

88168
struct EventFuture {
@@ -129,6 +209,7 @@ mod tests {
129209
});
130210

131211
for _ in 0..3 {
212+
let _guard = event_queue.notifier();
132213
event_queue.enqueue(expected_event.clone());
133214
}
134215

@@ -154,13 +235,15 @@ mod tests {
154235
let mut delayed_enqueue = false;
155236

156237
for _ in 0..25 {
238+
let _guard = event_queue.notifier();
157239
event_queue.enqueue(expected_event.clone());
158240
enqueued_events.fetch_add(1, Ordering::SeqCst);
159241
}
160242

161243
loop {
162244
tokio::select! {
163245
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
246+
let _guard = event_queue.notifier();
164247
event_queue.enqueue(expected_event.clone());
165248
enqueued_events.fetch_add(1, Ordering::SeqCst);
166249
delayed_enqueue = true;
@@ -169,6 +252,7 @@ mod tests {
169252
assert_eq!(e, expected_event);
170253
received_events.fetch_add(1, Ordering::SeqCst);
171254

255+
let _guard = event_queue.notifier();
172256
event_queue.enqueue(expected_event.clone());
173257
enqueued_events.fetch_add(1, Ordering::SeqCst);
174258
}
@@ -201,6 +285,7 @@ mod tests {
201285
std::thread::spawn(move || {
202286
// Sleep a bit before we enqueue the events everybody is waiting for.
203287
std::thread::sleep(Duration::from_millis(20));
288+
let _guard = thread_queue.notifier();
204289
thread_queue.enqueue(thread_event.clone());
205290
thread_queue.enqueue(thread_event.clone());
206291
});

lightning-liquidity/src/lsps0/client.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ where
6161
fn handle_response(
6262
&self, response: LSPS0Response, counterparty_node_id: &PublicKey,
6363
) -> Result<(), LightningError> {
64+
let _event_queue_notifier = self.pending_events.notifier();
65+
6466
match response {
6567
LSPS0Response::ListProtocols(LSPS0ListProtocolsResponse { protocols }) => {
6668
self.pending_events.enqueue(LSPS0ClientEvent::ListProtocolsResponse {

lightning-liquidity/src/lsps1/client.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,9 @@ where
110110
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
111111
result: LSPS1GetInfoResponse,
112112
) -> Result<(), LightningError> {
113-
let outer_state_lock = self.per_peer_state.write().unwrap();
113+
let _event_queue_notifier = self.pending_events.notifier();
114114

115+
let outer_state_lock = self.per_peer_state.write().unwrap();
115116
match outer_state_lock.get(counterparty_node_id) {
116117
Some(inner_state_lock) => {
117118
let mut peer_state_lock = inner_state_lock.lock().unwrap();
@@ -147,6 +148,8 @@ where
147148
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
148149
error: LSPSResponseError,
149150
) -> Result<(), LightningError> {
151+
let _event_queue_notifier = self.pending_events.notifier();
152+
150153
let outer_state_lock = self.per_peer_state.read().unwrap();
151154
match outer_state_lock.get(counterparty_node_id) {
152155
Some(inner_state_lock) => {
@@ -224,6 +227,8 @@ where
224227
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
225228
response: LSPS1CreateOrderResponse,
226229
) -> Result<(), LightningError> {
230+
let _event_queue_notifier = self.pending_events.notifier();
231+
227232
let outer_state_lock = self.per_peer_state.read().unwrap();
228233
match outer_state_lock.get(counterparty_node_id) {
229234
Some(inner_state_lock) => {
@@ -266,6 +271,8 @@ where
266271
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
267272
error: LSPSResponseError,
268273
) -> Result<(), LightningError> {
274+
let _event_queue_notifier = self.pending_events.notifier();
275+
269276
let outer_state_lock = self.per_peer_state.read().unwrap();
270277
match outer_state_lock.get(counterparty_node_id) {
271278
Some(inner_state_lock) => {
@@ -343,6 +350,8 @@ where
343350
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
344351
response: LSPS1CreateOrderResponse,
345352
) -> Result<(), LightningError> {
353+
let _event_queue_notifier = self.pending_events.notifier();
354+
346355
let outer_state_lock = self.per_peer_state.read().unwrap();
347356
match outer_state_lock.get(counterparty_node_id) {
348357
Some(inner_state_lock) => {
@@ -385,6 +394,8 @@ where
385394
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
386395
error: LSPSResponseError,
387396
) -> Result<(), LightningError> {
397+
let _event_queue_notifier = self.pending_events.notifier();
398+
388399
let outer_state_lock = self.per_peer_state.read().unwrap();
389400
match outer_state_lock.get(counterparty_node_id) {
390401
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/client.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ where
191191
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
192192
result: LSPS2GetInfoResponse,
193193
) -> Result<(), LightningError> {
194+
let _event_queue_notifier = self.pending_events.notifier();
195+
194196
let outer_state_lock = self.per_peer_state.read().unwrap();
195197
match outer_state_lock.get(counterparty_node_id) {
196198
Some(inner_state_lock) => {
@@ -257,6 +259,8 @@ where
257259
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
258260
result: LSPS2BuyResponse,
259261
) -> Result<(), LightningError> {
262+
let _event_queue_notifier = self.pending_events.notifier();
263+
260264
let outer_state_lock = self.per_peer_state.read().unwrap();
261265
match outer_state_lock.get(counterparty_node_id) {
262266
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/service.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,8 @@ where
777777
&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
778778
payment_hash: PaymentHash,
779779
) -> Result<(), APIError> {
780+
let _event_queue_notifier = self.pending_events.notifier();
781+
780782
let peer_by_intercept_scid = self.peer_by_intercept_scid.read().unwrap();
781783
if let Some(counterparty_node_id) = peer_by_intercept_scid.get(&intercept_scid) {
782784
let outer_state_lock = self.per_peer_state.read().unwrap();
@@ -1066,6 +1068,7 @@ where
10661068
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
10671069
params: LSPS2GetInfoRequest,
10681070
) -> Result<(), LightningError> {
1071+
let _event_queue_notifier = self.pending_events.notifier();
10691072
let (result, response) = {
10701073
let mut outer_state_lock = self.per_peer_state.write().unwrap();
10711074
let inner_state_lock =
@@ -1085,7 +1088,6 @@ where
10851088
token: params.token,
10861089
};
10871090
self.pending_events.enqueue(event);
1088-
10891091
(Ok(()), msg)
10901092
},
10911093
(e, msg) => (e, msg),
@@ -1102,6 +1104,7 @@ where
11021104
fn handle_buy_request(
11031105
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
11041106
) -> Result<(), LightningError> {
1107+
let _event_queue_notifier = self.pending_events.notifier();
11051108
if let Some(payment_size_msat) = params.payment_size_msat {
11061109
if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
11071110
let response = LSPS2Response::BuyError(LSPSResponseError {

0 commit comments

Comments
 (0)