Skip to content

Commit

Permalink
Merge branch 'master' into new_tx_pool_architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
AurelienFT authored Feb 20, 2025
2 parents 374341f + 2471df6 commit 8c02012
Show file tree
Hide file tree
Showing 12 changed files with 399 additions and 256 deletions.
1 change: 1 addition & 0 deletions .changes/changed/2388.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Rework the P2P service codecs to avoid unnecessary coupling between components. The refactoring makes it explicit that the Gossipsub and RequestResponse codecs only share encoding/decoding functionalities from the Postcard codec. It also makes handling Gossipsub and RequestResponse messages completely independent of each other.
17 changes: 14 additions & 3 deletions crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use fuel_core_chain_config::{
StateConfig,
};
use fuel_core_p2p::{
codecs::postcard::PostcardCodec,
codecs::{
gossipsub::GossipsubMessageHandler,
request_response::RequestResponseMessageHandler,
},
network_service::FuelP2PService,
p2p_service::FuelP2PEvent,
request_response::messages::{
Expand Down Expand Up @@ -172,10 +175,18 @@ impl Bootstrap {
/// Spawn a bootstrap node.
pub async fn new(node_config: &Config) -> anyhow::Result<Self> {
let bootstrap_config = extract_p2p_config(node_config).await;
let codec = PostcardCodec::new(bootstrap_config.max_block_size);
let request_response_codec =
RequestResponseMessageHandler::new(bootstrap_config.max_block_size);
let gossipsub_codec = GossipsubMessageHandler::new();
let (sender, _) =
broadcast::channel(bootstrap_config.reserved_nodes.len().saturating_add(1));
let mut bootstrap = FuelP2PService::new(sender, bootstrap_config, codec).await?;
let mut bootstrap = FuelP2PService::new(
sender,
bootstrap_config,
gossipsub_codec,
request_response_codec,
)
.await?;
bootstrap.start().await?;

let listeners = bootstrap.multiaddrs();
Expand Down
14 changes: 8 additions & 6 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
codecs::{
postcard::PostcardCodec,
NetworkCodec,
request_response::RequestResponseMessageHandler,
RequestResponseProtocols,
},
config::Config,
discovery,
Expand Down Expand Up @@ -67,7 +68,8 @@ pub struct FuelBehaviour {
discovery: discovery::Behaviour,

/// RequestResponse protocol
request_response: request_response::Behaviour<PostcardCodec>,
request_response:
request_response::Behaviour<RequestResponseMessageHandler<PostcardCodec>>,

/// The Behaviour to manage connection limits.
connection_limits: connection_limits::Behaviour,
Expand All @@ -76,8 +78,8 @@ pub struct FuelBehaviour {
impl FuelBehaviour {
pub(crate) fn new(
p2p_config: &Config,
codec: PostcardCodec,
) -> anyhow::Result<Self, anyhow::Error> {
request_response_codec: RequestResponseMessageHandler<PostcardCodec>,
) -> anyhow::Result<Self> {
let local_public_key = p2p_config.keypair.public();
let local_peer_id = PeerId::from_public_key(&local_public_key);

Expand Down Expand Up @@ -131,7 +133,7 @@ impl FuelBehaviour {
.with_max_established(Some(MAX_ESTABLISHED_CONNECTIONS)),
);

let req_res_protocol = codec
let req_res_protocol = request_response_codec
.get_req_res_protocols()
.map(|protocol| (protocol, ProtocolSupport::Full));

Expand All @@ -140,7 +142,7 @@ impl FuelBehaviour {
.with_max_concurrent_streams(p2p_config.max_concurrent_streams);

let request_response = request_response::Behaviour::with_codec(
codec.clone(),
request_response_codec.clone(),
req_res_protocol,
req_res_config,
);
Expand Down
70 changes: 44 additions & 26 deletions crates/services/p2p/src/codecs.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,48 @@
pub mod gossipsub;
pub mod postcard;
pub mod request_response;

use crate::{
gossipsub::messages::{
GossipTopicTag,
GossipsubBroadcastRequest,
GossipsubMessage,
},
request_response::messages::{
RequestMessage,
V2ResponseMessage,
},
use crate::gossipsub::messages::GossipTopicTag;
use libp2p::request_response as libp2p_request_response;

use std::{
borrow::Cow,
io,
};
use libp2p::request_response;
use std::io;

pub trait Encoder: Send {
/// Returns the serialized object as a slice.
fn into_bytes(self) -> Vec<u8>;
}

/// The trait encodes the type to the bytes and passes it to the `Encoder`,
/// which stores it and provides a reference to it. That allows gives more
/// flexibility and more performant encoding, allowing the use of slices and arrays
/// instead of vectors in some cases. Since the [`Encoder`] returns `Cow<[u8]>`,
/// it is always possible to take ownership of the serialized value.
pub trait Encode<T: ?Sized> {
type Error;
/// The encoder type that stores serialized object.
type Encoder<'a>: Encoder
where
T: 'a;

/// Encodes the object to the bytes and passes it to the `Encoder`.
fn encode<'a>(&self, t: &'a T) -> Result<Self::Encoder<'a>, Self::Error>;
}

/// The trait decodes the type from the bytes.
pub trait Decode<T> {
type Error;
/// Decodes the type `T` from the bytes.
fn decode(&self, bytes: &[u8]) -> Result<T, Self::Error>;
}

impl<'a> Encoder for Cow<'a, [u8]> {
fn into_bytes(self) -> Vec<u8> {
self.into_owned()
}
}

/// Implement this in order to handle serialization & deserialization of Gossipsub messages
pub trait GossipsubCodec {
Expand All @@ -28,22 +58,10 @@ pub trait GossipsubCodec {
) -> Result<Self::ResponseMessage, io::Error>;
}

// TODO: https://github.com/FuelLabs/fuel-core/issues/2368
// Remove this trait
/// Main Codec trait
/// Needs to be implemented and provided to FuelBehaviour
pub trait NetworkCodec:
GossipsubCodec<
RequestMessage = GossipsubBroadcastRequest,
ResponseMessage = GossipsubMessage,
> + request_response::Codec<Request = RequestMessage, Response = V2ResponseMessage>
+ Clone
+ Send
+ 'static
{
pub trait RequestResponseProtocols: libp2p_request_response::Codec {
/// Returns RequestResponse's Protocol
/// Needed for initialization of RequestResponse Behaviour
fn get_req_res_protocols(
&self,
) -> impl Iterator<Item = <Self as request_response::Codec>::Protocol>;
) -> impl Iterator<Item = <Self as libp2p_request_response::Codec>::Protocol>;
}
52 changes: 52 additions & 0 deletions crates/services/p2p/src/codecs/gossipsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::io;

use fuel_core_types::fuel_tx::Transaction;

use crate::gossipsub::messages::{
GossipTopicTag,
GossipsubBroadcastRequest,
GossipsubMessage,
};

use super::{
Decode,
Encode,
Encoder,
GossipsubCodec,
};

#[derive(Debug, Clone, Default)]
pub struct GossipsubMessageHandler<Codec> {
pub(crate) codec: Codec,
}

impl<Codec> GossipsubCodec for GossipsubMessageHandler<Codec>
where
Codec:
Encode<Transaction, Error = io::Error> + Decode<Transaction, Error = io::Error>,
{
type RequestMessage = GossipsubBroadcastRequest;
type ResponseMessage = GossipsubMessage;

fn encode(&self, data: Self::RequestMessage) -> Result<Vec<u8>, io::Error> {
match data {
GossipsubBroadcastRequest::NewTx(tx) => {
Ok(self.codec.encode(&tx)?.into_bytes())
}
}
}

fn decode(
&self,
encoded_data: &[u8],
gossipsub_tag: GossipTopicTag,
) -> Result<Self::ResponseMessage, io::Error> {
let decoded_response = match gossipsub_tag {
GossipTopicTag::NewTx => {
GossipsubMessage::NewTx(self.codec.decode(encoded_data)?)
}
};

Ok(decoded_response)
}
}
Loading

0 comments on commit 8c02012

Please sign in to comment.