Skip to content

Commit cbc0352

Browse files
committed
No Consumer Reader
1 parent d980334 commit cbc0352

File tree

6 files changed

+275
-13
lines changed

6 files changed

+275
-13
lines changed

tests/topics/test_topic_reader.py

+200
Original file line numberDiff line numberDiff line change
@@ -251,3 +251,203 @@ async def wait(fut):
251251

252252
await reader0.close()
253253
await reader1.close()
254+
255+
256+
@pytest.mark.asyncio
257+
class TestTopicNoConsumerReaderAsyncIO:
258+
async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages):
259+
with pytest.raises(ydb.Error):
260+
driver.topic_client.reader(
261+
topic_with_messages,
262+
consumer=None,
263+
)
264+
265+
async def test_reader_with_default_lambda(self, driver, topic_with_messages):
266+
reader = driver.topic_client.reader(
267+
topic_with_messages,
268+
consumer=None,
269+
partition_ids=[0],
270+
)
271+
msg = await reader.receive_message()
272+
273+
assert msg.seqno == 1
274+
275+
await reader.close()
276+
277+
async def test_reader_with_sync_lambda(self, driver, topic_with_messages):
278+
def sync_lambda(partition_id: int):
279+
assert partition_id == 0
280+
return 1
281+
282+
reader = driver.topic_client.reader(
283+
topic_with_messages,
284+
consumer=None,
285+
partition_ids=[0],
286+
get_start_offset_lambda=sync_lambda,
287+
)
288+
msg = await reader.receive_message()
289+
290+
assert msg.seqno == 2
291+
292+
await reader.close()
293+
294+
async def test_reader_with_async_lambda(self, driver, topic_with_messages):
295+
async def async_lambda(partition_id: int) -> int:
296+
assert partition_id == 0
297+
return 1
298+
299+
reader = driver.topic_client.reader(
300+
topic_with_messages,
301+
consumer=None,
302+
partition_ids=[0],
303+
get_start_offset_lambda=async_lambda,
304+
)
305+
msg = await reader.receive_message()
306+
307+
assert msg.seqno == 2
308+
309+
await reader.close()
310+
311+
async def test_commit_not_allowed(self, driver, topic_with_messages):
312+
reader = driver.topic_client.reader(
313+
topic_with_messages,
314+
consumer=None,
315+
partition_ids=[0],
316+
)
317+
batch = await reader.receive_batch()
318+
319+
with pytest.raises(ydb.Error):
320+
reader.commit(batch)
321+
322+
with pytest.raises(ydb.Error):
323+
await reader.commit_with_ack(batch)
324+
325+
await reader.close()
326+
327+
async def test_offsets_updated_after_reconnect(self, driver, topic_with_messages):
328+
current_offset = 0
329+
330+
def get_start_offset_lambda(partition_id: int) -> int:
331+
nonlocal current_offset
332+
return current_offset
333+
334+
reader = driver.topic_client.reader(
335+
topic_with_messages,
336+
consumer=None,
337+
partition_ids=[0],
338+
get_start_offset_lambda=get_start_offset_lambda,
339+
)
340+
msg = await reader.receive_message()
341+
342+
assert msg.seqno == current_offset + 1
343+
344+
current_offset += 2
345+
reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))
346+
347+
await asyncio.sleep(0)
348+
349+
msg = await reader.receive_message()
350+
351+
assert msg.seqno == current_offset + 1
352+
353+
await reader.close()
354+
355+
356+
class TestTopicReaderWithoutConsumer:
357+
def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messages):
358+
with pytest.raises(ydb.Error):
359+
driver_sync.topic_client.reader(
360+
topic_with_messages,
361+
consumer=None,
362+
)
363+
364+
def test_reader_with_default_lambda(self, driver_sync, topic_with_messages):
365+
reader = driver_sync.topic_client.reader(
366+
topic_with_messages,
367+
consumer=None,
368+
partition_ids=[0],
369+
)
370+
msg = reader.receive_message()
371+
372+
assert msg.seqno == 1
373+
374+
reader.close()
375+
376+
def test_reader_with_sync_lambda(self, driver_sync, topic_with_messages):
377+
def sync_lambda(partition_id: int):
378+
assert partition_id == 0
379+
return 1
380+
381+
reader = driver_sync.topic_client.reader(
382+
topic_with_messages,
383+
consumer=None,
384+
partition_ids=[0],
385+
get_start_offset_lambda=sync_lambda,
386+
)
387+
msg = reader.receive_message()
388+
389+
assert msg.seqno == 2
390+
391+
reader.close()
392+
393+
def test_reader_with_async_lambda(self, driver_sync, topic_with_messages):
394+
async def async_lambda(partition_id: int) -> int:
395+
assert partition_id == 0
396+
return 1
397+
398+
reader = driver_sync.topic_client.reader(
399+
topic_with_messages,
400+
consumer=None,
401+
partition_ids=[0],
402+
get_start_offset_lambda=async_lambda,
403+
)
404+
msg = reader.receive_message()
405+
406+
assert msg.seqno == 2
407+
408+
reader.close()
409+
410+
def test_commit_not_allowed(self, driver_sync, topic_with_messages):
411+
reader = driver_sync.topic_client.reader(
412+
topic_with_messages,
413+
consumer=None,
414+
partition_ids=[0],
415+
)
416+
batch = reader.receive_batch()
417+
418+
with pytest.raises(ydb.Error):
419+
reader.commit(batch)
420+
421+
with pytest.raises(ydb.Error):
422+
reader.commit_with_ack(batch)
423+
424+
with pytest.raises(ydb.Error):
425+
reader.async_commit_with_ack(batch)
426+
427+
reader.close()
428+
429+
def test_offsets_updated_after_reconnect(self, driver_sync, topic_with_messages):
430+
current_offset = 0
431+
432+
def get_start_offset_lambda(partition_id: int) -> int:
433+
nonlocal current_offset
434+
return current_offset
435+
436+
reader = driver_sync.topic_client.reader(
437+
topic_with_messages,
438+
consumer=None,
439+
partition_ids=[0],
440+
get_start_offset_lambda=get_start_offset_lambda,
441+
)
442+
msg = reader.receive_message()
443+
444+
assert msg.seqno == current_offset + 1
445+
446+
current_offset += 2
447+
reader._async_reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))
448+
449+
msg = reader.receive_message()
450+
451+
assert msg.seqno == current_offset + 1
452+
453+
reader.close()

