diff --git a/Cargo.lock b/Cargo.lock index deab9d3a4..25649d006 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1414,6 +1414,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "keccak" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +dependencies = [ + "cpufeatures", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -2073,6 +2082,7 @@ dependencies = [ "multihash-derive", "serde", "sha2", + "sha3", "unsigned-varint 0.7.2", ] @@ -3333,6 +3343,16 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha3" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75872d278a8f37ef87fa0ddbda7802605cb18344497949862c0d4dcb291eba60" +dependencies = [ + "digest 0.10.7", + "keccak", +] + [[package]] name = "sharded-slab" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index 61c026e73..05e3a2ca2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ indexmap = { version = "2.9.0", features = ["std"] } libc = "0.2.158" mockall = "0.13.1" multiaddr = "0.17.0" -multihash = { version = "0.17.0", default-features = false, features = ["std", "multihash-impl", "identity", "sha2", "blake2b"] } +multihash = { version = "0.17.0", default-features = false, features = ["std", "multihash-impl", "identity", "sha2", "sha3", "blake2b"] } network-interface = "2.0.1" parking_lot = "0.12.3" pin-project = "1.1.10" diff --git a/src/protocol/libp2p/bitswap/handle.rs b/src/protocol/libp2p/bitswap/handle.rs index a89b45b51..630c8d7f0 100644 --- a/src/protocol/libp2p/bitswap/handle.rs +++ b/src/protocol/libp2p/bitswap/handle.rs @@ -44,10 +44,19 @@ pub enum BitswapEvent { /// Requested CIDs. cids: Vec<(Cid, WantType)>, }, + + /// Bitswap response. + Response { + /// Peer ID. + peer: PeerId, + + /// Response entries: vector of CIDs with either block data or block presence. + responses: Vec, + }, } /// Response type for received bitswap request. -#[derive(Debug)] +#[derive(Debug, Clone)] #[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))] pub enum ResponseType { /// Block. @@ -73,6 +82,15 @@ pub enum ResponseType { #[derive(Debug)] #[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))] pub enum BitswapCommand { + /// Send bitswap request. + SendRequest { + /// Peer ID. + peer: PeerId, + + /// Requested CIDs. + cids: Vec<(Cid, WantType)>, + }, + /// Send bitswap response. SendResponse { /// Peer ID. @@ -99,10 +117,8 @@ impl BitswapHandle { } /// Send `request` to `peer`. - /// - /// Not supported by the current implementation. - pub async fn send_request(&self, _peer: PeerId, _request: Vec) { - unimplemented!("bitswap requests are not supported"); + pub async fn send_request(&self, peer: PeerId, cids: Vec<(Cid, WantType)>) { + let _ = self.cmd_tx.send(BitswapCommand::SendRequest { peer, cids }).await; } /// Send `response` to `peer`. diff --git a/src/protocol/libp2p/bitswap/mod.rs b/src/protocol/libp2p/bitswap/mod.rs index 039f0e159..c0f30c2d1 100644 --- a/src/protocol/libp2p/bitswap/mod.rs +++ b/src/protocol/libp2p/bitswap/mod.rs @@ -21,10 +21,13 @@ //! [`/ipfs/bitswap/1.2.0`](https://github.com/ipfs/specs/blob/main/BITSWAP.md) implementation. use crate::{ - error::Error, + error::{Error, ImmediateDialError}, protocol::{Direction, TransportEvent, TransportService}, substream::Substream, - types::SubstreamId, + types::{ + multihash::{Code, MultihashDigest}, + SubstreamId, + }, PeerId, }; @@ -36,7 +39,10 @@ use tokio_stream::{StreamExt, StreamMap}; pub use config::Config; pub use handle::{BitswapCommand, BitswapEvent, BitswapHandle, ResponseType}; pub use schema::bitswap::{wantlist::WantType, BlockPresenceType}; -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + time::Duration, +}; mod config; mod handle; @@ -91,6 +97,36 @@ impl Prefix { res.extend_from_slice(multihash_len); res } + + /// Parse byte representation of prefix. + pub fn from_bytes(prefix_bytes: &[u8]) -> Option { + let (version, rest) = unsigned_varint::decode::u64(prefix_bytes).ok()?; + let (codec, rest) = unsigned_varint::decode::u64(rest).ok()?; + let (multihash_type, rest) = unsigned_varint::decode::u64(rest).ok()?; + let (multihash_len, rest) = unsigned_varint::decode::u64(rest).ok()?; + if !rest.is_empty() { + return None; + } + + let version = Version::try_from(version).ok()?; + let multihash_len = u8::try_from(multihash_len).ok()?; + + Some(Prefix { + version, + codec, + multihash_type, + multihash_len, + }) + } +} + +/// Action to perform when substream is opened. +#[derive(Debug)] +enum SubstreamAction { + /// Send a request. + SendRequest(Vec<(Cid, WantType)>), + /// Send a response. + SendResponse(Vec), } /// Bitswap protocol. @@ -104,11 +140,17 @@ pub(crate) struct Bitswap { /// RX channel for receiving commands from `BitswapHandle`. cmd_rx: Receiver, - /// Pending outbound substreams. - pending_outbound: HashMap>, + /// Pending outbound actions. + pending_outbound: HashMap>, /// Inbound substreams. inbound: StreamMap, + + /// Outbound substreams. + outbound: HashMap, + + /// Peers waiting for dial. + pending_dials: HashSet, } impl Bitswap { @@ -120,6 +162,8 @@ impl Bitswap { event_tx: config.event_tx, pending_outbound: HashMap::new(), inbound: StreamMap::new(), + outbound: HashMap::new(), + pending_dials: HashSet::new(), } } @@ -147,92 +191,262 @@ impl Bitswap { let message = schema::bitswap::Message::decode(message)?; - let Some(wantlist) = message.wantlist else { - tracing::debug!(target: LOG_TARGET, "bitswap message doesn't contain `WantList`"); - return Err(Error::InvalidData); - }; + // Check if this is a request (has wantlist with entries). + if let Some(wantlist) = &message.wantlist { + if !wantlist.entries.is_empty() { + let cids = wantlist + .entries + .iter() + .filter_map(|entry| { + let cid = Cid::read_bytes(entry.block.as_slice()).ok()?; + + let want_type = match entry.want_type { + 0 => WantType::Block, + 1 => WantType::Have, + _ => return None, + }; + + Some((cid, want_type)) + }) + .collect::>(); + + if !cids.is_empty() { + let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await; + } + } + } - let cids = wantlist - .entries - .into_iter() - .filter_map(|entry| { - let cid = Cid::read_bytes(entry.block.as_slice()).ok()?; + // Check if this is a response (has payload or block presences). + if !message.payload.is_empty() || !message.block_presences.is_empty() { + let mut responses = Vec::new(); + + // Process payload (blocks). + for block in message.payload { + let Some(Prefix { + version, + codec, + multihash_type, + multihash_len: _, + }) = Prefix::from_bytes(&block.prefix) + else { + tracing::trace!(target: LOG_TARGET, ?peer, "invalid CID prefix received"); + continue; + }; - let want_type = match entry.want_type { - 0 => WantType::Block, - 1 => WantType::Have, - _ => return None, + // Create multihash from the block data. + let Ok(code) = Code::try_from(multihash_type) else { + tracing::trace!( + target: LOG_TARGET, + ?peer, + multihash_type, + "usupported multihash type", + ); + continue; }; - Some((cid, want_type)) - }) - .collect::>(); + let multihash = code.digest(&block.data); + + // We need to convert multihash to version supported by `cid` crate. + let Ok(multihash) = + cid::multihash::Multihash::wrap(multihash.code(), multihash.digest()) + else { + tracing::trace!( + target: LOG_TARGET, + ?peer, + multihash_type, + "multihash size > 64 unsupported", + ); + continue; + }; - let _ = self.event_tx.send(BitswapEvent::Request { peer, cids }).await; + match Cid::new(version, codec, multihash) { + Ok(cid) => responses.push(ResponseType::Block { + cid, + block: block.data, + }), + Err(error) => tracing::trace!( + target: LOG_TARGET, + ?peer, + ?error, + "invalid CID received", + ), + } + } + + // Process block presences. + for presence in message.block_presences { + if let Ok(cid) = Cid::read_bytes(&presence.cid[..]) { + let presence_type = match presence.r#type { + 0 => BlockPresenceType::Have, + 1 => BlockPresenceType::DontHave, + _ => continue, + }; + + responses.push(ResponseType::Presence { + cid, + presence: presence_type, + }); + } + } + + if !responses.is_empty() { + let _ = self.event_tx.send(BitswapEvent::Response { peer, responses }).await; + } + } Ok(()) } - /// Send response to bitswap request. + /// Handle opened outbound substream. async fn on_outbound_substream( &mut self, peer: PeerId, substream_id: SubstreamId, mut substream: Substream, ) { - let Some(entries) = self.pending_outbound.remove(&substream_id) else { + let Some(actions) = self.pending_outbound.remove(&peer) else { tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist"); return; }; - let mut response = schema::bitswap::Message { - // `wantlist` field must always be present. This is what the official Kubo IPFS - // implementation does. - wantlist: Some(Default::default()), - ..Default::default() - }; + tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream"); - for entry in entries { - match entry { - ResponseType::Block { cid, block } => { - let prefix = Prefix { - version: cid.version(), - codec: cid.codec(), - multihash_type: cid.hash().code(), - multihash_len: cid.hash().size(), + for action in actions { + match action { + SubstreamAction::SendRequest(cids) => { + if send_request(&mut substream, cids).await.is_err() { + // Drop the substream and all actions in case of sending error. + tracing::debug!(target: LOG_TARGET, ?peer, "bitswap request failed"); + return; } - .to_bytes(); + } + SubstreamAction::SendResponse(entries) => { + if send_response(&mut substream, entries).await.is_err() { + // Drop the substream and all actions in case of sending error. + tracing::debug!(target: LOG_TARGET, ?peer, "bitswap response failed"); + return; + } + } + } + } - response.payload.push(schema::bitswap::Block { - prefix, - data: block, - }); + self.outbound.insert(peer, substream); + } + + /// Handle connection established event. + fn on_connection_established(&mut self, peer: PeerId) { + // If we have pending actions for this peer, open a substream. + if self.pending_dials.remove(&peer) { + tracing::trace!( + target: LOG_TARGET, + ?peer, + "open substream after connection established", + ); + + if let Err(error) = self.service.open_substream(peer) { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?error, + "failed to open substream after connection established", + ); + // Drop all pending actions; they are not going to be handled anyway, and we need + // the entry to be empty to properly open subsequent substreams. + self.pending_outbound.remove(&peer); + } + } + } + + /// Open substream or dial a peer. + fn open_substream_or_dial(&mut self, peer: PeerId) { + tracing::trace!(target: LOG_TARGET, ?peer, "open substream"); + + if let Err(error) = self.service.open_substream(peer) { + tracing::trace!( + target: LOG_TARGET, + ?peer, + ?error, + "failed to open substream, dialing peer", + ); + + // Failed to open substream, try to dial the peer. + match self.service.dial(&peer) { + Ok(()) => { + // Store the peer to open a substream once it is connected. + self.pending_dials.insert(peer); } - ResponseType::Presence { cid, presence } => { - response.block_presences.push(schema::bitswap::BlockPresence { - cid: cid.to_bytes(), - r#type: presence as i32, - }); + Err(ImmediateDialError::AlreadyConnected) => { + // By the time we tried to dial peer, it got connected. + if let Err(error) = self.service.open_substream(peer) { + tracing::trace!( + target: LOG_TARGET, + ?peer, + ?error, + "failed to open substream for a second time", + ); + } + } + Err(error) => { + tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to dial peer"); } } } + } + + /// Handle bitswap request. + async fn on_bitswap_request(&mut self, peer: PeerId, cids: Vec<(Cid, WantType)>) { + // Try to send request over existing substream first. + if let Entry::Occupied(mut entry) = self.outbound.entry(peer) { + if send_request(entry.get_mut(), cids.clone()).await.is_ok() { + return; + } else { + tracing::debug!( + target: LOG_TARGET, + ?peer, + "failed to send request over existing substream", + ); + entry.remove(); + } + } + + // Store pending actions for once the substream is opened. + let pending_actions = self.pending_outbound.entry(peer).or_default(); + // If we inserted the default empty entry above, this means no pending substream + // was requested by previous calls to `on_bitswap_request`. We will request a substream + // in this case below. + let no_substream_pending = pending_actions.is_empty(); - let message = response.encode_to_vec().into(); - let _ = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await; + pending_actions.push(SubstreamAction::SendRequest(cids)); - substream.close().await; + if no_substream_pending { + self.open_substream_or_dial(peer); + } } /// Handle bitswap response. - fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec) { - match self.service.open_substream(peer) { - Err(error) => { - tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream to peer") - } - Ok(substream_id) => { - self.pending_outbound.insert(substream_id, responses); + async fn on_bitswap_response(&mut self, peer: PeerId, responses: Vec) { + // Try to send response over existing substream first. + if let Entry::Occupied(mut entry) = self.outbound.entry(peer) { + if send_response(entry.get_mut(), responses.clone()).await.is_ok() { + return; + } else { + tracing::debug!( + target: LOG_TARGET, + ?peer, + "failed to send response over existing substream", + ); + entry.remove(); } } + + // Store pending actions for later and open substream if not requested already. + let pending_actions = self.pending_outbound.entry(peer).or_default(); + let no_pending_substream = pending_actions.is_empty(); + pending_actions.push(SubstreamAction::SendResponse(responses)); + + if no_pending_substream { + self.open_substream_or_dial(peer); + } } /// Start [`Bitswap`] event loop. @@ -242,6 +456,9 @@ impl Bitswap { loop { tokio::select! { event = self.service.next() => match event { + Some(TransportEvent::ConnectionEstablished { peer, .. }) => { + self.on_connection_established(peer); + } Some(TransportEvent::SubstreamOpened { peer, substream, @@ -256,8 +473,11 @@ impl Bitswap { event => tracing::trace!(target: LOG_TARGET, ?event, "unhandled event"), }, command = self.cmd_rx.recv() => match command { + Some(BitswapCommand::SendRequest { peer, cids }) => { + self.on_bitswap_request(peer, cids).await; + } Some(BitswapCommand::SendResponse { peer, responses }) => { - self.on_bitswap_response(peer, responses); + self.on_bitswap_response(peer, responses).await; } None => return, }, @@ -287,3 +507,70 @@ impl Bitswap { } } } + +async fn send_request(substream: &mut Substream, cids: Vec<(Cid, WantType)>) -> Result<(), ()> { + let request = schema::bitswap::Message { + wantlist: Some(schema::bitswap::Wantlist { + entries: cids + .into_iter() + .map(|(cid, want_type)| schema::bitswap::wantlist::Entry { + block: cid.to_bytes(), + priority: 1, + cancel: false, + want_type: want_type as i32, + send_dont_have: false, + }) + .collect(), + full: false, + }), + ..Default::default() + }; + + let message = request.encode_to_vec().into(); + if let Ok(Ok(())) = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { + Ok(()) + } else { + Err(()) + } +} + +async fn send_response(substream: &mut Substream, entries: Vec) -> Result<(), ()> { + let mut response = schema::bitswap::Message { + // `wantlist` field must always be present. This is what the official Kubo + // IPFS implementation does. + wantlist: Some(Default::default()), + ..Default::default() + }; + + for entry in entries { + match entry { + ResponseType::Block { cid, block } => { + let prefix = Prefix { + version: cid.version(), + codec: cid.codec(), + multihash_type: cid.hash().code(), + multihash_len: cid.hash().size(), + } + .to_bytes(); + + response.payload.push(schema::bitswap::Block { + prefix, + data: block, + }); + } + ResponseType::Presence { cid, presence } => { + response.block_presences.push(schema::bitswap::BlockPresence { + cid: cid.to_bytes(), + r#type: presence as i32, + }); + } + } + } + + let message = response.encode_to_vec().into(); + if let Ok(Ok(())) = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { + Ok(()) + } else { + Err(()) + } +} diff --git a/src/protocol/libp2p/schema/bitswap.proto b/src/protocol/libp2p/schema/bitswap.proto index 727525b49..fa31a1313 100644 --- a/src/protocol/libp2p/schema/bitswap.proto +++ b/src/protocol/libp2p/schema/bitswap.proto @@ -1,3 +1,5 @@ +// Bitswap 1.2.0 Wire Format + syntax = "proto3"; package bitswap; @@ -9,19 +11,19 @@ message Wantlist { } message Entry { - bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0) + bytes block = 1; // CID of the block int32 priority = 2; // the priority (normalized). default to 1 - bool cancel = 3; // whether this revokes an entry + bool cancel = 3; // whether this revokes an entry WantType wantType = 4; // Note: defaults to enum 0, ie Block bool sendDontHave = 5; // Note: defaults to false } repeated Entry entries = 1; // a list of wantlist entries - bool full = 2; // whether this is the full wantlist. default to false + bool full = 2; // whether this is the full wantlist. default to false } message Block { - bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length) + bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length) bytes data = 2; } @@ -37,8 +39,8 @@ message BlockPresence { message Message { Wantlist wantlist = 1; - repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0 - repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 + repeated bytes blocks = 2; + repeated Block payload = 3; repeated BlockPresence blockPresences = 4; int32 pendingBytes = 5; }