Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _assets/scripts/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/sh

set -e

# Define $SCAN_WAKU_FLEET to run a command before running the actual app (CMD).
# This is expected to be used in functional tests to run `scan_waku_fleet.py` script before starting the app.
$SCAN_WAKU_FLEET
Expand Down
35 changes: 9 additions & 26 deletions messaging/waku/gowaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,32 +626,15 @@ func (w *Waku) runPeerExchangeLoop() {
w.logger.Debug("Peer exchange loop stopped")
return
case <-ticker.C:
w.logger.Info("Running peer exchange loop")

// We select only the nodes discovered via DNS Discovery that support peer exchange
// We assume that those peers are running peer exchange according to infra config,
// If not, the peer selection process in go-waku will filter them out anyway
w.dnsAddressCacheLock.RLock()
var peers peer.IDSlice
for _, record := range w.dnsAddressCache {
for _, discoveredNode := range record {
if len(discoveredNode.PeerInfo.Addrs) == 0 {
continue
}
// Attempt to connect to the peers.
// Peers will be added to the libp2p peer store thanks to identify
go w.connect(discoveredNode.PeerInfo, discoveredNode.ENR, wps.DNSDiscovery)
peers = append(peers, discoveredNode.PeerID)
}
}
w.dnsAddressCacheLock.RUnlock()

if len(peers) != 0 {
err := w.node.PeerExchange().Request(w.ctx, w.cfg.DiscoveryLimit, peer_exchange.WithAutomaticPeerSelection(peers...),
peer_exchange.FilterByShard(int(w.defaultShardInfo.ClusterID), int(w.defaultShardInfo.ShardIDs[0])))
if err != nil {
w.logger.Error("couldnt request peers via peer exchange", zap.Error(err))
}
w.logger.Debug("Running peer exchange loop")
err := w.node.PeerExchange().Request(
w.ctx,
w.cfg.DiscoveryLimit,
peer_exchange.WithAutomaticPeerSelection(),
peer_exchange.FilterByShard(int(w.defaultShardInfo.ClusterID), int(w.defaultShardInfo.ShardIDs[0])),
)
if err != nil {
w.logger.Error("could not request peers via peer exchange", zap.Error(err))
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion tests-functional/README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,13 @@ If that is so, force Python to use the same Docker socket (from `docker context
```shell
export DOCKER_HOST=unix:///var/run/docker.sock
```
Add this before launching your test runner
Add this before launching your test runner

## `/usr/status-user/wakufleetconfig.json: no such file or directory"`

When running tests against `status-backend` Docker containers, you might see this error:
```
DEBUG:root:Got response: b'{"error":"failed to open fleets json file: open /usr/status-user/wakufleetconfig.json: no such file or directory"}'
```

If running with default arguments, this most likely means that you're not running local Waku fleet as described in [Prerequisites](#prerequisites).
2 changes: 1 addition & 1 deletion tests-functional/clients/statusgo_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def __init__(self, privileged=False, ipv6=False, **kwargs):

super().__init__(entrypoint, ports, privileged, container_name_suffix=f"-status-backend-{host_port}")

bridge_network = kwargs.get("bridge_network", False)
bridge_network = kwargs.get("bridge_network", True)
if bridge_network:
self.connect_to_bridge_network()

Expand Down
2 changes: 1 addition & 1 deletion tests-functional/docker-compose.waku.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ services:
"--log-level=INFO",
"--max-connections=18000",
"--nodekey=3190bc9b55b18dbc171997a7a67abcd5bbf0c81002ad9617b1cb67f2f15daa64",
"--peer-exchange=false",
"--peer-exchange=true",
"--relay=true",
"--rest-address=0.0.0.0",
"--rest-admin",
Expand Down
140 changes: 89 additions & 51 deletions tests-functional/tests/test_discovery.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,110 @@
import logging
import time
import pytest
import threading

from clients.status_backend import StatusBackend

DISCOVERY_TIMEOUT_SEC = 30


def _all_nodes_discovered(nodes: dict[str, StatusBackend], known_peers: dict[str, str]) -> bool:
"""Check if all nodes discovered each other"""
for peer_id, node in nodes.items():
if node is None:
return False
peers = node.wakuext_service.peers()

# Use shorter names for logging
peer_names = [known_peers.get(peer, peer[-5:]) for peer in peers]
logging.info(f"Node {known_peers[peer_id]} peers: {peer_names}")
import pytest
from tenacity import retry, stop_after_delay, wait_fixed

for known_node in known_peers.keys():
if known_node == peer_id:
continue
if known_node not in peers:
return False
return True
from clients.status_backend import StatusBackend


@pytest.mark.rpc
class TestDiscovery:

def test_discovery(self, backend_new_profile):
nodes_count = 3
# Desired topology: 3 full nodes, 2 light nodes
full_count = 3
light_count = 2
total_nodes = full_count + light_count

nodes: dict[str, StatusBackend] = {}
full_ids: set[str] = set()

known_nodes = {
# Boot nodes available in the fleet
"16Uiu2HAm3vFYHkGRURyJ6F7bwDyzMLtPEuCg4DU89T7km2u8Fjyb": "boot-1",
"16Uiu2HAmCDqxtfF1DwBqs7UJ4TgSnjoh6j1RtE1hhQxLLao84jLi": "store",
}

def create_node(node_index: int):
"""Function to run in each thread - waits for wakuv2.peerstats signal"""
backend = backend_new_profile(f"node_{node_index}")
def create_node(node_index: int, waku_light_client: bool):
backend = backend_new_profile(
f"node_{node_index}",
# We mix full and light clients in a single test.
# Light clients (edge nodes) will not discover each,
# but they must discover full clients.
waku_light_client=waku_light_client,
# Force the container to only have the docker compose network.
# Otherwise, advertised ENRs may contain the wrong IP address.
bridge_network=False,
)
peer_id = backend.wakuext_service.peer_id()
known_nodes[peer_id] = f"backend_{node_index}"
if not waku_light_client:
full_ids.add(peer_id)
nodes[peer_id] = backend
logging.info(f"Backend {node_index} ready. Peer ID: {peer_id}")

# Run threads, each waiting for wakuv2.peerstats signal
logging.info("Starting threads to wait for wakuv2.peerstats signals...")
info = f"Peer ID: {peer_id}"
info += f", URL: {backend.url}"
if backend.container:
info += f", Container: {backend.container.short_id()}"
info += f", Type: {'full' if not waku_light_client else 'light'}"
logging.info(f"Backend {node_index} ready. {info}")

# Start threads to create nodes
logging.info("Starting threads to create full and light clients...")
threads = []

for i in range(nodes_count):
thread = threading.Thread(target=create_node, args=(i,))
thread.daemon = True
thread.start()
threads.append(thread)

for thread in threads:
thread.join()

assert len(nodes.keys()) == nodes_count, "Not all nodes created"

# Wait for all nodes to discover each other
start_time = time.time()
while time.time() - start_time < DISCOVERY_TIMEOUT_SEC:
if _all_nodes_discovered(nodes, known_nodes):
return
time.sleep(0.5)

assert False, f"Nodes failed to discover each other within {DISCOVERY_TIMEOUT_SEC} seconds"
# First start full nodes
for i in range(full_count):
t = threading.Thread(target=create_node, args=(i, False))
t.daemon = True
t.start()
threads.append(t)

# Then start light nodes
for i in range(full_count, total_nodes):
t = threading.Thread(target=create_node, args=(i, True))
t.daemon = True
t.start()
threads.append(t)

for t in threads:
t.join()

assert len(nodes.keys()) == total_nodes, "Not all nodes created"

# Build sets for expectations
all_node_ids = set(nodes.keys())
boot_ids = {k for k, v in known_nodes.items() if v in ("boot-1", "store")}

# Pre-build expected peers per node (static expectations)
expected_by_node: dict[str, set[str]] = {}
for pid in nodes.keys():
if pid in full_ids:
# Full clients should discover all other clients and boot nodes
expected = (all_node_ids | boot_ids) - {pid}
else:
# Light clients should discover only full clients and boot nodes
expected = boot_ids | full_ids
expected_by_node[pid] = expected

# Check using tenacity.retry, logs peers on each iteration
@retry(stop=stop_after_delay(120), wait=wait_fixed(1), reraise=True)
def assert_discovery():
all_ok = True

for peer_id, node in nodes.items():
# Fetch peers once per node per attempt
peers = set(node.wakuext_service.peers())

# Log peer names
peer_names = [known_nodes.get(peer, peer[-5:]) for peer in peers]
logging.debug(f"Checking node {known_nodes[peer_id]} peers: {peer_names}")

if peers == expected_by_node[peer_id]:
logging.info(f"Node {known_nodes[peer_id]} discovered peers as expected")
nodes.pop(peer_id)
continue

all_ok = False

assert all_ok, "Not all nodes discovered as expected"

assert_discovery()
Loading