Skip to content

Library import 250425-1915 #17754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/conf/compilers/gnu_compiler.conf
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ when ($ARCH_XTENSA == "yes") {

when ($OS_EMSCRIPTEN == "yes") {
FSTACK=-fno-stack-protector
CFLAGS+=-D__EMSCRIPTEN__
CFLAGS+=-D__EMSCRIPTEN__=1
CFLAGS+=-DSTANDALONE_WASM=1
}

Expand Down
4 changes: 2 additions & 2 deletions build/ymake.core.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5922,8 +5922,8 @@ macro LINK_EXCLUDE_LIBRARIES(Libs...) {
###
### GENERATE_IMPLIB(cuda $CUDA_TARGET_ROOT/lib64/stubs/libcuda.so SONAME libcuda.so.1)
###
macro GENERATE_IMPLIB(Lib, Path, SONAME="") {
.CMD=${tool:"contrib/tools/implib"} --target $HARDWARE_TYPE --outdir $BINDIR ${pre=--library-load-name :SONAME} $Path ${hide;output;suf=.init.c;nopath:Path} ${hide;output;suf=.tramp.S;nopath:Path}
macro GENERATE_IMPLIB(Lib, Path, SONAME="", DLOPEN_CALLBACK="_") {
.CMD=${tool:"contrib/tools/implib"} --target $HARDWARE_TYPE --dlopen-callback $DLOPEN_CALLBACK --outdir $BINDIR ${pre=--library-load-name :SONAME} $Path ${hide;output;suf=.init.c;nopath:Path} ${hide;output;suf=.tramp.S;nopath:Path}

LINK_EXCLUDE_LIBRARIES($Lib)
}
26 changes: 26 additions & 0 deletions contrib/libs/yajl/patch/fix_memory_leak.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
--- contrib/libs/yajl/yajl_tree.c (3b4a018b6b389390da3ee13f6b4ce0880cb71807)
+++ contrib/libs/yajl/yajl_tree.c (5fe0776c5f96630ddffb63e4ff0292037e12039e)
@@ -143,7 +143,7 @@ static yajl_val context_pop(context_t *ctx)
ctx->stack = stack->next;

v = stack->value;
-
+ free (stack->key);
free (stack);

return (v);
@@ -453,7 +453,14 @@ yajl_val yajl_tree_parse (const char *input,
(const unsigned char *) input,
strlen(input)));
}
+ while(ctx.stack != NULL) {
+ yajl_val v = context_pop(&ctx);
+ yajl_tree_free(v);
+ }
yajl_free (handle);
+ //If the requested memory is not released in time, it will cause memory leakage
+ if(ctx.root)
+ yajl_tree_free(ctx.root);
return NULL;
}

9 changes: 8 additions & 1 deletion contrib/libs/yajl/yajl_tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ static yajl_val context_pop(context_t *ctx)
ctx->stack = stack->next;

v = stack->value;

free (stack->key);
free (stack);

return (v);
Expand Down Expand Up @@ -453,7 +453,14 @@ yajl_val yajl_tree_parse (const char *input,
(const unsigned char *) input,
strlen(input)));
}
while(ctx.stack != NULL) {
yajl_val v = context_pop(&ctx);
yajl_tree_free(v);
}
yajl_free (handle);
//If the requested memory is not released in time, it will cause memory leakage
if(ctx.root)
yajl_tree_free(ctx.root);
return NULL;
}

Expand Down
2 changes: 1 addition & 1 deletion contrib/python/ydb/py3/.dist-info/METADATA
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: ydb
Version: 3.20.1
Version: 3.21.0
Summary: YDB Python SDK
Home-page: http://github.com/ydb-platform/ydb-python-sdk
Author: Yandex LLC
Expand Down
3 changes: 2 additions & 1 deletion contrib/python/ydb/py3/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

PY3_LIBRARY()

VERSION(3.20.1)
VERSION(3.21.0)

LICENSE(Apache-2.0)

