Skip to content

Commit

Permalink
P2P Service: Add /fuel/req_res/0.0.2 that sends error codes in respon…
Browse files Browse the repository at this point in the history
…ses to unsuccessful requests
  • Loading branch information
acerone85 committed Oct 17, 2024
1 parent e8c563c commit f50a4d3
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 41 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl Bootstrap {
if request_message == RequestMessage::TxPoolAllTransactionsIds {
let _ = bootstrap.send_response_msg(
request_id,
ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])),
ResponseMessage::TxPoolAllTransactionsIds(Ok(vec![])),
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ rayon = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_with = { workspace = true }
sha2 = "0.10"
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = "1.0.47"
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }
Expand Down
7 changes: 4 additions & 3 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,16 @@ impl FuelBehaviour {
BlockHeight::default(),
);

let req_res_protocol =
core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full));
let req_res_protocol = codec
.get_req_res_protocols()
.map(|protocol| (protocol, ProtocolSupport::Full));

let req_res_config = request_response::Config::default()
.with_request_timeout(p2p_config.set_request_timeout)
.with_max_concurrent_streams(p2p_config.max_concurrent_streams);

let request_response = request_response::Behaviour::with_codec(
codec,
codec.clone(),
req_res_protocol,
req_res_config,
);
Expand Down
4 changes: 3 additions & 1 deletion crates/services/p2p/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ pub trait NetworkCodec:
{
/// Returns RequestResponse's Protocol
/// Needed for initialization of RequestResponse Behaviour
fn get_req_res_protocol(&self) -> <Self as request_response::Codec>::Protocol;
fn get_req_res_protocols(
&self,
) -> impl Iterator<Item = <Self as request_response::Codec>::Protocol>;
}
54 changes: 42 additions & 12 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use crate::{
GossipsubMessage,
},
request_response::messages::{
LegacyResponseMessage,
RequestMessage,
ResponseMessage,
REQUEST_RESPONSE_PROTOCOL_ID,
REQUEST_RESPONSE_WITH_ERROR_CODES_PROTOCOL_ID,
},
};
use async_trait::async_trait;
Expand All @@ -26,6 +28,8 @@ use serde::{
Serialize,
};
use std::io;
use strum::IntoEnumIterator;
use strum_macros::EnumIter;

