Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 81 additions & 50 deletions cli/tests/message_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ async fn dump_topology(network: &TestNetwork) {

/// Assert that the network has mesh topology (peers connected to each other, not just gateway).
/// This validates that NAT hole punching is working correctly.
/// Returns error if any peer is connected only to the gateway (star topology).
/// Polls until peers reach sufficient P2P connections or timeout expires.
async fn assert_mesh_topology(network: &TestNetwork) -> Result<()> {
let use_docker_nat = env::var_os("FREENET_TEST_DOCKER_NAT").is_some();
if !use_docker_nat {
Expand All @@ -554,64 +554,93 @@ async fn assert_mesh_topology(network: &TestNetwork) -> Result<()> {
gateway_ids.insert(network.gateway(idx).id().to_string());
}

// Check each peer's connections
let peer_count = network.peer_ws_urls().len();
let mut peers_with_p2p_connections = 0usize;
// Each peer should have at least 2 P2P connections (not counting gateway) for reliable
// subscription propagation. With min_connections=4 (including gateway), that's 3 P2P,
// but we accept 2 as the minimum for the assertion to be practical.
let min_p2p_per_peer: usize = 2;
let poll_interval = Duration::from_secs(10);
let max_wait = Duration::from_secs(90);
let start = Instant::now();

for idx in 0..peer_count {
let peer = network.peer(idx);
let peer_id = peer.id().to_string();
match fetch_connected_peers(peer).await {
Ok(connections) => {
// Count how many connections are to other peers (not gateways)
let p2p_connections: Vec<_> = connections
.iter()
.filter(|conn| !gateway_ids.contains(*conn))
.collect();
loop {
let mut peer_p2p_counts = Vec::new();
let mut all_sufficient = true;

for idx in 0..peer_count {
let peer = network.peer(idx);
let peer_id = peer.id().to_string();
match fetch_connected_peers(peer).await {
Ok(connections) => {
let p2p_count = connections
.iter()
.filter(|conn| !gateway_ids.contains(*conn))
.count();
peer_p2p_counts.push((idx, peer_id, p2p_count));
if p2p_count < min_p2p_per_peer {
all_sufficient = false;
}
}
Err(err) => {
println!("peer {} ({}) topology check ERROR: {}", idx, peer.id(), err);
peer_p2p_counts.push((idx, peer_id, 0));
all_sufficient = false;
}
}
}

if !p2p_connections.is_empty() {
peers_with_p2p_connections += 1;
println!(
"peer {} ({}) has {} P2P connections: {:?}",
idx,
peer_id,
p2p_connections.len(),
p2p_connections
);
// Print current state
for (idx, peer_id, count) in &peer_p2p_counts {
println!(
"peer {} ({}) has {} P2P connections{}",
idx,
peer_id,
count,
if *count < min_p2p_per_peer {
" (below minimum)"
} else {
println!(
"peer {} ({}) has NO P2P connections (only gateway: {:?})",
idx, peer_id, connections
);
""
}
}
Err(err) => {
println!("peer {} ({}) topology check ERROR: {}", idx, peer_id, err);
}
);
}
}

// At least some peers should have P2P connections for mesh topology.
// With 6 peers, we expect most to have at least one P2P connection.
let min_expected_p2p_peers = peer_count / 2;
if peers_with_p2p_connections < min_expected_p2p_peers {
dump_topology(network).await;
anyhow::bail!(
"Mesh topology assertion failed: only {}/{} peers have P2P connections \
(expected at least {}). This indicates NAT hole punching may not be working. \
The network has star topology (peers only connected to gateway) instead of mesh.",
peers_with_p2p_connections,
peer_count,
min_expected_p2p_peers
if all_sufficient {
println!(
"Mesh topology assertion PASSED: all {}/{} peers have >= {} P2P connections",
peer_count, peer_count, min_p2p_per_peer
);
println!("--- End mesh topology assertion ---");
return Ok(());
}

let elapsed = start.elapsed();
if elapsed >= max_wait {
let peers_below: Vec<_> = peer_p2p_counts
.iter()
.filter(|(_, _, count)| *count < min_p2p_per_peer)
.collect();
dump_topology(network).await;
anyhow::bail!(
"Mesh topology assertion failed after {:.0}s: {}/{} peers below {} P2P connections: {:?}. \
Topology manager may need more time or Docker NAT is preventing connections.",
elapsed.as_secs_f64(),
peers_below.len(),
peer_count,
min_p2p_per_peer,
peers_below
.iter()
.map(|(idx, _, count)| format!("peer{}={}", idx, count))
.collect::<Vec<_>>()
);
}

println!(
"Waiting for topology formation ({:.0}s/{:.0}s)...",
elapsed.as_secs_f64(),
max_wait.as_secs_f64()
);
sleep(poll_interval).await;
}

println!(
"Mesh topology assertion PASSED: {}/{} peers have P2P connections",
peers_with_p2p_connections, peer_count
);
println!("--- End mesh topology assertion ---");
Ok(())
}

async fn dump_subscriptions(network: &TestNetwork) {
Expand Down Expand Up @@ -960,6 +989,8 @@ async fn run_message_flow_test(peer_count: usize, rounds: usize) -> Result<()> {
let network = TestNetwork::builder()
.gateways(1)
.peers(scenario.peer_count)
.min_connections(4)
.max_connections(5)
.binary(FreenetBinary::Workspace {
path: freenet_core.clone(),
profile: BuildProfile::Debug,
Expand Down
Loading