Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ impl Litep2p {
config.fallback_names.clone(),
config.codec,
litep2p_config.keep_alive_timeout,
false
);
let executor = Arc::clone(&litep2p_config.executor);
litep2p_config.executor.run(Box::pin(async move {
Expand All @@ -218,6 +219,7 @@ impl Litep2p {
config.fallback_names.clone(),
config.codec,
litep2p_config.keep_alive_timeout,
false
);
litep2p_config.executor.run(Box::pin(async move {
RequestResponseProtocol::new(service, config).run().await
Expand All @@ -233,6 +235,7 @@ impl Litep2p {
Vec::new(),
protocol.codec(),
litep2p_config.keep_alive_timeout,
false
);
litep2p_config.executor.run(Box::pin(async move {
let _ = protocol.run(service).await;
Expand All @@ -252,6 +255,7 @@ impl Litep2p {
Vec::new(),
ping_config.codec,
litep2p_config.keep_alive_timeout,
false
);
litep2p_config.executor.run(Box::pin(async move {
Ping::new(service, ping_config).run().await
Expand All @@ -275,6 +279,7 @@ impl Litep2p {
fallback_names,
kademlia_config.codec,
litep2p_config.keep_alive_timeout,
true
);
litep2p_config.executor.run(Box::pin(async move {
let _ = Kademlia::new(service, kademlia_config).run().await;
Expand All @@ -296,6 +301,7 @@ impl Litep2p {
Vec::new(),
identify_config.codec,
litep2p_config.keep_alive_timeout,
false
);
identify_config.public = Some(litep2p_config.keypair.public().into());

Expand All @@ -316,6 +322,7 @@ impl Litep2p {
Vec::new(),
bitswap_config.codec,
litep2p_config.keep_alive_timeout,
false
);
litep2p_config.executor.run(Box::pin(async move {
Bitswap::new(service, bitswap_config).run().await
Expand Down
47 changes: 31 additions & 16 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ pub struct TransportManager {
/// All names (main and fallback(s)) of the installed protocols.
protocol_names: HashSet<ProtocolName>,

/// Protocols that need to be informed about dial failures with addresses.
address_reporting_protocols: HashSet<ProtocolName>,

/// Listen addresses.
listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,

Expand Down Expand Up @@ -294,6 +297,7 @@ impl TransportManager {
protocols: HashMap::new(),
transports: TransportContext::new(),
protocol_names: HashSet::new(),
address_reporting_protocols: HashSet::new(),
transport_manager_handle: handle.clone(),
pending_connections: HashMap::new(),
next_substream_id: Arc::new(AtomicUsize::new(0usize)),
Expand Down Expand Up @@ -332,6 +336,7 @@ impl TransportManager {
fallback_names: Vec<ProtocolName>,
codec: ProtocolCodec,
keep_alive_timeout: Duration,
needs_dial_failure_addresses: bool,
) -> TransportService {
assert!(!self.protocol_names.contains(&protocol));

Expand All @@ -341,6 +346,12 @@ impl TransportManager {
}
}

if needs_dial_failure_addresses {
self.address_reporting_protocols.insert(protocol.clone());
self.address_reporting_protocols
.extend(fallback_names.clone());
}

let (service, sender) = TransportService::new(
self.local_peer_id,
protocol.clone(),
Expand Down Expand Up @@ -1116,10 +1127,8 @@ impl TransportManager {
?protocol,
"dial failure, notify protocol",
);
match context.tx.try_send(InnerTransportEvent::DialFailure {
peer,
addresses: vec![address.clone()],
}) {

match context.tx.try_send(make_dial_failure_event(&self.address_reporting_protocols, protocol, peer, vec![address.clone()])) {
Ok(()) => {}
Err(_) => {
tracing::trace!(
Expand All @@ -1132,10 +1141,7 @@ impl TransportManager {
);
let _ = context
.tx
.send(InnerTransportEvent::DialFailure {
peer,
addresses: vec![address.clone()],
})
.send(make_dial_failure_event(&self.address_reporting_protocols, protocol, peer, vec![address.clone()]))
.await;
}
}
Expand Down Expand Up @@ -1268,10 +1274,7 @@ impl TransportManager {
for (protocol, context) in &self.protocols {
let _ = match context
.tx
.try_send(InnerTransportEvent::DialFailure {
peer,
addresses: addresses.clone(),
}) {
.try_send(make_dial_failure_event(&self.address_reporting_protocols, protocol, peer, addresses.clone())) {
Ok(_) => Ok(()),
Err(_) => {
tracing::trace!(
Expand All @@ -1284,10 +1287,7 @@ impl TransportManager {

context
.tx
.send(InnerTransportEvent::DialFailure {
peer,
addresses: addresses.clone(),
})
.send(make_dial_failure_event(&self.address_reporting_protocols, protocol, peer, addresses.clone()))
.await
}
};
Expand Down Expand Up @@ -1343,6 +1343,21 @@ impl TransportManager {
}
}

fn make_dial_failure_event(
address_reporting_protocols: &HashSet<ProtocolName>,
protocol: &ProtocolName,
peer: PeerId,
addresses: Vec<Multiaddr>,
) -> InnerTransportEvent {
let addresses = if address_reporting_protocols.contains(protocol) {
addresses
} else {
Vec::new()
};

InnerTransportEvent::DialFailure { peer, addresses }
}

#[cfg(test)]
mod tests {
use crate::transport::manager::{address::AddressStore, peer_state::SecondaryOrDialing};
Expand Down