/// Helper method for decoding data
/// Reusable across `RequestResponseCodec` and `GossipsubCodec`
Expand Down Expand Up @@ -69,13 +73,13 @@ impl PostcardCodec {
/// run into a timeout waiting for the response.
#[async_trait]
impl request_response::Codec for PostcardCodec {
type Protocol = MessageExchangePostcardProtocol;
type Protocol = PostcardProtocol;
type Request = RequestMessage;
type Response = ResponseMessage;

async fn read_request<T>(
&mut self,
_: &Self::Protocol,
_protocol: &Self::Protocol,
socket: &mut T,
) -> io::Result<Self::Request>
where
Expand All @@ -91,7 +95,7 @@ impl request_response::Codec for PostcardCodec {

async fn read_response<T>(
&mut self,
_: &Self::Protocol,
protocol: &Self::Protocol,
socket: &mut T,
) -> io::Result<Self::Response>
where
Expand All @@ -103,7 +107,13 @@ impl request_response::Codec for PostcardCodec {
.read_to_end(&mut response)
.await?;

deserialize(&response)
match protocol {
PostcardProtocol::V1 => {
let legacy_response = deserialize::<LegacyResponseMessage>(&response)?;
Ok(legacy_response.into())
}
PostcardProtocol::V2 => deserialize::<ResponseMessage>(&response),
}
}

async fn write_request<T>(
Expand All @@ -122,14 +132,20 @@ impl request_response::Codec for PostcardCodec {

async fn write_response<T>(
&mut self,
_protocol: &Self::Protocol,
protocol: &Self::Protocol,
socket: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: futures::AsyncWrite + Unpin + Send,
{
let encoded_data = serialize(&res)?;
let encoded_data = match protocol {
PostcardProtocol::V1 => {
let legacy_response: LegacyResponseMessage = res.into();
serialize(&legacy_response)?
}
PostcardProtocol::V2 => serialize(&res)?,
};
socket.write_all(&encoded_data).await?;
Ok(())
}
Expand Down Expand Up @@ -161,17 +177,31 @@ impl GossipsubCodec for PostcardCodec {
}

impl NetworkCodec for PostcardCodec {
fn get_req_res_protocol(&self) -> <Self as request_response::Codec>::Protocol {
MessageExchangePostcardProtocol {}
fn get_req_res_protocols(
&self,
) -> impl Iterator<Item = <Self as request_response::Codec>::Protocol> {
PostcardProtocol::iter()
}
}

#[derive(Default, Debug, Clone)]
pub struct MessageExchangePostcardProtocol;
#[derive(Debug, Clone, EnumIter)]
pub enum PostcardProtocol {
V1,
V2,
}

impl AsRef<str> for MessageExchangePostcardProtocol {
impl Default for PostcardProtocol {
fn default() -> Self {
PostcardProtocol::V1
}
}

impl AsRef<str> for PostcardProtocol {
fn as_ref(&self) -> &str {
REQUEST_RESPONSE_PROTOCOL_ID
match self {
PostcardProtocol::V1 => REQUEST_RESPONSE_PROTOCOL_ID,
PostcardProtocol::V2 => REQUEST_RESPONSE_WITH_ERROR_CODES_PROTOCOL_ID,
}
}
}

Expand Down
19 changes: 10 additions & 9 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@ impl FuelP2PService {
let send_ok = match channel {
ResponseSender::SealedHeaders(c) => match response {
ResponseMessage::SealedHeaders(v) => {
c.send((peer, Ok(v))).is_ok()
// TODO[AC]: Change type of ResponseSender and remove the .ok() here
c.send((peer, Ok(v.ok()))).is_ok()
}
_ => {
warn!(
Expand All @@ -687,7 +688,7 @@ impl FuelP2PService {
},
ResponseSender::Transactions(c) => match response {
ResponseMessage::Transactions(v) => {
c.send((peer, Ok(v))).is_ok()
c.send((peer, Ok(v.ok()))).is_ok()
}
_ => {
warn!(
Expand All @@ -699,7 +700,7 @@ impl FuelP2PService {
},
ResponseSender::TxPoolAllTransactionsIds(c) => match response {
ResponseMessage::TxPoolAllTransactionsIds(v) => {
c.send((peer, Ok(v))).is_ok()
c.send((peer, Ok(v.ok()))).is_ok()
}
_ => {
warn!(
Expand All @@ -711,7 +712,7 @@ impl FuelP2PService {
},
ResponseSender::TxPoolFullTransactions(c) => match response {
ResponseMessage::TxPoolFullTransactions(v) => {
c.send((peer, Ok(v))).is_ok()
c.send((peer, Ok(v.ok()))).is_ok()
}
_ => {
warn!(
Expand Down Expand Up @@ -1778,16 +1779,16 @@ mod tests {
RequestMessage::SealedHeaders(range) => {
let sealed_headers: Vec<_> = arbitrary_headers_for_range(range.clone());

let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers)));
let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Ok(sealed_headers)));
}
RequestMessage::Transactions(_) => {
let txs = (0..5).map(|_| Transaction::default_test_tx()).collect();
let transactions = vec![Transactions(txs)];
let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(transactions)));
let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Ok(transactions)));
}
RequestMessage::TxPoolAllTransactionsIds => {
let tx_ids = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect();
let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolAllTransactionsIds(Some(tx_ids)));
let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolAllTransactionsIds(Ok(tx_ids)));
}
RequestMessage::TxPoolFullTransactions(tx_ids) => {
let txs = tx_ids.iter().enumerate().map(|(i, _)| {
Expand All @@ -1797,7 +1798,7 @@ mod tests {
Some(NetworkableTransactionPool::Transaction(Transaction::default_test_tx()))
}
}).collect();
let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolFullTransactions(Some(txs)));
let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolFullTransactions(Ok(txs)));
}
}
}
Expand Down Expand Up @@ -1905,7 +1906,7 @@ mod tests {
// 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator
if let Some(FuelP2PEvent::InboundRequestMessage{ request_id, request_message: _ }) = &node_b_event {
let sealed_headers: Vec<_> = arbitrary_headers_for_range(1..3);
let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers)));
let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Ok(sealed_headers)));
}

tracing::info!("Node B Event: {:?}", node_b_event);
Expand Down
67 changes: 66 additions & 1 deletion crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use thiserror::Error;
use tokio::sync::oneshot;

pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1";
pub(crate) const REQUEST_RESPONSE_WITH_ERROR_CODES_PROTOCOL_ID: &str =
"/fuel/req_res/0.0.2";

/// Max Size in Bytes of the Request Message
#[cfg(test)]
Expand All @@ -32,14 +34,77 @@ pub enum RequestMessage {
TxPoolFullTransactions(Vec<TxId>),
}

// TODO: Do we want explicit status codes or an Error type?
#[derive(Error, Debug, Clone, Serialize, Deserialize)]
pub enum ResponseMessageErrorCode {
/// The peer sent an empty response using protocol `/fuel/req_res/0.0.1`
#[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")]
ProtocolV1EmptyResponse = 0,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ResponseMessage {
pub enum LegacyResponseMessage {
SealedHeaders(Option<Vec<SealedBlockHeader>>),
Transactions(Option<Vec<Transactions>>),
TxPoolAllTransactionsIds(Option<Vec<TxId>>),
TxPoolFullTransactions(Option<Vec<Option<NetworkableTransactionPool>>>),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ResponseMessage {
SealedHeaders(Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>),
Transactions(Result<Vec<Transactions>, ResponseMessageErrorCode>),
TxPoolAllTransactionsIds(Result<Vec<TxId>, ResponseMessageErrorCode>),
TxPoolFullTransactions(
Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
),
}

impl From<LegacyResponseMessage> for ResponseMessage {
fn from(v1_response: LegacyResponseMessage) -> Self {
match v1_response {
LegacyResponseMessage::SealedHeaders(sealed_headers) => {
ResponseMessage::SealedHeaders(
sealed_headers
.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
)
}
LegacyResponseMessage::Transactions(vec) => ResponseMessage::Transactions(
vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
),
LegacyResponseMessage::TxPoolAllTransactionsIds(vec) => {
ResponseMessage::TxPoolAllTransactionsIds(
vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
)
}
LegacyResponseMessage::TxPoolFullTransactions(vec) => {
ResponseMessage::TxPoolFullTransactions(
vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse),
)
}
}
}
}

impl From<ResponseMessage> for LegacyResponseMessage {
fn from(response: ResponseMessage) -> Self {
match response {
ResponseMessage::SealedHeaders(sealed_headers) => {
LegacyResponseMessage::SealedHeaders(sealed_headers.ok())
}
ResponseMessage::Transactions(transactions) => {
LegacyResponseMessage::Transactions(transactions.ok())
}
ResponseMessage::TxPoolAllTransactionsIds(tx_ids) => {
LegacyResponseMessage::TxPoolAllTransactionsIds(tx_ids.ok())
}
ResponseMessage::TxPoolFullTransactions(tx_pool) => {
LegacyResponseMessage::TxPoolFullTransactions(tx_pool.ok())
}
}
}
}

pub type OnResponse<T> = oneshot::Sender<(PeerId, Result<T, ResponseError>)>;

#[derive(Debug)]
Expand Down
Loading

0 comments on commit f50a4d3

Please sign in to comment.