Skip to content

Commit 513b77e

Browse files
authored
Added count argument for better distribution. (#85)
1 parent 41da01e commit 513b77e

File tree

1 file changed

+4
-0
lines changed

1 file changed

+4
-0
lines changed

taskiq_redis/redis_broker.py

+4
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ def __init__(
168168
maxlen: Optional[int] = None,
169169
idle_timeout: int = 600000, # 10 minutes
170170
unacknowledged_batch_size: int = 100,
171+
xread_count: Optional[int] = 100,
171172
additional_streams: Optional[Dict[str, str]] = None,
172173
**connection_kwargs: Any,
173174
) -> None:
@@ -189,6 +190,7 @@ def __init__(
189190
Better to set it to a bigger value, to avoid unnecessary calls.
190191
:param maxlen: sets the maximum length of the stream
191192
trims (the old values of) the stream each time a new element is added
193+
:param xread_count: number of messages to fetch from the stream at once.
192194
:param additional_streams: additional streams to read from.
193195
Each key is a stream name, value is a consumer id.
194196
:param redeliver_timeout: time in ms to wait before redelivering a message.
@@ -211,6 +213,7 @@ def __init__(
211213
self.additional_streams = additional_streams or {}
212214
self.idle_timeout = idle_timeout
213215
self.unacknowledged_batch_size = unacknowledged_batch_size
216+
self.count = xread_count
214217

215218
async def _declare_consumer_group(self) -> None:
216219
"""
@@ -276,6 +279,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
276279
},
277280
block=self.block,
278281
noack=False,
282+
count=self.count,
279283
)
280284
for _, msg_list in fetched:
281285
for msg_id, msg in msg_list:

0 commit comments

Comments
 (0)