Skip to content

Commit be92584

Browse files
committed
Add custom message handler to peer manager
1 parent 6a94ff9 commit be92584

File tree

6 files changed

+236
-75
lines changed

6 files changed

+236
-75
lines changed

fuzz/src/full_stack.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use lightning::chain::transaction::OutPoint;
3434
use lightning::chain::keysinterface::{InMemorySigner, KeysInterface};
3535
use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
3636
use lightning::ln::channelmanager::{ChainParameters, ChannelManager};
37-
use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor};
37+
use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor,IgnoringCustomMessageHandler};
3838
use lightning::ln::msgs::DecodeError;
3939
use lightning::ln::script::ShutdownScript;
4040
use lightning::routing::router::get_route;
@@ -160,7 +160,7 @@ type ChannelMan = ChannelManager<
160160
EnforcingSigner,
161161
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
162162
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
163-
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>>;
163+
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringCustomMessageHandler>;
164164

165165
struct MoneyLossDetector<'a> {
166166
manager: Arc<ChannelMan>,
@@ -377,7 +377,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
377377
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
378378
chan_handler: channelmanager.clone(),
379379
route_handler: net_graph_msg_handler.clone(),
380-
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger)));
380+
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringCustomMessageHandler{}));
381381

382382
let mut should_forward = false;
383383
let mut payments_received: Vec<PaymentHash> = Vec::new();

