forked from bitcoin-dev-project/warnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_base.py
163 lines (139 loc) · 5.74 KB
/
test_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import json
import logging
import logging.config
import os
import re
import threading
from pathlib import Path
from subprocess import run
from tempfile import mkdtemp
from time import sleep
from warnet import SRC_DIR
from warnet.k8s import get_pod_exit_status
from warnet.network import _connected as network_connected
from warnet.status import _get_deployed_scenarios as scenarios_deployed
from warnet.status import _get_tank_status as network_status
class TestBase:
def __init__(self):
self.setup_environment()
self.setup_logging()
self.log_expected_msgs: None | [str] = None
self.log_unexpected_msgs: None | [str] = None
self.log_msg_assertions_passed = False
self.log.info("Warnet test base initialized")
def setup_environment(self):
self.tmpdir = Path(mkdtemp(prefix="warnet-test-"))
os.environ["XDG_STATE_HOME"] = str(self.tmpdir)
self.logfilepath = self.tmpdir / "warnet.log"
self.stop_threads = threading.Event()
self.network = True
def setup_logging(self):
with open(SRC_DIR / "logging_config.json") as f:
logging_config = json.load(f)
logging_config["handlers"]["file"]["filename"] = str(self.logfilepath)
logging.config.dictConfig(logging_config)
self.log = logging.getLogger("test")
self.log.info("Logging started")
self.log.info(f"Testdir: {self.tmpdir}")
def cleanup(self, signum=None, frame=None):
try:
self.log.info("Stopping network")
if self.network:
self.warnet("down --force")
self.wait_for_all_tanks_status(target="stopped", timeout=60, interval=1)
except Exception as e:
self.log.error(f"Error bringing network down: {e}")
finally:
self.stop_threads.set()
def _print_and_assert_msgs(self, message):
print(message)
if (self.log_expected_msgs or self.log_unexpected_msgs) and assert_log(
message, self.log_expected_msgs, self.log_unexpected_msgs
):
self.log_msg_assertions_passed = True
def assert_log_msgs(self):
assert (
self.log_msg_assertions_passed
), f"Log assertion failed. Expected message not found: {self.log_expected_msgs}"
self.log_msg_assertions_passed = False
def warnet(self, cmd):
self.log.debug(f"Executing warnet command: {cmd}")
command = ["warnet"] + cmd.split()
proc = run(command, capture_output=True)
if proc.stderr:
raise Exception(proc.stderr.decode().strip())
return proc.stdout.decode().strip()
def output_reader(self, pipe, func):
while not self.stop_threads.is_set():
line = pipe.readline().strip()
if line:
func(line)
def wait_for_predicate(self, predicate, timeout=5 * 60, interval=5):
self.log.debug(f"Waiting for predicate with timeout {timeout}s and interval {interval}s")
while timeout > 0:
try:
if predicate():
return
except Exception:
pass
sleep(interval)
timeout -= interval
import inspect
raise Exception(
f"Timed out waiting for Truth from predicate: {inspect.getsource(predicate).strip()}"
)
def get_tank(self, index):
# TODO
return None
def wait_for_all_tanks_status(self, target="running", timeout=20 * 60, interval=5):
"""Poll the warnet server for container status
Block until all tanks are running
"""
def check_status():
tanks = network_status()
stats = {"total": 0}
# "Probably" means all tanks are stopped and deleted
if len(tanks) == 0:
return True
for tank in tanks:
status = tank["status"]
stats["total"] += 1
stats[status] = stats.get(status, 0) + 1
self.log.info(f"Waiting for all tanks to reach '{target}': {stats}")
return target in stats and stats[target] == stats["total"]
self.wait_for_predicate(check_status, timeout, interval)
def wait_for_all_edges(self, timeout=20 * 60, interval=5):
"""Ensure all tanks have all the connections they are supposed to have
Block until all success
"""
self.wait_for_predicate(network_connected, timeout, interval)
def wait_for_all_scenarios(self):
def check_scenarios():
scns = scenarios_deployed()
if len(scns) == 0:
return True
for s in scns:
exit_status = get_pod_exit_status(s["name"], s["namespace"])
self.log.debug(f"Scenario {s['name']} exited with code {exit_status}")
if exit_status != 0:
return False
return True
self.wait_for_predicate(check_scenarios)
def assert_equal(thing1, thing2, *args):
if thing1 != thing2 or any(thing1 != arg for arg in args):
raise AssertionError(
"not({})".format(" == ".join(str(arg) for arg in (thing1, thing2) + args))
)
def assert_log(log_message, expected_msgs, unexpected_msgs=None) -> bool:
if unexpected_msgs is None:
unexpected_msgs = []
assert_equal(type(expected_msgs), list)
assert_equal(type(unexpected_msgs), list)
found = True
for unexpected_msg in unexpected_msgs:
if re.search(re.escape(unexpected_msg), log_message, flags=re.MULTILINE):
raise AssertionError(f"Unexpected message found in log: {unexpected_msg}")
for expected_msg in expected_msgs:
if re.search(re.escape(expected_msg), log_message, flags=re.MULTILINE) is None:
found = False
return found