-
Notifications
You must be signed in to change notification settings - Fork 26
Add Bitswap client #501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add Bitswap client #501
Conversation
src/protocol/libp2p/bitswap/mod.rs
Outdated
| let (version, rest) = unsigned_varint::decode::u64(prefix_bytes).ok()?; | ||
| let (codec, rest) = unsigned_varint::decode::u64(rest).ok()?; | ||
| let (multihash_type, rest) = unsigned_varint::decode::u64(rest).ok()?; | ||
| let (multihash_len, _rest) = unsigned_varint::decode::u64(rest).ok()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this means we ignore the unconsumed bytes, would that be expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good question. Do you think we should check that _rest.len() == 0 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't have a strong opinion here, but if prefix is extended in the future in a backward-compatible fashion by appending more fields, allowing _rest.len() != 0 here is kind of "future-compatible".
| cid: cid.to_bytes(), | ||
| r#type: presence as i32, | ||
| }); | ||
| Err(error) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We might have an obscure race that have seen in other components/kademlia:
- We are disconnected at T0 on like 351 while
self.service.open_substream(peer) - However we had a pending conntions initiated by some other protocol
- By the time we
self.service.dial(&peer)the peer might have connected - Maybe its worth for
Err(crate::Error::AlreadyConnected)to rerequetself.service.open_substream(peer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, completely forgot about it! Will handle this case.
| }; | ||
|
|
||
| let message = request.encode_to_vec().into(); | ||
| if let Ok(Ok(())) = tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this looks a bit odd?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send_framed() returns Result and tokio::time::timeout wraps it in its own Result, so we need to check that both succeeded.
src/protocol/libp2p/bitswap/mod.rs
Outdated
| let no_pending_substream = pending_actions.is_empty(); | ||
| pending_actions.push(SubstreamAction::SendRequest(cids)); | ||
|
|
||
| if no_pending_substream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dq: If the send_request above fails, then we drop the substream via entry.remove()
- However here we are deterining to open substreams only if we had prior actions in
pending_outbound - If we do not, we will not open a new substream and leave the current cids as pending
Should we alreays request a new substream on entry.remove() code paths?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is if there are pending actions, we already requested opening a substream (or dialing a peer then opening a substream), so we shouldn't request opening the second substream.
I will add a comment explaining this.
| ) { | ||
| let Some(entries) = self.pending_outbound.remove(&substream_id) else { | ||
| let Some(actions) = self.pending_outbound.remove(&peer) else { | ||
| tracing::warn!(target: LOG_TARGET, ?peer, ?substream_id, "pending outbound entry doesn't exist"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dq: Should we check the pending_dials here? Thinking of cases where we might receive this before on_connection_established but looks unlikely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TransportManager should deliver events in order, so this should not be possible.
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dq: Unrelated to these changes, we have an self.inbound.remove(&peer) above. However, there's a high likelyhood that if the inbound is broken, the outboud is as well. Should cleanup the outbound and force a reconnection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be legitimate cases where inbound fails (i.e., some implementations can close the outbound after sending just one request, like litep2p did before this PR), but outbound is fine.
The current code will fail fast if writing to the outbound fails and request a reconnection, so it shouldn't be a problem anyway.
lexnv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks solid! Left some questions about the bitswap to understand our implementation a bit better 🙏
Implement Bitswap client. This is needed to benchmark data retrieval from transaction storage.
Additionally, outbound substreams are now reused for subsequent messages (there is a limit of one outbound substream per peer).