-
Notifications
You must be signed in to change notification settings - Fork 23
kademlia: Track success of PUT_VALUE queries
#427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 11 commits
05318b9
f4259c8
0874d72
269a807
7b2b2b4
de6e235
22fad0b
5dc45cc
2459646
765883c
6644f83
09317fc
531926f
2bb7e88
988270b
b2586d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,7 +93,7 @@ enum PeerAction { | |
| SendFindNode(QueryId), | ||
|
|
||
| /// Send `PUT_VALUE` message to peer. | ||
| SendPutValue(Bytes), | ||
| SendPutValue(QueryId, Bytes), | ||
|
|
||
| /// Send `ADD_PROVIDER` message to peer. | ||
| SendAddProvider(Bytes), | ||
|
|
@@ -368,10 +368,10 @@ impl Kademlia { | |
| } | ||
| } | ||
| } | ||
| Some(PeerAction::SendPutValue(message)) => { | ||
| Some(PeerAction::SendPutValue(query, message)) => { | ||
| tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message"); | ||
|
|
||
| self.executor.send_message(peer, message, substream); | ||
| self.executor.send_request_read_response(peer, Some(query), message, substream); | ||
| } | ||
| Some(PeerAction::SendAddProvider(message)) => { | ||
| tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message"); | ||
|
|
@@ -472,20 +472,44 @@ impl Kademlia { | |
| } | ||
| } | ||
| } | ||
| KademliaMessage::PutValue { record } => { | ||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| ?peer, | ||
| record_key = ?record.key, | ||
| "handle `PUT_VALUE` message", | ||
| ); | ||
| KademliaMessage::PutValue { record } => match query_id { | ||
| Some(query_id) => { | ||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| ?peer, | ||
| query = ?query_id, | ||
| record_key = ?record.key, | ||
| "handle `PUT_VALUE` response", | ||
| ); | ||
|
|
||
| if let IncomingRecordValidationMode::Automatic = self.validation_mode { | ||
| self.store.put(record.clone()); | ||
| self.engine.register_response( | ||
| query_id, | ||
| peer, | ||
| KademliaMessage::PutValue { | ||
| record: record.clone(), | ||
| }, | ||
| ); | ||
| } | ||
| None => { | ||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| ?peer, | ||
| record_key = ?record.key, | ||
| "handle `PUT_VALUE` request", | ||
| ); | ||
|
|
||
| let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await; | ||
| } | ||
| if let IncomingRecordValidationMode::Automatic = self.validation_mode { | ||
| self.store.put(record.clone()); | ||
| } | ||
|
|
||
| // Send ACK even if the record was/will be filtered out to not reveal any | ||
| // internal state. | ||
| let message = KademliaMessage::put_value(record.clone()); | ||
| self.executor.send_message(peer, message, substream); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dq: Is libp2p sending the ACK as well? Im wondering if this is really needed, since we clone the record here and use some bandwidth? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is what libp2p does. I don't understand either, why the whole message is sent back instead of a small ACK message. Due to these ACKs not currentty being sent by litep2p we will likely need a two-stage upgrade process: first start sending out ACKs and use some alternative mechanism for determining if the record was received (not relying on ACKs being received), and after most of the network upgrades switch to using received ACKs. I am going to implement "stage 1" in a follow-up PR. |
||
|
|
||
| let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await; | ||
| } | ||
| }, | ||
| KademliaMessage::GetRecord { key, record, peers } => { | ||
| match (query_id, key) { | ||
| (Some(query_id), key) => { | ||
|
|
@@ -749,7 +773,7 @@ impl Kademlia { | |
| Ok(()) | ||
| } | ||
| Err(err) => { | ||
| tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time"); | ||
| tracing::debug!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time"); | ||
| Err(err.into()) | ||
| } | ||
| } | ||
|
|
@@ -803,20 +827,26 @@ impl Kademlia { | |
| .await; | ||
| Ok(()) | ||
| } | ||
| QueryAction::PutRecordToFoundNodes { record, peers } => { | ||
| QueryAction::PutRecordToFoundNodes { | ||
| query, | ||
| record, | ||
| peers, | ||
| quorum, | ||
| } => { | ||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| ?query, | ||
| record_key = ?record.key, | ||
| num_peers = ?peers.len(), | ||
| "store record to found peers", | ||
| ); | ||
| let key = record.key.clone(); | ||
| let message = KademliaMessage::put_value(record); | ||
|
|
||
| for peer in peers { | ||
| for peer in &peers { | ||
dmitry-markin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if let Err(error) = self.open_substream_or_dial( | ||
| peer.peer, | ||
| PeerAction::SendPutValue(message.clone()), | ||
| PeerAction::SendPutValue(query, message.clone()), | ||
| None, | ||
| ) { | ||
| tracing::debug!( | ||
|
|
@@ -829,6 +859,25 @@ impl Kademlia { | |
| } | ||
| } | ||
|
|
||
| self.engine.start_put_record_to_found_nodes_requests_tracking( | ||
| query, | ||
| key, | ||
| peers.into_iter().map(|peer| peer.peer).collect(), | ||
| quorum, | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
| QueryAction::PutRecordQuerySucceeded { query, key } => { | ||
| tracing::debug!(target: LOG_TARGET, ?query, "`PUT_VALUE` query succeeded"); | ||
|
|
||
| let _ = self | ||
| .event_tx | ||
| .send(KademliaEvent::PutRecordSuccess { | ||
| query_id: query, | ||
| key, | ||
| }) | ||
| .await; | ||
| Ok(()) | ||
| } | ||
| QueryAction::AddProviderToFoundNodes { | ||
|
|
@@ -1015,7 +1064,7 @@ impl Kademlia { | |
| .into() | ||
| ); | ||
| } | ||
| Some(KademliaCommand::PutRecord { mut record, query_id }) => { | ||
| Some(KademliaCommand::PutRecord { mut record, quorum, query_id }) => { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| query = ?query_id, | ||
|
|
@@ -1040,13 +1089,15 @@ impl Kademlia { | |
| query_id, | ||
| record, | ||
| self.routing_table.closest(&key, self.replication_factor).into(), | ||
| quorum, | ||
| ); | ||
| } | ||
| Some(KademliaCommand::PutRecordToPeers { | ||
| mut record, | ||
| query_id, | ||
| peers, | ||
| update_local_store, | ||
| quorum, | ||
| }) => { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
|
|
@@ -1082,6 +1133,7 @@ impl Kademlia { | |
| query_id, | ||
| record, | ||
| peers, | ||
| quorum, | ||
| ); | ||
| } | ||
| Some(KademliaCommand::StartProviding { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.