From c0f68d5d0c818884a41140a33395b7c8f12d9dbe Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 21 Apr 2025 16:56:15 +0300 Subject: [PATCH 1/2] CommitOffset feature --- tests/topics/test_topic_reader.py | 40 ++++++++++++++++++++++++++++++ ydb/_apis.py | 1 + ydb/_grpc/grpcwrapper/ydb_topic.py | 16 ++++++++++++ ydb/_topic_reader/datatypes.py | 4 +++ ydb/topic.py | 30 ++++++++++++++++++++++ 5 files changed, 91 insertions(+) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 1836d2e7..e6426660 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -64,6 +64,26 @@ async def test_read_and_commit_with_ack(self, driver, topic_with_messages, topic assert message != batch.messages[0] + async def test_commit_offset_works(self, driver, topic_with_messages, topic_consumer): + for out in ["123", "456", "789", "0"]: + async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader: + message = await reader.receive_message() + assert message.data.decode() == out + + await driver.topic_client.commit_offset( + topic_with_messages, topic_consumer, message.partition_id, message.offset + 1 + ) + + async def test_reader_reconnect_after_commit_offset(self, driver, topic_with_messages, topic_consumer): + async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader: + for out in ["123", "456", "789", "0"]: + message = await reader.receive_message() + assert message.data.decode() == out + + await driver.topic_client.commit_offset( + topic_with_messages, topic_consumer, message.partition_id, message.offset + 1 + ) + async def test_read_compressed_messages(self, driver, topic_path, topic_consumer): async with driver.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer: await writer.write("123") @@ -183,6 +203,26 @@ def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_ assert message != batch.messages[0] + def test_commit_offset_works(self, driver_sync, topic_with_messages, topic_consumer): + for out in ["123", "456", "789", "0"]: + with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader: + message = reader.receive_message() + assert message.data.decode() == out + + driver_sync.topic_client.commit_offset( + topic_with_messages, topic_consumer, message.partition_id, message.offset + 1 + ) + + def test_reader_reconnect_after_commit_offset(self, driver_sync, topic_with_messages, topic_consumer): + with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader: + for out in ["123", "456", "789", "0"]: + message = reader.receive_message() + assert message.data.decode() == out + + driver_sync.topic_client.commit_offset( + topic_with_messages, topic_consumer, message.partition_id, message.offset + 1 + ) + def test_read_compressed_messages(self, driver_sync, topic_path, topic_consumer): with driver_sync.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer: writer.write("123") diff --git a/ydb/_apis.py b/ydb/_apis.py index e54f25d2..827a71a4 100644 --- a/ydb/_apis.py +++ b/ydb/_apis.py @@ -117,6 +117,7 @@ class TopicService(object): StreamRead = "StreamRead" StreamWrite = "StreamWrite" UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction" + CommitOffset = "CommitOffset" class QueryService(object): diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index a4cdf407..e70de150 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -137,6 +137,22 @@ def from_proto(msg: ydb_topic_pb2.UpdateTokenResponse) -> typing.Any: return UpdateTokenResponse() +@dataclass +class CommitOffsetRequest(IToProto): + path: str + consumer: str + partition_id: int + offset: int + + def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest: + return ydb_topic_pb2.CommitOffsetRequest( + path=self.path, + consumer=self.consumer, + partition_id=self.partition_id, + offset=self.offset, + ) + + ######################################################################################################################## # StreamWrite ######################################################################################################################## diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index 74f06a08..737fa414 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -56,6 +56,10 @@ def _commit_get_offsets_range(self) -> OffsetsRange: def alive(self) -> bool: return not self._partition_session.closed + @property + def partition_id(self) -> int: + return self._partition_session.partition_id + @dataclass class PartitionSession: diff --git a/ydb/topic.py b/ydb/topic.py index 1f839ba7..ceb82efb 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -340,6 +340,21 @@ def tx_writer( return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self) + async def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None: + req = _ydb_topic.CommitOffsetRequest( + path=path, + consumer=consumer, + partition_id=partition_id, + offset=offset, + ) + + await self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.CommitOffset, + _wrap_operation, + ) + def close(self): if self._closed: return @@ -603,6 +618,21 @@ def tx_writer( return TopicTxWriter(tx, self._driver, settings, _parent=self) + def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None: + req = _ydb_topic.CommitOffsetRequest( + path=path, + consumer=consumer, + partition_id=partition_id, + offset=offset, + ) + + self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.CommitOffset, + _wrap_operation, + ) + def close(self): if self._closed: return From 0c32132bd1f378b35ef2d17963a52817130bdd4a Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 22 Apr 2025 11:45:26 +0300 Subject: [PATCH 2/2] update docker image from trunk to latest --- docker-compose-tls.yml | 2 +- docker-compose.yml | 2 +- tests/query/test_query_parameters.py | 24 +++++++++++++++++++++++- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/docker-compose-tls.yml b/docker-compose-tls.yml index f0a4b328..80b09eb9 100644 --- a/docker-compose-tls.yml +++ b/docker-compose-tls.yml @@ -1,7 +1,7 @@ version: "3.9" services: ydb: - image: ydbplatform/local-ydb:trunk + image: ydbplatform/local-ydb:latest restart: always ports: - 2136:2136 diff --git a/docker-compose.yml b/docker-compose.yml index 1a466fab..aafa938a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.3" services: ydb: - image: ydbplatform/local-ydb:trunk + image: ydbplatform/local-ydb:latest restart: always ports: - 2136:2136 diff --git a/tests/query/test_query_parameters.py b/tests/query/test_query_parameters.py index 4171f5eb..0367b6cd 100644 --- a/tests/query/test_query_parameters.py +++ b/tests/query/test_query_parameters.py @@ -4,10 +4,11 @@ import ydb -query = """SELECT $a AS value""" +query_template = "DECLARE $a as %s; SELECT $a AS value" def test_select_implicit_int(pool: ydb.QuerySessionPool): + query = query_template % "Int64" expected_value = 111 res = pool.execute_with_retries(query, parameters={"$a": expected_value}) actual_value = res[0].rows[0]["value"] @@ -15,6 +16,7 @@ def test_select_implicit_int(pool: ydb.QuerySessionPool): def test_select_implicit_float(pool: ydb.QuerySessionPool): + query = query_template % "Double" expected_value = 11.1 res = pool.execute_with_retries(query, parameters={"$a": expected_value}) actual_value = res[0].rows[0]["value"] @@ -22,6 +24,7 @@ def test_select_implicit_float(pool: ydb.QuerySessionPool): def test_select_implicit_bool(pool: ydb.QuerySessionPool): + query = query_template % "Bool" expected_value = False res = pool.execute_with_retries(query, parameters={"$a": expected_value}) actual_value = res[0].rows[0]["value"] @@ -29,6 +32,7 @@ def test_select_implicit_bool(pool: ydb.QuerySessionPool): def test_select_implicit_str(pool: ydb.QuerySessionPool): + query = query_template % "Utf8" expected_value = "text" res = pool.execute_with_retries(query, parameters={"$a": expected_value}) actual_value = res[0].rows[0]["value"] @@ -36,6 +40,7 @@ def test_select_implicit_str(pool: ydb.QuerySessionPool): def test_select_implicit_bytes(pool: ydb.QuerySessionPool): + query = query_template % "String" expected_value = b"text" res = pool.execute_with_retries(query, parameters={"$a": expected_value}) actual_value = res[0].rows[0]["value"] @@ -43,6 +48,7 @@ def test_select_implicit_bytes(pool: ydb.QuerySessionPool): def test_select_implicit_list(pool: ydb.QuerySessionPool): + query = query_template % "List" expected_value = [1, 2, 3] res = pool.execute_with_retries(query, parameters={"$a": expected_value}) actual_value = res[0].rows[0]["value"] @@ -50,6 +56,7 @@ def test_select_implicit_list(pool: ydb.QuerySessionPool): def test_select_implicit_dict(pool: ydb.QuerySessionPool): + query = query_template % "Dict" expected_value = {"a": 1, "b": 2} res = pool.execute_with_retries(query, parameters={"$a": expected_value}) actual_value = res[0].rows[0]["value"] @@ -57,6 +64,7 @@ def test_select_implicit_dict(pool: ydb.QuerySessionPool): def test_select_implicit_list_nested(pool: ydb.QuerySessionPool): + query = query_template % "List>" expected_value = [{"a": 1}, {"b": 2}] res = pool.execute_with_retries(query, parameters={"$a": expected_value}) actual_value = res[0].rows[0]["value"] @@ -64,6 +72,7 @@ def test_select_implicit_list_nested(pool: ydb.QuerySessionPool): def test_select_implicit_dict_nested(pool: ydb.QuerySessionPool): + query = query_template % "Dict>" expected_value = {"a": [1, 2, 3], "b": [4, 5]} res = pool.execute_with_retries(query, parameters={"$a": expected_value}) actual_value = res[0].rows[0]["value"] @@ -71,6 +80,8 @@ def test_select_implicit_dict_nested(pool: ydb.QuerySessionPool): def test_select_implicit_custom_type_raises(pool: ydb.QuerySessionPool): + query = query_template % "Struct" + class CustomClass: pass @@ -80,18 +91,21 @@ class CustomClass: def test_select_implicit_empty_list_raises(pool: ydb.QuerySessionPool): + query = query_template % "List" expected_value = [] with pytest.raises(ValueError): pool.execute_with_retries(query, parameters={"$a": expected_value}) def test_select_implicit_empty_dict_raises(pool: ydb.QuerySessionPool): + query = query_template % "Dict" expected_value = {} with pytest.raises(ValueError): pool.execute_with_retries(query, parameters={"$a": expected_value}) def test_select_explicit_primitive(pool: ydb.QuerySessionPool): + query = query_template % "Int64" expected_value = 111 res = pool.execute_with_retries(query, parameters={"$a": (expected_value, ydb.PrimitiveType.Int64)}) actual_value = res[0].rows[0]["value"] @@ -99,6 +113,7 @@ def test_select_explicit_primitive(pool: ydb.QuerySessionPool): def test_select_explicit_list(pool: ydb.QuerySessionPool): + query = query_template % "List" expected_value = [1, 2, 3] type_ = ydb.ListType(ydb.PrimitiveType.Int64) res = pool.execute_with_retries(query, parameters={"$a": (expected_value, type_)}) @@ -107,6 +122,7 @@ def test_select_explicit_list(pool: ydb.QuerySessionPool): def test_select_explicit_dict(pool: ydb.QuerySessionPool): + query = query_template % "Dict" expected_value = {"key": "value"} type_ = ydb.DictType(ydb.PrimitiveType.Utf8, ydb.PrimitiveType.Utf8) res = pool.execute_with_retries(query, parameters={"$a": (expected_value, type_)}) @@ -115,6 +131,7 @@ def test_select_explicit_dict(pool: ydb.QuerySessionPool): def test_select_explicit_empty_list_not_raises(pool: ydb.QuerySessionPool): + query = query_template % "List" expected_value = [] type_ = ydb.ListType(ydb.PrimitiveType.Int64) res = pool.execute_with_retries(query, parameters={"$a": (expected_value, type_)}) @@ -123,6 +140,7 @@ def test_select_explicit_empty_list_not_raises(pool: ydb.QuerySessionPool): def test_select_explicit_empty_dict_not_raises(pool: ydb.QuerySessionPool): + query = query_template % "Dict" expected_value = {} type_ = ydb.DictType(ydb.PrimitiveType.Utf8, ydb.PrimitiveType.Utf8) res = pool.execute_with_retries(query, parameters={"$a": (expected_value, type_)}) @@ -131,6 +149,7 @@ def test_select_explicit_empty_dict_not_raises(pool: ydb.QuerySessionPool): def test_select_typedvalue_full_primitive(pool: ydb.QuerySessionPool): + query = query_template % "Int64" expected_value = 111 typed_value = ydb.TypedValue(expected_value, ydb.PrimitiveType.Int64) res = pool.execute_with_retries(query, parameters={"$a": typed_value}) @@ -139,6 +158,7 @@ def test_select_typedvalue_full_primitive(pool: ydb.QuerySessionPool): def test_select_typedvalue_implicit_primitive(pool: ydb.QuerySessionPool): + query = query_template % "Int64" expected_value = 111 typed_value = ydb.TypedValue(expected_value) res = pool.execute_with_retries(query, parameters={"$a": typed_value}) @@ -147,6 +167,8 @@ def test_select_typedvalue_implicit_primitive(pool: ydb.QuerySessionPool): def test_select_typevalue_custom_type_raises(pool: ydb.QuerySessionPool): + query = query_template % "Struct" + class CustomClass: pass