Skip to content

Commit b80059d

Browse files
committed
liquidity: Allow setting process_events callback in c_bindings
To trigger message processing, we previously had the user set a callback to `PeerHandler::process_events` via an `Fn()` callback. This is however not supported by `c_bindings`. Here, we therefore introduce as `ProcessMesssagesCallback` trait that can be used via `LiquidityManager::set_process_msgs_callback_fn`, which is exposed in `c_bindings`.
1 parent f92c4dc commit b80059d

File tree

3 files changed

+42
-74
lines changed

3 files changed

+42
-74
lines changed

lightning-liquidity/src/manager.rs

+13-60
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::lsps0::ser::{
77
LSPS_MESSAGE_TYPE_ID,
88
};
99
use crate::lsps0::service::LSPS0ServiceHandler;
10-
use crate::message_queue::MessageQueue;
10+
use crate::message_queue::{MessageQueue, ProcessMessagesCallback};
1111

1212
use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
1313
use crate::lsps1::msgs::LSPS1Message;
@@ -17,7 +17,7 @@ use crate::lsps1::service::{LSPS1ServiceConfig, LSPS1ServiceHandler};
1717
use crate::lsps2::client::{LSPS2ClientConfig, LSPS2ClientHandler};
1818
use crate::lsps2::msgs::LSPS2Message;
1919
use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler};
20-
use crate::prelude::{new_hash_map, new_hash_set, HashMap, HashSet, ToString, Vec};
20+
use crate::prelude::{new_hash_map, new_hash_set, Box, HashMap, HashSet, ToString, Vec};
2121
use crate::sync::{Arc, Mutex, RwLock};
2222

