diff --git a/cli/tests/message_flow.rs b/cli/tests/message_flow.rs index 91073160..21a74f38 100644 --- a/cli/tests/message_flow.rs +++ b/cli/tests/message_flow.rs @@ -538,7 +538,7 @@ async fn dump_topology(network: &TestNetwork) { /// Assert that the network has mesh topology (peers connected to each other, not just gateway). /// This validates that NAT hole punching is working correctly. -/// Returns error if any peer is connected only to the gateway (star topology). +/// Polls until peers reach sufficient P2P connections or timeout expires. async fn assert_mesh_topology(network: &TestNetwork) -> Result<()> { let use_docker_nat = env::var_os("FREENET_TEST_DOCKER_NAT").is_some(); if !use_docker_nat { @@ -554,64 +554,93 @@ async fn assert_mesh_topology(network: &TestNetwork) -> Result<()> { gateway_ids.insert(network.gateway(idx).id().to_string()); } - // Check each peer's connections let peer_count = network.peer_ws_urls().len(); - let mut peers_with_p2p_connections = 0usize; + // Each peer should have at least 2 P2P connections (not counting gateway) for reliable + // subscription propagation. With min_connections=4 (including gateway), that's 3 P2P, + // but we accept 2 as the minimum for the assertion to be practical. + let min_p2p_per_peer: usize = 2; + let poll_interval = Duration::from_secs(10); + let max_wait = Duration::from_secs(90); + let start = Instant::now(); - for idx in 0..peer_count { - let peer = network.peer(idx); - let peer_id = peer.id().to_string(); - match fetch_connected_peers(peer).await { - Ok(connections) => { - // Count how many connections are to other peers (not gateways) - let p2p_connections: Vec<_> = connections - .iter() - .filter(|conn| !gateway_ids.contains(*conn)) - .collect(); + loop { + let mut peer_p2p_counts = Vec::new(); + let mut all_sufficient = true; + + for idx in 0..peer_count { + let peer = network.peer(idx); + let peer_id = peer.id().to_string(); + match fetch_connected_peers(peer).await { + Ok(connections) => { + let p2p_count = connections + .iter() + .filter(|conn| !gateway_ids.contains(*conn)) + .count(); + peer_p2p_counts.push((idx, peer_id, p2p_count)); + if p2p_count < min_p2p_per_peer { + all_sufficient = false; + } + } + Err(err) => { + println!("peer {} ({}) topology check ERROR: {}", idx, peer.id(), err); + peer_p2p_counts.push((idx, peer_id, 0)); + all_sufficient = false; + } + } + } - if !p2p_connections.is_empty() { - peers_with_p2p_connections += 1; - println!( - "peer {} ({}) has {} P2P connections: {:?}", - idx, - peer_id, - p2p_connections.len(), - p2p_connections - ); + // Print current state + for (idx, peer_id, count) in &peer_p2p_counts { + println!( + "peer {} ({}) has {} P2P connections{}", + idx, + peer_id, + count, + if *count < min_p2p_per_peer { + " (below minimum)" } else { - println!( - "peer {} ({}) has NO P2P connections (only gateway: {:?})", - idx, peer_id, connections - ); + "" } - } - Err(err) => { - println!("peer {} ({}) topology check ERROR: {}", idx, peer_id, err); - } + ); } - } - // At least some peers should have P2P connections for mesh topology. - // With 6 peers, we expect most to have at least one P2P connection. - let min_expected_p2p_peers = peer_count / 2; - if peers_with_p2p_connections < min_expected_p2p_peers { - dump_topology(network).await; - anyhow::bail!( - "Mesh topology assertion failed: only {}/{} peers have P2P connections \ - (expected at least {}). This indicates NAT hole punching may not be working. \ - The network has star topology (peers only connected to gateway) instead of mesh.", - peers_with_p2p_connections, - peer_count, - min_expected_p2p_peers + if all_sufficient { + println!( + "Mesh topology assertion PASSED: all {}/{} peers have >= {} P2P connections", + peer_count, peer_count, min_p2p_per_peer + ); + println!("--- End mesh topology assertion ---"); + return Ok(()); + } + + let elapsed = start.elapsed(); + if elapsed >= max_wait { + let peers_below: Vec<_> = peer_p2p_counts + .iter() + .filter(|(_, _, count)| *count < min_p2p_per_peer) + .collect(); + dump_topology(network).await; + anyhow::bail!( + "Mesh topology assertion failed after {:.0}s: {}/{} peers below {} P2P connections: {:?}. \ + Topology manager may need more time or Docker NAT is preventing connections.", + elapsed.as_secs_f64(), + peers_below.len(), + peer_count, + min_p2p_per_peer, + peers_below + .iter() + .map(|(idx, _, count)| format!("peer{}={}", idx, count)) + .collect::>() + ); + } + + println!( + "Waiting for topology formation ({:.0}s/{:.0}s)...", + elapsed.as_secs_f64(), + max_wait.as_secs_f64() ); + sleep(poll_interval).await; } - - println!( - "Mesh topology assertion PASSED: {}/{} peers have P2P connections", - peers_with_p2p_connections, peer_count - ); - println!("--- End mesh topology assertion ---"); - Ok(()) } async fn dump_subscriptions(network: &TestNetwork) { @@ -960,6 +989,8 @@ async fn run_message_flow_test(peer_count: usize, rounds: usize) -> Result<()> { let network = TestNetwork::builder() .gateways(1) .peers(scenario.peer_count) + .min_connections(4) + .max_connections(5) .binary(FreenetBinary::Workspace { path: freenet_core.clone(), profile: BuildProfile::Debug,