From cbc035223d3e7548fc3fa38075341d7df2d07192 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 9 Apr 2025 20:00:30 +0300 Subject: [PATCH 1/2] No Consumer Reader --- tests/topics/test_topic_reader.py | 200 ++++++++++++++++++++++ ydb/_grpc/grpcwrapper/ydb_topic.py | 5 +- ydb/_topic_reader/topic_reader.py | 10 +- ydb/_topic_reader/topic_reader_asyncio.py | 25 ++- ydb/_topic_reader/topic_reader_sync.py | 13 ++ ydb/topic.py | 35 +++- 6 files changed, 275 insertions(+), 13 deletions(-) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index dee5ab49..fc041980 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -251,3 +251,203 @@ async def wait(fut): await reader0.close() await reader1.close() + + +@pytest.mark.asyncio +class TestTopicNoConsumerReaderAsyncIO: + async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages): + with pytest.raises(ydb.Error): + driver.topic_client.reader( + topic_with_messages, + consumer=None, + ) + + async def test_reader_with_default_lambda(self, driver, topic_with_messages): + reader = driver.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + ) + msg = await reader.receive_message() + + assert msg.seqno == 1 + + await reader.close() + + async def test_reader_with_sync_lambda(self, driver, topic_with_messages): + def sync_lambda(partition_id: int): + assert partition_id == 0 + return 1 + + reader = driver.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + get_start_offset_lambda=sync_lambda, + ) + msg = await reader.receive_message() + + assert msg.seqno == 2 + + await reader.close() + + async def test_reader_with_async_lambda(self, driver, topic_with_messages): + async def async_lambda(partition_id: int) -> int: + assert partition_id == 0 + return 1 + + reader = driver.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + get_start_offset_lambda=async_lambda, + ) + msg = await reader.receive_message() + + assert msg.seqno == 2 + + await reader.close() + + async def test_commit_not_allowed(self, driver, topic_with_messages): + reader = driver.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + ) + batch = await reader.receive_batch() + + with pytest.raises(ydb.Error): + reader.commit(batch) + + with pytest.raises(ydb.Error): + await reader.commit_with_ack(batch) + + await reader.close() + + async def test_offsets_updated_after_reconnect(self, driver, topic_with_messages): + current_offset = 0 + + def get_start_offset_lambda(partition_id: int) -> int: + nonlocal current_offset + return current_offset + + reader = driver.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + get_start_offset_lambda=get_start_offset_lambda, + ) + msg = await reader.receive_message() + + assert msg.seqno == current_offset + 1 + + current_offset += 2 + reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error")) + + await asyncio.sleep(0) + + msg = await reader.receive_message() + + assert msg.seqno == current_offset + 1 + + await reader.close() + + +class TestTopicReaderWithoutConsumer: + def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messages): + with pytest.raises(ydb.Error): + driver_sync.topic_client.reader( + topic_with_messages, + consumer=None, + ) + + def test_reader_with_default_lambda(self, driver_sync, topic_with_messages): + reader = driver_sync.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + ) + msg = reader.receive_message() + + assert msg.seqno == 1 + + reader.close() + + def test_reader_with_sync_lambda(self, driver_sync, topic_with_messages): + def sync_lambda(partition_id: int): + assert partition_id == 0 + return 1 + + reader = driver_sync.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + get_start_offset_lambda=sync_lambda, + ) + msg = reader.receive_message() + + assert msg.seqno == 2 + + reader.close() + + def test_reader_with_async_lambda(self, driver_sync, topic_with_messages): + async def async_lambda(partition_id: int) -> int: + assert partition_id == 0 + return 1 + + reader = driver_sync.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + get_start_offset_lambda=async_lambda, + ) + msg = reader.receive_message() + + assert msg.seqno == 2 + + reader.close() + + def test_commit_not_allowed(self, driver_sync, topic_with_messages): + reader = driver_sync.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + ) + batch = reader.receive_batch() + + with pytest.raises(ydb.Error): + reader.commit(batch) + + with pytest.raises(ydb.Error): + reader.commit_with_ack(batch) + + with pytest.raises(ydb.Error): + reader.async_commit_with_ack(batch) + + reader.close() + + def test_offsets_updated_after_reconnect(self, driver_sync, topic_with_messages): + current_offset = 0 + + def get_start_offset_lambda(partition_id: int) -> int: + nonlocal current_offset + return current_offset + + reader = driver_sync.topic_client.reader( + topic_with_messages, + consumer=None, + partition_ids=[0], + get_start_offset_lambda=get_start_offset_lambda, + ) + msg = reader.receive_message() + + assert msg.seqno == current_offset + 1 + + current_offset += 2 + reader._async_reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error")) + + msg = reader.receive_message() + + assert msg.seqno == current_offset + 1 + + reader.close() diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 0ab78e03..a4cdf407 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -439,12 +439,13 @@ def from_proto( @dataclass class InitRequest(IToProto): topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"] - consumer: str + consumer: Optional[str] auto_partitioning_support: bool def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest: res = ydb_topic_pb2.StreamReadMessage.InitRequest() - res.consumer = self.consumer + if self.consumer is not None: + res.consumer = self.consumer for settings in self.topics_read_settings: res.topics_read_settings.append(settings.to_proto()) res.auto_partitioning_support = self.auto_partitioning_support diff --git a/ydb/_topic_reader/topic_reader.py b/ydb/_topic_reader/topic_reader.py index 8bc12cc0..015e0bf6 100644 --- a/ydb/_topic_reader/topic_reader.py +++ b/ydb/_topic_reader/topic_reader.py @@ -3,6 +3,7 @@ import datetime from dataclasses import dataclass from typing import ( + Awaitable, Union, Optional, List, @@ -42,7 +43,7 @@ def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSett @dataclass class PublicReaderSettings: - consumer: str + consumer: Optional[str] topic: TopicSelectorTypes buffer_size_bytes: int = 50 * 1024 * 1024 auto_partitioning_support: bool = True @@ -54,12 +55,15 @@ class PublicReaderSettings: decoder_executor: Optional[concurrent.futures.Executor] = None update_token_interval: Union[int, float] = 3600 + partition_ids: Optional[List[int]] = None + get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None + def __post_init__(self): # check possible create init message _ = self._init_message() def _init_message(self) -> StreamReadMessage.InitRequest: - if not isinstance(self.consumer, str): + if self.consumer is not None and not isinstance(self.consumer, str): raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer)) if isinstance(self.topic, list): @@ -69,7 +73,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest: for index, selector in enumerate(selectors): if isinstance(selector, str): - selectors[index] = PublicTopicSelector(path=selector) + selectors[index] = PublicTopicSelector(path=selector, partitions=self.partition_ids) elif isinstance(selector, PublicTopicSelector): pass else: diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 87012554..7ba05321 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -72,6 +72,7 @@ def __init__(self): class PublicAsyncIOReader: _loop: asyncio.AbstractEventLoop _closed: bool + _settings: topic_reader.PublicReaderSettings _reconnector: ReaderReconnector _parent: typing.Any # need for prevent close parent client by GC @@ -84,6 +85,7 @@ def __init__( ): self._loop = asyncio.get_running_loop() self._closed = False + self._settings = settings self._reconnector = ReaderReconnector(driver, settings, self._loop) self._parent = _parent @@ -156,6 +158,9 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa For the method no way check the commit result (for example if lost connection - commits will not re-send and committed messages will receive again). """ + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + try: self._reconnector.commit(batch) except PublicTopicReaderPartitionExpiredError: @@ -171,6 +176,9 @@ async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, dat before receive commit ack. Message may be acked or not (if not - it will send in other read session, to this or other reader). """ + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + waiter = self._reconnector.commit(batch) await waiter.future @@ -393,6 +401,7 @@ class ReaderStream: _update_token_interval: Union[int, float] _update_token_event: asyncio.Event _get_token_function: Callable[[], str] + _settings: topic_reader.PublicReaderSettings def __init__( self, @@ -425,6 +434,8 @@ def __init__( self._get_token_function = get_token_function self._update_token_event = asyncio.Event() + self._settings = settings + @staticmethod async def create( reader_reconnector_id: int, @@ -615,7 +626,7 @@ async def _read_messages_loop(self): message.server_message, StreamReadMessage.StartPartitionSessionRequest, ): - self._on_start_partition_session(message.server_message) + await self._on_start_partition_session(message.server_message) elif isinstance( message.server_message, @@ -660,7 +671,7 @@ async def _update_token(self, token: str): finally: self._update_token_event.clear() - def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest): + async def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest): try: if message.partition_session.partition_session_id in self._partition_sessions: raise TopicReaderError( @@ -676,11 +687,19 @@ def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionS reader_reconnector_id=self._reader_reconnector_id, reader_stream_id=self._id, ) + + read_offset = None + callee = self._settings.get_start_offset_lambda + if callee is not None: + read_offset = callee(message.partition_session.partition_id) + if asyncio.iscoroutinefunction(callee): + read_offset = await read_offset + self._stream.write( StreamReadMessage.FromClient( client_message=StreamReadMessage.StartPartitionSessionResponse( partition_session_id=message.partition_session.partition_session_id, - read_offset=None, + read_offset=read_offset, commit_offset=None, ) ), diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index 31f28899..bb2fc2a3 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -4,6 +4,7 @@ import typing from typing import List, Union, Optional +from ydb import issues from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType from ydb._topic_common.common import ( _get_shared_event_loop, @@ -31,6 +32,7 @@ class TopicReaderSync: _caller: CallFromSyncToAsync _async_reader: PublicAsyncIOReader _closed: bool + _settings: PublicReaderSettings _parent: typing.Any # need for prevent stop the client by GC def __init__( @@ -55,6 +57,8 @@ async def create_reader(): self._async_reader = asyncio.run_coroutine_threadsafe(create_reader(), loop).result() + self._settings = settings + self._parent = _parent def __del__(self): @@ -154,6 +158,9 @@ def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBat """ self._check_closed() + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + self._caller.call_sync(lambda: self._async_reader.commit(mess)) def commit_with_ack( @@ -168,6 +175,9 @@ def commit_with_ack( """ self._check_closed() + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + return self._caller.unsafe_call_with_result(self._async_reader.commit_with_ack(mess), timeout) def async_commit_with_ack( @@ -178,6 +188,9 @@ def async_commit_with_ack( """ self._check_closed() + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + return self._caller.unsafe_call_with_future(self._async_reader.commit_with_ack(mess)) def close(self, *, flush: bool = True, timeout: TimeoutType = None): diff --git a/ydb/topic.py b/ydb/topic.py index a501f9d2..586a0491 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -36,7 +36,7 @@ import datetime from dataclasses import dataclass import logging -from typing import List, Union, Mapping, Optional, Dict, Callable +from typing import Awaitable, List, Union, Mapping, Optional, Dict, Callable from . import aio, Credentials, _apis, issues @@ -52,7 +52,9 @@ PublicTopicSelector as TopicReaderSelector, ) -from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader +from ._topic_reader.topic_reader_sync import ( + TopicReaderSync as TopicReader, +) from ._topic_reader.topic_reader_asyncio import ( PublicAsyncIOReader as TopicReaderAsyncIO, @@ -240,7 +242,7 @@ async def drop_topic(self, path: str): def reader( self, topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]], - consumer: str, + consumer: Optional[str], buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes # the func will be called from multiply threads in parallel @@ -249,6 +251,8 @@ def reader( # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. + partition_ids: Optional[List[int]] = None, + get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None, ) -> TopicReaderAsyncIO: if not decoder_executor: @@ -257,6 +261,16 @@ def reader( args = locals().copy() del args["self"] + if consumer is None: + if partition_ids is None: + raise issues.Error("To use reader without consumer it is required to specify partition_ids.") + if get_start_offset_lambda is None: + + def callee(partition_id: int): + return None + + args["get_start_offset_lambda"] = callee + settings = TopicReaderSettings(**args) return TopicReaderAsyncIO(self._driver, settings, _parent=self) @@ -484,7 +498,7 @@ def drop_topic(self, path: str): def reader( self, topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]], - consumer: str, + consumer: Optional[str], buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes # the func will be called from multiply threads in parallel @@ -493,13 +507,24 @@ def reader( # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. + partition_ids: Optional[List[int]] = None, + get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None, ) -> TopicReader: if not decoder_executor: decoder_executor = self._executor args = locals().copy() del args["self"] - self._check_closed() + + if consumer is None: + if partition_ids is None: + raise issues.Error("To use reader without consumer it is required to specify partition_ids.") + if get_start_offset_lambda is None: + + def callee(partition_id: int): + return None + + args["get_start_offset_lambda"] = callee settings = TopicReaderSettings(**args) From 1bd2b2887d84e773efaccff04392e9b260deaa39 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 16 Apr 2025 13:52:30 +0300 Subject: [PATCH 2/2] another approach --- tests/topics/test_topic_reader.py | 160 ++++++++++++++-------- ydb/_topic_reader/events.py | 81 +++++++++++ ydb/_topic_reader/topic_reader.py | 28 +--- ydb/_topic_reader/topic_reader_asyncio.py | 15 +- ydb/topic.py | 55 +++++--- 5 files changed, 233 insertions(+), 106 deletions(-) create mode 100644 ydb/_topic_reader/events.py diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index fc041980..1836d2e7 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -253,20 +253,43 @@ async def wait(fut): await reader1.close() +@pytest.fixture() +def topic_selector(topic_with_messages): + return ydb.TopicReaderSelector(path=topic_with_messages, partitions=[0]) + + @pytest.mark.asyncio class TestTopicNoConsumerReaderAsyncIO: async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages): + with pytest.raises(ydb.Error): + driver.topic_client.reader( + topic_with_messages, + consumer=None, + event_handler=ydb.TopicReaderEvents.EventHandler(), + ) + + async def test_reader_with_no_event_handler_raises(self, driver, topic_with_messages): with pytest.raises(ydb.Error): driver.topic_client.reader( topic_with_messages, consumer=None, ) - async def test_reader_with_default_lambda(self, driver, topic_with_messages): + async def test_reader_with_no_partition_ids_selector_raises(self, driver, topic_selector): + topic_selector.partitions = None + + with pytest.raises(ydb.Error): + driver.topic_client.reader( + topic_selector, + consumer=None, + event_handler=ydb.TopicReaderEvents.EventHandler(), + ) + + async def test_reader_with_default_lambda(self, driver, topic_selector): reader = driver.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], + event_handler=ydb.TopicReaderEvents.EventHandler(), ) msg = await reader.receive_message() @@ -274,45 +297,49 @@ async def test_reader_with_default_lambda(self, driver, topic_with_messages): await reader.close() - async def test_reader_with_sync_lambda(self, driver, topic_with_messages): - def sync_lambda(partition_id: int): - assert partition_id == 0 - return 1 + async def test_reader_with_sync_lambda(self, driver, topic_selector): + class CustomEventHandler(ydb.TopicReaderEvents.EventHandler): + def on_partition_get_start_offset(self, event): + assert topic_selector.path.endswith(event.topic) + assert event.partition_id == 0 + return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1) reader = driver.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], - get_start_offset_lambda=sync_lambda, + event_handler=CustomEventHandler(), ) + msg = await reader.receive_message() assert msg.seqno == 2 await reader.close() - async def test_reader_with_async_lambda(self, driver, topic_with_messages): - async def async_lambda(partition_id: int) -> int: - assert partition_id == 0 - return 1 + async def test_reader_with_async_lambda(self, driver, topic_selector): + class CustomEventHandler(ydb.TopicReaderEvents.EventHandler): + async def on_partition_get_start_offset(self, event): + assert topic_selector.path.endswith(event.topic) + assert event.partition_id == 0 + return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1) reader = driver.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], - get_start_offset_lambda=async_lambda, + event_handler=CustomEventHandler(), ) + msg = await reader.receive_message() assert msg.seqno == 2 await reader.close() - async def test_commit_not_allowed(self, driver, topic_with_messages): + async def test_commit_not_allowed(self, driver, topic_selector): reader = driver.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], + event_handler=ydb.TopicReaderEvents.EventHandler(), ) batch = await reader.receive_batch() @@ -324,18 +351,18 @@ async def test_commit_not_allowed(self, driver, topic_with_messages): await reader.close() - async def test_offsets_updated_after_reconnect(self, driver, topic_with_messages): + async def test_offsets_updated_after_reconnect(self, driver, topic_selector): current_offset = 0 - def get_start_offset_lambda(partition_id: int) -> int: - nonlocal current_offset - return current_offset + class CustomEventHandler(ydb.TopicReaderEvents.EventHandler): + def on_partition_get_start_offset(self, event): + nonlocal current_offset + return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset) reader = driver.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], - get_start_offset_lambda=get_start_offset_lambda, + event_handler=CustomEventHandler(), ) msg = await reader.receive_message() @@ -359,13 +386,31 @@ def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messa driver_sync.topic_client.reader( topic_with_messages, consumer=None, + event_handler=ydb.TopicReaderEvents.EventHandler(), ) - def test_reader_with_default_lambda(self, driver_sync, topic_with_messages): + def test_reader_with_no_event_handler_raises(self, driver_sync, topic_with_messages): + with pytest.raises(ydb.Error): + driver_sync.topic_client.reader( + topic_with_messages, + consumer=None, + ) + + def test_reader_with_no_partition_ids_selector_raises(self, driver_sync, topic_selector): + topic_selector.partitions = None + + with pytest.raises(ydb.Error): + driver_sync.topic_client.reader( + topic_selector, + consumer=None, + event_handler=ydb.TopicReaderEvents.EventHandler(), + ) + + def test_reader_with_default_lambda(self, driver_sync, topic_selector): reader = driver_sync.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], + event_handler=ydb.TopicReaderEvents.EventHandler(), ) msg = reader.receive_message() @@ -373,45 +418,49 @@ def test_reader_with_default_lambda(self, driver_sync, topic_with_messages): reader.close() - def test_reader_with_sync_lambda(self, driver_sync, topic_with_messages): - def sync_lambda(partition_id: int): - assert partition_id == 0 - return 1 + def test_reader_with_sync_lambda(self, driver_sync, topic_selector): + class CustomEventHandler(ydb.TopicReaderEvents.EventHandler): + def on_partition_get_start_offset(self, event): + assert topic_selector.path.endswith(event.topic) + assert event.partition_id == 0 + return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1) reader = driver_sync.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], - get_start_offset_lambda=sync_lambda, + event_handler=CustomEventHandler(), ) + msg = reader.receive_message() assert msg.seqno == 2 reader.close() - def test_reader_with_async_lambda(self, driver_sync, topic_with_messages): - async def async_lambda(partition_id: int) -> int: - assert partition_id == 0 - return 1 + def test_reader_with_async_lambda(self, driver_sync, topic_selector): + class CustomEventHandler(ydb.TopicReaderEvents.EventHandler): + async def on_partition_get_start_offset(self, event): + assert topic_selector.path.endswith(event.topic) + assert event.partition_id == 0 + return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1) reader = driver_sync.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], - get_start_offset_lambda=async_lambda, + event_handler=CustomEventHandler(), ) + msg = reader.receive_message() assert msg.seqno == 2 reader.close() - def test_commit_not_allowed(self, driver_sync, topic_with_messages): + def test_commit_not_allowed(self, driver_sync, topic_selector): reader = driver_sync.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], + event_handler=ydb.TopicReaderEvents.EventHandler(), ) batch = reader.receive_batch() @@ -421,23 +470,20 @@ def test_commit_not_allowed(self, driver_sync, topic_with_messages): with pytest.raises(ydb.Error): reader.commit_with_ack(batch) - with pytest.raises(ydb.Error): - reader.async_commit_with_ack(batch) - reader.close() - def test_offsets_updated_after_reconnect(self, driver_sync, topic_with_messages): + def test_offsets_updated_after_reconnect(self, driver_sync, topic_selector): current_offset = 0 - def get_start_offset_lambda(partition_id: int) -> int: - nonlocal current_offset - return current_offset + class CustomEventHandler(ydb.TopicReaderEvents.EventHandler): + def on_partition_get_start_offset(self, event): + nonlocal current_offset + return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset) reader = driver_sync.topic_client.reader( - topic_with_messages, + topic_selector, consumer=None, - partition_ids=[0], - get_start_offset_lambda=get_start_offset_lambda, + event_handler=CustomEventHandler(), ) msg = reader.receive_message() diff --git a/ydb/_topic_reader/events.py b/ydb/_topic_reader/events.py new file mode 100644 index 00000000..b229713c --- /dev/null +++ b/ydb/_topic_reader/events.py @@ -0,0 +1,81 @@ +import asyncio +from dataclasses import dataclass +from typing import Awaitable, Union + +from ..issues import ClientInternalError + +__all__ = [ + "OnCommit", + "OnPartitionGetStartOffsetRequest", + "OnPartitionGetStartOffsetResponse", + "OnInitPartition", + "OnShutdownPartition", + "EventHandler", +] + + +class BaseReaderEvent: + pass + + +@dataclass +class OnCommit(BaseReaderEvent): + topic: str + offset: int + + +@dataclass +class OnPartitionGetStartOffsetRequest(BaseReaderEvent): + topic: str + partition_id: int + + +@dataclass +class OnPartitionGetStartOffsetResponse: + start_offset: int + + +class OnInitPartition(BaseReaderEvent): + pass + + +class OnShutdownPartition: + pass + + +TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None] + + +class EventHandler: + def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]: + pass + + def on_partition_get_start_offset( + self, + event: OnPartitionGetStartOffsetRequest, + ) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]: + pass + + def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]: + pass + + def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]: + pass + + async def _dispatch(self, event: BaseReaderEvent) -> Awaitable[TopicEventDispatchType]: + f = None + if isinstance(event, OnCommit): + f = self.on_commit + elif isinstance(event, OnPartitionGetStartOffsetRequest): + f = self.on_partition_get_start_offset + elif isinstance(event, OnInitPartition): + f = self.on_init_partition + elif isinstance(event, OnShutdownPartition): + f = self.on_shutdown_partition + else: + raise ClientInternalError("Unsupported topic reader event") + + if asyncio.iscoroutinefunction(f): + return await f(event) + + return f(event) diff --git a/ydb/_topic_reader/topic_reader.py b/ydb/_topic_reader/topic_reader.py index 015e0bf6..d477c9ca 100644 --- a/ydb/_topic_reader/topic_reader.py +++ b/ydb/_topic_reader/topic_reader.py @@ -3,7 +3,6 @@ import datetime from dataclasses import dataclass from typing import ( - Awaitable, Union, Optional, List, @@ -11,6 +10,7 @@ Callable, ) +from .events import EventHandler from ..retries import RetrySettings from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange @@ -21,6 +21,7 @@ class PublicTopicSelector: partitions: Optional[Union[int, List[int]]] = None read_from: Optional[datetime.datetime] = None max_lag: Optional[datetime.timedelta] = None + read_offset: Optional[int] = None def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings: partitions = self.partitions @@ -54,9 +55,7 @@ class PublicReaderSettings: # decoder_executor, must be set for handle non raw messages decoder_executor: Optional[concurrent.futures.Executor] = None update_token_interval: Union[int, float] = 3600 - - partition_ids: Optional[List[int]] = None - get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None + event_handler: Optional[EventHandler] = None def __post_init__(self): # check possible create init message @@ -73,7 +72,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest: for index, selector in enumerate(selectors): if isinstance(selector, str): - selectors[index] = PublicTopicSelector(path=selector, partitions=self.partition_ids) + selectors[index] = PublicTopicSelector(path=selector) elif isinstance(selector, PublicTopicSelector): pass else: @@ -89,25 +88,6 @@ def _retry_settings(self) -> RetrySettings: return RetrySettings(idempotent=True) -class Events: - class OnCommit: - topic: str - offset: int - - class OnPartitionGetStartOffsetRequest: - topic: str - partition_id: int - - class OnPartitionGetStartOffsetResponse: - start_offset: int - - class OnInitPartition: - pass - - class OnShutdownPatition: - pass - - class RetryPolicy: connection_timeout_sec: float overload_timeout_sec: float diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 7ba05321..34c52108 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -15,6 +15,7 @@ from ..aio import Driver from ..issues import Error as YdbError, _process_response from . import datatypes +from . import events from . import topic_reader from .._grpc.grpcwrapper.common_utils import ( IGrpcWrapperAsyncIO, @@ -689,11 +690,15 @@ async def _on_start_partition_session(self, message: StreamReadMessage.StartPart ) read_offset = None - callee = self._settings.get_start_offset_lambda - if callee is not None: - read_offset = callee(message.partition_session.partition_id) - if asyncio.iscoroutinefunction(callee): - read_offset = await read_offset + + if self._settings.event_handler is not None: + resp = await self._settings.event_handler._dispatch( + events.OnPartitionGetStartOffsetRequest( + message.partition_session.path, + message.partition_session.partition_id, + ) + ) + read_offset = None if resp is None else resp.start_offset self._stream.write( StreamReadMessage.FromClient( diff --git a/ydb/topic.py b/ydb/topic.py index 586a0491..1f839ba7 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -16,6 +16,7 @@ "TopicReader", "TopicReaderAsyncIO", "TopicReaderBatch", + "TopicReaderEvents", "TopicReaderMessage", "TopicReaderSelector", "TopicReaderSettings", @@ -36,12 +37,14 @@ import datetime from dataclasses import dataclass import logging -from typing import Awaitable, List, Union, Mapping, Optional, Dict, Callable +from typing import List, Union, Mapping, Optional, Dict, Callable from . import aio, Credentials, _apis, issues from . import driver +from ._topic_reader import events as TopicReaderEvents + from ._topic_reader.datatypes import ( PublicBatch as TopicReaderBatch, PublicMessage as TopicReaderMessage, @@ -251,8 +254,7 @@ def reader( # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. - partition_ids: Optional[List[int]] = None, - get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None, + event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReaderAsyncIO: if not decoder_executor: @@ -261,15 +263,22 @@ def reader( args = locals().copy() del args["self"] - if consumer is None: - if partition_ids is None: - raise issues.Error("To use reader without consumer it is required to specify partition_ids.") - if get_start_offset_lambda is None: + if consumer == "": + raise issues.Error( + "Consumer name could not be empty! To use reader without consumer specify consumer as None." + ) - def callee(partition_id: int): - return None + if consumer is None: + if not isinstance(topic, TopicReaderSelector) or topic.partitions is None: + raise issues.Error( + "To use reader without consumer it is required to specify partition_ids in topic selector." + ) - args["get_start_offset_lambda"] = callee + if event_handler is None: + raise issues.Error( + "To use reader without consumer it is required to specify event_handler with " + "on_partition_get_start_offset method." + ) settings = TopicReaderSettings(**args) @@ -507,8 +516,7 @@ def reader( # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. - partition_ids: Optional[List[int]] = None, - get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None, + event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReader: if not decoder_executor: decoder_executor = self._executor @@ -516,15 +524,22 @@ def reader( args = locals().copy() del args["self"] - if consumer is None: - if partition_ids is None: - raise issues.Error("To use reader without consumer it is required to specify partition_ids.") - if get_start_offset_lambda is None: - - def callee(partition_id: int): - return None + if consumer == "": + raise issues.Error( + "Consumer name could not be empty! To use reader without consumer specify consumer as None." + ) - args["get_start_offset_lambda"] = callee + if consumer is None: + if not isinstance(topic, TopicReaderSelector) or topic.partitions is None: + raise issues.Error( + "To use reader without consumer it is required to specify partition_ids in topic selector." + ) + + if event_handler is None: + raise issues.Error( + "To use reader without consumer it is required to specify event_handler with " + "on_partition_get_start_offset method." + ) settings = TopicReaderSettings(**args)