forked from bitcoin-dev-project/warnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscenarios_test.py
executable file
·129 lines (105 loc) · 5.11 KB
/
scenarios_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3
import os
import re
from pathlib import Path
from test_base import TestBase
from warnet.control import stop_scenario
from warnet.process import run_command
from warnet.status import _get_deployed_scenarios as scenarios_deployed
class ScenariosTest(TestBase):
def __init__(self):
super().__init__()
self.network_dir = Path(os.path.dirname(__file__)) / "data" / "12_node_ring"
self.scen_dir = Path(os.path.dirname(__file__)).parent / "resources" / "scenarios"
def run_test(self):
try:
self.setup_network()
self.run_and_check_miner_scenario_from_file()
self.run_and_check_scenario_from_file()
self.run_and_check_scenario_from_file_debug()
self.check_regtest_recon()
self.check_active_count()
finally:
self.cleanup()
def setup_network(self):
self.log.info("Setting up network")
self.log.info(self.warnet(f"deploy {self.network_dir}"))
self.wait_for_all_tanks_status(target="running")
self.wait_for_all_edges()
def scenario_running(self, scenario_name: str):
"""Check that we are only running a single scenario of the correct name"""
deployed = scenarios_deployed()
assert len(deployed) == 1
return scenario_name in deployed[0]["name"]
def check_scenario_stopped(self):
running = scenarios_deployed()
self.log.debug(f"Checking if scenario stopped. Running scenarios: {len(running)}")
return len(running) == 0
def check_scenario_clean_exit(self):
deployed = scenarios_deployed()
return all(scenario["status"] == "succeeded" for scenario in deployed)
def stop_scenario(self):
self.log.info("Stopping running scenario")
running = scenarios_deployed()
assert len(running) == 1, f"Expected one running scenario, got {len(running)}"
assert running[0]["status"] == "running", "Scenario should be running"
stop_scenario(running[0]["name"])
self.wait_for_predicate(self.check_scenario_stopped)
def check_blocks(self, target_blocks, start: int = 0):
count = int(self.warnet("bitcoin rpc tank-0000 getblockcount"))
self.log.debug(f"Current block count: {count}, target: {start + target_blocks}")
try:
deployed = scenarios_deployed()
commander = deployed[0]["commander"]
command = f"kubectl logs {commander}"
print("\ncommander output:")
print(run_command(command))
print("\n")
except Exception:
pass
return count >= start + target_blocks
def run_and_check_miner_scenario_from_file(self):
scenario_file = self.scen_dir / "miner_std.py"
self.log.info(f"Running scenario from file: {scenario_file}")
self.warnet(f"run {scenario_file} --allnodes --interval=1")
start = int(self.warnet("bitcoin rpc tank-0000 getblockcount"))
self.wait_for_predicate(lambda: self.scenario_running("commander-minerstd"))
self.wait_for_predicate(lambda: self.check_blocks(2, start=start))
table = self.warnet("status")
assert "Active Scenarios: 1" in table
self.stop_scenario()
def run_and_check_scenario_from_file_debug(self):
scenario_file = self.scen_dir / "test_scenarios" / "p2p_interface.py"
self.log.info(f"Running scenario from: {scenario_file}")
output = self.warnet(f"run {scenario_file} --source_dir={self.scen_dir} --debug")
self.check_for_pod_deletion_message(output)
def run_and_check_scenario_from_file(self):
scenario_file = self.scen_dir / "test_scenarios" / "p2p_interface.py"
self.log.info(f"Running scenario from: {scenario_file}")
self.warnet(f"run {scenario_file} --source_dir={self.scen_dir}")
self.wait_for_predicate(self.check_scenario_clean_exit)
def check_regtest_recon(self):
scenario_file = self.scen_dir / "reconnaissance.py"
self.log.info(f"Running scenario from file: {scenario_file}")
self.warnet(f"run {scenario_file}")
self.wait_for_predicate(self.check_scenario_clean_exit)
def check_active_count(self):
scenario_file = self.scen_dir / "test_scenarios" / "buggy_failure.py"
self.log.info(f"Running scenario from: {scenario_file}")
self.warnet(f"run {scenario_file} --source_dir={self.scen_dir}")
def two_pass_one_fail():
deployed = scenarios_deployed()
if len([s for s in deployed if s["status"] == "succeeded"]) != 2:
return False
return len([s for s in deployed if s["status"] == "failed"]) == 1
self.wait_for_predicate(two_pass_one_fail)
table = self.warnet("status")
assert "Active Scenarios: 0" in table
def check_for_pod_deletion_message(self, input):
message = "Deleting pod..."
self.log.info(f"Checking for message: '{message}'")
assert re.search(re.escape(message), input, flags=re.MULTILINE)
self.log.info(f"Found message: '{message}'")
if __name__ == "__main__":
test = ScenariosTest()
test.run_test()