Skip to content

Commit e2bbade

Browse files
authored
Merge pull request #89 from taskiq-python/develop
release
2 parents 60ad259 + 11bcebd commit e2bbade

File tree

4 files changed

+58
-13
lines changed

4 files changed

+58
-13
lines changed

pyproject.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "taskiq-faststream"
3-
version = "0.2.1"
3+
version = "0.2.2"
44
description = "FastStream - taskiq integration to schedule FastStream tasks"
55
readme = "README.md"
66
authors = [
@@ -73,12 +73,13 @@ test = [
7373

7474
"coverage[toml]>=7.2.0,<8.0.0",
7575
"pytest>=7.4.0,<9",
76+
"freezegun>=1.2.2"
7677
]
7778

7879
dev = [
7980
"taskiq-faststream[test]",
80-
"mypy==1.11.2",
81-
"ruff==0.11.8",
81+
"mypy==1.15.0",
82+
"ruff==0.11.10",
8283
"pre-commit >=3.6.0,<5.0.0",
8384
]
8485

taskiq_faststream/broker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ def task( # type: ignore[override]
7373
SendableMessage,
7474
typing.Callable[[], SendableMessage],
7575
typing.Callable[[], typing.Awaitable[SendableMessage]],
76+
typing.Callable[[], typing.Generator[SendableMessage, None, None]],
77+
typing.Callable[[], typing.AsyncGenerator[SendableMessage, None]],
7678
] = None,
7779
*,
7880
schedule: list[ScheduledTask],

taskiq_faststream/kicker.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
1-
from typing import Any
2-
31
from taskiq.kicker import AsyncKicker, _FuncParams, _ReturnType
4-
from taskiq.message import TaskiqMessage
52

63

74
class LabelRespectKicker(AsyncKicker[_FuncParams, _ReturnType]):
85
"""Patched kicker doesn't cast labels to str."""
9-
10-
def _prepare_message(self, *args: Any, **kwargs: Any) -> TaskiqMessage:
11-
msg = super()._prepare_message(*args, **kwargs)
12-
msg.labels = self.labels
13-
return msg

tests/testcase.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import asyncio
2-
from datetime import datetime, timezone
2+
from datetime import datetime, timedelta, timezone
33
from typing import Any
44
from unittest.mock import MagicMock
55

66
import pytest
77
from faststream.utils.functions import timeout_scope
8+
from freezegun import freeze_time
89
from taskiq import AsyncBroker, TaskiqScheduler
910
from taskiq.cli.scheduler.args import SchedulerArgs
1011
from taskiq.cli.scheduler.run import run_scheduler
1112
from taskiq.schedule_sources import LabelScheduleSource
1213

13-
from taskiq_faststream import BrokerWrapper
14+
from taskiq_faststream import BrokerWrapper, StreamScheduler
1415

1516

1617
@pytest.mark.anyio
@@ -67,3 +68,52 @@ async def handler(msg: str) -> None:
6768

6869
mock.assert_called_once_with("Hi!")
6970
task.cancel()
71+
72+
async def test_task_multiple_schedules_by_cron(
73+
self,
74+
subject: str,
75+
broker: Any,
76+
event: asyncio.Event,
77+
) -> None:
78+
"""Test cron runs twice via StreamScheduler."""
79+
received_message = []
80+
81+
@broker.subscriber(subject)
82+
async def handler(msg: str) -> None:
83+
received_message.append(msg)
84+
event.set()
85+
86+
taskiq_broker = self.build_taskiq_broker(broker)
87+
88+
taskiq_broker.task(
89+
"Hi!",
90+
**{self.subj_name: subject},
91+
schedule=[
92+
{
93+
"cron": "* * * * *",
94+
},
95+
],
96+
)
97+
98+
async with self.test_class(broker):
99+
with freeze_time("00:00:00", tick=True) as frozen_datetime:
100+
task = asyncio.create_task(
101+
run_scheduler(
102+
SchedulerArgs(
103+
scheduler=StreamScheduler(
104+
broker=taskiq_broker,
105+
sources=[LabelScheduleSource(taskiq_broker)],
106+
),
107+
modules=[],
108+
),
109+
),
110+
)
111+
112+
await asyncio.wait_for(event.wait(), 2.0)
113+
event.clear()
114+
frozen_datetime.tick(timedelta(minutes=2))
115+
await asyncio.wait_for(event.wait(), 2.0)
116+
117+
task.cancel()
118+
119+
assert received_message == ["Hi!", "Hi!"], received_message

0 commit comments

Comments
 (0)