Skip to content

Commit a23feec

Browse files
committed
fixup! Reset LSPS5 persistence_in_flight counter on persist errors
1 parent fe04494 commit a23feec

1 file changed

Lines changed: 71 additions & 65 deletions

File tree

lightning-liquidity/src/lsps5/service.rs

Lines changed: 71 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -251,85 +251,91 @@ where
251251
return Ok(false);
252252
}
253253

254-
let res = self.do_persist().await;
255-
debug_assert!(res.is_err() || self.persistence_in_flight.load(Ordering::Acquire) == 0);
256-
self.persistence_in_flight.store(0, Ordering::Release);
257-
res
258-
}
259-
260-
async fn do_persist(&self) -> Result<bool, lightning::io::Error> {
261254
let mut did_persist = false;
262255

263256
loop {
264-
let mut need_remove = Vec::new();
265-
let mut need_persist = Vec::new();
257+
match self.do_persist().await {
258+
Ok(pass_did_persist) => did_persist |= pass_did_persist,
259+
Err(e) => {
260+
self.persistence_in_flight.store(0, Ordering::Release);
261+
return Err(e);
262+
},
263+
}
266264

267-
self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
268-
{
269-
let outer_state_lock = self.per_peer_state.read().unwrap();
270-
271-
for (client_id, peer_state) in outer_state_lock.iter() {
272-
let is_prunable = peer_state.is_prunable();
273-
let has_open_channel = self.client_has_open_channel(client_id);
274-
if is_prunable && !has_open_channel {
275-
need_remove.push(*client_id);
276-
} else if peer_state.needs_persist {
277-
need_persist.push(*client_id);
278-
}
279-
}
265+
if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
266+
// If another thread incremented the state while we were running we should go
267+
// around again, but only once.
268+
self.persistence_in_flight.store(1, Ordering::Release);
269+
continue;
280270
}
271+
break;
272+
}
281273

282-
for client_id in need_persist.into_iter() {
283-
debug_assert!(!need_remove.contains(&client_id));
284-
self.persist_peer_state(client_id).await?;
285-
did_persist = true;
274+
Ok(did_persist)
275+
}
276+
277+
async fn do_persist(&self) -> Result<bool, lightning::io::Error> {
278+
let mut did_persist = false;
279+
let mut need_remove = Vec::new();
280+
let mut need_persist = Vec::new();
281+
282+
self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
283+
{
284+
let outer_state_lock = self.per_peer_state.read().unwrap();
285+
286+
for (client_id, peer_state) in outer_state_lock.iter() {
287+
let is_prunable = peer_state.is_prunable();
288+
let has_open_channel = self.client_has_open_channel(client_id);
289+
if is_prunable && !has_open_channel {
290+
need_remove.push(*client_id);
291+
} else if peer_state.needs_persist {
292+
need_persist.push(*client_id);
293+
}
286294
}
295+
}
287296

288-
for client_id in need_remove {
289-
let mut future_opt = None;
290-
{
291-
// We need to take the `per_peer_state` write lock to remove an entry, but also
292-
// have to hold it until after the `remove` call returns (but not through
293-
// future completion) to ensure that writes for the peer's state are
294-
// well-ordered with other `persist_peer_state` calls even across the removal
295-
// itself.
296-
let mut per_peer_state = self.per_peer_state.write().unwrap();
297-
if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
298-
let state = entry.get_mut();
299-
if state.is_prunable() && !self.client_has_open_channel(&client_id) {
300-
entry.remove();
301-
let key = client_id.to_string();
302-
future_opt = Some(self.kv_store.remove(
303-
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
304-
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
305-
&key,
306-
true,
307-
));
308-
} else {
309-
// If the peer was re-added, force a re-persist of the current state.
310-
state.needs_persist = true;
311-
}
297+
for client_id in need_persist.into_iter() {
298+
debug_assert!(!need_remove.contains(&client_id));
299+
self.persist_peer_state(client_id).await?;
300+
did_persist = true;
301+
}
302+
303+
for client_id in need_remove {
304+
let mut future_opt = None;
305+
{
306+
// We need to take the `per_peer_state` write lock to remove an entry, but also
307+
// have to hold it until after the `remove` call returns (but not through
308+
// future completion) to ensure that writes for the peer's state are
309+
// well-ordered with other `persist_peer_state` calls even across the removal
310+
// itself.
311+
let mut per_peer_state = self.per_peer_state.write().unwrap();
312+
if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
313+
let state = entry.get_mut();
314+
if state.is_prunable() && !self.client_has_open_channel(&client_id) {
315+
entry.remove();
316+
let key = client_id.to_string();
317+
future_opt = Some(self.kv_store.remove(
318+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
319+
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
320+
&key,
321+
true,
322+
));
312323
} else {
313-
// This should never happen, we can only have one `persist` call
314-
// in-progress at once and map entries are only removed by it.
315-
debug_assert!(false);
324+
// If the peer was re-added, force a re-persist of the current state.
325+
state.needs_persist = true;
316326
}
317-
}
318-
if let Some(future) = future_opt {
319-
future.await?;
320-
did_persist = true;
321327
} else {
322-
self.persist_peer_state(client_id).await?;
328+
// This should never happen, we can only have one `persist` call
329+
// in-progress at once and map entries are only removed by it.
330+
debug_assert!(false);
323331
}
324332
}
325-
326-
if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
327-
// If another thread incremented the state while we were running we should go
328-
// around again, but only once.
329-
self.persistence_in_flight.store(1, Ordering::Release);
330-
continue;
333+
if let Some(future) = future_opt {
334+
future.await?;
335+
did_persist = true;
336+
} else {
337+
self.persist_peer_state(client_id).await?;
331338
}
332-
break;
333339
}
334340

335341
Ok(did_persist)

0 commit comments

Comments
 (0)