Skip to content

Commit 325a197

Browse files
committed
No Consumer Reader
1 parent d980334 commit 325a197

File tree

6 files changed

+227
-10
lines changed

6 files changed

+227
-10
lines changed

tests/topics/test_topic_reader.py

+124
Original file line numberDiff line numberDiff line change
@@ -251,3 +251,127 @@ async def wait(fut):
251251

252252
await reader0.close()
253253
await reader1.close()
254+
255+
256+
@pytest.mark.asyncio
257+
class TestTopicNoConsumerReaderAsyncIO:
258+
async def test_reader_with_sync_lambda(self, driver, topic_with_messages):
259+
def sync_lambda(partition_id: int):
260+
assert partition_id == 0
261+
return 1
262+
263+
reader = driver.topic_client.no_consumer_reader(topic_with_messages, [0], sync_lambda)
264+
msg = await reader.receive_message()
265+
266+
assert msg.seqno == 2
267+
268+
await reader.close()
269+
270+
async def test_reader_with_async_lambda(self, driver, topic_with_messages):
271+
async def async_lambda(partition_id: int) -> int:
272+
assert partition_id == 0
273+
return 1
274+
275+
reader = driver.topic_client.no_consumer_reader(topic_with_messages, [0], async_lambda)
276+
msg = await reader.receive_message()
277+
278+
assert msg.seqno == 2
279+
280+
await reader.close()
281+
282+
async def test_commit_not_allowed(self, driver, topic_with_messages):
283+
reader = driver.topic_client.no_consumer_reader(topic_with_messages, [0], lambda x: None)
284+
batch = await reader.receive_batch()
285+
286+
with pytest.raises(ydb.Error):
287+
reader.commit(batch)
288+
289+
with pytest.raises(ydb.Error):
290+
await reader.commit_with_ack(batch)
291+
292+
await reader.close()
293+
294+
async def test_offsets_updated_after_reconnect(self, driver, topic_with_messages):
295+
current_offset = 0
296+
297+
def get_start_offset_lambda(partition_id: int) -> int:
298+
nonlocal current_offset
299+
return current_offset
300+
301+
reader = driver.topic_client.no_consumer_reader(topic_with_messages, [0], get_start_offset_lambda)
302+
msg = await reader.receive_message()
303+
304+
assert msg.seqno == current_offset + 1
305+
306+
current_offset += 2
307+
reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))
308+
309+
await asyncio.sleep(0)
310+
311+
msg = await reader.receive_message()
312+
313+
assert msg.seqno == current_offset + 1
314+
315+
await reader.close()
316+
317+
318+
class TestTopicNoConsumerReader:
319+
def test_reader_with_sync_lambda(self, driver_sync, topic_with_messages):
320+
def sync_lambda(partition_id: int):
321+
assert partition_id == 0
322+
return 1
323+
324+
reader = driver_sync.topic_client.no_consumer_reader(topic_with_messages, [0], sync_lambda)
325+
msg = reader.receive_message()
326+
327+
assert msg.seqno == 2
328+
329+
reader.close()
330+
331+
def test_reader_with_async_lambda(self, driver_sync, topic_with_messages):
332+
async def async_lambda(partition_id: int) -> int:
333+
assert partition_id == 0
334+
return 1
335+
336+
reader = driver_sync.topic_client.no_consumer_reader(topic_with_messages, [0], async_lambda)
337+
msg = reader.receive_message()
338+
339+
assert msg.seqno == 2
340+
341+
reader.close()
342+
343+
def test_commit_not_allowed(self, driver_sync, topic_with_messages):
344+
reader = driver_sync.topic_client.no_consumer_reader(topic_with_messages, [0], lambda x: None)
345+
batch = reader.receive_batch()
346+
347+
with pytest.raises(ydb.Error):
348+
reader.commit(batch)
349+
350+
with pytest.raises(ydb.Error):
351+
reader.commit_with_ack(batch)
352+
353+
with pytest.raises(ydb.Error):
354+
reader.async_commit_with_ack(batch)
355+
356+
reader.close()
357+
358+
def test_offsets_updated_after_reconnect(self, driver_sync, topic_with_messages):
359+
current_offset = 0
360+
361+
def get_start_offset_lambda(partition_id: int) -> int:
362+
nonlocal current_offset
363+
return current_offset
364+
365+
reader = driver_sync.topic_client.no_consumer_reader(topic_with_messages, [0], get_start_offset_lambda)
366+
msg = reader.receive_message()
367+
368+
assert msg.seqno == current_offset + 1
369+
370+
current_offset += 2
371+
reader._async_reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))
372+
373+
msg = reader.receive_message()
374+
375+
assert msg.seqno == current_offset + 1
376+
377+
reader.close()

