Skip to content

No Consumer Reader #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions tests/topics/test_topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,203 @@ async def wait(fut):

await reader0.close()
await reader1.close()


@pytest.mark.asyncio
class TestTopicNoConsumerReaderAsyncIO:
async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages):
with pytest.raises(ydb.Error):
driver.topic_client.reader(
topic_with_messages,
consumer=None,
)

async def test_reader_with_default_lambda(self, driver, topic_with_messages):
reader = driver.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
)
msg = await reader.receive_message()

assert msg.seqno == 1

await reader.close()

async def test_reader_with_sync_lambda(self, driver, topic_with_messages):
def sync_lambda(partition_id: int):
assert partition_id == 0
return 1

reader = driver.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
get_start_offset_lambda=sync_lambda,
)
msg = await reader.receive_message()

assert msg.seqno == 2

await reader.close()

async def test_reader_with_async_lambda(self, driver, topic_with_messages):
async def async_lambda(partition_id: int) -> int:
assert partition_id == 0
return 1

reader = driver.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
get_start_offset_lambda=async_lambda,
)
msg = await reader.receive_message()

assert msg.seqno == 2

await reader.close()

async def test_commit_not_allowed(self, driver, topic_with_messages):
reader = driver.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
)
batch = await reader.receive_batch()

with pytest.raises(ydb.Error):
reader.commit(batch)

with pytest.raises(ydb.Error):
await reader.commit_with_ack(batch)

await reader.close()

async def test_offsets_updated_after_reconnect(self, driver, topic_with_messages):
current_offset = 0

def get_start_offset_lambda(partition_id: int) -> int:
nonlocal current_offset
return current_offset

reader = driver.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
get_start_offset_lambda=get_start_offset_lambda,
)
msg = await reader.receive_message()

assert msg.seqno == current_offset + 1

current_offset += 2
reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))

await asyncio.sleep(0)

msg = await reader.receive_message()

assert msg.seqno == current_offset + 1

await reader.close()


class TestTopicReaderWithoutConsumer:
def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messages):
with pytest.raises(ydb.Error):
driver_sync.topic_client.reader(
topic_with_messages,
consumer=None,
)

def test_reader_with_default_lambda(self, driver_sync, topic_with_messages):
reader = driver_sync.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
)
msg = reader.receive_message()

assert msg.seqno == 1

reader.close()

def test_reader_with_sync_lambda(self, driver_sync, topic_with_messages):
def sync_lambda(partition_id: int):
assert partition_id == 0
return 1

reader = driver_sync.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
get_start_offset_lambda=sync_lambda,
)
msg = reader.receive_message()

assert msg.seqno == 2

reader.close()

def test_reader_with_async_lambda(self, driver_sync, topic_with_messages):
async def async_lambda(partition_id: int) -> int:
assert partition_id == 0
return 1

reader = driver_sync.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
get_start_offset_lambda=async_lambda,
)
msg = reader.receive_message()

assert msg.seqno == 2

reader.close()

def test_commit_not_allowed(self, driver_sync, topic_with_messages):
reader = driver_sync.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
)
batch = reader.receive_batch()

with pytest.raises(ydb.Error):
reader.commit(batch)

with pytest.raises(ydb.Error):
reader.commit_with_ack(batch)

with pytest.raises(ydb.Error):
reader.async_commit_with_ack(batch)

reader.close()

def test_offsets_updated_after_reconnect(self, driver_sync, topic_with_messages):
current_offset = 0

def get_start_offset_lambda(partition_id: int) -> int:
nonlocal current_offset
return current_offset

reader = driver_sync.topic_client.reader(
topic_with_messages,
consumer=None,
partition_ids=[0],
get_start_offset_lambda=get_start_offset_lambda,
)
msg = reader.receive_message()

assert msg.seqno == current_offset + 1

current_offset += 2
reader._async_reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))

msg = reader.receive_message()

assert msg.seqno == current_offset + 1