lightning-background-processor/src/lib.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use lightning::chain::keysinterface::{Sign, KeysInterface};
1616
use lightning::ln::channelmanager::ChannelManager;
1717
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
1818
use lightning::ln::peer_handler::{PeerManager, SocketDescriptor};
19+
use lightning::ln::peer_handler::CustomMessageHandler;
1920
use lightning::util::events::{EventHandler, EventsProvider};
2021
use lightning::util::logger::Logger;
2122
use std::sync::Arc;
@@ -123,7 +124,8 @@ impl BackgroundProcessor {
123124
CMP: 'static + Send + ChannelManagerPersister<Signer, CW, T, K, F, L>,
124125
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
125126
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
126-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L>> + Send + Sync,
127+
UMH: 'static + Deref + Send + Sync,
128+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
127129
>
128130
(persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self
129131
where
@@ -136,6 +138,7 @@ impl BackgroundProcessor {
136138
P::Target: 'static + channelmonitor::Persist<Signer>,
137139
CMH::Target: 'static + ChannelMessageHandler,
138140
RMH::Target: 'static + RoutingMessageHandler,
141+
UMH::Target: 'static + CustomMessageHandler,
139142
{
140143
let stop_thread = Arc::new(AtomicBool::new(false));
141144
let stop_thread_clone = stop_thread.clone();
@@ -244,7 +247,7 @@ mod tests {
244247
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
245248
use lightning::ln::features::InitFeatures;
246249
use lightning::ln::msgs::{ChannelMessageHandler, Init};
247-
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
250+
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringCustomMessageHandler};
248251
use lightning::util::config::UserConfig;
249252
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
250253
use lightning::util::ser::Writeable;
@@ -272,7 +275,7 @@ mod tests {
272275

273276
struct Node {
274277
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
275-
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>>>,
278+
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringCustomMessageHandler>>,
276279
chain_monitor: Arc<ChainMonitor>,
277280
persister: Arc<FilesystemPersister>,
278281
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -313,7 +316,7 @@ mod tests {
313316
let params = ChainParameters { network, best_block };
314317
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
315318
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
316-
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone()));
319+
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone(), IgnoringCustomMessageHandler{}));
317320
let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block };
318321
nodes.push(node);
319322
}

lightning-net-tokio/src/lib.rs

+15-10
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
8080

8181
use lightning::ln::peer_handler;
8282
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
83+
use lightning::ln::peer_handler::CustomMessageHandler;
8384
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
8485
use lightning::util::logger::Logger;
8586

@@ -119,10 +120,11 @@ struct Connection {
119120
id: u64,
120121
}
121122
impl Connection {
122-
async fn schedule_read<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
123+
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
123124
CMH: ChannelMessageHandler + 'static,
124125
RMH: RoutingMessageHandler + 'static,
125-
L: Logger + 'static + ?Sized {
126+
L: Logger + 'static + ?Sized,
127+
UMH: CustomMessageHandler + 'static {
126128
// 8KB is nice and big but also should never cause any issues with stack overflowing.
127129
let mut buf = [0; 8192];
128130

@@ -222,10 +224,11 @@ impl Connection {
222224
/// The returned future will complete when the peer is disconnected and associated handling
223225
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
224226
/// not need to poll the provided future in order to make progress.
225-
pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
227+
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
226228
CMH: ChannelMessageHandler + 'static + Send + Sync,
227229
RMH: RoutingMessageHandler + 'static + Send + Sync,
228-
L: Logger + 'static + ?Sized + Send + Sync {
230+
L: Logger + 'static + ?Sized + Send + Sync,
231+
UMH: CustomMessageHandler + 'static + Send + Sync {
229232
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
230233
#[cfg(debug_assertions)]
231234
let last_us = Arc::clone(&us);
@@ -262,10 +265,11 @@ pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<So
262265
/// The returned future will complete when the peer is disconnected and associated handling
263266
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
264267
/// not need to poll the provided future in order to make progress.
265-
pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
268+
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
266269
CMH: ChannelMessageHandler + 'static + Send + Sync,
267270
RMH: RoutingMessageHandler + 'static + Send + Sync,
268-
L: Logger + 'static + ?Sized + Send + Sync {
271+
L: Logger + 'static + ?Sized + Send + Sync,
272+
UMH: CustomMessageHandler + 'static + Send + Sync {
269273
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
270274
#[cfg(debug_assertions)]
271275
let last_us = Arc::clone(&us);
@@ -332,10 +336,11 @@ pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<S
332336
/// disconnected and associated handling futures are freed, though, because all processing in said
333337
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
334338
/// make progress.
335-
pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
339+
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
336340
CMH: ChannelMessageHandler + 'static + Send + Sync,
337341
RMH: RoutingMessageHandler + 'static + Send + Sync,
338-
L: Logger + 'static + ?Sized + Send + Sync {
342+
L: Logger + 'static + ?Sized + Send + Sync,
343+
UMH: CustomMessageHandler + 'static + Send + Sync {
339344
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
340345
Some(setup_outbound(peer_manager, their_node_id, stream))
341346
} else { None }
@@ -563,7 +568,7 @@ mod tests {
563568
let a_manager = Arc::new(PeerManager::new(MessageHandler {
564569
chan_handler: Arc::clone(&a_handler),
565570
route_handler: Arc::clone(&a_handler),
566-
}, a_key.clone(), &[1; 32], Arc::new(TestLogger())));
571+
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringCustomMessageHandler{})));
567572

568573
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
569574
let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
@@ -577,7 +582,7 @@ mod tests {
577582
let b_manager = Arc::new(PeerManager::new(MessageHandler {
578583
chan_handler: Arc::clone(&b_handler),
579584
route_handler: Arc::clone(&b_handler),
580-
}, b_key.clone(), &[2; 32], Arc::new(TestLogger())));
585+
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringCustomMessageHandler{})));
581586

582587
// We bind on localhost, hoping the environment is properly configured with a local
583588
// address. This may not always be the case in containers and the like, so if this test is

lightning/src/ln/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub(crate) mod peer_channel_encryptor;
3636

3737
mod channel;
3838
mod onion_utils;
39-
mod wire;
39+
pub mod wire;
4040

4141
// Older rustc (which we support) refuses to let us call the get_payment_preimage_hash!() macro
4242
// without the node parameter being mut. This is incorrect, and thus newer rustcs will complain

0 commit comments

Comments
 (0)