ydb/_grpc/grpcwrapper/ydb_topic.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,13 @@ def from_proto(
439439
@dataclass
440440
class InitRequest(IToProto):
441441
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
442-
consumer: str
442+
consumer: Optional[str]
443443
auto_partitioning_support: bool
444444

445445
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
446446
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
447-
res.consumer = self.consumer
447+
if self.consumer is not None:
448+
res.consumer = self.consumer
448449
for settings in self.topics_read_settings:
449450
res.topics_read_settings.append(settings.to_proto())
450451
res.auto_partitioning_support = self.auto_partitioning_support

ydb/_topic_reader/topic_reader.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import datetime
44
from dataclasses import dataclass
55
from typing import (
6+
Awaitable,
67
Union,
78
Optional,
89
List,
@@ -42,7 +43,7 @@ def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSett
4243

4344
@dataclass
4445
class PublicReaderSettings:
45-
consumer: str
46+
consumer: Optional[str]
4647
topic: TopicSelectorTypes
4748
buffer_size_bytes: int = 50 * 1024 * 1024
4849
auto_partitioning_support: bool = True
@@ -54,12 +55,15 @@ class PublicReaderSettings:
5455
decoder_executor: Optional[concurrent.futures.Executor] = None
5556
update_token_interval: Union[int, float] = 3600
5657

58+
partition_ids: Optional[List[int]] = None
59+
get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None
60+
5761
def __post_init__(self):
5862
# check possible create init message
5963
_ = self._init_message()
6064

6165
def _init_message(self) -> StreamReadMessage.InitRequest:
62-
if not isinstance(self.consumer, str):
66+
if self.consumer is not None and not isinstance(self.consumer, str):
6367
raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer))
6468

6569
if isinstance(self.topic, list):
@@ -69,7 +73,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
6973

7074
for index, selector in enumerate(selectors):
7175
if isinstance(selector, str):
72-
selectors[index] = PublicTopicSelector(path=selector)
76+
selectors[index] = PublicTopicSelector(path=selector, partitions=self.partition_ids)
7377
elif isinstance(selector, PublicTopicSelector):
7478
pass
7579
else:

ydb/_topic_reader/topic_reader_asyncio.py

+22-3
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,14 @@ async def close(self, flush: bool = True):
182182
await self._reconnector.close(flush)
183183

184184

185+
class PublicAsyncIONoConsumerReader(PublicAsyncIOReader):
186+
def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
187+
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
188+
189+
async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
190+
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
191+
192+
185193
class ReaderReconnector:
186194
_static_reader_reconnector_counter = AtomicCounter()
187195

@@ -393,6 +401,7 @@ class ReaderStream:
393401
_update_token_interval: Union[int, float]
394402
_update_token_event: asyncio.Event
395403
_get_token_function: Callable[[], str]
404+
_settings: topic_reader.PublicReaderSettings
396405

