diff --git a/modules/kafka/testcontainers/kafka/__init__.py b/modules/kafka/testcontainers/kafka/__init__.py index ccd7f5b77..74af0f8b9 100644 --- a/modules/kafka/testcontainers/kafka/__init__.py +++ b/modules/kafka/testcontainers/kafka/__init__.py @@ -1,3 +1,4 @@ +import re import tarfile import time from dataclasses import dataclass, field @@ -10,7 +11,7 @@ from testcontainers.core.container import DockerContainer from testcontainers.core.utils import raise_for_deprecated_parameter from testcontainers.core.version import ComparableVersion -from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.core.wait_strategies import LogMessageWaitStrategy from testcontainers.kafka._redpanda import RedpandaContainer __all__ = [ @@ -59,7 +60,7 @@ def __init__(self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093, super().__init__(image, **kwargs) self.port = port self.kraft_enabled = False - self.wait_for = r".*\[KafkaServer id=\d+\] started.*" + self.wait_for = re.compile(r".*\[KafkaServer id=\d+\] started.*") self.boot_command = "" self.cluster_id = "MkU3OEVBNTcwNTJENDM2Qk" self.listeners = f"PLAINTEXT://0.0.0.0:{self.port},BROKER://0.0.0.0:9092" @@ -179,7 +180,8 @@ def start(self, timeout=30) -> "KafkaContainer": self.with_command(command) super().start() self.tc_start() - wait_for_logs(self, self.wait_for, timeout=timeout) + wait_strategy = LogMessageWaitStrategy(self.wait_for) + wait_strategy.wait_until_ready(self) return self def create_file(self, content: bytes, path: str) -> None: diff --git a/modules/kafka/testcontainers/kafka/_redpanda.py b/modules/kafka/testcontainers/kafka/_redpanda.py index b49957d03..36fffbf05 100644 --- a/modules/kafka/testcontainers/kafka/_redpanda.py +++ b/modules/kafka/testcontainers/kafka/_redpanda.py @@ -1,11 +1,12 @@ import os.path +import re import tarfile import time from io import BytesIO from textwrap import dedent from testcontainers.core.container import DockerContainer -from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.core.wait_strategies import LogMessageWaitStrategy class RedpandaContainer(DockerContainer): @@ -34,6 +35,7 @@ def __init__( self.redpanda_port = 9092 self.schema_registry_port = 8081 self.with_exposed_ports(self.redpanda_port, self.schema_registry_port) + self.wait_for = re.compile(r".*Started Kafka API server.*") def get_bootstrap_server(self) -> str: host = self.get_container_host_ip() @@ -64,13 +66,14 @@ def tc_start(self) -> None: self.create_file(data, RedpandaContainer.TC_START_SCRIPT) - def start(self, timeout=10) -> "RedpandaContainer": + def start(self) -> "RedpandaContainer": script = RedpandaContainer.TC_START_SCRIPT command = f'-c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"' self.with_command(command) super().start() self.tc_start() - wait_for_logs(self, r".*Started Kafka API server.*", timeout=timeout) + wait_strategy = LogMessageWaitStrategy(self.wait_for) + wait_strategy.wait_until_ready(self) return self def create_file(self, content: bytes, path: str) -> None: diff --git a/modules/kafka/tests/test_redpanda.py b/modules/kafka/tests/test_redpanda.py index 7cee9fa8a..13db32e59 100644 --- a/modules/kafka/tests/test_redpanda.py +++ b/modules/kafka/tests/test_redpanda.py @@ -13,7 +13,7 @@ def test_redpanda_producer_consumer(): produce_and_consume_message(container) -@pytest.mark.parametrize("version", ["v23.1.13", "v23.3.10"]) +@pytest.mark.parametrize("version", ["v23.1.13", "v25.2.9"]) def test_redpanda_confluent_version(version): with RedpandaContainer(image=f"docker.redpanda.com/redpandadata/redpanda:{version}") as container: produce_and_consume_message(container)