|
| 1 | +import pytest |
| 2 | +import docker |
| 3 | +import logging |
| 4 | +import grpc |
| 5 | +from unittest.mock import MagicMock |
| 6 | +from docker.types import IPAMConfig, IPAMPool |
| 7 | +import time |
| 8 | +from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig |
| 9 | +from csle_common.controllers.host_controller import HostController |
| 10 | +import csle_common.constants.constants as constants |
| 11 | +import csle_collector.host_manager.host_manager_pb2_grpc |
| 12 | +import csle_collector.host_manager.host_manager_pb2 |
| 13 | +import csle_collector.host_manager.query_host_manager |
| 14 | +from csle_common.metastore.metastore_facade import MetastoreFacade |
| 15 | +from typing import List |
| 16 | +from typing import Generator |
| 17 | + |
| 18 | + |
| 19 | +@pytest.fixture(scope="module") |
| 20 | +def docker_client() -> None: |
| 21 | + """ |
| 22 | + Initialize and Provide a Docker client instance for the test |
| 23 | +
|
| 24 | + :return: None |
| 25 | + """ |
| 26 | + return docker.from_env() |
| 27 | + |
| 28 | + |
| 29 | +@pytest.fixture(scope="module") |
| 30 | +def network(docker_client) -> Generator: |
| 31 | + """ |
| 32 | + Create a custom network with a specific subnet |
| 33 | +
|
| 34 | + :param docker_client: docker_client |
| 35 | + :yield: network |
| 36 | +
|
| 37 | + :return: None |
| 38 | + """ |
| 39 | + subnet = "15.15.15.0/24" |
| 40 | + ipam_pool = IPAMPool(subnet=subnet) |
| 41 | + ipam_config = IPAMConfig(pool_configs=[ipam_pool]) |
| 42 | + logging.info(f"Creating virtual network with subnet: {subnet}") |
| 43 | + network = docker_client.networks.create("test_network", driver="bridge", ipam=ipam_config) |
| 44 | + yield network |
| 45 | + network.remove() |
| 46 | + |
| 47 | + |
| 48 | +def get_derived_containers(docker_client, excluded_tag="blank") -> List: |
| 49 | + """ |
| 50 | + Get all the containers except the blank ones |
| 51 | +
|
| 52 | + :param docker_client: docker_client |
| 53 | +
|
| 54 | + :return: List of Images |
| 55 | + """ |
| 56 | + # Get all images except those with the excluded tag |
| 57 | + config = MetastoreFacade.get_config(id=1) |
| 58 | + match_tag = config.version |
| 59 | + all_images = docker_client.images.list() |
| 60 | + derived_images = [image for image in all_images |
| 61 | + if (any(match_tag in tag for tag in image.tags) |
| 62 | + and all(constants.CONTAINER_IMAGES.BASE not in tag for tag in image.tags) |
| 63 | + and all(excluded_tag not in tag for tag in image.tags))] |
| 64 | + return derived_images |
| 65 | + |
| 66 | + |
| 67 | +@pytest.fixture(scope="module", params=get_derived_containers(docker.from_env())) |
| 68 | +def container_setup(request, docker_client, network) -> Generator: |
| 69 | + """ |
| 70 | + Starts a Docker container before running tests and ensures its stopped and removed after tests complete. |
| 71 | +
|
| 72 | + :param request: request |
| 73 | + :param docker_client: docker_client |
| 74 | + :yield: container |
| 75 | +
|
| 76 | + :return: None |
| 77 | + """ |
| 78 | + # Create and start each derived container |
| 79 | + image = request.param |
| 80 | + container = docker_client.containers.create(image.tags[0], command="sh -c 'while true; do sleep 3600; done'", |
| 81 | + detach=True) |
| 82 | + network.connect(container) |
| 83 | + logging.info(f"Starting container: {container.id} with image: {container.image.tags}") |
| 84 | + container.start() |
| 85 | + yield container |
| 86 | + logging.info(f"Stopping and removing container: {container.id} with image: {container.image.tags}") |
| 87 | + container.stop() |
| 88 | + container.remove() |
| 89 | + |
| 90 | + |
| 91 | +def test_start_host_manager(container_setup) -> None: |
| 92 | + """ |
| 93 | + Start host_manager in a container |
| 94 | +
|
| 95 | + :param container_setup: container_setup |
| 96 | + :return: None |
| 97 | + """ |
| 98 | + failed_containers = [] |
| 99 | + containers_info = [] |
| 100 | + container_setup.reload() |
| 101 | + assert container_setup.status == "running" |
| 102 | + # Mock emulation_env_config |
| 103 | + emulation_env_config = MagicMock(spec=EmulationEnvConfig) |
| 104 | + emulation_env_config.get_connection.return_value = MagicMock() |
| 105 | + emulation_env_config.host_manager_config = MagicMock() |
| 106 | + emulation_env_config.host_manager_config.host_manager_port = 8080 |
| 107 | + emulation_env_config.host_manager_config.host_manager_log_dir = "/var/log/host_manager" |
| 108 | + emulation_env_config.host_manager_config.host_manager_log_file = "host_manager.log" |
| 109 | + emulation_env_config.host_manager_config.host_manager_max_workers = 4 |
| 110 | + |
| 111 | + ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO] |
| 112 | + port = emulation_env_config.host_manager_config.host_manager_port |
| 113 | + try: |
| 114 | + # Start host_manager command |
| 115 | + cmd = ( |
| 116 | + f"/root/miniconda3/bin/python3 /host_manager.py " |
| 117 | + f"--port {emulation_env_config.host_manager_config.host_manager_port} " |
| 118 | + f"--logdir {emulation_env_config.host_manager_config.host_manager_log_dir} " |
| 119 | + f"--logfile {emulation_env_config.host_manager_config.host_manager_log_file} " |
| 120 | + f"--maxworkers {emulation_env_config.host_manager_config.host_manager_max_workers}" |
| 121 | + ) |
| 122 | + |
| 123 | + # Run cmd in the container |
| 124 | + logging.info(f"Starting host manager in container: {container_setup.id} " |
| 125 | + f"with image: {container_setup.image.tags}") |
| 126 | + container_setup.exec_run(cmd, detach=True) |
| 127 | + |
| 128 | + # Check if host_manager starts |
| 129 | + cmd = ( |
| 130 | + f"sh -c '{constants.COMMANDS.PS_AUX} | {constants.COMMANDS.GREP} " |
| 131 | + f"{constants.COMMANDS.SPACE_DELIM}{constants.TRAFFIC_COMMANDS.HOST_MANAGER_FILE_NAME}'" |
| 132 | + ) |
| 133 | + logging.info(f"Verifying that host manager is running in container: {container_setup.id} " |
| 134 | + f"with image: {container_setup.image.tags}") |
| 135 | + result = container_setup.exec_run(cmd) |
| 136 | + output = result.output.decode("utf-8") |
| 137 | + assert constants.COMMANDS.SEARCH_HOST_MANAGER in output, "Host manager is not running in the container" |
| 138 | + time.sleep(5) |
| 139 | + # Call grpc |
| 140 | + with grpc.insecure_channel(f"{ip}:{port}", options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: |
| 141 | + stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel) |
| 142 | + status = csle_collector.host_manager.query_host_manager.get_host_status(stub=stub) |
| 143 | + assert status |
| 144 | + except Exception as e: |
| 145 | + logging.info(f"Error occurred in container {container_setup.name}: {e}") |
| 146 | + failed_containers.append(container_setup.name) |
| 147 | + containers_info.append( |
| 148 | + { |
| 149 | + "container_status": container_setup.status, |
| 150 | + "container_image": container_setup.image.tags, |
| 151 | + "name": container_setup.name, |
| 152 | + "error": str(e) |
| 153 | + } |
| 154 | + ) |
| 155 | + if failed_containers: |
| 156 | + logging.info("Containers that failed to start the host manager:") |
| 157 | + logging.info(containers_info) |
| 158 | + assert not failed_containers, f"T{failed_containers} failed" |
| 159 | + |
| 160 | + |
| 161 | +def test_start_filebeat(container_setup) -> None: |
| 162 | + """ |
| 163 | + Start filebeat in a container |
| 164 | +
|
| 165 | + :param container_setup: container_setup |
| 166 | + :return: None |
| 167 | + """ |
| 168 | + emulation_env_config = MagicMock(spec=EmulationEnvConfig) |
| 169 | + emulation_env_config.get_connection.return_value = MagicMock() |
| 170 | + emulation_env_config.host_manager_config = MagicMock() |
| 171 | + emulation_env_config.host_manager_config.host_manager_port = 8080 |
| 172 | + ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO] |
| 173 | + port = emulation_env_config.host_manager_config.host_manager_port |
| 174 | + logger = logging.getLogger("test_logger") |
| 175 | + try: |
| 176 | + with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: |
| 177 | + stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel) |
| 178 | + csle_collector.host_manager.query_host_manager.start_filebeat(stub=stub) |
| 179 | + host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port) |
| 180 | + assert host_monitor_dto.filebeat_running, f"Failed to start Filebeat on {ip}." |
| 181 | + logger.info(f"Filebeat has been successfully started on {ip}.") |
| 182 | + except grpc.RpcError as e: |
| 183 | + logger.error(f"gRPC Error: {e}") |
| 184 | + assert False, f"gRPC call failed with error: {e}" |
| 185 | + |
| 186 | + |
| 187 | +def test_stop_filebeat(container_setup) -> None: |
| 188 | + """ |
| 189 | + Stop filebeat in a container |
| 190 | +
|
| 191 | + :param container_setup: container_setup |
| 192 | + :return: None |
| 193 | + """ |
| 194 | + emulation_env_config = MagicMock(spec=EmulationEnvConfig) |
| 195 | + emulation_env_config.get_connection.return_value = MagicMock() |
| 196 | + emulation_env_config.host_manager_config = MagicMock() |
| 197 | + emulation_env_config.host_manager_config.host_manager_port = 8080 |
| 198 | + ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO] |
| 199 | + port = emulation_env_config.host_manager_config.host_manager_port |
| 200 | + logger = logging.getLogger("test_logger") |
| 201 | + try: |
| 202 | + with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: |
| 203 | + stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel) |
| 204 | + csle_collector.host_manager.query_host_manager.stop_filebeat(stub=stub) |
| 205 | + host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port) |
| 206 | + assert not host_monitor_dto.filebeat_running, f"Failed to stop Filebeat on {ip}." |
| 207 | + logger.info(f"Filebeat has been successfully stopped on {ip}.") |
| 208 | + except grpc.RpcError as e: |
| 209 | + logger.error(f"gRPC Error: {e}") |
| 210 | + assert False, f"gRPC call failed with error: {e}" |
| 211 | + |
| 212 | + |
| 213 | +def test_start_packetbeat(container_setup) -> None: |
| 214 | + """ |
| 215 | + Start packetbeat in a container |
| 216 | +
|
| 217 | + :param container_setup: container_setup |
| 218 | + :return: None |
| 219 | + """ |
| 220 | + emulation_env_config = MagicMock(spec=EmulationEnvConfig) |
| 221 | + emulation_env_config.get_connection.return_value = MagicMock() |
| 222 | + emulation_env_config.host_manager_config = MagicMock() |
| 223 | + emulation_env_config.host_manager_config.host_manager_port = 8080 |
| 224 | + ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO] |
| 225 | + port = emulation_env_config.host_manager_config.host_manager_port |
| 226 | + logger = logging.getLogger("test_logger") |
| 227 | + try: |
| 228 | + with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: |
| 229 | + stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel) |
| 230 | + csle_collector.host_manager.query_host_manager.start_packetbeat(stub=stub) |
| 231 | + host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port) |
| 232 | + assert host_monitor_dto.packetbeat_running, f"Failed to start packetbeat on {ip}." |
| 233 | + logger.info(f"packetbeat has been successfully started on {ip}.") |
| 234 | + except grpc.RpcError as e: |
| 235 | + logger.error(f"gRPC Error: {e}") |
| 236 | + assert False, f"gRPC call failed with error: {e}" |
| 237 | + |
| 238 | + |
| 239 | +def test_stop_packetbeat(container_setup) -> None: |
| 240 | + """ |
| 241 | + Stop packetbeat in a container |
| 242 | +
|
| 243 | + :param container_setup: container_setup |
| 244 | + :return: None |
| 245 | + """ |
| 246 | + emulation_env_config = MagicMock(spec=EmulationEnvConfig) |
| 247 | + emulation_env_config.get_connection.return_value = MagicMock() |
| 248 | + emulation_env_config.host_manager_config = MagicMock() |
| 249 | + emulation_env_config.host_manager_config.host_manager_port = 8080 |
| 250 | + ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO] |
| 251 | + port = emulation_env_config.host_manager_config.host_manager_port |
| 252 | + logger = logging.getLogger("test_logger") |
| 253 | + try: |
| 254 | + with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: |
| 255 | + stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel) |
| 256 | + csle_collector.host_manager.query_host_manager.stop_packetbeat(stub=stub) |
| 257 | + host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port) |
| 258 | + assert not host_monitor_dto.packetbeat_running, f"Failed to stop packetbeat on {ip}." |
| 259 | + logger.info(f"packetbeat has been successfully stopped on {ip}.") |
| 260 | + except grpc.RpcError as e: |
| 261 | + logger.error(f"gRPC Error: {e}") |
| 262 | + assert False, f"gRPC call failed with error: {e}" |
| 263 | + |
| 264 | + |
| 265 | +def test_start_metricbeat(container_setup) -> None: |
| 266 | + """ |
| 267 | + Start metricbeat in a container |
| 268 | +
|
| 269 | + :param container_setup: container_setup |
| 270 | + :return: None |
| 271 | + """ |
| 272 | + emulation_env_config = MagicMock(spec=EmulationEnvConfig) |
| 273 | + emulation_env_config.get_connection.return_value = MagicMock() |
| 274 | + emulation_env_config.host_manager_config = MagicMock() |
| 275 | + emulation_env_config.host_manager_config.host_manager_port = 8080 |
| 276 | + ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO] |
| 277 | + port = emulation_env_config.host_manager_config.host_manager_port |
| 278 | + logger = logging.getLogger("test_logger") |
| 279 | + try: |
| 280 | + with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: |
| 281 | + stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel) |
| 282 | + csle_collector.host_manager.query_host_manager.start_metricbeat(stub=stub) |
| 283 | + host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port) |
| 284 | + assert host_monitor_dto.metricbeat_running, f"Failed to start metricbeat on {ip}." |
| 285 | + logger.info(f"metricbeat has been successfully started on {ip}.") |
| 286 | + except grpc.RpcError as e: |
| 287 | + logger.error(f"gRPC Error: {e}") |
| 288 | + assert False, f"gRPC call failed with error: {e}" |
| 289 | + |
| 290 | + |
| 291 | +def test_stop_metricbeat(container_setup) -> None: |
| 292 | + """ |
| 293 | + Stop metricbeat in a container |
| 294 | +
|
| 295 | + :param container_setup: container_setup |
| 296 | + :return: None |
| 297 | + """ |
| 298 | + emulation_env_config = MagicMock(spec=EmulationEnvConfig) |
| 299 | + emulation_env_config.get_connection.return_value = MagicMock() |
| 300 | + emulation_env_config.host_manager_config = MagicMock() |
| 301 | + emulation_env_config.host_manager_config.host_manager_port = 8080 |
| 302 | + ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO] |
| 303 | + port = emulation_env_config.host_manager_config.host_manager_port |
| 304 | + logger = logging.getLogger("test_logger") |
| 305 | + try: |
| 306 | + with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: |
| 307 | + stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel) |
| 308 | + csle_collector.host_manager.query_host_manager.stop_metricbeat(stub=stub) |
| 309 | + host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port) |
| 310 | + assert not host_monitor_dto.metricbeat_running, f"Failed to stop metricbeat on {ip}." |
| 311 | + logger.info(f"metricbeat has been successfully stopped on {ip}.") |
| 312 | + except grpc.RpcError as e: |
| 313 | + logger.error(f"gRPC Error: {e}") |
| 314 | + assert False, f"gRPC call failed with error: {e}" |
| 315 | + |
| 316 | + |
| 317 | +def test_start_heartbeat(container_setup) -> None: |
| 318 | + """ |
| 319 | + Start heartbeat in a container |
| 320 | +
|
| 321 | + :param container_setup: container_setup |
| 322 | + :return: None |
| 323 | + """ |
| 324 | + emulation_env_config = MagicMock(spec=EmulationEnvConfig) |
| 325 | + emulation_env_config.get_connection.return_value = MagicMock() |
| 326 | + emulation_env_config.host_manager_config = MagicMock() |
| 327 | + emulation_env_config.host_manager_config.host_manager_port = 8080 |
| 328 | + ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO] |
| 329 | + port = emulation_env_config.host_manager_config.host_manager_port |
| 330 | + logger = logging.getLogger("test_logger") |
| 331 | + try: |
| 332 | + with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: |
| 333 | + stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel) |
| 334 | + csle_collector.host_manager.query_host_manager.start_heartbeat(stub=stub) |
| 335 | + host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port) |
| 336 | + assert host_monitor_dto.heartbeat_running, f"Failed to start heartbeat on {ip}." |
| 337 | + logger.info(f"heartbeat has been successfully started on {ip}.") |
| 338 | + except grpc.RpcError as e: |
| 339 | + logger.error(f"gRPC Error: {e}") |
| 340 | + assert False, f"gRPC call failed with error: {e}" |
| 341 | + |
| 342 | + |
| 343 | +def test_stop_heartbeat(container_setup) -> None: |
| 344 | + """ |
| 345 | + Stop heartbeat in a container |
| 346 | +
|
| 347 | + :param container_setup: container_setup |
| 348 | + :return: None |
| 349 | + """ |
| 350 | + emulation_env_config = MagicMock(spec=EmulationEnvConfig) |
| 351 | + emulation_env_config.get_connection.return_value = MagicMock() |
| 352 | + emulation_env_config.host_manager_config = MagicMock() |
| 353 | + emulation_env_config.host_manager_config.host_manager_port = 8080 |
| 354 | + ip = container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO] |
| 355 | + port = emulation_env_config.host_manager_config.host_manager_port |
| 356 | + logger = logging.getLogger("test_logger") |
| 357 | + try: |
| 358 | + with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: |
| 359 | + stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel) |
| 360 | + csle_collector.host_manager.query_host_manager.stop_heartbeat(stub=stub) |
| 361 | + host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(ip=ip, port=port) |
| 362 | + assert not host_monitor_dto.heartbeat_running, f"Failed to stop heartbeat on {ip}." |
| 363 | + logger.info(f"heartbeat has been successfully stopped on {ip}.") |
| 364 | + except grpc.RpcError as e: |
| 365 | + logger.error(f"gRPC Error: {e}") |
| 366 | + assert False, f"gRPC call failed with error: {e}" |
0 commit comments