2323
use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
@@ -310,74 +310,27 @@ where {
310310
/// let process_msgs_pm = Arc::clone(&my_peer_manager);
311311
/// let process_msgs_callback = move || process_msgs_pm.process_events();
312312
///
313-
/// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback);
313+
/// my_liquidity_manager.set_process_msgs_callback(Box::new(process_msgs_callback));
314314
/// # }
315315
/// ```
316316
///
317317
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
318-
#[cfg(feature = "std")]
319-
pub fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) {
320-
self.pending_messages.set_process_msgs_callback(callback)
318+
pub fn set_process_msgs_callback(&self, callback: Box<dyn ProcessMessagesCallback>) {
319+
self.pending_messages.set_process_msgs_callback(callback);
321320
}
322321

323322
/// Allows to set a callback that will be called after new messages are pushed to the message
324323
/// queue.
325324
///
326-
/// Usually, you'll want to use this to call [`PeerManager::process_events`] to clear the
327-
/// message queue. For example:
325+
/// C bindings don't (currently) know how to map `Box<dyn Trait>`, and while it could add the
326+
/// following wrapper, doing it in the bindings is currently much more work than simply doing it
327+
/// here.
328328
///
329-
/// ```
330-
/// # use lightning::io;
331-
/// # use lightning_liquidity::LiquidityManager;
332-
/// # use std::sync::{Arc, RwLock};
333-
/// # use std::sync::atomic::{AtomicBool, Ordering};
334-
/// # use std::time::SystemTime;
335-
/// # struct MyStore {}
336-
/// # impl lightning::util::persist::KVStore for MyStore {
337-
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
338-
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
339-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
340-
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
341-
/// # }
342-
/// # struct MyEntropySource {}
343-
/// # impl lightning::sign::EntropySource for MyEntropySource {
344-
/// # fn get_secure_random_bytes(&self) -> [u8; 32] { [0u8; 32] }
345-
/// # }
346-
/// # struct MyEventHandler {}
347-
/// # impl MyEventHandler {
348-
/// # async fn handle_event(&self, _: lightning::events::Event) {}
349-
/// # }
350-
/// # #[derive(Eq, PartialEq, Clone, Hash)]
351-
/// # struct MySocketDescriptor {}
352-
/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
353-
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
354-
/// # fn disconnect_socket(&mut self) {}
355-
/// # }
356-
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface;
357-
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
358-
/// # type MyNodeSigner = dyn lightning::sign::NodeSigner;
359-
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup;
360-
/// # type MyFilter = dyn lightning::chain::Filter;
361-
/// # type MyLogger = dyn lightning::util::logger::Logger;
362-
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
363-
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
364-
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
365-
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
366-
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
367-
/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
368-
/// # type MyLiquidityManager = LiquidityManager<Arc<MyEntropySource>, Arc<MyChannelManager>, Arc<MyFilter>>;
369-
/// # fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_logger: Arc<MyLogger>, my_peer_manager: Arc<MyPeerManager>, my_liquidity_manager: Arc<MyLiquidityManager>) {
370-
/// let process_msgs_pm = Arc::clone(&my_peer_manager);
371-
/// let process_msgs_callback = move || process_msgs_pm.process_events();
372-
///
373-
/// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback);
374-
/// # }
375-
/// ```
376-
///
377-
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
378-
#[cfg(not(feature = "std"))]
379-
pub fn set_process_msgs_callback(&self, callback: impl Fn() + 'static) {
380-
self.pending_messages.set_process_msgs_callback(callback)
329+
/// Hence we simply allow setting a callback function that will be set via
330+
/// [`Self::set_process_msgs_callback`] internally.
331+
#[cfg(c_bindings)]
332+
pub fn set_process_msgs_callback_fn<F: 'static + ProcessMessagesCallback>(&self, callback: F) {
333+
self.set_process_msgs_callback(Box::new(callback));
381334
}
382335

383336
/// Blocks the current thread until next event is ready and returns it.

lightning-liquidity/src/message_queue.rs

+25-13
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@ use bitcoin::secp256k1::PublicKey;
1111
/// [`LiquidityManager`]: crate::LiquidityManager
1212
pub struct MessageQueue {
1313
queue: Mutex<VecDeque<(PublicKey, LSPSMessage)>>,
14-
#[cfg(feature = "std")]
15-
process_msgs_callback: RwLock<Option<Box<dyn Fn() + Send + Sync + 'static>>>,
16-
#[cfg(not(feature = "std"))]
17-
process_msgs_callback: RwLock<Option<Box<dyn Fn() + 'static>>>,
14+
process_msgs_callback: RwLock<Option<Box<dyn ProcessMessagesCallback>>>,
1815
}
1916

2017
impl MessageQueue {
@@ -24,14 +21,8 @@ impl MessageQueue {
2421
Self { queue, process_msgs_callback }
2522
}
2623

27-
#[cfg(feature = "std")]
28-
pub(crate) fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) {
29-
*self.process_msgs_callback.write().unwrap() = Some(Box::new(callback));
30-
}
31-
32-
#[cfg(not(feature = "std"))]
33-
pub(crate) fn set_process_msgs_callback(&self, callback: impl Fn() + 'static) {
34-
*self.process_msgs_callback.write().unwrap() = Some(Box::new(callback));
24+
pub(crate) fn set_process_msgs_callback(&self, callback: Box<dyn ProcessMessagesCallback>) {
25+
*self.process_msgs_callback.write().unwrap() = Some(callback);
3526
}
3627

3728
pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
@@ -45,7 +36,28 @@ impl MessageQueue {
4536
}
4637

4738
if let Some(process_msgs_callback) = self.process_msgs_callback.read().unwrap().as_ref() {
48-
(process_msgs_callback)()
39+
process_msgs_callback.call()
4940
}
5041
}
5142
}
43+
44+
macro_rules! define_callback { ($($bounds: path),*) => {
45+
/// A callback which will be called to trigger network message processing.
46+
///
47+
/// Usually, this should call [`PeerHandler::process_events`].
48+
///
49+
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
50+
pub trait ProcessMessagesCallback : $($bounds +)* {
51+
/// The method which is called.
52+
fn call(&self);
53+
}
54+
55+
impl<F: Fn() $(+ $bounds)*> ProcessMessagesCallback for F {
56+
fn call(&self) { (self)(); }
57+
}
58+
} }
59+
60+
#[cfg(feature = "std")]
61+
define_callback!(Send, Sync);
62+
#[cfg(not(feature = "std"))]
63+
define_callback!();

lightning-liquidity/tests/common/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,10 @@ pub(crate) fn create_liquidity_node(
478478

479479
let process_msgs_flag = Arc::clone(&check_msgs_processed);
480480
let process_msgs_callback = move || process_msgs_flag.store(true, Ordering::Release);
481-
liquidity_manager.set_process_msgs_callback(process_msgs_callback);
481+
#[cfg(not(c_bindings))]
482+
liquidity_manager.set_process_msgs_callback(Box::new(process_msgs_callback));
483+
#[cfg(c_bindings)]
484+
liquidity_manager.set_process_msgs_callback_fn(process_msgs_callback);
482485

483486
Node {
484487
channel_manager,

0 commit comments

Comments
 (0)