Skip to content

Commit 39a3fff

Browse files
committed
Add test to check that pub (peer) > router > sub (client) works as expected.
Test checks the following scenarios: * TCP Unicast * UDP Multicast * Pub starting first * Sub starting first
1 parent 1b655b3 commit 39a3fff

File tree

3 files changed

+211
-0
lines changed

3 files changed

+211
-0
lines changed

.github/workflows/ci.yml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,39 @@ jobs:
470470
RUST_LOG=debug sudo python3 ./build/tests/connection_restore.py ./zenoh-standalone/zenohd
471471
timeout-minutes: 20
472472

473+
routed_peer_client_test:
474+
needs: zenoh_build
475+
name: Test routed peer client communication
476+
runs-on: ubuntu-latest
477+
steps:
478+
- name: Checkout code
479+
uses: actions/checkout@v4
480+
481+
- name: Download Zenoh artifacts
482+
uses: actions/download-artifact@v4
483+
with:
484+
name: ${{ needs.zenoh_build.outputs.artifact-name }}
485+
486+
- name: Unzip Zenoh artifacts
487+
run: unzip ${{ needs.zenoh_build.outputs.artifact-name }} -d zenoh-standalone
488+
489+
- id: run-zenoh
490+
name: Run Zenoh router
491+
run: |
492+
RUST_LOG=debug ./zenoh-standalone/zenohd -l tcp/127.0.0.1:7447 -l udp/224.0.0.123:7447#iface=lo &
493+
echo "zenohd-pid=$!" >> $GITHUB_OUTPUT
494+
495+
- name: Build project and run test
496+
run: |
497+
sudo apt install -y ninja-build
498+
Z_FEATURE_UNSTABLE_API=1 BATCH_MULTICAST_SIZE=8192 CMAKE_GENERATOR=Ninja make
499+
python3 ./build/tests/routed_peer_client.py tcp/127.0.0.1:7447 udp/224.0.0.123:7447#iface=lo
500+
timeout-minutes: 15
501+
502+
- name: Kill Zenoh router
503+
if: always()
504+
run: kill ${{ steps.run-zenoh.outputs.zenohd-pid }}
505+
473506
unicast_peer_test:
474507
name: P2p unicast test
475508
runs-on: ubuntu-latest

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ if(UNIX OR MSVC)
656656
configure_file(${PROJECT_SOURCE_DIR}/tests/no_router.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/no_router.py COPYONLY)
657657
configure_file(${PROJECT_SOURCE_DIR}/tests/memory_leak.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/memory_leak.py COPYONLY)
658658
configure_file(${PROJECT_SOURCE_DIR}/tests/connection_restore.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/connection_restore.py COPYONLY)
659+
configure_file(${PROJECT_SOURCE_DIR}/tests/routed_peer_client.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/routed_peer_client.py COPYONLY)
659660

660661
enable_testing()
661662
add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test)

