|
1 | 1 | import logging |
2 | 2 | import threading |
3 | | -import time |
4 | 3 |
|
5 | 4 | import pytest |
| 5 | +from tenacity import retry, stop_after_delay, wait_fixed |
6 | 6 |
|
7 | 7 | from clients.status_backend import StatusBackend |
8 | 8 |
|
9 | | -DISCOVERY_TIMEOUT_SEC = 30 |
10 | | - |
11 | | - |
12 | | -def _all_nodes_discovered(nodes: dict[str, StatusBackend], known_peers: dict[str, str]) -> bool: |
13 | | - """Check if all nodes discovered each other""" |
14 | | - for peer_id, node in nodes.items(): |
15 | | - if node is None: |
16 | | - return False |
17 | | - peers = node.wakuext_service.peers() |
18 | | - |
19 | | - # Use shorter names for logging |
20 | | - peer_names = [known_peers.get(peer, peer[-5:]) for peer in peers] |
21 | | - logging.info(f"Node {known_peers[peer_id]} peers: {peer_names}") |
22 | | - |
23 | | - for known_node in known_peers.keys(): |
24 | | - if known_node == peer_id: |
25 | | - continue |
26 | | - if known_node not in peers: |
27 | | - return False |
28 | | - return True |
29 | | - |
30 | 9 |
|
31 | 10 | @pytest.mark.rpc |
32 | 11 | class TestDiscovery: |
33 | | - |
34 | | - # @pytest.mark.parametrize("waku_light_client", [False, True], indirect=True, |
35 | | - # ids=["wakuV2LightClient_False", "wakuV2LightClient_True"]) |
36 | 12 | def test_discovery(self, backend_new_profile): |
37 | | - nodes_count = 2 |
| 13 | + # Desired topology: 3 full nodes, 2 light nodes |
| 14 | + full_count = 3 |
| 15 | + light_count = 2 |
| 16 | + total_nodes = full_count + light_count |
| 17 | + |
38 | 18 | nodes: dict[str, StatusBackend] = {} |
| 19 | + full_ids: set[str] = set() |
39 | 20 |
|
40 | 21 | known_nodes = { |
| 22 | + # Boot nodes available in the fleet |
41 | 23 | "16Uiu2HAm3vFYHkGRURyJ6F7bwDyzMLtPEuCg4DU89T7km2u8Fjyb": "boot-1", |
42 | 24 | "16Uiu2HAmCDqxtfF1DwBqs7UJ4TgSnjoh6j1RtE1hhQxLLao84jLi": "store", |
43 | 25 | } |
44 | 26 |
|
45 | | - def create_node(node_index: int): |
46 | | - """Function to run in each thread - waits for wakuv2.peerstats signal""" |
47 | | - backend = backend_new_profile(f"node_{node_index}", waku_light_client=True) |
| 27 | + def create_node(node_index: int, waku_light_client: bool): |
| 28 | + backend = backend_new_profile(f"node_{node_index}", waku_light_client=waku_light_client) |
48 | 29 | peer_id = backend.wakuext_service.peer_id() |
49 | 30 | known_nodes[peer_id] = f"backend_{node_index}" |
| 31 | + if not waku_light_client: |
| 32 | + full_ids.add(peer_id) |
50 | 33 | nodes[peer_id] = backend |
51 | 34 | info = f"Peer ID: {peer_id}" |
52 | 35 | info += f", URL: {backend.url}" |
53 | 36 | if backend.container: |
54 | 37 | info += f", Container: {backend.container.short_id()}" |
| 38 | + info += f", Type: {'full' if not waku_light_client else 'light'}" |
55 | 39 | logging.info(f"Backend {node_index} ready. {info}") |
56 | 40 |
|
57 | | - # Run threads, each waiting for wakuv2.peerstats signal |
58 | | - logging.info("Starting threads to wait for wakuv2.peerstats signals...") |
| 41 | + # Start threads to create nodes |
| 42 | + logging.info("Starting threads to create full and light clients...") |
59 | 43 | threads = [] |
60 | 44 |
|
61 | | - for i in range(nodes_count): |
62 | | - thread = threading.Thread(target=create_node, args=(i,)) |
63 | | - thread.daemon = True |
64 | | - thread.start() |
65 | | - threads.append(thread) |
66 | | - |
67 | | - for thread in threads: |
68 | | - thread.join() |
69 | | - |
70 | | - assert len(nodes.keys()) == nodes_count, "Not all nodes created" |
71 | | - |
72 | | - # Wait for all nodes to discover each other |
73 | | - start_time = time.time() |
74 | | - while time.time() - start_time < DISCOVERY_TIMEOUT_SEC: |
75 | | - if _all_nodes_discovered(nodes, known_nodes): |
76 | | - return |
77 | | - time.sleep(0.5) |
78 | | - |
79 | | - assert False, f"Nodes failed to discover each other within {DISCOVERY_TIMEOUT_SEC} seconds" |
| 45 | + # First start full nodes |
| 46 | + for i in range(full_count): |
| 47 | + t = threading.Thread(target=create_node, args=(i, False)) |
| 48 | + t.daemon = True |
| 49 | + t.start() |
| 50 | + threads.append(t) |
| 51 | + |
| 52 | + # Then start light nodes |
| 53 | + for i in range(full_count, total_nodes): |
| 54 | + t = threading.Thread(target=create_node, args=(i, True)) |
| 55 | + t.daemon = True |
| 56 | + t.start() |
| 57 | + threads.append(t) |
| 58 | + |
| 59 | + for t in threads: |
| 60 | + t.join() |
| 61 | + |
| 62 | + assert len(nodes.keys()) == total_nodes, "Not all nodes created" |
| 63 | + |
| 64 | + # Build sets for expectations |
| 65 | + all_node_ids = set(nodes.keys()) |
| 66 | + boot_ids = {k for k, v in known_nodes.items() if v in ("boot-1", "store")} |
| 67 | + |
| 68 | + # Pre-build expected peers per node (static expectations) |
| 69 | + expected_by_node: dict[str, set[str]] = {} |
| 70 | + for pid in nodes.keys(): |
| 71 | + if pid in full_ids: |
| 72 | + # Full clients should discover all other clients and boot nodes |
| 73 | + expected = (all_node_ids | boot_ids) - {pid} |
| 74 | + else: |
| 75 | + # Light clients should discover only full clients and boot nodes |
| 76 | + expected = boot_ids | full_ids |
| 77 | + expected_by_node[pid] = expected |
| 78 | + |
| 79 | + # Check using tenacity.retry, logs peers on each iteration |
| 80 | + @retry(stop=stop_after_delay(120), wait=wait_fixed(1), reraise=True) |
| 81 | + def assert_discovery(): |
| 82 | + all_ok = True |
| 83 | + |
| 84 | + for peer_id, node in nodes.items(): |
| 85 | + # Fetch peers once per node per attempt |
| 86 | + peers = set(node.wakuext_service.peers()) |
| 87 | + |
| 88 | + # Log peer names |
| 89 | + peer_names = [known_nodes.get(peer, peer[-5:]) for peer in peers] |
| 90 | + logging.debug(f"Checking ode {known_nodes[peer_id]} peers: {peer_names}") |
| 91 | + |
| 92 | + if peers == expected_by_node[peer_id]: |
| 93 | + logging.info(f"Node {known_nodes[peer_id]} discovered peers as expected") |
| 94 | + nodes.pop(peer_id) |
| 95 | + continue |
| 96 | + |
| 97 | + all_ok = False |
| 98 | + |
| 99 | + assert all_ok, "Not all nodes discovered as expected" |
| 100 | + |
| 101 | + assert_discovery() |
0 commit comments