Skip to content

Commit 2b1b2cf

Browse files
authored
Merge pull request #396 from Limmen/connection
unit test connection_util
2 parents 02698c7 + 5a83cf7 commit 2b1b2cf

File tree

1 file changed

+319
-0
lines changed

1 file changed

+319
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
from unittest.mock import patch, MagicMock
2+
from csle_common.util.connection_util import ConnectionUtil
3+
from csle_common.dao.emulation_observation.common.emulation_connection_observation_state import (
4+
EmulationConnectionObservationState,
5+
)
6+
import csle_common.constants.constants as constants
7+
8+
9+
class TestConnectionUtilSuite:
10+
"""
11+
Test suite for connection_util
12+
"""
13+
14+
@patch("paramiko.SSHClient")
15+
def test_ssh_setup_connection(self, mock_SSHClient) -> None:
16+
"""
17+
Test the helper function for setting up a SSH connection
18+
19+
:param mock_SSHClient: mock_SSHClient
20+
21+
:return: None
22+
"""
23+
mock_ssh_client = MagicMock()
24+
mock_SSHClient.return_value = mock_ssh_client
25+
mock_transport = MagicMock()
26+
mock_relay_channel = MagicMock()
27+
mock_ssh_client.get_transport.return_value = mock_transport
28+
mock_transport.open_channel.return_value = mock_relay_channel
29+
30+
a = MagicMock()
31+
a.ips = ["192.168.1.2"]
32+
a.ips_match.side_effect = lambda ips: True
33+
34+
credentials = [MagicMock()]
35+
credentials[0].service = constants.SSH.SERVICE_NAME
36+
credentials[0].username = "user"
37+
credentials[0].pw = "password"
38+
credentials[0].port = 22
39+
40+
proxy_connections = [MagicMock()]
41+
proxy_connections[0].ip = "192.168.1.1"
42+
proxy_connections[0].conn.get_transport.return_value = mock_transport
43+
44+
s = MagicMock()
45+
s.emulation_env_config.containers_config.agent_ip = "192.168.1.3"
46+
s.attacker_obs_state.agent_reachable = ["192.168.1.1"]
47+
48+
result = ConnectionUtil._ssh_setup_connection(
49+
a=a, credentials=credentials, proxy_connections=proxy_connections, s=s
50+
)
51+
mock_SSHClient.assert_not_called()
52+
assert result
53+
54+
@patch("csle_common.util.emulation_util.EmulationUtil.execute_ssh_cmd")
55+
def test_ssh_finalize_connection(self, mock_execute_ssh_cmd) -> None:
56+
"""
57+
Test the helper function for finalizing a SSH connection and setting up the DTO
58+
59+
:param mock_execute_ssh_cmd: mock_execute_ssh_cmd
60+
61+
:return: None
62+
"""
63+
mock_execute_ssh_cmd.side_effect = [
64+
(b"output", b"error", 0.1),
65+
(b"output", b"error", 0.1),
66+
(b"output", b"error", 0.1),
67+
(b"output", b"error", 0.1),
68+
(b"(ALL) NOPASSWD: ALL", b"", 0.1),
69+
]
70+
71+
target_machine = MagicMock()
72+
connection_setup_dto = MagicMock()
73+
connection_setup_dto.target_connections = [MagicMock()]
74+
connection_setup_dto.credentials = [MagicMock()]
75+
connection_setup_dto.credentials[0].username = "user"
76+
connection_setup_dto.ports = [22]
77+
connection_setup_dto.proxies = [MagicMock()]
78+
connection_setup_dto.ip = "192.168.1.2"
79+
80+
root, total_time = ConnectionUtil._ssh_finalize_connection(
81+
target_machine=target_machine, connection_setup_dto=connection_setup_dto, i=0
82+
)
83+
assert not root
84+
assert total_time
85+
mock_execute_ssh_cmd.assert_called()
86+
87+
@patch("telnetlib.Telnet")
88+
def test_telnet_setup_connection(self, mock_telnet) -> None:
89+
"""
90+
Test the helper function for setting up a Telnet connection to a target machine
91+
92+
:param mock_telnet: mock_telnet
93+
94+
:return: None
95+
"""
96+
mock_telnet_conn = MagicMock()
97+
mock_telnet.return_value = mock_telnet_conn
98+
mock_telnet_conn.read_until.side_effect = [
99+
constants.TELNET.LOGIN_PROMPT,
100+
constants.TELNET.PASSWORD_PROMPT,
101+
constants.TELNET.PROMPT,
102+
]
103+
mock_telnet_conn.write.return_value = None
104+
mock_telnet_conn.read_until.return_value = b"$"
105+
106+
a = MagicMock()
107+
a.ips = ["192.168.1.2"]
108+
a.ips_match.side_effect = lambda ips: True
109+
110+
credentials = [MagicMock()]
111+
credentials[0].service = constants.TELNET.SERVICE_NAME
112+
credentials[0].username = "user"
113+
credentials[0].pw = "password"
114+
credentials[0].port = 23
115+
116+
proxy_connections = [MagicMock()]
117+
proxy_connections[0].ip = "192.168.1.1"
118+
proxy_connections[0].conn.get_transport.return_value = MagicMock()
119+
120+
s = MagicMock()
121+
s.emulation_env_config.containers_config.agent_ip = "192.168.1.3"
122+
s.attacker_obs_state.agent_reachable = ["192.168.1.1"]
123+
s.get_attacker_machine.return_value.reachable = ["192.168.1.2"]
124+
s.emulation_env_config.get_port_forward_port.return_value = 9999
125+
126+
result = ConnectionUtil._telnet_setup_connection(
127+
a=a, credentials=credentials, proxy_connections=proxy_connections, s=s
128+
)
129+
mock_telnet.assert_called()
130+
assert result
131+
132+
def test_telnet_finalize_connection(self) -> None:
133+
"""
134+
Test the helper function for finalizing a Telnet connection to a target machine and creating the DTO
135+
136+
:return: None
137+
"""
138+
target_machine = MagicMock()
139+
connection_setup_dto = MagicMock()
140+
mock_telnet_conn = [MagicMock() for _ in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CHECK_ROOT)]
141+
connection_setup_dto.target_connections = mock_telnet_conn
142+
connection_setup_dto.credentials = [
143+
MagicMock() for _ in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CHECK_ROOT)
144+
]
145+
for credential in connection_setup_dto.credentials:
146+
credential.username = "user"
147+
connection_setup_dto.tunnel_threads = [
148+
MagicMock() for _ in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CHECK_ROOT)
149+
]
150+
connection_setup_dto.forward_ports = [9999 for _ in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CHECK_ROOT)]
151+
connection_setup_dto.ports = [23 for _ in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CHECK_ROOT)]
152+
connection_setup_dto.proxies = [MagicMock() for _ in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CHECK_ROOT)]
153+
connection_setup_dto.ip = "192.168.1.2"
154+
155+
for i, conn in enumerate(mock_telnet_conn):
156+
conn.read_until.side_effect = [b"user may not run sudo" if i < 4 else b"(ALL) NOPASSWD: ALL"]
157+
158+
root, total_time = ConnectionUtil._telnet_finalize_connection(
159+
target_machine=target_machine, i=0, connection_setup_dto=connection_setup_dto
160+
)
161+
assert not root
162+
assert total_time
163+
164+
@patch("ftplib.FTP")
165+
def test_ftp_setup_connection(self, mock_ftp) -> None:
166+
"""
167+
Test the helper function for setting up a FTP connection
168+
169+
:param mock_ftp: mock_ftp
170+
171+
:return: None
172+
"""
173+
mock_ftp_conn = MagicMock()
174+
mock_ftp.return_value = mock_ftp_conn
175+
mock_ftp_conn.login.return_value = "230 Login successful."
176+
177+
a = MagicMock()
178+
a.ips = ["192.168.1.2"]
179+
a.ips_match.side_effect = lambda ips: True
180+
181+
credentials = [MagicMock()]
182+
credentials[0].service = constants.FTP.SERVICE_NAME
183+
credentials[0].username = "user"
184+
credentials[0].pw = "password"
185+
credentials[0].port = 21
186+
187+
proxy_connections = [MagicMock()]
188+
proxy_connections[0].ip = "192.168.1.1"
189+
proxy_connections[0].conn.get_transport.return_value = MagicMock()
190+
proxy_connections[0].conn.invoke_shell.return_value = MagicMock()
191+
192+
s = MagicMock()
193+
s.emulation_env_config.containers_config.agent_ip = "192.168.1.3"
194+
s.attacker_obs_state.agent_reachable = ["192.168.1.1"]
195+
s.get_attacker_machine.return_value.reachable = ["192.168.1.2"]
196+
s.emulation_env_config.get_port_forward_port.return_value = 9999
197+
198+
result = ConnectionUtil._ftp_setup_connection(
199+
a=a, credentials=credentials, proxy_connections=proxy_connections, s=s
200+
)
201+
mock_ftp.assert_not_called()
202+
assert result
203+
204+
def test_ftp_finalize_connection(self) -> None:
205+
"""
206+
Test helper function for creating the connection DTO for FTP
207+
208+
:return: None
209+
"""
210+
target_machine = MagicMock()
211+
target_machine.ftp_connections = []
212+
213+
connection_setup_dto = MagicMock()
214+
connection_setup_dto.target_connections = [MagicMock()]
215+
connection_setup_dto.credentials = [MagicMock()]
216+
connection_setup_dto.tunnel_threads = [MagicMock()]
217+
connection_setup_dto.forward_ports = [9999]
218+
connection_setup_dto.ports = [21]
219+
connection_setup_dto.interactive_shells = [MagicMock()]
220+
connection_setup_dto.proxies = [MagicMock()]
221+
connection_setup_dto.ip = "192.168.1.2"
222+
223+
root, cost = ConnectionUtil._ftp_finalize_connection(
224+
target_machine=target_machine, i=0, connection_setup_dto=connection_setup_dto
225+
)
226+
assert not root
227+
assert cost == 0
228+
229+
def test__find_jump_host_connection(self) -> None:
230+
"""
231+
Test utility function for finding a jump-host from the set of compromised machines to reach a target IP
232+
233+
:return: None
234+
"""
235+
ip = "192.168.1.100"
236+
s = MagicMock()
237+
s.attacker_obs_state = MagicMock()
238+
s.attacker_obs_state.agent_reachable = ["192.168.1.100"]
239+
s.emulation_env_config.containers_config.agent_ip = "192.168.1.1"
240+
s.emulation_env_config.get_hacker_connection.return_value = MagicMock()
241+
242+
result = ConnectionUtil.find_jump_host_connection(ip=ip, s=s)
243+
assert result.ip == "192.168.1.1"
244+
assert result.root
245+
246+
@patch("csle_common.util.emulation_util.EmulationUtil.execute_ssh_cmd")
247+
def test_test_connection(self, mock_execute_ssh_cmd) -> None:
248+
"""
249+
Test utility function for testing if a connection is alive or not
250+
251+
:param mock_execute_ssh_cmd: mock_execute_ssh_cmd
252+
253+
:return: None
254+
"""
255+
mock_conn = MagicMock()
256+
mock_execute_ssh_cmd.return_value = (b"user", b"", 0.1)
257+
c = EmulationConnectionObservationState(
258+
conn=mock_conn, credential=MagicMock(), root=False, service="ssh", port=22
259+
)
260+
result = ConnectionUtil.test_connection(c=c)
261+
assert result
262+
263+
@patch("paramiko.SSHClient")
264+
def test_reconnect_ssh(self, mock_SSHClient) -> None:
265+
"""
266+
Test the method that reconnects the given SSH connection if it has died for some reason
267+
268+
:param mock_SSHClient: mock_SSHClient
269+
270+
:return: None
271+
"""
272+
mock_credential = MagicMock()
273+
mock_credential.username = "user"
274+
mock_credential.pw = "password"
275+
mock_credential.port = 22
276+
277+
c = EmulationConnectionObservationState(
278+
conn=MagicMock(), credential=mock_credential, root=False, service="ssh", port=22, ip="192.168.1.2"
279+
)
280+
mock_ssh_client = mock_SSHClient.return_value
281+
mock_transport = MagicMock()
282+
mock_ssh_client.get_transport.return_value = mock_transport
283+
284+
result = ConnectionUtil.reconnect_ssh(c)
285+
assert result.conn == mock_ssh_client
286+
287+
@patch("csle_common.util.connection_util.ConnectionUtil.reconnect_ssh")
288+
@patch("telnetlib.Telnet")
289+
def test_reconnect_telnet(self, mock_telnet, mock_reconnect_ssh) -> None:
290+
"""
291+
Test the method that reconnects the given Telnet connection if it has died for some reason
292+
293+
:param mock_telnet: mock_telnet
294+
:param mock_reconnect_ssh: mock_reconnect_ssh
295+
296+
:return: None
297+
"""
298+
mock_credential = MagicMock()
299+
mock_credential.username = "user"
300+
mock_credential.pw = "password"
301+
302+
mock_proxy_conn = MagicMock()
303+
proxy = EmulationConnectionObservationState(
304+
conn=mock_proxy_conn, credential=mock_credential, root=False, service="ssh", port=22, ip="192.168.1.1"
305+
)
306+
c = EmulationConnectionObservationState(
307+
conn=None, credential=mock_credential, root=False, service="telnet", port=23, ip="192.168.1.2", proxy=proxy
308+
)
309+
310+
mock_telnet_conn = mock_telnet.return_value
311+
mock_telnet_conn.read_until.side_effect = [
312+
constants.TELNET.LOGIN_PROMPT,
313+
constants.TELNET.PASSWORD_PROMPT,
314+
constants.TELNET.PROMPT,
315+
]
316+
317+
mock_reconnect_ssh.return_value = proxy
318+
result = ConnectionUtil.reconnect_telnet(c)
319+
assert result.conn == mock_telnet_conn

0 commit comments

Comments
 (0)