diff --git a/src/config.rs b/src/config.rs index c2956021..e1872752 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,6 +23,7 @@ use crate::{ crypto::ed25519::Keypair, executor::{DefaultExecutor, Executor}, + metrics::MetricsRegistry, protocol::{ libp2p::{bitswap, identify, kademlia, ping}, mdns::Config as MdnsConfig, @@ -83,6 +84,9 @@ pub struct ConfigBuilder { #[cfg(feature = "websocket")] websocket: Option, + /// Metrics registry + metrics_registry: Option, + /// Keypair. keypair: Option, @@ -143,6 +147,7 @@ impl ConfigBuilder { webrtc: None, #[cfg(feature = "websocket")] websocket: None, + metrics_registry: None, keypair: None, ping: None, identify: None, @@ -187,6 +192,12 @@ impl ConfigBuilder { self } + /// Add metrics registry. + pub fn with_metrics_registry(mut self, registry: MetricsRegistry) -> Self { + self.metrics_registry = Some(registry); + self + } + /// Add keypair. /// /// If no keypair is specified, litep2p creates a new keypair. @@ -295,6 +306,7 @@ impl ConfigBuilder { webrtc: self.webrtc.take(), #[cfg(feature = "websocket")] websocket: self.websocket.take(), + metrics_registry: self.metrics_registry.take(), ping: self.ping.take(), identify: self.identify.take(), kademlia: self.kademlia.take(), @@ -328,6 +340,9 @@ pub struct Litep2pConfig { #[cfg(feature = "websocket")] pub(crate) websocket: Option, + /// Prometheus metrics registry. + pub(crate) metrics_registry: Option, + /// Keypair. pub(crate) keypair: Keypair, diff --git a/src/error.rs b/src/error.rs index 604d00e9..c96868b3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -127,6 +127,8 @@ pub enum Error { ConnectionLimit(ConnectionLimitsError), #[error("Failed to dial peer immediately")] ImmediateDialError(#[from] ImmediateDialError), + #[error("Invalid metric: `{0}`")] + MetricError(String), } /// Error type for address parsing. diff --git a/src/lib.rs b/src/lib.rs index 66e03289..4262cefa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,6 +72,7 @@ pub mod config; pub mod crypto; pub mod error; pub mod executor; +pub mod metrics; pub mod protocol; pub mod substream; pub mod transport; @@ -164,7 +165,8 @@ impl Litep2p { bandwidth_sink.clone(), litep2p_config.max_parallel_dials, litep2p_config.connection_limits, - ); + litep2p_config.metrics_registry.clone(), + )?; // add known addresses to `TransportManager`, if any exist if !litep2p_config.known_addresses.is_empty() { @@ -188,9 +190,14 @@ impl Litep2p { litep2p_config.keep_alive_timeout, ); let executor = Arc::clone(&litep2p_config.executor); - litep2p_config.executor.run(Box::pin(async move { - NotificationProtocol::new(service, config, executor).run().await - })); + let notification = NotificationProtocol::new( + service, + config, + executor, + litep2p_config.metrics_registry.clone(), + )?; + + litep2p_config.executor.run(Box::pin(async move { notification.run().await })); } // start request-response protocol event loops @@ -207,9 +214,15 @@ impl Litep2p { config.codec, litep2p_config.keep_alive_timeout, ); - litep2p_config.executor.run(Box::pin(async move { - RequestResponseProtocol::new(service, config).run().await - })); + let request_response = RequestResponseProtocol::new( + service, + config, + litep2p_config.metrics_registry.clone(), + )?; + + litep2p_config + .executor + .run(Box::pin(async move { request_response.run().await })); } // start user protocol event loops @@ -241,9 +254,13 @@ impl Litep2p { ping_config.codec, litep2p_config.keep_alive_timeout, ); - litep2p_config.executor.run(Box::pin(async move { - Ping::new(service, ping_config).run().await - })); + let ping = Ping::new( + service, + ping_config, + litep2p_config.metrics_registry.clone(), + )?; + + litep2p_config.executor.run(Box::pin(async move { ping.run().await })); } // start kademlia protocol event loop if enabled @@ -264,8 +281,14 @@ impl Litep2p { kademlia_config.codec, litep2p_config.keep_alive_timeout, ); + let kad = Kademlia::new( + service, + kademlia_config, + litep2p_config.metrics_registry.clone(), + )?; + litep2p_config.executor.run(Box::pin(async move { - let _ = Kademlia::new(service, kademlia_config).run().await; + let _ = kad.run().await; })); } @@ -313,8 +336,11 @@ impl Litep2p { // enable tcp transport if the config exists if let Some(config) = litep2p_config.tcp.take() { let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); - let (transport, transport_listen_addresses) = - ::new(handle, config)?; + let (transport, transport_listen_addresses) = ::new( + handle, + config, + litep2p_config.metrics_registry.clone(), + )?; for address in transport_listen_addresses { transport_manager.register_listen_address(address.clone()); @@ -330,8 +356,11 @@ impl Litep2p { #[cfg(feature = "quic")] if let Some(config) = litep2p_config.quic.take() { let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); - let (transport, transport_listen_addresses) = - ::new(handle, config)?; + let (transport, transport_listen_addresses) = ::new( + handle, + config, + litep2p_config.metrics_registry.clone(), + )?; for address in transport_listen_addresses { transport_manager.register_listen_address(address.clone()); @@ -348,7 +377,11 @@ impl Litep2p { if let Some(config) = litep2p_config.webrtc.take() { let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); let (transport, transport_listen_addresses) = - ::new(handle, config)?; + ::new( + handle, + config, + litep2p_config.metrics_registry.clone(), + )?; for address in transport_listen_addresses { transport_manager.register_listen_address(address.clone()); @@ -365,7 +398,11 @@ impl Litep2p { if let Some(config) = litep2p_config.websocket.take() { let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); let (transport, transport_listen_addresses) = - ::new(handle, config)?; + ::new( + handle, + config, + litep2p_config.metrics_registry.clone(), + )?; for address in transport_listen_addresses { transport_manager.register_listen_address(address.clone()); @@ -390,7 +427,11 @@ impl Litep2p { // if identify was enabled, give it the enabled protocols and listen addresses and start it if let Some((service, mut identify_config)) = identify_info.take() { identify_config.protocols = transport_manager.protocols().cloned().collect(); - let identify = Identify::new(service, identify_config); + let identify = Identify::new( + service, + identify_config, + litep2p_config.metrics_registry.clone(), + )?; litep2p_config.executor.run(Box::pin(async move { let _ = identify.run().await; diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 00000000..559e02a9 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,150 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// Copyright 2024 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A generic module for handling the metrics exposed by litep2p. +//! +//! Contains the traits and types that are used to define and interact with metrics. + +use crate::{utils::futures_stream::FuturesStream, Error}; +use futures::{Stream, StreamExt}; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +pub type MetricCounter = Arc; + +pub type MetricGauge = Arc; + +pub type MetricsRegistry = Arc; + +/// Represents a metric that can only go up. +pub trait MetricCounterT: Send + Sync { + /// Increment the counter by `value`. + fn inc(&self, value: u64); +} + +/// Represents a metric that can arbitrarily go up and down. +pub trait MetricGaugeT: Send + Sync { + /// Set the gauge to `value`. + fn set(&self, value: u64); + + /// Increment the gauge. + fn inc(&self); + + /// Decrement the gauge. + fn dec(&self); + + /// Add `value` to the gauge. + fn add(&self, value: u64); + + /// Subtract `value` from the gauge. + fn sub(&self, value: u64); +} + +/// A registry for metrics. +pub trait MetricsRegistryT: Send + Sync { + /// Register a new counter. + fn register_counter(&self, name: String, help: String) -> Result; + + /// Register a new gauge. + fn register_gauge(&self, name: String, help: String) -> Result; +} + +/// A scope for metrics that modifies a provided gauge in an RAII fashion. +/// +/// The gauge is incremented when constructed and decremented when the object is dropped. +#[derive(Clone)] +pub struct ScopeGaugeMetric { + inner: MetricGauge, +} + +impl ScopeGaugeMetric { + /// Create a new [`ScopeGaugeMetric`]. + pub fn new(inner: MetricGauge) -> Self { + inner.inc(); + ScopeGaugeMetric { inner } + } +} + +impl Drop for ScopeGaugeMetric { + fn drop(&mut self) { + self.inner.dec(); + } +} + +/// Wrapper around `FuturesStream` that provides information to the given metric. +#[derive(Default)] +pub struct MeteredFuturesStream { + stream: FuturesStream, + metric: Option, +} + +impl MeteredFuturesStream { + pub fn new(metric: Option) -> Self { + MeteredFuturesStream { + stream: FuturesStream::new(), + metric, + } + } + + pub fn push(&mut self, future: F) { + if let Some(ref metric) = self.metric { + metric.inc(); + } + + self.stream.push(future); + } + + /// Number of futures in the stream. + pub fn len(&self) -> usize { + self.stream.len() + } + + /// Returns `true` if the stream is empty. + pub fn is_empty(&self) -> bool { + self.stream.len() == 0 + } +} + +impl Stream for MeteredFuturesStream { + type Item = ::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let result = self.stream.poll_next_unpin(cx); + if result.is_ready() { + if let Some(ref metric) = self.metric { + metric.dec(); + } + } + result + } +} + +impl Drop for MeteredFuturesStream { + fn drop(&mut self) { + if let Some(ref metric) = self.metric { + metric.sub(self.len() as u64); + } + } +} diff --git a/src/protocol/libp2p/identify.rs b/src/protocol/libp2p/identify.rs index 39f74581..0813c37d 100644 --- a/src/protocol/libp2p/identify.rs +++ b/src/protocol/libp2p/identify.rs @@ -24,6 +24,7 @@ use crate::{ codec::ProtocolCodec, crypto::PublicKey, error::{Error, SubstreamError}, + metrics::{MetricGauge, MetricsRegistry}, protocol::{Direction, TransportEvent, TransportService}, substream::Substream, transport::Endpoint, @@ -186,12 +187,42 @@ pub(crate) struct Identify { /// Pending inbound substreams. pending_inbound: FuturesStream>, + + /// Metrics. + metrics: Option, +} + +struct Metrics { + peers: MetricGauge, + pending_outbound: MetricGauge, + pending_inbound: MetricGauge, } impl Identify { /// Create new [`Identify`] protocol. - pub(crate) fn new(service: TransportService, config: Config) -> Self { - Self { + pub(crate) fn new( + service: TransportService, + config: Config, + registry: Option, + ) -> Result { + let metrics = if let Some(registry) = registry { + Some(Metrics { + peers: registry + .register_gauge("litep2p_identify_peers".into(), "Connected peers".into())?, + pending_outbound: registry.register_gauge( + "litep2p_identify_pending_outbound".into(), + "Pending outbound substreams".into(), + )?, + pending_inbound: registry.register_gauge( + "litep2p_identify_pending_inbound".into(), + "Pending inbound substreams".into(), + )?, + }) + } else { + None + }; + + Ok(Self { service, tx: config.tx_event, peers: HashMap::new(), @@ -201,7 +232,8 @@ impl Identify { pending_inbound: FuturesStream::new(), pending_outbound: FuturesStream::new(), protocols: config.protocols.iter().map(|protocol| protocol.to_string()).collect(), - } + metrics, + }) } /// Connection established to remote peer. @@ -355,6 +387,12 @@ impl Identify { tracing::debug!(target: LOG_TARGET, "starting identify event loop"); loop { + if let Some(metrics) = &self.metrics { + metrics.peers.set(self.peers.len() as u64); + metrics.pending_inbound.set(self.pending_inbound.len() as u64); + metrics.pending_outbound.set(self.pending_outbound.len() as u64); + } + tokio::select! { event = self.service.next() => match event { None => { diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index e94a09f2..155c8316 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -89,6 +89,11 @@ impl QueryExecutor { } } + /// Get number of pending futures. + pub fn futures_len(&self) -> usize { + self.futures.len() + } + /// Send message to remote peer. pub fn send_message(&mut self, peer: PeerId, message: Bytes, mut substream: Substream) { self.futures.push(Box::pin(async move { diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 03b98ea2..1587d027 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -22,6 +22,7 @@ use crate::{ error::{Error, ImmediateDialError, SubstreamError}, + metrics::{MetricGauge, MetricsRegistry}, protocol::{ libp2p::kademlia::{ bucket::KBucketEntry, @@ -168,11 +169,30 @@ pub(crate) struct Kademlia { /// Query executor. executor: QueryExecutor, + + /// Metrics. + metrics: Option, +} + +struct Metrics { + peers: MetricGauge, + + mem_store_records: MetricGauge, + mem_store_providers: MetricGauge, + mem_store_local_providers: MetricGauge, + mem_store_provider_refresh: MetricGauge, + + engine_queries: MetricGauge, + executor_queries: MetricGauge, } impl Kademlia { /// Create new [`Kademlia`]. - pub(crate) fn new(mut service: TransportService, config: Config) -> Self { + pub(crate) fn new( + mut service: TransportService, + config: Config, + registry: Option, + ) -> Result { let local_peer_id = service.local_peer_id(); let local_key = Key::from(service.local_peer_id()); let mut routing_table = RoutingTable::new(local_key.clone()); @@ -193,7 +213,40 @@ impl Kademlia { }, ); - Self { + let metrics = if let Some(registry) = registry { + Some(Metrics { + peers: registry + .register_gauge("litep2p_kad_peers".into(), "Connected peers".into())?, + mem_store_records: registry.register_gauge( + "litep2p_kad_mem_store_records".into(), + "Number of records in memory store".into(), + )?, + mem_store_providers: registry.register_gauge( + "litep2p_kad_mem_store_providers".into(), + "Number of providers in memory store".into(), + )?, + mem_store_local_providers: registry.register_gauge( + "litep2p_kad_mem_store_local_providers".into(), + "Number of local providers in memory store".into(), + )?, + mem_store_provider_refresh: registry.register_gauge( + "litep2p_kad_mem_store_provider_refresh".into(), + "Number of provider refresh futures".into(), + )?, + engine_queries: registry.register_gauge( + "litep2p_kad_engine_queries".into(), + "Number of queries in the query engine".into(), + )?, + executor_queries: registry.register_gauge( + "litep2p_kad_executor_queries".into(), + "Number of queries in the query executor".into(), + )?, + }) + } else { + None + }; + + Ok(Self { service, routing_table, peers: HashMap::new(), @@ -210,7 +263,8 @@ impl Kademlia { record_ttl: config.record_ttl, replication_factor: config.replication_factor, engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR), - } + metrics, + }) } /// Allocate next query ID. @@ -877,6 +931,20 @@ impl Kademlia { tracing::debug!(target: LOG_TARGET, "starting kademlia event loop"); loop { + if let Some(metrics) = &self.metrics { + metrics.peers.set(self.peers.len() as u64); + + metrics.mem_store_records.set(self.store.records_len() as u64); + metrics.mem_store_providers.set(self.store.provider_keys_len() as u64); + metrics.mem_store_local_providers.set(self.store.local_providers_len() as u64); + metrics + .mem_store_provider_refresh + .set(self.store.pending_provider_refresh_len() as u64); + + metrics.engine_queries.set(self.engine.active_queries() as u64); + metrics.executor_queries.set(self.executor.futures_len() as u64); + } + // poll `QueryEngine` for next actions. while let Some(action) = self.engine.next_action() { if let Err((query, peer)) = self.on_query_action(action).await { @@ -1233,7 +1301,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let (transport_service, _tx) = TransportService::new( @@ -1264,7 +1334,7 @@ mod tests { }; ( - Kademlia::new(transport_service, config), + Kademlia::new(transport_service, config, None).unwrap(), Context { _cmd_tx, event_rx }, manager, ) diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 9f14a6de..f95a176c 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -216,6 +216,11 @@ impl QueryEngine { } } + /// Get number of active queries. + pub fn active_queries(&self) -> usize { + self.queries.len() + } + /// Start `FIND_NODE` query. pub fn start_find_node( &mut self, diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index efb39f0f..8286c0ad 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -350,6 +350,26 @@ impl MemoryStore { } }) } + + /// Get the number of records in the store. + pub fn records_len(&self) -> usize { + self.records.len() + } + + /// Get the number of provider keys in the store. + pub fn provider_keys_len(&self) -> usize { + self.provider_keys.len() + } + + /// Get the number of local providers in the store. + pub fn local_providers_len(&self) -> usize { + self.local_providers.len() + } + + /// Get the number of pending provider refreshes in the store. + pub fn pending_provider_refresh_len(&self) -> usize { + self.pending_provider_refresh.len() + } } pub struct MemoryStoreConfig { diff --git a/src/protocol/libp2p/ping/mod.rs b/src/protocol/libp2p/ping/mod.rs index efe6fad0..7358bc34 100644 --- a/src/protocol/libp2p/ping/mod.rs +++ b/src/protocol/libp2p/ping/mod.rs @@ -22,6 +22,7 @@ use crate::{ error::{Error, SubstreamError}, + metrics::{MetricGauge, MetricsRegistry}, protocol::{Direction, TransportEvent, TransportService}, substream::Substream, types::SubstreamId, @@ -77,19 +78,50 @@ pub(crate) struct Ping { /// Pending inbound substreams. pending_inbound: FuturesUnordered>>, + + /// Metrics. + metrics: Option, +} + +struct Metrics { + peers: MetricGauge, + pending_outbound: MetricGauge, + pending_inbound: MetricGauge, } impl Ping { /// Create new [`Ping`] protocol. - pub fn new(service: TransportService, config: Config) -> Self { - Self { + pub fn new( + service: TransportService, + config: Config, + registry: Option, + ) -> Result { + let metrics = if let Some(registry) = registry { + Some(Metrics { + peers: registry + .register_gauge("litep2p_ping_peers".into(), "Connected peers".into())?, + pending_outbound: registry.register_gauge( + "litep2p_ping_pending_outbound".into(), + "Pending outbound substreams".into(), + )?, + pending_inbound: registry.register_gauge( + "litep2p_ping_pending_inbound".into(), + "Pending inbound substreams".into(), + )?, + }) + } else { + None + }; + + Ok(Self { service, tx: config.tx_event, peers: HashSet::new(), pending_outbound: FuturesUnordered::new(), pending_inbound: FuturesUnordered::new(), _max_failures: config.max_failures, - } + metrics, + }) } /// Connection established to remote peer. @@ -168,6 +200,12 @@ impl Ping { tracing::debug!(target: LOG_TARGET, "starting ping event loop"); loop { + if let Some(metrics) = &self.metrics { + metrics.peers.set(self.peers.len() as u64); + metrics.pending_inbound.set(self.pending_inbound.len() as u64); + metrics.pending_outbound.set(self.pending_outbound.len() as u64); + } + tokio::select! { event = self.service.next() => match event { Some(TransportEvent::ConnectionEstablished { peer, .. }) => { @@ -192,7 +230,8 @@ impl Ping { Some(_) => {} None => return, }, - _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {} + _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => { + } event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => { match event { Some(Ok((peer, elapsed))) => { diff --git a/src/protocol/mdns.rs b/src/protocol/mdns.rs index d305a3f7..f5a8562d 100644 --- a/src/protocol/mdns.rs +++ b/src/protocol/mdns.rs @@ -355,7 +355,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let mdns1 = Mdns::new( handle1, @@ -378,7 +380,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let mdns2 = Mdns::new( handle2, diff --git a/src/protocol/notification/mod.rs b/src/protocol/notification/mod.rs index 33108d3f..d1d18385 100644 --- a/src/protocol/notification/mod.rs +++ b/src/protocol/notification/mod.rs @@ -23,6 +23,7 @@ use crate::{ error::{Error, SubstreamError}, executor::Executor, + metrics::{MetricGauge, MetricsRegistry}, protocol::{ self, notification::{ @@ -282,6 +283,19 @@ pub(crate) struct NotificationProtocol { /// Should `NotificationProtocol` attempt to dial the peer. should_dial: bool, + + /// Metrics. + metrics: Option, +} + +/// Request-response protocol metrics. +struct Metrics { + connected_peers: MetricGauge, + pending_outbound_num: MetricGauge, + pending_outbound_handshake_num: MetricGauge, + ready_substreams_handshake_num: MetricGauge, + pending_validations_num: MetricGauge, + timers_num: MetricGauge, } impl NotificationProtocol { @@ -289,10 +303,50 @@ impl NotificationProtocol { service: TransportService, config: Config, executor: Arc, - ) -> Self { + registry: Option, + ) -> Result { let (shutdown_tx, shutdown_rx) = channel(DEFAULT_CHANNEL_SIZE); - Self { + let metrics = if let Some(registry) = registry { + let protocol_name = config.protocol_name.to_metric_string(); + + Some(Metrics { + connected_peers: registry.register_gauge( + format!("litep2p_notif{}_connected_peers", protocol_name), + "Number of connected peers".to_string(), + )?, + pending_outbound_num: registry.register_gauge( + format!("litep2p_notif{}_pending_outbound_num", protocol_name), + "Number of pending outbound substreams".to_string(), + )?, + pending_outbound_handshake_num: registry.register_gauge( + format!( + "litep2p_notif{}_pending_outbound_handshake_num", + protocol_name + ), + "Number of pending outbound substreams with handshake".to_string(), + )?, + ready_substreams_handshake_num: registry.register_gauge( + format!( + "litep2p_notif{}_ready_substreams_handshake_num", + protocol_name + ), + "Number of ready substreams with handshake".to_string(), + )?, + pending_validations_num: registry.register_gauge( + format!("litep2p_notif{}_pending_validations_num", protocol_name), + "Number of pending substream validations".to_string(), + )?, + timers_num: registry.register_gauge( + format!("litep2p_notif{}_timers_num", protocol_name), + "Number of pending timers".to_string(), + )?, + }) + } else { + None + }; + + Ok(Self { service, shutdown_tx, shutdown_rx, @@ -310,7 +364,8 @@ impl NotificationProtocol { sync_channel_size: config.sync_channel_size, async_channel_size: config.async_channel_size, should_dial: config.should_dial, - } + metrics, + }) } /// Connection established to remote node. @@ -1605,8 +1660,24 @@ impl NotificationProtocol { } } + /// Report metrics. + fn report_metrics(&self) { + if let Some(metrics) = &self.metrics { + metrics.connected_peers.set(self.peers.len() as u64); + metrics.pending_outbound_num.set(self.pending_outbound.len() as u64); + metrics + .pending_outbound_handshake_num + .set(self.negotiation.substreams_len() as u64); + metrics.ready_substreams_handshake_num.set(self.negotiation.ready_len() as u64); + metrics.pending_validations_num.set(self.pending_validations.len() as u64); + metrics.timers_num.set(self.timers.len() as u64); + } + } + /// Handle next notification event. async fn next_event(&mut self) { + self.report_metrics(); + // biased select is used because the substream events must be prioritized above other events // that is because a closed substream is detected by either `substreams` or `negotiation` // and if that event is not handled with priority but, e.g., inbound substream is diff --git a/src/protocol/notification/negotiation.rs b/src/protocol/notification/negotiation.rs index 9c53c760..c57a61db 100644 --- a/src/protocol/notification/negotiation.rs +++ b/src/protocol/notification/negotiation.rs @@ -116,6 +116,16 @@ impl HandshakeService { } } + /// Number of substreams in the [`HandshakeService`]. + pub fn substreams_len(&self) -> usize { + self.substreams.len() + } + + /// Number of ready substreams in the [`HandshakeService`]. + pub fn ready_len(&self) -> usize { + self.ready.len() + } + /// Remove outbound substream from [`HandshakeService`]. pub fn remove_outbound(&mut self, peer: &PeerId) -> Option { self.substreams diff --git a/src/protocol/notification/tests/mod.rs b/src/protocol/notification/tests/mod.rs index 4aa48aa4..7f972b46 100644 --- a/src/protocol/notification/tests/mod.rs +++ b/src/protocol/notification/tests/mod.rs @@ -57,7 +57,9 @@ fn make_notification_protocol() -> ( BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let (transport_service, tx) = TransportService::new( @@ -84,7 +86,9 @@ fn make_notification_protocol() -> ( transport_service, config, std::sync::Arc::new(DefaultExecutor {}), - ), + None, + ) + .unwrap(), handle, manager, tx, diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index 3a64a386..8b12307b 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -22,6 +22,7 @@ use crate::{ error::{Error, NegotiationError, SubstreamError}, + metrics::{MetricGauge, MetricsRegistry}, multistream_select::NegotiationError::Failed as MultistreamFailed, protocol::{ request_response::handle::{InnerRequestResponseEvent, RequestResponseCommand}, @@ -183,12 +184,70 @@ pub(crate) struct RequestResponseProtocol { /// Maximum concurrent inbound requests, if specified. max_concurrent_inbound_requests: Option, + + /// Metrics. + metrics: Option, +} + +/// Request-response protocol metrics. +struct Metrics { + connected_peers: MetricGauge, + pending_outbound_num: MetricGauge, + pending_outbound_responses_num: MetricGauge, + pending_outbound_cancels_num: MetricGauge, + pending_inbound_num: MetricGauge, + pending_inbound_requests_num: MetricGauge, + pending_dials_num: MetricGauge, } impl RequestResponseProtocol { /// Create new [`RequestResponseProtocol`]. - pub(crate) fn new(service: TransportService, config: Config) -> Self { - Self { + pub(crate) fn new( + service: TransportService, + config: Config, + registry: Option, + ) -> Result { + let metrics = if let Some(registry) = registry { + let protocol_name = config.protocol_name.to_metric_string(); + + Some(Metrics { + connected_peers: registry.register_gauge( + format!("litep2p_req_res{}_connected_peers", protocol_name), + "Litep2p number of connected peers".into(), + )?, + pending_outbound_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_outbound", protocol_name), + "Litep2p number of pending outbound requests".into(), + )?, + pending_outbound_responses_num: registry.register_gauge( + format!( + "litep2p_req_res{}_pending_outbound_responses", + protocol_name + ), + "Litep2p number of pending outbound responses".into(), + )?, + pending_outbound_cancels_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_outbound_cancels", protocol_name), + "Litep2p number of pending outbound cancels".into(), + )?, + pending_inbound_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_inbound", protocol_name), + "Litep2p number of pending inbound requests".into(), + )?, + pending_inbound_requests_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_inbound_requests", protocol_name), + "Litep2p number of pending inbound requests".into(), + )?, + pending_dials_num: registry.register_gauge( + format!("litep2p_req_res{}_pending_dials", protocol_name), + "Litep2p number of pending dials".into(), + )?, + }) + } else { + None + }; + + Ok(Self { service, peers: HashMap::new(), timeout: config.timeout, @@ -203,7 +262,8 @@ impl RequestResponseProtocol { pending_inbound_requests: SubstreamSet::new(), pending_outbound_responses: FuturesUnordered::new(), max_concurrent_inbound_requests: config.max_concurrent_inbound_request, - } + metrics, + }) } /// Get next ephemeral request ID. @@ -1016,11 +1076,32 @@ impl RequestResponseProtocol { } } + /// Report metrics. + fn report_metrics(&self) { + if let Some(metrics) = &self.metrics { + metrics.connected_peers.set(self.peers.len() as u64); + metrics.pending_outbound_num.set(self.pending_outbound.len() as u64); + metrics + .pending_outbound_responses_num + .set(self.pending_outbound_responses.len() as u64); + metrics + .pending_outbound_cancels_num + .set(self.pending_outbound_cancels.len() as u64); + metrics.pending_inbound_num.set(self.pending_inbound.len() as u64); + metrics + .pending_inbound_requests_num + .set(self.pending_inbound_requests.len() as u64); + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + } + } + /// Start [`RequestResponseProtocol`] event loop. pub async fn run(mut self) { tracing::debug!(target: LOG_TARGET, "starting request-response event loop"); loop { + self.report_metrics(); + tokio::select! { // events coming from the network have higher priority than user commands as all user commands are // responses to network behaviour so ensure that the commands operate on the most up to date information. diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index 03cd891e..d8bca2b5 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -55,7 +55,9 @@ fn protocol() -> ( BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let (transport_service, tx) = TransportService::new( @@ -70,7 +72,7 @@ fn protocol() -> ( ConfigBuilder::new(ProtocolName::from("/req/1")).with_max_size(1024).build(); ( - RequestResponseProtocol::new(transport_service, config), + RequestResponseProtocol::new(transport_service, config, None).unwrap(), handle, manager, tx, diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 0af49eb1..a2dec9d1 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -81,6 +81,16 @@ impl ConnectionLimits { } } + /// Returns the number of established incoming connections. + pub fn num_incoming_connections(&self) -> usize { + self.incoming_connections.len() + } + + /// Returns the number of established outgoing connections. + pub fn num_outgoing_connections(&self) -> usize { + self.outgoing_connections.len() + } + /// Called when dialing an address. /// /// Returns the number of outgoing connections permitted to be established. diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index d7eba036..a15f6177 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -24,6 +24,7 @@ use crate::{ crypto::ed25519::Keypair, error::{AddressError, DialError, Error}, executor::Executor, + metrics::{MetricGauge, MetricsRegistry}, protocol::{InnerTransportEvent, TransportService}, transport::{ manager::{ @@ -254,6 +255,17 @@ pub struct TransportManager { /// Opening connections errors. opening_errors: HashMap>, + + /// Metrics. + metrics: Option, +} + +struct Metrics { + peers: MetricGauge, + pending_connections: MetricGauge, + incoming_connections: MetricGauge, + outgoing_connections: MetricGauge, + opening_errors: MetricGauge, } impl TransportManager { @@ -265,7 +277,8 @@ impl TransportManager { bandwidth_sink: BandwidthSink, max_parallel_dials: usize, connection_limits_config: limits::ConnectionLimitsConfig, - ) -> (Self, TransportManagerHandle) { + registry: Option, + ) -> Result<(Self, TransportManagerHandle), Error> { let local_peer_id = PeerId::from_public_key(&keypair.public().into()); let peers = Arc::new(RwLock::new(HashMap::new())); let (cmd_tx, cmd_rx) = channel(256); @@ -281,7 +294,32 @@ impl TransportManager { public_addresses.clone(), ); - ( + let metrics = if let Some(registry) = registry { + Some(Metrics { + peers: registry + .register_gauge("litep2p_manager_peers".into(), "Connected peers".into())?, + pending_connections: registry.register_gauge( + "litep2p_manager_pending_connections".into(), + "Pending connections".into(), + )?, + incoming_connections: registry.register_gauge( + "litep2p_manager_incoming_connections".into(), + "Incoming connections".into(), + )?, + outgoing_connections: registry.register_gauge( + "litep2p_manager_outgoing_connections".into(), + "Outgoing connections".into(), + )?, + opening_errors: registry.register_gauge( + "litep2p_manager_opening_errors".into(), + "Opening errors".into(), + )?, + }) + } else { + None + }; + + Ok(( Self { peers, cmd_rx, @@ -302,9 +340,10 @@ impl TransportManager { next_connection_id: Arc::new(AtomicUsize::new(0usize)), connection_limits: limits::ConnectionLimits::new(connection_limits_config), opening_errors: HashMap::new(), + metrics, }, handle, - ) + )) } /// Get iterator to installed protocols. @@ -999,9 +1038,28 @@ impl TransportManager { Ok(None) } + /// Report metrics. + fn report_metrics(&self) { + if let Some(metrics) = &self.metrics { + metrics.peers.set(self.peers.read().len() as u64); + metrics.pending_connections.set(self.pending_connections.len() as u64); + + metrics + .incoming_connections + .set(self.connection_limits.num_incoming_connections() as u64); + metrics + .outgoing_connections + .set(self.connection_limits.num_outgoing_connections() as u64); + + metrics.opening_errors.set(self.opening_errors.len() as u64); + } + } + /// Poll next event from [`crate::transport::manager::TransportManager`]. pub async fn next(&mut self) -> Option { loop { + self.report_metrics(); + tokio::select! { event = self.event_rx.recv() => { let Some(event) = event else { @@ -1508,8 +1566,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.register_protocol( ProtocolName::from("/notif/1"), Vec::new(), @@ -1535,8 +1594,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.register_protocol( ProtocolName::from("/notif/1"), Vec::new(), @@ -1565,8 +1625,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.register_protocol( ProtocolName::from("/notif/1"), vec![ @@ -1598,8 +1659,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); } @@ -1615,8 +1677,9 @@ mod tests { sink, 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); assert!(manager.dial(local_peer_id).await.is_err()); } @@ -1628,7 +1691,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1658,7 +1723,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let dial_address = Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) @@ -1720,7 +1787,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1751,7 +1820,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1796,7 +1867,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1815,7 +1888,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1848,8 +1923,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); // ipv6 let address = Multiaddr::empty() .with(Protocol::Ip6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1))) @@ -1910,7 +1986,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1977,7 +2055,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2064,7 +2144,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2149,7 +2231,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2258,7 +2342,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2354,7 +2440,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2463,7 +2551,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2567,7 +2657,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); let peer = PeerId::random(); @@ -2711,8 +2803,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager.on_dial_failure(ConnectionId::random()).unwrap(); } @@ -2730,7 +2823,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.on_connection_closed(PeerId::random(), ConnectionId::random()).unwrap(); } @@ -2748,7 +2843,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager .on_connection_opened( SupportedTransport::Tcp, @@ -2772,7 +2869,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2796,7 +2895,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2823,8 +2924,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); manager .on_open_failure(SupportedTransport::Tcp, ConnectionId::random()) .unwrap(); @@ -2844,7 +2946,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2864,8 +2968,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); assert!(manager.next().await.is_none()); } @@ -2877,8 +2982,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); let peer = { let peer = PeerId::random(); let mut peers = manager.peers.write(); @@ -2925,8 +3031,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); let peer = { let peer = PeerId::random(); let mut peers = manager.peers.write(); @@ -2988,8 +3095,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); let peer = { let peer = PeerId::random(); let mut peers = manager.peers.write(); @@ -3031,8 +3139,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); // transport doesn't start with ip/dns { let address = Multiaddr::empty().with(Protocol::P2p(Multihash::from(PeerId::random()))); @@ -3097,8 +3206,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); - + None, + ) + .unwrap(); async fn call_manager(manager: &mut TransportManager, address: Multiaddr) { match manager.dial_address(address).await { Err(Error::AddressError(AddressError::PeerIdMissing)) => {} @@ -3151,7 +3261,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let dial_address = Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) @@ -3237,7 +3349,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let dial_address = Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) @@ -3327,7 +3441,9 @@ mod tests { ConnectionLimitsConfig::default() .max_incoming_connections(Some(3)) .max_outgoing_connections(Some(2)), - ); + None, + ) + .unwrap(); // The connection limit is agnostic of the underlying transports. manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -3403,7 +3519,9 @@ mod tests { ConnectionLimitsConfig::default() .max_incoming_connections(Some(3)) .max_outgoing_connections(Some(2)), - ); + None, + ) + .unwrap(); // The connection limit is agnostic of the underlying transports. manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -3490,7 +3608,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); // Random peer ID. @@ -3543,7 +3663,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); // Random peer ID. @@ -3695,7 +3817,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let dial_address = Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) @@ -3781,7 +3905,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let peer = PeerId::random(); let connection_id = ConnectionId::from(0); diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 1d61ca9d..7a0f1eea 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -20,7 +20,10 @@ //! Transport protocol implementations provided by [`Litep2p`](`crate::Litep2p`). -use crate::{error::DialError, transport::manager::TransportHandle, types::ConnectionId, PeerId}; +use crate::{ + error::DialError, metrics::MetricsRegistry, transport::manager::TransportHandle, + types::ConnectionId, PeerId, +}; use futures::Stream; use multiaddr::Multiaddr; @@ -177,7 +180,11 @@ pub(crate) trait TransportBuilder { type Transport: Transport; /// Create new [`Transport`] object. - fn new(context: TransportHandle, config: Self::Config) -> crate::Result<(Self, Vec)> + fn new( + context: TransportHandle, + config: Self::Config, + registry: Option, + ) -> crate::Result<(Self, Vec)> where Self: Sized; } diff --git a/src/transport/quic/mod.rs b/src/transport/quic/mod.rs index 0cf5e255..12c5e0db 100644 --- a/src/transport/quic/mod.rs +++ b/src/transport/quic/mod.rs @@ -215,6 +215,7 @@ impl TransportBuilder for QuicTransport { fn new( context: TransportHandle, mut config: Self::Config, + _registry: Option, ) -> crate::Result<(Self, Vec)> where Self: Sized, @@ -623,7 +624,7 @@ mod tests { }; let (mut transport1, listen_addresses) = - QuicTransport::new(handle1, Default::default()).unwrap(); + QuicTransport::new(handle1, Default::default(), None).unwrap(); let listen_address = listen_addresses[0].clone(); let keypair2 = Keypair::generate(); @@ -648,7 +649,7 @@ mod tests { )]), }; - let (mut transport2, _) = QuicTransport::new(handle2, Default::default()).unwrap(); + let (mut transport2, _) = QuicTransport::new(handle2, Default::default(), None).unwrap(); let peer1: PeerId = PeerId::from_public_key(&keypair1.public().into()); let _peer2: PeerId = PeerId::from_public_key(&keypair2.public().into()); let listen_address = listen_address.with(Protocol::P2p( diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index d0d21fde..eb7aa831 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -25,6 +25,7 @@ use crate::{ noise::{self, NoiseSocket}, }, error::{Error, NegotiationError, SubstreamError}, + metrics::{MeteredFuturesStream, MetricGauge, ScopeGaugeMetric}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -37,11 +38,7 @@ use crate::{ BandwidthSink, PeerId, }; -use futures::{ - future::BoxFuture, - stream::{FuturesUnordered, StreamExt}, - AsyncRead, AsyncWrite, -}; +use futures::{future::BoxFuture, stream::StreamExt, AsyncRead, AsyncWrite}; use multiaddr::{Multiaddr, Protocol}; use tokio::net::TcpStream; use tokio_util::compat::{ @@ -140,6 +137,16 @@ impl NegotiatedConnection { } } +/// Connection specific metrics. +pub struct TcpConnectionMetrics { + /// Metric for the number of pending substreams that are negotiated. + pub pending_substreams_num: MetricGauge, + + /// Metric incremented when the connection starts + /// and decremented when the connection closes. + pub _active_connections_num: ScopeGaugeMetric, +} + /// TCP connection. pub struct TcpConnection { /// Protocol context. @@ -168,7 +175,11 @@ pub struct TcpConnection { /// Pending substreams. pending_substreams: - FuturesUnordered>>, + MeteredFuturesStream>>, + + /// Metric incremented when the connection starts + /// and decremented when the connection closes. + _active_connections_num: Option, } impl fmt::Debug for TcpConnection { @@ -187,6 +198,7 @@ impl TcpConnection { protocol_set: ProtocolSet, bandwidth_sink: BandwidthSink, next_substream_id: Arc, + metrics: Option, ) -> Self { let NegotiatedConnection { connection, @@ -196,6 +208,15 @@ impl TcpConnection { substream_open_timeout, } = context; + let (pending_substreams_num, _active_connections_num) = if let Some(metrics) = metrics { + ( + Some(metrics.pending_substreams_num), + Some(metrics._active_connections_num), + ) + } else { + (None, None) + }; + Self { protocol_set, connection, @@ -204,8 +225,9 @@ impl TcpConnection { endpoint, bandwidth_sink, next_substream_id, - pending_substreams: FuturesUnordered::new(), substream_open_timeout, + pending_substreams: MeteredFuturesStream::new(pending_substreams_num), + _active_connections_num, } } @@ -733,8 +755,12 @@ impl TcpConnection { return Ok(()); } }, - substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => { - self.handle_negotiated_substream(substream).await; + substream = self.pending_substreams.next(), if !self.pending_substreams.is_empty() => { + let Some(substream) = substream else { + continue; + }; + + self.handle_negotiated_substream(substream).await; } protocol = self.protocol_set.next() => { if self.handle_protocol_command(protocol).await? { diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 748e138d..10130d7b 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -24,6 +24,7 @@ use crate::{ config::Role, error::{DialError, Error}, + metrics::{MetricGauge, MetricsRegistry, ScopeGaugeMetric}, transport::{ common::listener::{DialAddresses, GetSocketAddr, SocketListener, TcpAddress}, manager::TransportHandle, @@ -37,6 +38,7 @@ use crate::{ utils::futures_stream::FuturesStream, }; +use connection::TcpConnectionMetrics; use futures::{ future::BoxFuture, stream::{AbortHandle, FuturesUnordered, Stream, StreamExt}, @@ -129,6 +131,34 @@ pub(crate) struct TcpTransport { /// Connections which have been opened and negotiated but are being validated by the /// `TransportManager`. pending_open: HashMap, + + /// Tcp metrics. + metrics: Option, +} + +/// TCP specific metrics. +struct TcpMetrics { + /// The following metrics are used for the transport itself. + pending_dials_num: MetricGauge, + pending_inbound_connections_num: MetricGauge, + pending_connections_num: MetricGauge, + pending_raw_connections_num: MetricGauge, + open_raw_connections_num: MetricGauge, + cancel_futures_num: MetricGauge, + pending_open_num: MetricGauge, + + /// The following metrics are shared with all TCP connections. + active_connections_num: MetricGauge, + pending_substreams_num: MetricGauge, +} + +impl TcpMetrics { + fn to_connection_metrics(&self) -> TcpConnectionMetrics { + TcpConnectionMetrics { + _active_connections_num: ScopeGaugeMetric::new(self.active_connections_num.clone()), + pending_substreams_num: self.pending_substreams_num.clone(), + } + } } impl TcpTransport { @@ -271,6 +301,7 @@ impl TransportBuilder for TcpTransport { fn new( context: TransportHandle, mut config: Self::Config, + registry: Option, ) -> crate::Result<(Self, Vec)> { tracing::debug!( target: LOG_TARGET, @@ -285,6 +316,51 @@ impl TransportBuilder for TcpTransport { config.nodelay, ); + let metrics = if let Some(registry) = registry { + Some(TcpMetrics { + pending_dials_num: registry.register_gauge( + "litep2p_tcp_pending_dials".into(), + "Litep2p number of pending dials".into(), + )?, + pending_inbound_connections_num: registry.register_gauge( + "litep2p_tcp_pending_inbound_connections".into(), + "Litep2p number of pending inbound connections".into(), + )?, + pending_connections_num: registry.register_gauge( + "litep2p_tcp_pending_connections".into(), + "Litep2p number of pending connections".into(), + )?, + pending_raw_connections_num: registry.register_gauge( + "litep2p_tcp_pending_raw_connections".into(), + "Litep2p number of pending raw connections".into(), + )?, + open_raw_connections_num: registry.register_gauge( + "litep2p_tcp_open_raw_connections".into(), + "Litep2p number of open raw connections".into(), + )?, + cancel_futures_num: registry.register_gauge( + "litep2p_tcp_cancel_futures".into(), + "Litep2p number of cancel futures".into(), + )?, + pending_open_num: registry.register_gauge( + "litep2p_tcp_pending_open".into(), + "Litep2p number of pending open connections".into(), + )?, + + active_connections_num: registry.register_gauge( + "litep2p_tcp_active_connections".into(), + "Litep2p number of active connections".into(), + )?, + + pending_substreams_num: registry.register_gauge( + "litep2p_tcp_pending_substreams".into(), + "Litep2p number of pending substreams".into(), + )?, + }) + } else { + None + }; + Ok(( Self { listener, @@ -298,6 +374,7 @@ impl TransportBuilder for TcpTransport { pending_connections: FuturesStream::new(), pending_raw_connections: FuturesStream::new(), cancel_futures: HashMap::new(), + metrics, }, listen_addresses, )) @@ -319,6 +396,7 @@ impl Transport for TcpTransport { let nodelay = self.config.nodelay; self.pending_dials.insert(connection_id, address.clone()); + self.pending_connections.push(Box::pin(async move { let (_, stream) = TcpTransport::dial_peer(address, dial_addresses, connection_open_timeout, nodelay) @@ -359,11 +437,17 @@ impl Transport for TcpTransport { "start connection", ); + let metrics = self.metrics.as_ref().map(|metrics| metrics.to_connection_metrics()); self.context.executor.run(Box::pin(async move { - if let Err(error) = - TcpConnection::new(context, protocol_set, bandwidth_sink, next_substream_id) - .start() - .await + if let Err(error) = TcpConnection::new( + context, + protocol_set, + bandwidth_sink, + next_substream_id, + metrics, + ) + .start() + .await { tracing::debug!( target: LOG_TARGET, @@ -500,6 +584,7 @@ impl Transport for TcpTransport { ); self.pending_dials.insert(connection_id, address); + self.pending_connections.push(Box::pin(async move { match tokio::time::timeout(connection_open_timeout, async move { TcpConnection::negotiate_connection( @@ -541,6 +626,20 @@ impl Stream for TcpTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + metrics + .pending_inbound_connections_num + .set(self.pending_inbound_connections.len() as u64); + metrics.pending_connections_num.set(self.pending_connections.len() as u64); + metrics + .pending_raw_connections_num + .set(self.pending_raw_connections.len() as u64); + metrics.open_raw_connections_num.set(self.opened_raw.len() as u64); + metrics.cancel_futures_num.set(self.cancel_futures.len() as u64); + metrics.pending_open_num.set(self.pending_open.len() as u64); + } + if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { return match event { None => { @@ -652,6 +751,10 @@ impl Stream for TcpTransport { let peer = connection.peer(); let endpoint = connection.endpoint(); self.pending_dials.remove(&connection.connection_id()); + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + } + self.pending_open.insert(connection.connection_id(), connection); return Poll::Ready(Some(TransportEvent::ConnectionEstablished { @@ -661,6 +764,10 @@ impl Stream for TcpTransport { } Err((connection_id, error)) => { if let Some(address) = self.pending_dials.remove(&connection_id) { + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + } + return Poll::Ready(Some(TransportEvent::DialFailure { connection_id, address, @@ -729,7 +836,7 @@ mod tests { }; let (mut transport1, listen_addresses) = - TcpTransport::new(handle1, transport_config1).unwrap(); + TcpTransport::new(handle1, transport_config1, None).unwrap(); let listen_address = listen_addresses[0].clone(); let keypair2 = Keypair::generate(); @@ -758,7 +865,7 @@ mod tests { ..Default::default() }; - let (mut transport2, _) = TcpTransport::new(handle2, transport_config2).unwrap(); + let (mut transport2, _) = TcpTransport::new(handle2, transport_config2, None).unwrap(); transport2.dial(ConnectionId::new(), listen_address).unwrap(); let (tx, mut from_transport2) = channel(64); @@ -822,7 +929,7 @@ mod tests { }; let (mut transport1, listen_addresses) = - TcpTransport::new(handle1, transport_config1).unwrap(); + TcpTransport::new(handle1, transport_config1, None).unwrap(); let listen_address = listen_addresses[0].clone(); let keypair2 = Keypair::generate(); @@ -851,7 +958,7 @@ mod tests { ..Default::default() }; - let (mut transport2, _) = TcpTransport::new(handle2, transport_config2).unwrap(); + let (mut transport2, _) = TcpTransport::new(handle2, transport_config2, None).unwrap(); transport2.dial(ConnectionId::new(), listen_address).unwrap(); let (tx, mut from_transport2) = channel(64); @@ -904,7 +1011,7 @@ mod tests { }, )]), }; - let (mut transport1, _) = TcpTransport::new(handle1, Default::default()).unwrap(); + let (mut transport1, _) = TcpTransport::new(handle1, Default::default(), None).unwrap(); tokio::spawn(async move { while let Some(event) = transport1.next().await { @@ -941,7 +1048,7 @@ mod tests { )]), }; - let (mut transport2, _) = TcpTransport::new(handle2, Default::default()).unwrap(); + let (mut transport2, _) = TcpTransport::new(handle2, Default::default(), None).unwrap(); let peer1: PeerId = PeerId::from_public_key(&keypair1.public().into()); let peer2: PeerId = PeerId::from_public_key(&keypair2.public().into()); @@ -980,7 +1087,9 @@ mod tests { BandwidthSink::new(), 8usize, ConnectionLimitsConfig::default(), - ); + None, + ) + .unwrap(); let handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport( SupportedTransport::Tcp, @@ -992,6 +1101,7 @@ mod tests { listen_addresses: vec!["/ip4/127.0.0.1/tcp/0".parse().unwrap()], ..Default::default() }, + None, ) .unwrap(); diff --git a/src/transport/webrtc/mod.rs b/src/transport/webrtc/mod.rs index 7dce743d..b401291e 100644 --- a/src/transport/webrtc/mod.rs +++ b/src/transport/webrtc/mod.rs @@ -423,7 +423,11 @@ impl TransportBuilder for WebRtcTransport { type Transport = WebRtcTransport; /// Create new [`Transport`] object. - fn new(context: TransportHandle, config: Self::Config) -> crate::Result<(Self, Vec)> + fn new( + context: TransportHandle, + config: Self::Config, + _registry: Option, + ) -> crate::Result<(Self, Vec)> where Self: Sized, { diff --git a/src/transport/websocket/connection.rs b/src/transport/websocket/connection.rs index 3635e655..c1df2032 100644 --- a/src/transport/websocket/connection.rs +++ b/src/transport/websocket/connection.rs @@ -25,6 +25,7 @@ use crate::{ noise::{self, NoiseSocket}, }, error::{Error, NegotiationError, SubstreamError}, + metrics::{MeteredFuturesStream, MetricGauge, ScopeGaugeMetric}, multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}, protocol::{Direction, Permit, ProtocolCommand, ProtocolSet}, substream, @@ -36,7 +37,7 @@ use crate::{ BandwidthSink, PeerId, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, AsyncRead, AsyncWrite, StreamExt}; +use futures::{future::BoxFuture, AsyncRead, AsyncWrite, StreamExt}; use multiaddr::{multihash::Multihash, Multiaddr, Protocol}; use tokio::net::TcpStream; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; @@ -130,6 +131,16 @@ impl NegotiatedConnection { } } +/// Connection specific metrics. +pub struct WebSocketConnectionMetrics { + /// Metric for the number of pending substreams that are negotiated. + pub pending_substreams_num: MetricGauge, + + /// Metric incremented when the connection starts + /// and decremented when the connection closes. + pub _active_connections_num: ScopeGaugeMetric, +} + /// WebSocket connection. pub(crate) struct WebSocketConnection { /// Protocol context. @@ -159,7 +170,11 @@ pub(crate) struct WebSocketConnection { /// Pending substreams. pending_substreams: - FuturesUnordered>>, + MeteredFuturesStream>>, + + /// Metric incremented when the connection starts + /// and decremented when the connection closes. + _active_connections_num: Option, } impl WebSocketConnection { @@ -169,6 +184,7 @@ impl WebSocketConnection { protocol_set: ProtocolSet, bandwidth_sink: BandwidthSink, substream_open_timeout: Duration, + metrics: Option, ) -> Self { let NegotiatedConnection { peer, @@ -177,6 +193,15 @@ impl WebSocketConnection { control, } = connection; + let (pending_substreams_num, _active_connections_num) = if let Some(metrics) = metrics { + ( + Some(metrics.pending_substreams_num), + Some(metrics._active_connections_num), + ) + } else { + (None, None) + }; + Self { connection_id: endpoint.connection_id(), protocol_set, @@ -186,7 +211,8 @@ impl WebSocketConnection { endpoint, bandwidth_sink, substream_open_timeout, - pending_substreams: FuturesUnordered::new(), + pending_substreams: MeteredFuturesStream::new(pending_substreams_num), + _active_connections_num, } } @@ -423,7 +449,7 @@ impl WebSocketConnection { /// Start connection event loop. pub(crate) async fn start(mut self) -> crate::Result<()> { self.protocol_set - .report_connection_established(self.peer, self.endpoint) + .report_connection_established(self.peer, self.endpoint.clone()) .await?; loop { @@ -474,7 +500,11 @@ impl WebSocketConnection { } }, // TODO: move this to a function - substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => { + substream = self.pending_substreams.next(), if !self.pending_substreams.is_empty() => { + let Some(substream) = substream else { + continue; + }; + match substream { // TODO: return error to protocol Err(error) => { diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 2435f639..fc2fcef8 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -23,6 +23,7 @@ use crate::{ config::Role, error::{AddressError, Error, NegotiationError}, + metrics::{MetricGauge, ScopeGaugeMetric}, transport::{ common::listener::{DialAddresses, GetSocketAddr, SocketListener, WebSocketAddress}, manager::TransportHandle, @@ -37,6 +38,7 @@ use crate::{ DialError, PeerId, }; +use connection::WebSocketConnectionMetrics; use futures::{ future::BoxFuture, stream::{AbortHandle, FuturesUnordered}, @@ -132,6 +134,34 @@ pub(crate) struct WebSocketTransport { /// Negotiated connections waiting validation. pending_open: HashMap, + + /// Websocket metrics. + metrics: Option, +} + +/// Websocket specific metrics. +struct WebSocketMetrics { + /// The following metrics are used for the transport itself. + pending_dials_num: MetricGauge, + pending_inbound_connections_num: MetricGauge, + pending_connections_num: MetricGauge, + pending_raw_connections_num: MetricGauge, + open_raw_connections_num: MetricGauge, + cancel_futures_num: MetricGauge, + pending_open_num: MetricGauge, + + /// The following metrics are shared with all TCP connections. + active_connections_num: MetricGauge, + pending_substreams_num: MetricGauge, +} + +impl WebSocketMetrics { + fn to_connection_metrics(&self) -> WebSocketConnectionMetrics { + WebSocketConnectionMetrics { + _active_connections_num: ScopeGaugeMetric::new(self.active_connections_num.clone()), + pending_substreams_num: self.pending_substreams_num.clone(), + } + } } impl WebSocketTransport { @@ -300,6 +330,7 @@ impl TransportBuilder for WebSocketTransport { fn new( context: TransportHandle, mut config: Self::Config, + registry: Option, ) -> crate::Result<(Self, Vec)> where Self: Sized, @@ -315,6 +346,51 @@ impl TransportBuilder for WebSocketTransport { config.nodelay, ); + let metrics = if let Some(registry) = registry { + Some(WebSocketMetrics { + pending_dials_num: registry.register_gauge( + "litep2p_websocket_pending_dials".into(), + "Litep2p number of pending dials".into(), + )?, + pending_inbound_connections_num: registry.register_gauge( + "litep2p_websocket_pending_inbound_connections".into(), + "Litep2p number of pending inbound connections".into(), + )?, + pending_connections_num: registry.register_gauge( + "litep2p_websocket_pending_connections".into(), + "Litep2p number of pending connections".into(), + )?, + pending_raw_connections_num: registry.register_gauge( + "litep2p_websocket_pending_raw_connections".into(), + "Litep2p number of pending raw connections".into(), + )?, + open_raw_connections_num: registry.register_gauge( + "litep2p_websocket_open_raw_connections".into(), + "Litep2p number of open raw connections".into(), + )?, + cancel_futures_num: registry.register_gauge( + "litep2p_websocket_cancel_futures".into(), + "Litep2p number of cancel futures".into(), + )?, + pending_open_num: registry.register_gauge( + "litep2p_websocket_pending_open".into(), + "Litep2p number of pending open connections".into(), + )?, + + active_connections_num: registry.register_gauge( + "litep2p_websocket_active_connections".into(), + "Litep2p number of active connections".into(), + )?, + + pending_substreams_num: registry.register_gauge( + "litep2p_websocket_pending_substreams".into(), + "Litep2p number of pending substreams".into(), + )?, + }) + } else { + None + }; + Ok(( Self { listener, @@ -328,6 +404,7 @@ impl TransportBuilder for WebSocketTransport { pending_connections: FuturesStream::new(), pending_raw_connections: FuturesStream::new(), cancel_futures: HashMap::new(), + metrics, }, listen_addresses, )) @@ -400,12 +477,14 @@ impl Transport for WebSocketTransport { "start connection", ); + let metrics = self.metrics.as_ref().map(|metrics| metrics.to_connection_metrics()); self.context.executor.run(Box::pin(async move { if let Err(error) = WebSocketConnection::new( context, protocol_set, bandwidth_sink, substream_open_timeout, + metrics, ) .start() .await @@ -587,6 +666,20 @@ impl Stream for WebSocketTransport { type Item = TransportEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(metrics) = &self.metrics { + metrics.pending_dials_num.set(self.pending_dials.len() as u64); + metrics + .pending_inbound_connections_num + .set(self.pending_inbound_connections.len() as u64); + metrics.pending_connections_num.set(self.pending_connections.len() as u64); + metrics + .pending_raw_connections_num + .set(self.pending_raw_connections.len() as u64); + metrics.open_raw_connections_num.set(self.opened_raw.len() as u64); + metrics.cancel_futures_num.set(self.cancel_futures.len() as u64); + metrics.pending_open_num.set(self.pending_open.len() as u64); + } + if let Poll::Ready(event) = self.listener.poll_next_unpin(cx) { return match event { None => { diff --git a/src/types/protocol.rs b/src/types/protocol.rs index adbfe8b1..426c01ad 100644 --- a/src/types/protocol.rs +++ b/src/types/protocol.rs @@ -85,6 +85,13 @@ impl PartialEq for ProtocolName { impl Eq for ProtocolName {} +impl ProtocolName { + /// Get the protocol name as a metrics valid string. + pub fn to_metric_string(&self) -> String { + self.to_string().replace("/", "_").replace("-", "_") + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/utils/futures_stream.rs b/src/utils/futures_stream.rs index 60ef7172..7f134794 100644 --- a/src/utils/futures_stream.rs +++ b/src/utils/futures_stream.rs @@ -45,7 +45,6 @@ impl FuturesStream { } /// Number of futures in the stream. - #[cfg(test)] pub fn len(&self) -> usize { self.futures.len() }