Skip to content

Commit 8b1f640

Browse files
authored
Selective peer connection block (#1581)
1 parent 33d1c76 commit 8b1f640

File tree

14 files changed

+359
-506
lines changed

14 files changed

+359
-506
lines changed

Cargo.lock

+258-476
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/freenet-ping/Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/freenet-ping/app/tests/run_app.rs

+11
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ async fn base_node_test_config(
4646
gateways: Vec<String>,
4747
public_port: Option<u16>,
4848
ws_api_port: u16,
49+
// New parameter to specify addresses this node should block
50+
blocked_addresses: Option<Vec<std::net::SocketAddr>>,
4951
) -> anyhow::Result<(ConfigArgs, PresetConfig)> {
5052
if is_gateway {
5153
assert!(public_port.is_some());
@@ -72,6 +74,9 @@ async fn base_node_test_config(
7274
address: Some(Ipv4Addr::LOCALHOST.into()),
7375
network_port: public_port,
7476
bandwidth_limit: None,
77+
// Assuming the new field 'blocked_addresses' is added to NetworkArgs
78+
// and it takes Option<Vec<SocketAddr>>
79+
blocked_addresses,
7580
},
7681
config_paths: {
7782
freenet::config::ConfigPathsArgs {
@@ -154,6 +159,7 @@ async fn test_ping_multi_node() -> TestResult {
154159
vec![],
155160
Some(network_socket_gw.local_addr()?.port()),
156161
ws_api_port_socket_gw.local_addr()?.port(),
162+
None, // blocked_addresses
157163
)
158164
.await?;
159165
let public_port = cfg.network_api.public_port.unwrap();
@@ -168,6 +174,7 @@ async fn test_ping_multi_node() -> TestResult {
168174
vec![serde_json::to_string(&config_gw_info)?],
169175
None,
170176
ws_api_port_socket_node1.local_addr()?.port(),
177+
None, // blocked_addresses
171178
)
172179
.await?;
173180
let ws_api_port_node1 = config_node1.ws_api.ws_api_port.unwrap();
@@ -178,6 +185,7 @@ async fn test_ping_multi_node() -> TestResult {
178185
vec![serde_json::to_string(&config_gw_info)?],
179186
None,
180187
ws_api_port_socket_node2.local_addr()?.port(),
188+
None, // blocked_addresses
181189
)
182190
.await?;
183191
let ws_api_port_node2 = config_node2.ws_api.ws_api_port.unwrap();
@@ -667,6 +675,7 @@ async fn test_ping_application_loop() -> TestResult {
667675
vec![],
668676
Some(network_socket_gw.local_addr()?.port()),
669677
ws_api_port_socket_gw.local_addr()?.port(),
678+
None, // blocked_addresses
670679
)
671680
.await?;
672681
let public_port = cfg.network_api.public_port.unwrap();
@@ -681,6 +690,7 @@ async fn test_ping_application_loop() -> TestResult {
681690
vec![serde_json::to_string(&config_gw_info)?],
682691
None,
683692
ws_api_port_socket_node1.local_addr()?.port(),
693+
None, // blocked_addresses
684694
)
685695
.await?;
686696
let ws_api_port_node1 = config_node1.ws_api.ws_api_port.unwrap();
@@ -691,6 +701,7 @@ async fn test_ping_application_loop() -> TestResult {
691701
vec![serde_json::to_string(&config_gw_info)?],
692702
None,
693703
ws_api_port_socket_node2.local_addr()?.port(),
704+
None, // blocked_addresses
694705
)
695706
.await?;
696707
let ws_api_port_node2 = config_node2.ws_api.ws_api_port.unwrap();

crates/core/src/config/mod.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ impl Default for ConfigArgs {
9797
gateways: None,
9898
location: None,
9999
bandwidth_limit: None,
100+
blocked_addresses: None,
100101
},
101102
ws_api: WebsocketApiArgs {
102103
address: Some(default_listening_address()),
@@ -320,6 +321,10 @@ impl ConfigArgs {
320321
public_port: self.network_api.public_port,
321322
ignore_protocol_version: self.network_api.ignore_protocol_checking,
322323
bandwidth_limit: self.network_api.bandwidth_limit,
324+
blocked_addresses: self
325+
.network_api
326+
.blocked_addresses
327+
.map(|addrs| addrs.into_iter().collect()),
323328
},
324329
ws_api: WebsocketApiConfig {
325330
// the websocket API is always local
@@ -486,6 +491,10 @@ pub struct NetworkArgs {
486491
/// Hard limit the bandwidth usage for upstream traffic.
487492
#[arg(long)]
488493
pub bandwidth_limit: Option<usize>,
494+
495+
/// List of IP:port addresses to refuse connections to/from.
496+
#[arg(long, num_args = 0..)]
497+
pub blocked_addresses: Option<Vec<SocketAddr>>,
489498
}
490499

491500
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -518,7 +527,7 @@ impl NetworkArgs {
518527
}
519528
}
520529

521-
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
530+
#[derive(Debug, Clone, Serialize, Deserialize)]
522531
pub struct NetworkApiConfig {
523532
/// Address to listen to locally
524533
#[serde(default = "default_listening_address", rename = "network-address")]
@@ -545,6 +554,10 @@ pub struct NetworkApiConfig {
545554

546555
/// Hard limit the bandwidth usage for upstream traffic.
547556
pub bandwidth_limit: Option<usize>,
557+
558+
/// List of IP:port addresses to refuse connections to/from.
559+
#[serde(skip_serializing_if = "Option::is_none")]
560+
pub blocked_addresses: Option<HashSet<SocketAddr>>,
548561
}
549562

550563
mod port_allocation;

crates/core/src/node/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ pub struct NodeConfig {
125125
pub(crate) min_number_conn: Option<usize>,
126126
pub(crate) max_upstream_bandwidth: Option<Rate>,
127127
pub(crate) max_downstream_bandwidth: Option<Rate>,
128+
pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
128129
}
129130

130131
impl NodeConfig {
@@ -170,13 +171,14 @@ impl NodeConfig {
170171
network_listener_ip: config.network_api.address,
171172
network_listener_port: config.network_api.port,
172173
location: config.location.map(Location::new),
173-
config: Arc::new(config),
174+
config: Arc::new(config.clone()),
174175
max_hops_to_live: None,
175176
rnd_if_htl_above: None,
176177
max_number_conn: None,
177178
min_number_conn: None,
178179
max_upstream_bandwidth: None,
179180
max_downstream_bandwidth: None,
181+
blocked_addresses: config.network_api.blocked_addresses.clone(),
180182
})
181183
}
182184

crates/core/src/node/network_bridge.rs

+3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub(crate) enum ConnectionError {
4848
FailedConnectOp,
4949
#[error("unwanted connection")]
5050
UnwantedConnection,
51+
#[error("connection to/from address {0} blocked by local policy")]
52+
AddressBlocked(std::net::SocketAddr),
5153

5254
// errors produced while handling the connection:
5355
#[error("IO error: {0}")]
@@ -80,6 +82,7 @@ impl Clone for ConnectionError {
8082
Self::TransportError(err) => Self::TransportError(err.clone()),
8183
Self::FailedConnectOp => Self::FailedConnectOp,
8284
Self::UnwantedConnection => Self::UnwantedConnection,
85+
Self::AddressBlocked(addr) => Self::AddressBlocked(*addr),
8386
}
8487
}
8588
}

crates/core/src/node/network_bridge/handshake.rs

+17
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub(super) enum HandshakeError {
4949
TransportError(#[from] TransportError),
5050
#[error("receibed an unexpected message at this point: {0}")]
5151
UnexpectedMessage(Box<NetMessage>),
52+
#[error("connection error: {0}")]
53+
ConnectionError(#[from] super::ConnectionError),
5254
}
5355

5456
#[derive(Debug)]
@@ -124,6 +126,7 @@ pub(super) enum ExternConnection {
124126
Dropped {
125127
peer: PeerId,
126128
},
129+
DropConnectionByAddr(SocketAddr),
127130
}
128131

129132
/// Used for communicating with the HandshakeHandler.
@@ -149,6 +152,14 @@ impl HanshakeHandlerMsg {
149152
.map_err(|_| HandshakeError::ChannelClosed)?;
150153
Ok(())
151154
}
155+
156+
pub async fn drop_connection_by_addr(&self, remote_addr: SocketAddr) -> Result<()> {
157+
self.0
158+
.send(ExternConnection::DropConnectionByAddr(remote_addr))
159+
.await
160+
.map_err(|_| HandshakeError::ChannelClosed)?;
161+
Ok(())
162+
}
152163
}
153164

154165
type OutboundMessageSender = mpsc::Sender<NetMessage>;
@@ -526,6 +537,12 @@ impl HandshakeHandler {
526537
Some(ExternConnection::Dropped { peer }) => {
527538
self.connected.remove(&peer.addr);
528539
self.outbound_messages.remove(&peer.addr);
540+
self.connecting.remove(&peer.addr);
541+
}
542+
Some(ExternConnection::DropConnectionByAddr(addr)) => {
543+
self.connected.remove(&addr);
544+
self.outbound_messages.remove(&addr);
545+
self.connecting.remove(&addr);
529546
}
530547
None => return Err(HandshakeError::ChannelClosed),
531548
}

crates/core/src/node/network_bridge/p2p_protoc.rs

+41-17
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ pub(in crate::node) struct P2pConnManager {
125125
this_location: Option<Location>,
126126
check_version: bool,
127127
bandwidth_limit: Option<usize>,
128+
blocked_addresses: Option<HashSet<SocketAddr>>,
128129
}
129130

130131
impl P2pConnManager {
@@ -154,6 +155,7 @@ impl P2pConnManager {
154155
this_location: config.location,
155156
check_version: !config.config.network_api.ignore_protocol_version,
156157
bandwidth_limit: config.config.network_api.bandwidth_limit,
158+
blocked_addresses: config.blocked_addresses.clone(),
157159
})
158160
}
159161

@@ -245,10 +247,6 @@ impl P2pConnManager {
245247
}
246248
}
247249
}
248-
249-
ConnEvent::HandshakeAction(action) => {
250-
self.handle_handshake_action(action, &mut state).await?;
251-
}
252250
ConnEvent::ClosedChannel => {
253251
tracing::info!("Notification channel closed");
254252
break;
@@ -334,7 +332,7 @@ impl P2pConnManager {
334332
&mut self,
335333
state: &mut EventListenerState,
336334
handshake_handler: &mut HandshakeHandler,
337-
handshake_handler_msg: &HanshakeHandlerMsg,
335+
handshake_handler_msg: &HanshakeHandlerMsg, // already passed here
338336
notification_channel: &mut EventLoopNotificationsReceiver,
339337
node_controller: &mut Receiver<NodeEvent>,
340338
client_wait_for_transaction: &mut ContractHandlerChannel<WaitingResolution>,
@@ -353,8 +351,18 @@ impl P2pConnManager {
353351
msg = self.conn_bridge_rx.recv() => {
354352
Ok(self.handle_bridge_msg(msg))
355353
}
356-
msg = handshake_handler.wait_for_events() => {
357-
Ok(self.handle_handshake_msg(msg))
354+
handshake_event_res = handshake_handler.wait_for_events() => {
355+
match handshake_event_res {
356+
Ok(event) => {
357+
self.handle_handshake_action(event, state, handshake_handler_msg).await?;
358+
Ok(EventResult::Continue)
359+
}
360+
Err(HandshakeError::ChannelClosed) => Ok(EventResult::Event(ConnEvent::ClosedChannel)),
361+
Err(e) => {
362+
tracing::warn!("Handshake error: {:?}", e);
363+
Ok(EventResult::Continue)
364+
}
365+
}
358366
}
359367
msg = node_controller.recv() => {
360368
Ok(self.handle_node_controller_msg(msg))
@@ -460,13 +468,25 @@ impl P2pConnManager {
460468
async fn handle_connect_peer(
461469
&mut self,
462470
peer: PeerId,
463-
callback: Box<dyn ConnectResultSender>,
471+
mut callback: Box<dyn ConnectResultSender>,
464472
tx: Transaction,
465473
handshake_handler_msg: &HanshakeHandlerMsg,
466474
state: &mut EventListenerState,
467475
is_gw: bool,
468476
) -> anyhow::Result<()> {
469477
tracing::info!(tx = %tx, remote = %peer, "Connecting to peer");
478+
if let Some(blocked_addrs) = &self.blocked_addresses {
479+
if blocked_addrs.contains(&peer.addr) {
480+
tracing::info!(tx = %tx, remote = %peer.addr, "Outgoing connection to peer blocked by local policy");
481+
// Ensure ConnectionError is correctly namespaced if HandshakeError::ConnectionError expects it directly
482+
callback
483+
.send_result(Err(HandshakeError::ConnectionError(
484+
crate::node::network_bridge::ConnectionError::AddressBlocked(peer.addr),
485+
)))
486+
.await?;
487+
return Ok(());
488+
}
489+
}
470490
state.awaiting_connection.insert(peer.addr, callback);
471491
let res = timeout(
472492
Duration::from_secs(10),
@@ -492,6 +512,7 @@ impl P2pConnManager {
492512
&mut self,
493513
event: HandshakeEvent,
494514
state: &mut EventListenerState,
515+
handshake_handler_msg: &HanshakeHandlerMsg, // Parameter added
495516
) -> anyhow::Result<()> {
496517
match event {
497518
HandshakeEvent::InboundConnection {
@@ -502,6 +523,16 @@ impl P2pConnManager {
502523
op,
503524
forward_info,
504525
} => {
526+
if let Some(blocked_addrs) = &self.blocked_addresses {
527+
if blocked_addrs.contains(&joiner.addr) {
528+
tracing::info!(%id, remote = %joiner.addr, "Inbound connection from peer blocked by local policy");
529+
// Not proceeding with adding connection or processing the operation.
530+
handshake_handler_msg
531+
.drop_connection_by_addr(joiner.addr)
532+
.await?;
533+
return Ok(());
534+
}
535+
}
505536
let (tx, rx) = mpsc::channel(1);
506537
self.connections.insert(joiner.clone(), tx);
507538
let was_reserved = {
@@ -733,21 +764,15 @@ impl P2pConnManager {
733764
}
734765
}
735766

736-
fn handle_handshake_msg(&self, msg: Result<HandshakeEvent, HandshakeError>) -> EventResult {
737-
match msg {
738-
Ok(event) => EventResult::Event(ConnEvent::HandshakeAction(event)),
739-
Err(HandshakeError::ChannelClosed) => EventResult::Event(ConnEvent::ClosedChannel),
740-
_ => EventResult::Continue,
741-
}
742-
}
743-
744767
fn handle_node_controller_msg(&self, msg: Option<NodeEvent>) -> EventResult {
745768
match msg {
746769
Some(msg) => EventResult::Event(ConnEvent::NodeAction(msg)),
747770
None => EventResult::Event(ConnEvent::ClosedChannel),
748771
}
749772
}
750773

774+
// Removed handle_handshake_msg as it's integrated into wait_for_event
775+
751776
fn handle_client_transaction_subscription(
752777
&self,
753778
event_id: Result<(ClientId, WaitingTransaction), anyhow::Error>,
@@ -875,7 +900,6 @@ enum EventResult {
875900
enum ConnEvent {
876901
InboundMessage(NetMessage),
877902
OutboundMessage(NetMessage),
878-
HandshakeAction(HandshakeEvent),
879903
NodeAction(NodeEvent),
880904
ClosedChannel,
881905
}

crates/core/tests/operations.rs

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ async fn base_node_test_config(
7171
address: Some(Ipv4Addr::LOCALHOST.into()),
7272
network_port: public_port,
7373
bandwidth_limit: None,
74+
blocked_addresses: None,
7475
},
7576
config_paths: {
7677
freenet::config::ConfigPathsArgs {

tests/test-app-1/Cargo.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/test-contract-1/Cargo.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)