ydb/_grpc/grpcwrapper/ydb_topic.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,13 @@ def from_proto(
439439
@dataclass
440440
class InitRequest(IToProto):
441441
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
442-
consumer: str
442+
consumer: Optional[str]
443443
auto_partitioning_support: bool
444444

445445
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
446446
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
447-
res.consumer = self.consumer
447+
if self.consumer is not None:
448+
res.consumer = self.consumer
448449
for settings in self.topics_read_settings:
449450
res.topics_read_settings.append(settings.to_proto())
450451
res.auto_partitioning_support = self.auto_partitioning_support

ydb/_topic_reader/topic_reader.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import datetime
44
from dataclasses import dataclass
55
from typing import (
6+
Awaitable,
67
Union,
78
Optional,
89
List,
@@ -42,7 +43,7 @@ def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSett
4243

4344
@dataclass
4445
class PublicReaderSettings:
45-
consumer: str
46+
consumer: Optional[str]
4647
topic: TopicSelectorTypes
4748
buffer_size_bytes: int = 50 * 1024 * 1024
4849
auto_partitioning_support: bool = True
@@ -54,12 +55,15 @@ class PublicReaderSettings:
5455
decoder_executor: Optional[concurrent.futures.Executor] = None
5556
update_token_interval: Union[int, float] = 3600
5657

58+
partition_ids: Optional[List[int]] = None
59+
get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None
60+
5761
def __post_init__(self):
5862
# check possible create init message
5963
_ = self._init_message()
6064

6165
def _init_message(self) -> StreamReadMessage.InitRequest:
62-
if not isinstance(self.consumer, str):
66+
if self.consumer is not None and not isinstance(self.consumer, str):
6367
raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer))
6468