tests/routed_peer_client.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""Test suite for pub (peer) > router > sub (client) scenarios."""
2+
import os
3+
from signal import SIGINT
4+
import subprocess
5+
import sys
6+
import time
7+
import re
8+
9+
# Specify the directory for the binaries
10+
DIR_EXAMPLES = "build/examples"
11+
12+
13+
def pub_and_sub(pub_args, sub_args, pub_first=True):
14+
"""
15+
Run a publisher and subscriber test.
16+
17+
Parameters:
18+
pub_args (str): Arguments passed to the publisher binary.
19+
sub_args (str): Arguments passed to the subscriber binary.
20+
pub_first (bool): If True, start publisher first; otherwise start subscriber first.
21+
22+
Returns:
23+
int: 0 if the test passes, 1 if it fails.
24+
"""
25+
test_status = 0
26+
27+
# Expected z_pub status
28+
z_pub_expected_status = 0
29+
30+
# Expected z_sub output & status
31+
z_sub_expected_status = 0
32+
z_sub_expected_pattern = re.compile(
33+
r">> \[Subscriber\] Received \('demo/example/zenoh-pico-pub': '\[.*?\] Pub from Pico!'\)"
34+
)
35+
36+
z_pub_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_pub {pub_args} -n 10"
37+
z_sub_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_sub {sub_args} -n 1"
38+
39+
print("PUB CMD:", z_pub_command)
40+
print("SUB CMD:", z_sub_command)
41+
42+
if pub_first:
43+
print("Start publisher")
44+
# Start z_pub
45+
z_pub_process = subprocess.Popen(
46+
z_pub_command,
47+
shell=True,
48+
stdin=subprocess.PIPE,
49+
stdout=subprocess.PIPE,
50+
stderr=subprocess.PIPE,
51+
text=True,
52+
) # pylint: disable=consider-using-with
53+
54+
# Give publisher time to start
55+
time.sleep(2)
56+
57+
print("Start subscriber")
58+
# Start z_sub
59+
z_sub_process = subprocess.Popen(
60+
z_sub_command,
61+
shell=True,
62+
stdin=subprocess.PIPE,
63+
stdout=subprocess.PIPE,
64+
stderr=subprocess.PIPE,
65+
text=True,
66+
start_new_session=True,
67+
) # pylint: disable=consider-using-with
68+
else:
69+
print("Start subscriber")
70+
# Start z_sub first
71+
z_sub_process = subprocess.Popen(
72+
z_sub_command,
73+
shell=True,
74+
stdin=subprocess.PIPE,
75+
stdout=subprocess.PIPE,
76+
stderr=subprocess.PIPE,
77+
text=True,
78+
start_new_session=True,
79+
) # pylint: disable=consider-using-with
80+
81+
# Give subscriber time to start
82+
time.sleep(2)
83+
84+
print("Start publisher")
85+
# Then start z_pub
86+
z_pub_process = subprocess.Popen(
87+
z_pub_command,
88+
shell=True,
89+
stdin=subprocess.PIPE,
90+
stdout=subprocess.PIPE,
91+
stderr=subprocess.PIPE,
92+
text=True,
93+
)
94+
95+
# Wait for z_pub to finish
96+
z_pub_process.wait()
97+
98+
print("Stop subscriber")
99+
if z_sub_process.poll() is None:
100+
# send SIGINT to group (safe because of start_new_session=True)
101+
z_sub_process_gid = os.getpgid(z_sub_process.pid)
102+
os.killpg(z_sub_process_gid, SIGINT)
103+
104+
print("Check publisher status")
105+
# Check the exit status of z_pub
106+
z_pub_status = z_pub_process.returncode
107+
if z_pub_status == z_pub_expected_status:
108+
print("z_pub status valid")
109+
else:
110+
print(f"z_pub status invalid, expected: {z_pub_expected_status}, received: {z_pub_status}")
111+
test_status = 1
112+
113+
print("Check subscriber status & output")
114+
# Check the exit status of z_sub
115+
z_sub_status = z_sub_process.returncode
116+
if z_sub_status == z_sub_expected_status:
117+
print("z_sub status valid")
118+
else:
119+
print(f"z_sub status invalid, expected: {z_sub_expected_status}, received: {z_sub_status}")
120+
test_status = 1
121+
122+
# Check the output of z_sub
123+
if z_sub_expected_pattern.search(z_sub_process.stdout.read()):
124+
print("z_sub output valid")
125+
else:
126+
print("z_sub output invalid:")
127+
test_status = 1
128+
129+
# Return value
130+
return test_status
131+
132+
def test_tcp_unicast(locator, pub_first=True):
133+
"""Run TCP unicast pub/sub test."""
134+
print(f"*** TCP Unicast Test (pub_first={pub_first}) ***")
135+
pub_args = f"-m peer -e {locator}"
136+
sub_args = f"-m client -e {locator}"
137+
138+
return pub_and_sub(pub_args, sub_args, pub_first)
139+
140+
def test_udp_multicast(tcp_locator, udp_locator, pub_first=True):
141+
"""Run UDP multicast pub/sub test."""
142+
print(f"*** UDP Multicast Test (pub_first={pub_first}) ***")
143+
pub_args = f"-m peer -l {udp_locator}"
144+
sub_args = f"-m client -e {tcp_locator}"
145+
146+
return pub_and_sub(pub_args, sub_args, pub_first)
147+
148+
if __name__ == "__main__":
149+
EXIT_STATUS = 0
150+
151+
_tcp_unicast = None
152+
_udp_multicast = None
153+
154+
args = sys.argv[1:]
155+
156+
for arg in args:
157+
if arg.startswith("tcp/"):
158+
_tcp_unicast = arg
159+
elif arg.startswith("udp/"):
160+
_udp_multicast = arg
161+
162+
print("TCP unicast locator:", _tcp_unicast)
163+
print("UDP multicast locator:", _udp_multicast)
164+
165+
if _tcp_unicast is not None:
166+
test_tcp_unicast(_tcp_unicast, True)
167+
test_tcp_unicast(_tcp_unicast, False)
168+
else:
169+
print("No TCP unicast locator provided, skipping test_tcp_unicast.")
170+
171+
if _tcp_unicast is not None and _udp_multicast is not None:
172+
test_udp_multicast(_tcp_unicast, _udp_multicast, True)
173+
test_udp_multicast(_tcp_unicast, _udp_multicast, False)
174+
else:
175+
print("No TCP unicast or UDP multicast locators provided, skipping test_udp_multicast.")
176+
177+
sys.exit(EXIT_STATUS)

0 commit comments

Comments
 (0)