Expand Down Expand Up @@ -40,6 +40,7 @@ PY_SRCS(
ydb/_topic_common/common.py
ydb/_topic_reader/__init__.py
ydb/_topic_reader/datatypes.py
ydb/_topic_reader/events.py
ydb/_topic_reader/topic_reader.py
ydb/_topic_reader/topic_reader_asyncio.py
ydb/_topic_reader/topic_reader_sync.py
Expand Down
1 change: 1 addition & 0 deletions contrib/python/ydb/py3/ydb/_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class TopicService(object):
StreamRead = "StreamRead"
StreamWrite = "StreamWrite"
UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction"
CommitOffset = "CommitOffset"


class QueryService(object):
Expand Down
21 changes: 19 additions & 2 deletions contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ def from_proto(msg: ydb_topic_pb2.UpdateTokenResponse) -> typing.Any:
return UpdateTokenResponse()


@dataclass
class CommitOffsetRequest(IToProto):
path: str
consumer: str
partition_id: int
offset: int

def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
return ydb_topic_pb2.CommitOffsetRequest(
path=self.path,
consumer=self.consumer,
partition_id=self.partition_id,
offset=self.offset,
)


########################################################################################################################
# StreamWrite
########################################################################################################################
Expand Down Expand Up @@ -438,12 +454,13 @@ def from_proto(
@dataclass
class InitRequest(IToProto):
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
consumer: str
consumer: Optional[str]
auto_partitioning_support: bool

def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
res.consumer = self.consumer
if self.consumer is not None:
res.consumer = self.consumer
for settings in self.topics_read_settings:
res.topics_read_settings.append(settings.to_proto())
res.auto_partitioning_support = self.auto_partitioning_support
Expand Down
4 changes: 4 additions & 0 deletions contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def _commit_get_offsets_range(self) -> OffsetsRange:
def alive(self) -> bool:
return not self._partition_session.closed

@property
def partition_id(self) -> int:
return self._partition_session.partition_id


@dataclass
class PartitionSession:
Expand Down
81 changes: 81 additions & 0 deletions contrib/python/ydb/py3/ydb/_topic_reader/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Union

from ..issues import ClientInternalError

__all__ = [
"OnCommit",
"OnPartitionGetStartOffsetRequest",
"OnPartitionGetStartOffsetResponse",
"OnInitPartition",
"OnShutdownPartition",
"EventHandler",
]


class BaseReaderEvent:
pass


@dataclass
class OnCommit(BaseReaderEvent):
topic: str
offset: int


@dataclass
class OnPartitionGetStartOffsetRequest(BaseReaderEvent):
topic: str
partition_id: int


@dataclass
class OnPartitionGetStartOffsetResponse:
start_offset: int


class OnInitPartition(BaseReaderEvent):
pass


class OnShutdownPartition:
pass


TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None]


class EventHandler:
def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]:
pass

def on_partition_get_start_offset(
self,
event: OnPartitionGetStartOffsetRequest,
) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]:
pass

def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]:
pass

def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]:
pass

async def _dispatch(self, event: BaseReaderEvent) -> Awaitable[TopicEventDispatchType]:
f = None
if isinstance(event, OnCommit):
f = self.on_commit
elif isinstance(event, OnPartitionGetStartOffsetRequest):
f = self.on_partition_get_start_offset
elif isinstance(event, OnInitPartition):
f = self.on_init_partition
elif isinstance(event, OnShutdownPartition):
f = self.on_shutdown_partition
else:
raise ClientInternalError("Unsupported topic reader event")

if asyncio.iscoroutinefunction(f):
return await f(event)

return f(event)
26 changes: 5 additions & 21 deletions contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Callable,
)

from .events import EventHandler
from ..retries import RetrySettings
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange

Expand All @@ -20,6 +21,7 @@ class PublicTopicSelector:
partitions: Optional[Union[int, List[int]]] = None
read_from: Optional[datetime.datetime] = None
max_lag: Optional[datetime.timedelta] = None
read_offset: Optional[int] = None

def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings:
partitions = self.partitions
Expand All @@ -42,7 +44,7 @@ def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSett

@dataclass
class PublicReaderSettings:
consumer: str
consumer: Optional[str]
topic: TopicSelectorTypes
buffer_size_bytes: int = 50 * 1024 * 1024
auto_partitioning_support: bool = True
Expand All @@ -53,13 +55,14 @@ class PublicReaderSettings:
# decoder_executor, must be set for handle non raw messages
decoder_executor: Optional[concurrent.futures.Executor] = None
update_token_interval: Union[int, float] = 3600
event_handler: Optional[EventHandler] = None

def __post_init__(self):
# check possible create init message
_ = self._init_message()

def _init_message(self) -> StreamReadMessage.InitRequest:
if not isinstance(self.consumer, str):
if self.consumer is not None and not isinstance(self.consumer, str):
raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer))

if isinstance(self.topic, list):
Expand All @@ -85,25 +88,6 @@ def _retry_settings(self) -> RetrySettings:
return RetrySettings(idempotent=True)


class Events:
class OnCommit:
topic: str
offset: int

class OnPartitionGetStartOffsetRequest:
topic: str
partition_id: int

class OnPartitionGetStartOffsetResponse:
start_offset: int

class OnInitPartition:
pass

class OnShutdownPatition:
pass


class RetryPolicy:
connection_timeout_sec: float
overload_timeout_sec: float
Expand Down
Loading
Loading