6569
if isinstance(self.topic, list):
@@ -69,7 +73,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
6973

7074
for index, selector in enumerate(selectors):
7175
if isinstance(selector, str):
72-
selectors[index] = PublicTopicSelector(path=selector)
76+
selectors[index] = PublicTopicSelector(path=selector, partitions=self.partition_ids)
7377
elif isinstance(selector, PublicTopicSelector):
7478
pass
7579
else:

ydb/_topic_reader/topic_reader_asyncio.py

+22-3
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def __init__(self):
7272
class PublicAsyncIOReader:
7373
_loop: asyncio.AbstractEventLoop
7474
_closed: bool
75+
_settings: topic_reader.PublicReaderSettings
7576
_reconnector: ReaderReconnector
7677
_parent: typing.Any # need for prevent close parent client by GC
7778

@@ -84,6 +85,7 @@ def __init__(
8485
):
8586
self._loop = asyncio.get_running_loop()
8687
self._closed = False
88+
self._settings = settings
8789
self._reconnector = ReaderReconnector(driver, settings, self._loop)
8890
self._parent = _parent
8991

@@ -156,6 +158,9 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
156158
For the method no way check the commit result
157159
(for example if lost connection - commits will not re-send and committed messages will receive again).
158160
"""
161+
if self._settings.consumer is None:
162+
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
163+
159164
try:
160165
self._reconnector.commit(batch)
161166
except PublicTopicReaderPartitionExpiredError:
@@ -171,6 +176,9 @@ async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, dat
171176
before receive commit ack. Message may be acked or not (if not - it will send in other read session,
172177
to this or other reader).
173178
"""
179+
if self._settings.consumer is None:
180+
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
181+
174182
waiter = self._reconnector.commit(batch)
175183
await waiter.future
176184

@@ -393,6 +401,7 @@ class ReaderStream:
393401
_update_token_interval: Union[int, float]
394402
_update_token_event: asyncio.Event
395403
_get_token_function: Callable[[], str]
404+
_settings: topic_reader.PublicReaderSettings
396405

397406
def __init__(
398407
self,
@@ -425,6 +434,8 @@ def __init__(
425434
self._get_token_function = get_token_function
426435
self._update_token_event = asyncio.Event()
427436

437+
self._settings = settings
438+
428439
@staticmethod
429440
async def create(
430441
reader_reconnector_id: int,
@@ -615,7 +626,7 @@ async def _read_messages_loop(self):
615626
message.server_message,
616627
StreamReadMessage.StartPartitionSessionRequest,
617628
):
618-
self._on_start_partition_session(message.server_message)
629+
await self._on_start_partition_session(message.server_message)
619630

620631
elif isinstance(
621632
message.server_message,
@@ -660,7 +671,7 @@ async def _update_token(self, token: str):
660671
finally:
661672
self._update_token_event.clear()
662673

663-
def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest):
674+
async def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest):
664675
try:
665676
if message.partition_session.partition_session_id in self._partition_sessions:
666677
raise TopicReaderError(
@@ -676,11 +687,19 @@ def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionS
676687
reader_reconnector_id=self._reader_reconnector_id,
677688
reader_stream_id=self._id,
678689
)
690+
691+
read_offset = None
692+
callee = self._settings.get_start_offset_lambda
693+
if callee is not None:
694+
read_offset = callee(message.partition_session.partition_id)
695+
if asyncio.iscoroutinefunction(callee):
696+
read_offset = await read_offset
697+
679698
self._stream.write(
680699
StreamReadMessage.FromClient(
681700
client_message=StreamReadMessage.StartPartitionSessionResponse(
682701
partition_session_id=message.partition_session.partition_session_id,
683-
read_offset=None,
702+
read_offset=read_offset,
684703
commit_offset=None,
685704
)
686705
),

0 commit comments

Comments
 (0)