diff --git a/pyproject.toml b/pyproject.toml index 5cb62d0..1b1ef56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,7 @@ test = [ "coverage[toml]>=7.2.0,<8.0.0", "pytest>=7.4.0,<9", + "freezegun>=1.2.2" ] dev = [ diff --git a/taskiq_faststream/kicker.py b/taskiq_faststream/kicker.py index f2a96eb..2226e5d 100644 --- a/taskiq_faststream/kicker.py +++ b/taskiq_faststream/kicker.py @@ -1,13 +1,5 @@ -from typing import Any - from taskiq.kicker import AsyncKicker, _FuncParams, _ReturnType -from taskiq.message import TaskiqMessage class LabelRespectKicker(AsyncKicker[_FuncParams, _ReturnType]): """Patched kicker doesn't cast labels to str.""" - - def _prepare_message(self, *args: Any, **kwargs: Any) -> TaskiqMessage: - msg = super()._prepare_message(*args, **kwargs) - msg.labels = self.labels - return msg diff --git a/tests/testcase.py b/tests/testcase.py index c22b7ff..d1e62a0 100644 --- a/tests/testcase.py +++ b/tests/testcase.py @@ -1,16 +1,17 @@ import asyncio -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Any from unittest.mock import MagicMock import pytest from faststream.utils.functions import timeout_scope +from freezegun import freeze_time from taskiq import AsyncBroker, TaskiqScheduler from taskiq.cli.scheduler.args import SchedulerArgs from taskiq.cli.scheduler.run import run_scheduler from taskiq.schedule_sources import LabelScheduleSource -from taskiq_faststream import BrokerWrapper +from taskiq_faststream import BrokerWrapper, StreamScheduler @pytest.mark.anyio @@ -67,3 +68,52 @@ async def handler(msg: str) -> None: mock.assert_called_once_with("Hi!") task.cancel() + + async def test_task_multiple_schedules_by_cron( + self, + subject: str, + broker: Any, + event: asyncio.Event, + ) -> None: + """Test cron runs twice via StreamScheduler.""" + received_message = [] + + @broker.subscriber(subject) + async def handler(msg: str) -> None: + received_message.append(msg) + event.set() + + taskiq_broker = self.build_taskiq_broker(broker) + + taskiq_broker.task( + "Hi!", + **{self.subj_name: subject}, + schedule=[ + { + "cron": "* * * * *", + }, + ], + ) + + async with self.test_class(broker): + with freeze_time("00:00:00", tick=True) as frozen_datetime: + task = asyncio.create_task( + run_scheduler( + SchedulerArgs( + scheduler=StreamScheduler( + broker=taskiq_broker, + sources=[LabelScheduleSource(taskiq_broker)], + ), + modules=[], + ), + ), + ) + + await asyncio.wait_for(event.wait(), 2.0) + event.clear() + frozen_datetime.tick(timedelta(minutes=2)) + await asyncio.wait_for(event.wait(), 2.0) + + task.cancel() + + assert received_message == ["Hi!", "Hi!"], received_message