From 066167ab7cec5da3d96d141220fe8371cc63492f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Jul 2025 16:25:14 +0000 Subject: [PATCH 1/8] address: Add bucketed address store for holding addresses efficiently Signed-off-by: Alexandru Vasile --- src/transport/manager/address.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/transport/manager/address.rs b/src/transport/manager/address.rs index 2c3cb5d0..0345871c 100644 --- a/src/transport/manager/address.rs +++ b/src/transport/manager/address.rs @@ -261,6 +261,25 @@ impl AddressStore { } } +/// Buckets for storing addresses based on dial results. +/// +/// This is a more optimized version of [`AddressStore`] that separates addresses +/// based on their dial results (success, unknown, failure). +/// +/// It allows for more efficient management of addresses based on their dial outcomes, +/// reducing the need for sorting and filtering during address selection. +#[derive(Debug, Clone, Default)] +pub struct AddressStoreBuckets { + /// Addresses with successful dials. + pub success: HashMap, + + /// Addresses not yet dialed. + pub unknown: HashMap, + + /// Addresses with dial failures. + pub failure: HashMap, +} + #[cfg(test)] mod tests { use std::{ From 2f535955fbe58ab2b3513c645e7a55a23b6be5f3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Jul 2025 16:40:08 +0000 Subject: [PATCH 2/8] address: Add logic for insert() Signed-off-by: Alexandru Vasile --- src/transport/manager/address.rs | 67 ++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/src/transport/manager/address.rs b/src/transport/manager/address.rs index 0345871c..01e71431 100644 --- a/src/transport/manager/address.rs +++ b/src/transport/manager/address.rs @@ -23,11 +23,20 @@ use crate::{error::DialError, PeerId}; use multiaddr::{Multiaddr, Protocol}; use multihash::Multihash; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; /// Maximum number of addresses tracked for a peer. const MAX_ADDRESSES: usize = 64; +/// Maximum number of addresses tracked for a peer in the success bucket. +const MAX_SUCCESS_ADDRESSES: usize = 32; + +/// Maximum number of addresses tracked for a peer in the unknown bucket. +const MAX_UNKNOWN_ADDRESSES: usize = 16; + +/// Maximum number of addresses tracked for a peer in the failure bucket. +const MAX_FAILURE_ADDRESSES: usize = 16; + /// Scores for address records. pub mod scores { /// Score indicating that the connection was successfully established. @@ -271,13 +280,63 @@ impl AddressStore { #[derive(Debug, Clone, Default)] pub struct AddressStoreBuckets { /// Addresses with successful dials. - pub success: HashMap, + pub success: HashSet, /// Addresses not yet dialed. - pub unknown: HashMap, + pub unknown: HashSet, /// Addresses with dial failures. - pub failure: HashMap, + pub failure: HashSet, +} + +impl AddressStoreBuckets { + /// Create new [`AddressStoreBuckets`]. + pub fn new() -> Self { + Self { + success: HashSet::with_capacity(MAX_SUCCESS_ADDRESSES), + unknown: HashSet::with_capacity(MAX_UNKNOWN_ADDRESSES), + failure: HashSet::with_capacity(MAX_FAILURE_ADDRESSES), + } + } + + /// Get the score for a given error. + pub fn error_score(_error: &DialError) -> i32 { + scores::CONNECTION_FAILURE + } + + /// Insert an address record into the appropriate bucket based on its score. + pub fn insert(&mut self, record: AddressRecord) { + let AddressRecord { score, address } = record; + + match score { + score if score > 0 => { + // Moves directly to the success bucket. + self.unknown.remove(&address); + self.failure.remove(&address); + + self.success.insert(address); + } + 0 => { + // Moves to the unknown bucket. + self.success.remove(&address); + self.failure.remove(&address); + + self.unknown.insert(address); + } + _ => { + // Moves to the failure bucket. + self.success.remove(&address); + self.unknown.remove(&address); + + self.failure.insert(address); + } + } + } + + /// Check if the store is empty. + pub fn is_empty(&self) -> bool { + self.success.is_empty() && self.unknown.is_empty() && self.failure.is_empty() + } } #[cfg(test)] From ed527a7db2ee743a7c035f7f3f353e715a1acb92 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Jul 2025 16:42:30 +0000 Subject: [PATCH 3/8] address: Add method to fetch addresses Signed-off-by: Alexandru Vasile --- src/transport/manager/address.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/transport/manager/address.rs b/src/transport/manager/address.rs index 01e71431..6d4a0d10 100644 --- a/src/transport/manager/address.rs +++ b/src/transport/manager/address.rs @@ -337,6 +337,15 @@ impl AddressStoreBuckets { pub fn is_empty(&self) -> bool { self.success.is_empty() && self.unknown.is_empty() && self.failure.is_empty() } + + /// Return the available addresses from all buckets. + pub fn addresses(&self, limit: usize) -> impl Iterator { + self.success + .iter() + .chain(self.unknown.iter()) + .chain(self.failure.iter()) + .take(limit) + } } #[cfg(test)] From 47294b8ebe78fa602288090fb0f079a13447d7bb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Jul 2025 16:50:20 +0000 Subject: [PATCH 4/8] kad: Switch address store of peers to bucketed Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/record.rs | 9 ++------- src/protocol/libp2p/kademlia/types.rs | 23 +++++------------------ src/transport/manager/address.rs | 11 +++++++++++ 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/src/protocol/libp2p/kademlia/record.rs b/src/protocol/libp2p/kademlia/record.rs index 322553d4..643d5679 100644 --- a/src/protocol/libp2p/kademlia/record.rs +++ b/src/protocol/libp2p/kademlia/record.rs @@ -23,7 +23,7 @@ use crate::{ protocol::libp2p::kademlia::types::{ ConnectionType, Distance, KademliaPeer, Key as KademliaKey, }, - transport::manager::address::{AddressRecord, AddressStore}, + transport::manager::address::AddressStoreBuckets, Multiaddr, PeerId, }; @@ -170,15 +170,10 @@ pub struct ContentProvider { impl From for KademliaPeer { fn from(provider: ContentProvider) -> Self { - let mut address_store = AddressStore::new(); - for address in provider.addresses.iter() { - address_store.insert(AddressRecord::from_raw_multiaddr(address.clone())); - } - Self { key: KademliaKey::from(provider.peer), peer: provider.peer, - address_store, + address_store: AddressStoreBuckets::from_unknown(provider.addresses), connection: ConnectionType::NotConnected, } } diff --git a/src/protocol/libp2p/kademlia/types.rs b/src/protocol/libp2p/kademlia/types.rs index c71bb878..5f5e498b 100644 --- a/src/protocol/libp2p/kademlia/types.rs +++ b/src/protocol/libp2p/kademlia/types.rs @@ -26,7 +26,7 @@ use crate::{ protocol::libp2p::kademlia::schema, - transport::manager::address::{AddressRecord, AddressStore}, + transport::manager::address::{AddressRecord, AddressStore, AddressStoreBuckets}, PeerId, }; @@ -254,7 +254,7 @@ pub struct KademliaPeer { pub(super) peer: PeerId, /// Known addresses of peer. - pub(super) address_store: AddressStore, + pub(super) address_store: AddressStoreBuckets, /// Connection type. pub(super) connection: ConnectionType, @@ -263,15 +263,9 @@ pub struct KademliaPeer { impl KademliaPeer { /// Create new [`KademliaPeer`]. pub fn new(peer: PeerId, addresses: Vec, connection: ConnectionType) -> Self { - let mut address_store = AddressStore::new(); - - for address in addresses.into_iter() { - address_store.insert(AddressRecord::from_raw_multiaddr(address)); - } - Self { peer, - address_store, + address_store: AddressStoreBuckets::from_unknown(addresses), connection, key: Key::from(peer), } @@ -295,19 +289,12 @@ impl TryFrom<&schema::kademlia::Peer> for KademliaPeer { fn try_from(record: &schema::kademlia::Peer) -> Result { let peer = PeerId::from_bytes(&record.id).map_err(|_| ())?; - - let mut address_store = AddressStore::new(); - for address in record.addrs.iter() { - let Ok(address) = Multiaddr::try_from(address.clone()) else { - continue; - }; - address_store.insert(AddressRecord::from_raw_multiaddr(address)); - } + let addresses = record.addrs.into_iter().filter_map(|addr| Multiaddr::try_from(addr).ok()); Ok(KademliaPeer { key: Key::from(peer), peer, - address_store, + address_store: AddressStoreBuckets::from_unknown(addresses), connection: ConnectionType::try_from(record.connection)?, }) } diff --git a/src/transport/manager/address.rs b/src/transport/manager/address.rs index 6d4a0d10..c108dfeb 100644 --- a/src/transport/manager/address.rs +++ b/src/transport/manager/address.rs @@ -299,6 +299,17 @@ impl AddressStoreBuckets { } } + /// Create [`AddressStoreBuckets`] from a set of unknown addresses. + /// + /// If the addresses exceed the maximum capacity, they will be truncated. + pub fn from_unknown(addresses: impl IntoIterator) -> Self { + let mut store = Self::new(); + for address in addresses.take(MAX_UNKNOWN_ADDRESSES) { + store.unknown.insert(address); + } + store + } + /// Get the score for a given error. pub fn error_score(_error: &DialError) -> i32 { scores::CONNECTION_FAILURE From ca58bf77c6b088264bce535595784b899df041c3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Jul 2025 16:52:27 +0000 Subject: [PATCH 5/8] kad/types: Avoid cloning over the wire peers Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/message.rs | 10 +++++----- src/protocol/libp2p/kademlia/types.rs | 5 ++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index de8665f5..e6f72a9b 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -247,7 +247,7 @@ impl KademliaMessage { 4 => { let peers = message .closer_peers - .iter() + .into_iter() .filter_map(|peer| KademliaPeer::try_from(peer).ok()) .take(replication_factor) .collect(); @@ -285,7 +285,7 @@ impl KademliaMessage { record, peers: message .closer_peers - .iter() + .into_iter() .filter_map(|peer| KademliaPeer::try_from(peer).ok()) .take(replication_factor) .collect(), @@ -296,7 +296,7 @@ impl KademliaMessage { let key = (!message.key.is_empty()).then_some(message.key.into())?; let providers = message .provider_peers - .iter() + .into_iter() .filter_map(|peer| KademliaPeer::try_from(peer).ok()) .take(replication_factor) .collect(); @@ -308,13 +308,13 @@ impl KademliaMessage { let key = (!message.key.is_empty()).then_some(message.key.into()); let peers = message .closer_peers - .iter() + .into_iter() .filter_map(|peer| KademliaPeer::try_from(peer).ok()) .take(replication_factor) .collect(); let providers = message .provider_peers - .iter() + .into_iter() .filter_map(|peer| KademliaPeer::try_from(peer).ok()) .take(replication_factor) .collect(); diff --git a/src/protocol/libp2p/kademlia/types.rs b/src/protocol/libp2p/kademlia/types.rs index 5f5e498b..3d8c3dbf 100644 --- a/src/protocol/libp2p/kademlia/types.rs +++ b/src/protocol/libp2p/kademlia/types.rs @@ -284,10 +284,10 @@ impl KademliaPeer { } } -impl TryFrom<&schema::kademlia::Peer> for KademliaPeer { +impl TryFrom for KademliaPeer { type Error = (); - fn try_from(record: &schema::kademlia::Peer) -> Result { + fn try_from(record: schema::kademlia::Peer) -> Result { let peer = PeerId::from_bytes(&record.id).map_err(|_| ())?; let addresses = record.addrs.into_iter().filter_map(|addr| Multiaddr::try_from(addr).ok()); @@ -307,7 +307,6 @@ impl From<&KademliaPeer> for schema::kademlia::Peer { addrs: peer .address_store .addresses(MAX_ADDRESSES) - .iter() .map(|address| address.to_vec()) .collect(), connection: peer.connection.into(), From 6adbaf9200c1bf56f9c1a47f5da75b8583ac8e72 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Jul 2025 17:03:04 +0000 Subject: [PATCH 6/8] kad: Adjust to the new ref API Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 6 +++--- src/protocol/libp2p/kademlia/query/get_providers.rs | 5 ++++- src/protocol/libp2p/kademlia/types.rs | 4 ++-- src/transport/manager/address.rs | 7 +------ 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index fcfb6a88..13567fbf 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -408,7 +408,7 @@ impl Kademlia { .await; for info in peers { - let addresses = info.addresses(); + let addresses: Vec = info.addresses().cloned().collect(); self.service.add_known_address(&info.peer, addresses.clone().into_iter()); if std::matches!(self.update_mode, RoutingTableUpdateMode::Automatic) { @@ -544,7 +544,7 @@ impl Kademlia { match (providers.len(), providers.pop()) { (1, Some(provider)) => { - let addresses = provider.addresses(); + let addresses: Vec = provider.addresses().cloned().collect(); if provider.peer == peer { self.store.put_provider( @@ -797,7 +797,7 @@ impl Kademlia { query_id: query, peers: peers .into_iter() - .map(|info| (info.peer, info.addresses())) + .map(|info| (info.peer, info.addresses().cloned().collect())) .collect(), }) .await; diff --git a/src/protocol/libp2p/kademlia/query/get_providers.rs b/src/protocol/libp2p/kademlia/query/get_providers.rs index d6874bd1..3c31ae80 100644 --- a/src/protocol/libp2p/kademlia/query/get_providers.rs +++ b/src/protocol/libp2p/kademlia/query/get_providers.rs @@ -117,7 +117,10 @@ impl GetProvidersContext { // Merge addresses of different provider records of the same peer. let mut providers = HashMap::>::new(); found_providers.into_iter().for_each(|provider| { - providers.entry(provider.peer).or_default().extend(provider.addresses()) + providers + .entry(provider.peer) + .or_default() + .extend(provider.addresses().cloned()) }); // Convert into `Vec` diff --git a/src/protocol/libp2p/kademlia/types.rs b/src/protocol/libp2p/kademlia/types.rs index 3d8c3dbf..b4949f2c 100644 --- a/src/protocol/libp2p/kademlia/types.rs +++ b/src/protocol/libp2p/kademlia/types.rs @@ -26,7 +26,7 @@ use crate::{ protocol::libp2p::kademlia::schema, - transport::manager::address::{AddressRecord, AddressStore, AddressStoreBuckets}, + transport::manager::address::{AddressRecord, AddressStoreBuckets}, PeerId, }; @@ -279,7 +279,7 @@ impl KademliaPeer { } /// Returns the addresses of the peer. - pub fn addresses(&self) -> Vec { + pub fn addresses(&self) -> impl Iterator { self.address_store.addresses(MAX_ADDRESSES) } } diff --git a/src/transport/manager/address.rs b/src/transport/manager/address.rs index c108dfeb..be3d00ce 100644 --- a/src/transport/manager/address.rs +++ b/src/transport/manager/address.rs @@ -304,17 +304,12 @@ impl AddressStoreBuckets { /// If the addresses exceed the maximum capacity, they will be truncated. pub fn from_unknown(addresses: impl IntoIterator) -> Self { let mut store = Self::new(); - for address in addresses.take(MAX_UNKNOWN_ADDRESSES) { + for address in addresses.into_iter().take(MAX_UNKNOWN_ADDRESSES) { store.unknown.insert(address); } store } - /// Get the score for a given error. - pub fn error_score(_error: &DialError) -> i32 { - scores::CONNECTION_FAILURE - } - /// Insert an address record into the appropriate bucket based on its score. pub fn insert(&mut self, record: AddressRecord) { let AddressRecord { score, address } = record; From 559d91a0752c62e205e7ab915f56d5f2ebf956ed Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Jul 2025 17:08:18 +0000 Subject: [PATCH 7/8] kad/tests: Adjust testing Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 11 +++++++---- src/protocol/libp2p/kademlia/routing_table.rs | 6 +++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 13567fbf..5a92e5a1 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1431,7 +1431,10 @@ mod tests { // Check peer addresses. match kademlia.routing_table.entry(Key::from(peer)) { KBucketEntry::Occupied(entry) => { - assert_eq!(entry.addresses(), vec![address_a.clone()]); + assert_eq!( + entry.addresses().cloned().collect::>(), + vec![address_a.clone()] + ); } _ => panic!("Peer not found in routing table"), }; @@ -1450,7 +1453,7 @@ mod tests { match kademlia.routing_table.entry(Key::from(peer)) { KBucketEntry::Occupied(entry) => { assert_eq!( - entry.addresses(), + entry.addresses().cloned().collect::>(), vec![address_b.clone(), address_a.clone()] ); } @@ -1469,7 +1472,7 @@ mod tests { match kademlia.routing_table.entry(Key::from(peer)) { KBucketEntry::Occupied(entry) => { assert_eq!( - entry.addresses(), + entry.addresses().cloned().collect::>(), vec![address_b.clone(), address_a.clone()] ); } @@ -1483,7 +1486,7 @@ mod tests { match kademlia.routing_table.entry(Key::from(peer)) { KBucketEntry::Occupied(entry) => { assert_eq!( - entry.addresses(), + entry.addresses().cloned().collect::>(), vec![address_a.clone(), address_b.clone()] ); } diff --git a/src/protocol/libp2p/kademlia/routing_table.rs b/src/protocol/libp2p/kademlia/routing_table.rs index e012318e..dd317dd4 100644 --- a/src/protocol/libp2p/kademlia/routing_table.rs +++ b/src/protocol/libp2p/kademlia/routing_table.rs @@ -400,7 +400,7 @@ mod tests { KBucketEntry::Occupied(entry) => { assert_eq!(entry.key, key); assert_eq!(entry.peer, peer); - assert_eq!(entry.addresses(), addresses); + assert_eq!(entry.addresses().cloned().collect::>(), addresses); assert_eq!(entry.connection, ConnectionType::Connected); } state => panic!("invalid state for `KBucketEntry`: {state:?}"), @@ -418,7 +418,7 @@ mod tests { KBucketEntry::Occupied(entry) => { assert_eq!(entry.key, key); assert_eq!(entry.peer, peer); - assert_eq!(entry.addresses(), addresses); + assert_eq!(entry.addresses().cloned().collect::>(), addresses); assert_eq!(entry.connection, ConnectionType::NotConnected); } state => panic!("invalid state for `KBucketEntry`: {state:?}"), @@ -508,7 +508,7 @@ mod tests { KBucketEntry::Occupied(entry) => { assert_eq!(entry.key, key); assert_eq!(entry.peer, peer); - assert_eq!(entry.addresses(), addresses); + assert_eq!(entry.addresses().cloned().collect::>(), addresses); assert_eq!(entry.connection, ConnectionType::CanConnect); } state => panic!("invalid state for `KBucketEntry`: {state:?}"), From 75782f96a0141a9bf4d69b0e276c46eea1a84329 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 3 Jul 2025 17:35:59 +0000 Subject: [PATCH 8/8] address: Bound maximum addresses to capacity Signed-off-by: Alexandru Vasile --- src/transport/manager/address.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/transport/manager/address.rs b/src/transport/manager/address.rs index be3d00ce..f78bbf57 100644 --- a/src/transport/manager/address.rs +++ b/src/transport/manager/address.rs @@ -320,6 +320,7 @@ impl AddressStoreBuckets { self.unknown.remove(&address); self.failure.remove(&address); + Self::ensure_space(&mut self.success); self.success.insert(address); } 0 => { @@ -327,6 +328,7 @@ impl AddressStoreBuckets { self.success.remove(&address); self.failure.remove(&address); + Self::ensure_space(&mut self.unknown); self.unknown.insert(address); } _ => { @@ -334,11 +336,24 @@ impl AddressStoreBuckets { self.success.remove(&address); self.unknown.remove(&address); + Self::ensure_space(&mut self.failure); self.failure.insert(address); } } } + /// Ensure that there is space in the bucket. + fn ensure_space(bucket: &mut HashSet) { + if bucket.len() < bucket.capacity() { + return; + } + + // Remove the first element to ensure space. + if let Some(first) = bucket.iter().next().cloned() { + bucket.remove(&first); + } + } + /// Check if the store is empty. pub fn is_empty(&self) -> bool { self.success.is_empty() && self.unknown.is_empty() && self.failure.is_empty()