reader.close()
5 changes: 3 additions & 2 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,13 @@ def from_proto(
@dataclass
class InitRequest(IToProto):
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
consumer: str
consumer: Optional[str]
auto_partitioning_support: bool

def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
res.consumer = self.consumer
if self.consumer is not None:
res.consumer = self.consumer
for settings in self.topics_read_settings:
res.topics_read_settings.append(settings.to_proto())
res.auto_partitioning_support = self.auto_partitioning_support
Expand Down
10 changes: 7 additions & 3 deletions ydb/_topic_reader/topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
from dataclasses import dataclass
from typing import (
Awaitable,
Union,
Optional,
List,
Expand Down Expand Up @@ -42,7 +43,7 @@ def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSett

@dataclass
class PublicReaderSettings:
consumer: str
consumer: Optional[str]
topic: TopicSelectorTypes
buffer_size_bytes: int = 50 * 1024 * 1024
auto_partitioning_support: bool = True
Expand All @@ -54,12 +55,15 @@ class PublicReaderSettings:
decoder_executor: Optional[concurrent.futures.Executor] = None
update_token_interval: Union[int, float] = 3600

partition_ids: Optional[List[int]] = None
get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None

def __post_init__(self):
# check possible create init message
_ = self._init_message()

def _init_message(self) -> StreamReadMessage.InitRequest:
if not isinstance(self.consumer, str):
if self.consumer is not None and not isinstance(self.consumer, str):
raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer))

if isinstance(self.topic, list):
Expand All @@ -69,7 +73,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:

for index, selector in enumerate(selectors):
if isinstance(selector, str):
selectors[index] = PublicTopicSelector(path=selector)
selectors[index] = PublicTopicSelector(path=selector, partitions=self.partition_ids)
elif isinstance(selector, PublicTopicSelector):
pass
else:
Expand Down
25 changes: 22 additions & 3 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self):
class PublicAsyncIOReader:
_loop: asyncio.AbstractEventLoop
_closed: bool
_settings: topic_reader.PublicReaderSettings
_reconnector: ReaderReconnector
_parent: typing.Any # need for prevent close parent client by GC

Expand All @@ -84,6 +85,7 @@ def __init__(
):
self._loop = asyncio.get_running_loop()
self._closed = False
self._settings = settings
self._reconnector = ReaderReconnector(driver, settings, self._loop)
self._parent = _parent

Expand Down Expand Up @@ -156,6 +158,9 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
For the method no way check the commit result
(for example if lost connection - commits will not re-send and committed messages will receive again).
"""
if self._settings.consumer is None:
raise issues.Error("Commit operations are not supported for topic reader without consumer.")

try:
self._reconnector.commit(batch)
except PublicTopicReaderPartitionExpiredError:
Expand All @@ -171,6 +176,9 @@ async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, dat
before receive commit ack. Message may be acked or not (if not - it will send in other read session,
to this or other reader).
"""
if self._settings.consumer is None:
raise issues.Error("Commit operations are not supported for topic reader without consumer.")

waiter = self._reconnector.commit(batch)
await waiter.future

Expand Down Expand Up @@ -393,6 +401,7 @@ class ReaderStream:
_update_token_interval: Union[int, float]
_update_token_event: asyncio.Event
_get_token_function: Callable[[], str]
_settings: topic_reader.PublicReaderSettings

def __init__(
self,
Expand Down Expand Up @@ -425,6 +434,8 @@ def __init__(
self._get_token_function = get_token_function
self._update_token_event = asyncio.Event()

self._settings = settings

@staticmethod
async def create(
reader_reconnector_id: int,
Expand Down Expand Up @@ -615,7 +626,7 @@ async def _read_messages_loop(self):
message.server_message,
StreamReadMessage.StartPartitionSessionRequest,
):
self._on_start_partition_session(message.server_message)
await self._on_start_partition_session(message.server_message)

elif isinstance(
message.server_message,
Expand Down Expand Up @@ -660,7 +671,7 @@ async def _update_token(self, token: str):
finally:
self._update_token_event.clear()

def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest):
async def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest):
try:
if message.partition_session.partition_session_id in self._partition_sessions:
raise TopicReaderError(
Expand All @@ -676,11 +687,19 @@ def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionS
reader_reconnector_id=self._reader_reconnector_id,
reader_stream_id=self._id,
)

read_offset = None
callee = self._settings.get_start_offset_lambda
if callee is not None:
read_offset = callee(message.partition_session.partition_id)
if asyncio.iscoroutinefunction(callee):
read_offset = await read_offset

self._stream.write(
StreamReadMessage.FromClient(
client_message=StreamReadMessage.StartPartitionSessionResponse(
partition_session_id=message.partition_session.partition_session_id,
read_offset=None,
read_offset=read_offset,
commit_offset=None,
)
),
Expand Down
Loading
Loading