397406
def __init__(
398407
self,
@@ -425,6 +434,8 @@ def __init__(
425434
self._get_token_function = get_token_function
426435
self._update_token_event = asyncio.Event()
427436

437+
self._settings = settings
438+
428439
@staticmethod
429440
async def create(
430441
reader_reconnector_id: int,
@@ -615,7 +626,7 @@ async def _read_messages_loop(self):
615626
message.server_message,
616627
StreamReadMessage.StartPartitionSessionRequest,
617628
):
618-
self._on_start_partition_session(message.server_message)
629+
await self._on_start_partition_session(message.server_message)
619630

620631
elif isinstance(
621632
message.server_message,
@@ -660,7 +671,7 @@ async def _update_token(self, token: str):
660671
finally:
661672
self._update_token_event.clear()
662673

663-
def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest):
674+
async def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest):
664675
try:
665676
if message.partition_session.partition_session_id in self._partition_sessions:
666677
raise TopicReaderError(
@@ -676,11 +687,19 @@ def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionS
676687
reader_reconnector_id=self._reader_reconnector_id,
677688
reader_stream_id=self._id,
678689
)
690+
691+
read_offset = None
692+
callee = self._settings.get_start_offset_lambda
693+
if callee is not None:
694+
read_offset = callee(message.partition_session.partition_id)
695+
if asyncio.iscoroutinefunction(callee):
696+
read_offset = await read_offset
697+
679698
self._stream.write(
680699
StreamReadMessage.FromClient(
681700
client_message=StreamReadMessage.StartPartitionSessionResponse(
682701
partition_session_id=message.partition_session.partition_session_id,
683-
read_offset=None,
702+
read_offset=read_offset,
684703
commit_offset=None,
685704
)
686705
),

ydb/_topic_reader/topic_reader_sync.py

+14
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import typing
55
from typing import List, Union, Optional
66

7+
from ydb import issues
78
from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType
89
from ydb._topic_common.common import (
910
_get_shared_event_loop,
@@ -191,3 +192,16 @@ def close(self, *, flush: bool = True, timeout: TimeoutType = None):
191192
def _check_closed(self):
192193
if self._closed:
193194
raise TopicReaderClosedError()
195+
196+
197+
class TopicNoConsumerReaderSync(TopicReaderSync):
198+
def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
199+
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
200+
201+
def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
202+
raise issues.Error("Commit operations are not supported for topic reader without consumer.")
203+
204+
def async_commit_with_ack(
205+
self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]
206+
) -> concurrent.futures.Future:
207+
raise issues.Error("Commit operations are not supported for topic reader without consumer.")

ydb/topic.py

+57-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"TopicError",
1515
"TopicMeteringMode",
1616
"TopicReader",
17+
"TopicNoConsumerReaderAsyncIO",
1718
"TopicReaderAsyncIO",
1819
"TopicReaderBatch",
1920
"TopicReaderMessage",
@@ -36,7 +37,7 @@
3637
import datetime
3738
from dataclasses import dataclass
3839
import logging
39-
from typing import List, Union, Mapping, Optional, Dict, Callable
40+
from typing import Awaitable, List, Union, Mapping, Optional, Dict, Callable
4041

4142
from . import aio, Credentials, _apis, issues
4243

@@ -52,10 +53,14 @@
5253
PublicTopicSelector as TopicReaderSelector,
5354
)
5455

55-
from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader
56+
from ._topic_reader.topic_reader_sync import (
57+
TopicReaderSync as TopicReader,
58+
TopicNoConsumerReaderSync as TopicNoConsumerReader,
59+
)
5660

5761
from ._topic_reader.topic_reader_asyncio import (
5862
PublicAsyncIOReader as TopicReaderAsyncIO,
63+
PublicAsyncIONoConsumerReader as TopicNoConsumerReaderAsyncIO,
5964
PublicTopicReaderPartitionExpiredError as TopicReaderPartitionExpiredError,
6065
PublicTopicReaderUnexpectedCodecError as TopicReaderUnexpectedCodecError,
6166
)
@@ -261,6 +266,31 @@ def reader(
261266

262267
return TopicReaderAsyncIO(self._driver, settings, _parent=self)
263268

269+
def no_consumer_reader(
270+
self,
271+
topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]],
272+
partition_ids: List[int],
273+
get_start_offset_lambda: Union[Callable[[int], int], Callable[[int], Awaitable[int]]],
274+
buffer_size_bytes: int = 50 * 1024 * 1024,
275+
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
276+
# the func will be called from multiply threads in parallel
277+
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
278+
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
279+
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
280+
decoder_executor: Optional[concurrent.futures.Executor] = None,
281+
auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
282+
) -> TopicNoConsumerReaderAsyncIO:
283+
if not decoder_executor:
284+
decoder_executor = self._executor
285+
286+
args = locals().copy()
287+
del args["self"]
288+
args["consumer"] = None
289+
290+
settings = TopicReaderSettings(**args)
291+
292+
return TopicNoConsumerReaderAsyncIO(self._driver, settings, _parent=self)
293+
264294
def writer(
265295
self,
266296
topic,
@@ -505,6 +535,31 @@ def reader(
505535

506536
return TopicReader(self._driver, settings, _parent=self)
507537

538+
def no_consumer_reader(
539+
self,
540+
topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]],
541+
partition_ids: List[int],
542+
get_start_offset_lambda: Union[Callable[[int], int], Callable[[int], Awaitable[int]]],
543+
buffer_size_bytes: int = 50 * 1024 * 1024,
544+
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
545+
# the func will be called from multiply threads in parallel
546+
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
547+
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
548+
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
549+
decoder_executor: Optional[concurrent.futures.Executor] = None,
550+
auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True.
551+
) -> TopicNoConsumerReader:
552+
if not decoder_executor:
553+
decoder_executor = self._executor
554+
555+
args = locals().copy()
556+
del args["self"]
557+
args["consumer"] = None
558+
559+
settings = TopicReaderSettings(**args)
560+
561+
return TopicNoConsumerReader(self._driver, settings, _parent=self)
562+
508563
def writer(
509564
self,
510565
topic,

0 commit comments

Comments
 (0)