From db6e819a6153b7b66aa1c6751213cbd6d256f806 Mon Sep 17 00:00:00 2001 From: qyryq Date: Fri, 23 Jan 2026 14:09:48 +0000 Subject: [PATCH 01/17] Don't buffer PartitionSessionStatusResponse (#31406) --- .github/last_commit.txt | 2 +- src/client/topic/impl/read_session_impl.ipp | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 3522bfc6d0..99aa78dc8b 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -303019a794dd98ca44a77440af08bbeecf56d727 +1dc03cfd8cf9fa4d2013372c06c33502ed6d2985 diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 9913397563..0ad0023694 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -1698,6 +1698,13 @@ inline void TSingleClusterReadSessionImpl::OnReadDoneImpl( if (partitionStreamIt == PartitionStreams.end()) { return; } + + // We should never get an old status: + Y_ABORT_UNLESS( + partitionStreamIt->second->GetFirstNotReadOffset() <= static_cast(msg.read_offset()) && + partitionStreamIt->second->GetFirstNotReadOffset() <= static_cast(msg.partition_offsets().end()) + ); + bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, TReadSessionEvent::TPartitionSessionStatusEvent( partitionStreamIt->second, msg.committed_offset(), From 459996059b63021b24b77c04368fd1aea280a41a Mon Sep 17 00:00:00 2001 From: qyryq Date: Fri, 23 Jan 2026 14:09:54 +0000 Subject: [PATCH 02/17] Topic SDK: Add direct read option (#29540) --- .github/last_commit.txt | 2 +- include/ydb-cpp-sdk/client/topic/read_session.h | 4 ++-- src/client/topic/impl/read_session_impl.ipp | 9 +++++---- tests/integration/topic/direct_read_it.cpp | 10 +++++----- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 99aa78dc8b..1ef366aa08 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -1dc03cfd8cf9fa4d2013372c06c33502ed6d2985 +9f0dfacf557dd9ff1de5ba799864177aab11cc5a diff --git a/include/ydb-cpp-sdk/client/topic/read_session.h b/include/ydb-cpp-sdk/client/topic/read_session.h index b7d4e69841..1a7d4be36c 100644 --- a/include/ydb-cpp-sdk/client/topic/read_session.h +++ b/include/ydb-cpp-sdk/client/topic/read_session.h @@ -193,8 +193,8 @@ struct TReadSessionSettings: public TRequestSettings { //! AutoPartitioningSupport. FLUENT_SETTING_DEFAULT(bool, AutoPartitioningSupport, false); - // TODO(qyryq) Uncomment when direct read is ready. - // FLUENT_SETTING_DEFAULT(bool, DirectRead, false); + //! Direct read from partition nodes. Experimental setting — not recommended for use. + FLUENT_SETTING_DEFAULT(bool, DirectRead, false); //! Log. FLUENT_SETTING_OPTIONAL(TLog, Log); diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 0ad0023694..6477f3adfb 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -516,9 +516,7 @@ inline void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions template<> inline bool TSingleClusterReadSessionImpl::IsDirectRead() { - // TODO(qyryq) Replace this return with the next one when direct read is ready for production. - return ExperimentalDirectRead; - // return Settings.DirectRead_; + return ExperimentalDirectRead && Settings.DirectRead_; } template<> @@ -531,7 +529,10 @@ inline void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions reader; @@ -1396,7 +1396,7 @@ TEST_F(DirectReadWithControlSession, EmptyDirectReadResponse) { // Sometimes the server might send a DirectReadResponse with no data, but with bytes_size value > 0. // It can happen, if the server tried to send DirectReadResponse, but did not succeed, // and in the meantime the messages that should had been sent have been rotated by retention period, - // and do not exist anymore. To keep ReadSizeBudget bookkeeping correct, the server still sends the an DirectReadResponse, + // and do not exist anymore. To keep ReadSizeBudget bookkeeping correct, the server still sends DirectReadResponse, // and SDK should process it correctly: basically it should immediately send a ReadRequest(bytes_size=DirectReadResponse.bytes_size). auto const startPartitionSessionRequest = TStartPartitionSessionRequest{ @@ -2048,7 +2048,7 @@ TEST_F(DirectReadWithServer, Devslice) { auto settings = TReadSessionSettings() .AppendTopics(TTopicReadSettings("t1").AppendPartitionIds({0})) .ConsumerName("c1") - // .DirectRead(true) + .DirectRead(true) ; settings.EventHandlers_ From cc23c48f2145743f867b1dd83142e2a9a34b1c0d Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 23 Jan 2026 14:10:01 +0000 Subject: [PATCH 03/17] Fixed sanitizer error (#31901) --- .github/last_commit.txt | 2 +- src/client/topic/impl/read_session_impl.ipp | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 1ef366aa08..ef5ecda2b7 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -9f0dfacf557dd9ff1de5ba799864177aab11cc5a +90ada5ef7c22ed64c5ff4a306a4fd3779c17bf4b diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 6477f3adfb..37e299dc8b 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -2111,8 +2111,6 @@ void TSingleClusterReadSessionImpl::UnregisterPartition(ui template std::vector TSingleClusterReadSessionImpl::GetParentPartitionSessions(ui32 partitionId, ui64 partitionSessionId) { - std::lock_guard guard(HierarchyDataLock); - auto it = HierarchyData.find(partitionId); if (it == HierarchyData.end()) { return {}; @@ -2139,6 +2137,7 @@ std::vector TSingleClusterReadSessionImpl::GetParent template bool TSingleClusterReadSessionImpl::AllParentSessionsHasBeenRead(ui32 partitionId, ui64 partitionSessionId) { + std::lock_guard guard(HierarchyDataLock); for (auto id : GetParentPartitionSessions(partitionId, partitionSessionId)) { if (!ReadingFinishedData.contains(id)) { return false; @@ -2150,7 +2149,10 @@ bool TSingleClusterReadSessionImpl::AllParentSessionsHasBe template void TSingleClusterReadSessionImpl::ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, const std::vector& childIds) { - ReadingFinishedData.insert(partitionStream->GetPartitionSessionId()); + { + std::lock_guard guard(HierarchyDataLock); + ReadingFinishedData.insert(partitionStream->GetPartitionSessionId()); + } for (auto& [_, s] : PartitionStreams) { for (auto partitionId : childIds) { if (s->GetPartitionId() == partitionId) { From ffd3e0a2ac7581af7e395a4b081f2833a13a181f Mon Sep 17 00:00:00 2001 From: qyryq Date: Fri, 23 Jan 2026 14:10:07 +0000 Subject: [PATCH 04/17] Delete optimistic VERIFY (#31955) --- .github/last_commit.txt | 2 +- src/client/topic/impl/read_session_impl.ipp | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index ef5ecda2b7..e68d33ed6d 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -90ada5ef7c22ed64c5ff4a306a4fd3779c17bf4b +e8b29deeaea8e9f8790ba672a6d7b8a8b309d571 diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 37e299dc8b..afacec5ee2 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -1700,12 +1700,6 @@ inline void TSingleClusterReadSessionImpl::OnReadDoneImpl( return; } - // We should never get an old status: - Y_ABORT_UNLESS( - partitionStreamIt->second->GetFirstNotReadOffset() <= static_cast(msg.read_offset()) && - partitionStreamIt->second->GetFirstNotReadOffset() <= static_cast(msg.partition_offsets().end()) - ); - bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, TReadSessionEvent::TPartitionSessionStatusEvent( partitionStreamIt->second, msg.committed_offset(), From 4d8f5fa6f6f1b184c3c4c713ba688ef6836865ae Mon Sep 17 00:00:00 2001 From: Stanislav Yablonskiy Date: Fri, 23 Jan 2026 14:10:14 +0000 Subject: [PATCH 05/17] Column Shard column COMPRESSION (#29523) --- .github/last_commit.txt | 2 +- src/api/protos/ydb_table.proto | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index e68d33ed6d..657af6912d 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -e8b29deeaea8e9f8790ba672a6d7b8a8b309d571 +dc19370a47f99419af5ff09d9ea6e630fac19635 diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index de83c9c57f..adff527863 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -637,6 +637,22 @@ message SequenceDescription { optional Type data_type = 9; // data type of the sequence } +// TODO: merge it with Column Family Compression +message ColumnCompression { + enum Algorithm { + ALGORITHM_UNSPECIFIED = 0; + ALGORITHM_OFF = 1; + ALGORITHM_LZ4 = 2; + ALGORITHM_ZSTD = 3; + } + + optional Algorithm algorithm = 1; + + // Set the compression level for selected compression type. If no value is specified, default value will be chosen. + // For ZSTD compression level must be in range [-131072:22] + optional int32 compression_level = 2; +} + message ColumnMeta { // Name of column string name = 1; @@ -652,6 +668,7 @@ message ColumnMeta { SequenceDescription from_sequence = 6; google.protobuf.NullValue empty_default = 7; } + optional ColumnCompression compression = 8; } message EvictionToExternalStorageSettings { From 19c137f4e64d98332c3f83d11375b669e17164e1 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 23 Jan 2026 14:10:20 +0000 Subject: [PATCH 06/17] Add proper handling for backup collection as directory-like (#31767) --- .github/last_commit.txt | 2 +- include/ydb-cpp-sdk/client/scheme/scheme.h | 1 + src/api/protos/ydb_scheme.proto | 1 + src/client/scheme/scheme.cpp | 2 ++ 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 657af6912d..4f4d0ba542 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -dc19370a47f99419af5ff09d9ea6e630fac19635 +9d754573e89b623f6686ccc6465071e902dbf9bb diff --git a/include/ydb-cpp-sdk/client/scheme/scheme.h b/include/ydb-cpp-sdk/client/scheme/scheme.h index ec0291f1e4..df6d0b0133 100644 --- a/include/ydb-cpp-sdk/client/scheme/scheme.h +++ b/include/ydb-cpp-sdk/client/scheme/scheme.h @@ -53,6 +53,7 @@ enum class ESchemeEntryType : i32 { SysView = 22, Transfer = 23, StreamingQuery = 24, + BackupCollection = 25, }; struct TVirtualTimestamp { diff --git a/src/api/protos/ydb_scheme.proto b/src/api/protos/ydb_scheme.proto index d4c9831365..c3a93c3356 100644 --- a/src/api/protos/ydb_scheme.proto +++ b/src/api/protos/ydb_scheme.proto @@ -66,6 +66,7 @@ message Entry { EXTERNAL_DATA_SOURCE = 19; VIEW = 20; RESOURCE_POOL = 21; + BACKUP_COLLECTION = 22; TRANSFER = 23; SYS_VIEW = 24; STREAMING_QUERY = 26; diff --git a/src/client/scheme/scheme.cpp b/src/client/scheme/scheme.cpp index 25794e5bf6..3377486f96 100644 --- a/src/client/scheme/scheme.cpp +++ b/src/client/scheme/scheme.cpp @@ -109,6 +109,8 @@ static ESchemeEntryType ConvertProtoEntryType(::Ydb::Scheme::Entry::Type entry) return ESchemeEntryType::View; case ::Ydb::Scheme::Entry::RESOURCE_POOL: return ESchemeEntryType::ResourcePool; + case ::Ydb::Scheme::Entry::BACKUP_COLLECTION: + return ESchemeEntryType::BackupCollection; case ::Ydb::Scheme::Entry::SYS_VIEW: return ESchemeEntryType::SysView; case ::Ydb::Scheme::Entry::TRANSFER: From 591885b8143d0b6773cf75e479bba2c9da1cda0d Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 23 Jan 2026 14:10:26 +0000 Subject: [PATCH 07/17] Removed using EnablePQConfigTransactionsAtSchemeShard feature flag (#31710) --- .github/last_commit.txt | 2 +- src/client/topic/ut/local_partition_ut.cpp | 1 - src/client/topic/ut/topic_to_table_ut.cpp | 4 ++-- src/client/topic/ut/ut_utils/txusage_fixture.cpp | 4 ++-- src/client/topic/ut/ut_utils/txusage_fixture.h | 1 - 5 files changed, 5 insertions(+), 7 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 4f4d0ba542..e42048e7f2 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -9d754573e89b623f6686ccc6465071e902dbf9bb +f2ad3aa38cd6b231a7c38450869e7791f94913ce diff --git a/src/client/topic/ut/local_partition_ut.cpp b/src/client/topic/ut/local_partition_ut.cpp index 19809f111c..b0f94be2c8 100644 --- a/src/client/topic/ut/local_partition_ut.cpp +++ b/src/client/topic/ut/local_partition_ut.cpp @@ -35,7 +35,6 @@ namespace NYdb::inline V3::NTopic::NTests { TTopicSdkTestSetup CreateSetupForSplitMerge(const std::string& testCaseName) { NKikimrConfig::TFeatureFlags ff; ff.SetEnableTopicSplitMerge(true); - ff.SetEnablePQConfigTransactionsAtSchemeShard(true); auto settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetFeatureFlags(ff); auto setup = TTopicSdkTestSetup(testCaseName, settings, false); diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 448a56bbfc..e1974e1074 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -182,7 +182,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_43_Query, TFixtureQuery) Y_UNIT_TEST_F(ReadRuleGeneration, TFixtureNoClient) { // There was a server - NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false}); + NotifySchemeShard({}); // Users have created their own topic on it CreateTopic(TEST_TOPIC); @@ -200,7 +200,7 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixtureNoClient) CloseTopicReadSession(TEST_TOPIC, "consumer-1"); // And then the Logbroker team turned on the feature flag - NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true}); + NotifySchemeShard({}); // Users continued to write to the topic WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4"); diff --git a/src/client/topic/ut/ut_utils/txusage_fixture.cpp b/src/client/topic/ut/ut_utils/txusage_fixture.cpp index 9ff1610dae..3b33e5f506 100644 --- a/src/client/topic/ut/ut_utils/txusage_fixture.cpp +++ b/src/client/topic/ut/ut_utils/txusage_fixture.cpp @@ -38,7 +38,6 @@ void TFixture::SetUp(NUnitTest::TTestContext&) NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetEnableTopicServiceTx(true); settings.SetEnableTopicSplitMerge(true); - settings.SetEnablePQConfigTransactionsAtSchemeShard(true); settings.SetEnableOltpSink(GetEnableOltpSink()); settings.SetEnableOlapSink(GetEnableOlapSink()); settings.SetEnableHtapTx(GetEnableHtapTx()); @@ -61,9 +60,10 @@ void TFixture::SetUp(NUnitTest::TTestContext&) void TFixture::NotifySchemeShard(const TFeatureFlags& flags) { + Y_UNUSED(flags); + auto request = std::make_unique(); *request->Record.MutableConfig() = *Setup->GetServer().ServerSettings.AppConfig; - request->Record.MutableConfig()->MutableFeatureFlags()->SetEnablePQConfigTransactionsAtSchemeShard(flags.EnablePQConfigTransactionsAtSchemeShard); auto& runtime = Setup->GetRuntime(); auto actorId = runtime.AllocateEdgeActor(); diff --git a/src/client/topic/ut/ut_utils/txusage_fixture.h b/src/client/topic/ut/ut_utils/txusage_fixture.h index 28354a4db5..5ee506491b 100644 --- a/src/client/topic/ut/ut_utils/txusage_fixture.h +++ b/src/client/topic/ut/ut_utils/txusage_fixture.h @@ -37,7 +37,6 @@ class TFixture : public NUnitTest::TBaseFixture { }; struct TFeatureFlags { - bool EnablePQConfigTransactionsAtSchemeShard = true; }; class ISession { From 0eec78cf128bf3228d97697adddafb1263243081 Mon Sep 17 00:00:00 2001 From: Maksim Date: Fri, 23 Jan 2026 14:10:33 +0000 Subject: [PATCH 08/17] [NBS-6758]: nbs2 grpc api to start nbs partition (#32327) --- .github/last_commit.txt | 2 +- src/api/grpc/draft/ydb_nbs_v1.proto | 21 +++++++++ src/api/protos/draft/ydb_nbs.proto | 68 +++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 src/api/grpc/draft/ydb_nbs_v1.proto create mode 100644 src/api/protos/draft/ydb_nbs.proto diff --git a/.github/last_commit.txt b/.github/last_commit.txt index e42048e7f2..9c0afe77fa 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -f2ad3aa38cd6b231a7c38450869e7791f94913ce +115b46e5ea833d10d06db6316596755ff9e8f411 diff --git a/src/api/grpc/draft/ydb_nbs_v1.proto b/src/api/grpc/draft/ydb_nbs_v1.proto new file mode 100644 index 0000000000..b2f5351908 --- /dev/null +++ b/src/api/grpc/draft/ydb_nbs_v1.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package Ydb.Nbs.V1; + +option java_package = "com.yandex.ydb.nbs.v1"; +option java_outer_classname = "NbsGrpc"; +option java_multiple_files = true; + +import "src/api/protos/draft/ydb_nbs.proto"; + +// Service for managing nbs 2.0 +service NbsService { + // Create NBS partition + rpc CreatePartition(Ydb.Nbs.CreatePartitionRequest) returns (Ydb.Nbs.CreatePartitionResponse); + + // Delete NBS partition + rpc DeletePartition(Ydb.Nbs.DeletePartitionRequest) returns (Ydb.Nbs.DeletePartitionResponse); + + // List NBS partitions + rpc ListPartitions(Ydb.Nbs.ListPartitionsRequest) returns (Ydb.Nbs.ListPartitionsResponse); +} diff --git a/src/api/protos/draft/ydb_nbs.proto b/src/api/protos/draft/ydb_nbs.proto new file mode 100644 index 0000000000..dd5afb5a0c --- /dev/null +++ b/src/api/protos/draft/ydb_nbs.proto @@ -0,0 +1,68 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package Ydb.Nbs; + +option java_package = "com.yandex.ydb.nbs.proto"; +option java_outer_classname = "NbsProtos"; +option java_multiple_files = true; + +import "src/api/protos/ydb_operation.proto"; + + +//////////////////////////////////////////////////////////////////////////////// +// Partition create request/response. + +message CreatePartitionRequest { + Ydb.Operations.OperationParams operation_params = 1; + + // Storage pool name to use. + string StoragePoolName = 2; + + // Minimum addressable block size (smallest unit of I/O operations). + uint32 BlockSize = 3; + + // Maximum number of blocks stored in partition. + uint64 BlocksCount = 4; +} + +message CreatePartitionResponse { + Ydb.Operations.Operation operation = 1; +} + +message CreatePartitionResult { + string TabletId = 1; +} + +//////////////////////////////////////////////////////////////////////////////// +// Partition delete request/response. + +message DeletePartitionRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Partition tablet id to delete. + string TabletId = 2; +} + +message DeletePartitionResponse { + Ydb.Operations.Operation operation = 1; +} + +message DeletePartitionResult { + // Deleted partition tablet id. + string TabletId = 1; +} + +//////////////////////////////////////////////////////////////////////////////// +// Partition list request/response. + +message ListPartitionsRequest { + Ydb.Operations.OperationParams operation_params = 1; +} + +message ListPartitionsResponse { + Ydb.Operations.Operation operation = 1; +} + +message ListPartitionsResult { + repeated string TabletId = 1; +} From f5639d38c19de84960e00ae97d7def92173616bd Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 23 Jan 2026 14:10:39 +0000 Subject: [PATCH 09/17] fixed crash in topic SDK (#32375) --- .github/last_commit.txt | 2 +- src/client/topic/impl/read_session_impl.ipp | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 9c0afe77fa..53a37965c1 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -115b46e5ea833d10d06db6316596755ff9e8f411 +a68dddd1d7a3fc0eb7670b9bf6d25d548151e280 diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index afacec5ee2..ef7482c4e7 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -244,10 +244,6 @@ template TSingleClusterReadSessionImpl::~TSingleClusterReadSessionImpl() { std::lock_guard guard(Lock); - for (auto&& [_, partitionStream] : PartitionStreams) { - partitionStream->ClearQueue(); - } - for (auto& e : DecompressionQueue) { e.OnDestroyReadSession(); } From 68545d583a2803d0ba0d190552f349ace3745860 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Fri, 23 Jan 2026 14:10:45 +0000 Subject: [PATCH 10/17] YQ-4474 add public interface to partition session control in topic sdk (#31808) --- .github/last_commit.txt | 2 +- include/ydb-cpp-sdk/client/driver/driver.h | 3 +++ .../ydb-cpp-sdk/client/topic/read_events.h | 23 +++++++++++++++---- .../ydb-cpp-sdk/client/topic/write_session.h | 6 ++--- src/client/driver/driver.cpp | 4 ++++ .../impl/federated_deferred_commit.cpp | 4 ++-- .../impl/federated_read_session_event.cpp | 4 ++-- src/client/topic/impl/deferred_commit.cpp | 4 ++-- src/client/topic/impl/read_session_event.cpp | 14 +++++------ src/client/topic/impl/read_session_impl.h | 21 +++++++++++------ src/client/topic/impl/read_session_impl.ipp | 16 +++++++++---- src/library/grpc/client/grpc_client_low.h | 4 ++-- 12 files changed, 70 insertions(+), 35 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 53a37965c1..03e2bfcda9 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -a68dddd1d7a3fc0eb7670b9bf6d25d548151e280 +33da4eb6e53776e8778f362016b0d2b61ada8fe1 diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index b1a9d0ada9..72aa008ccc 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -32,6 +32,9 @@ class TDriverConfig { //! client will connect to others nodes according to client loadbalancing TDriverConfig& SetEndpoint(const std::string& endpoint); + //! Get endpoint, returns the endpoint set via connection string or SetEndpoint() + const std::string& GetEndpoint() const; + //! Set number of network threads, default: 2 TDriverConfig& SetNetworkThreadsNum(size_t sz); diff --git a/include/ydb-cpp-sdk/client/topic/read_events.h b/include/ydb-cpp-sdk/client/topic/read_events.h index 7dc550e49c..6e1f841ce1 100644 --- a/include/ydb-cpp-sdk/client/topic/read_events.h +++ b/include/ydb-cpp-sdk/client/topic/read_events.h @@ -57,6 +57,21 @@ struct TPartitionSession: public TThrRefBase, public TPrintable void TPrintable::DebugString(TStringBuilder& res, bool) const; +struct TPartitionSessionControl: public TPartitionSession { + //! Commit offsets range. + //! Can be used from TDataReceivedEvent or TDeferredCommit. + virtual void Commit(uint64_t startOffset, uint64_t endOffset) = 0; + + //! Confirm partition session creation from TStartPartitionSessionEvent. + virtual void ConfirmCreate(std::optional readOffset, std::optional commitOffset) = 0; + + //! Confirm partition session destruction from TStopPartitionSessionEvent. + virtual void ConfirmDestroy() = 0; + + //! Confirm partition session end from TEndPartitionSessionEvent. + virtual void ConfirmEnd(std::span childIds) = 0; +}; + //! Events for read session. struct TReadSessionEvent { class TPartitionSessionAccessor { @@ -73,7 +88,7 @@ struct TReadSessionEvent { //! Event with new data. //! Contains batch of messages from single partition session. - struct TDataReceivedEvent : public TPartitionSessionAccessor, public TPrintable { + struct TDataReceivedEvent: public TPartitionSessionAccessor, public TPrintable { struct TMessageInformation { TMessageInformation(uint64_t offset, std::string producerId, @@ -95,7 +110,7 @@ struct TReadSessionEvent { std::string MessageGroupId; }; - class TMessageBase : public TPrintable { + class TMessageBase: public TPrintable { public: TMessageBase(const std::string& data, TMessageInformation info); @@ -249,7 +264,7 @@ struct TReadSessionEvent { //! This means that from now the first available //! message offset in current partition //! for current consumer is this offset. - //! All messages before are committed and futher never be available. + //! All messages before are committed and further never be available. uint64_t GetCommittedOffset() const { return CommittedOffset; } @@ -452,4 +467,4 @@ void TPrintable::DebugString(TStringBuilder& ret, bool prin std::string DebugString(const TReadSessionEvent::TEvent& event); -} +} // namespace NYdb::NTopic diff --git a/include/ydb-cpp-sdk/client/topic/write_session.h b/include/ydb-cpp-sdk/client/topic/write_session.h index 68525b87fe..e0c4d4618e 100644 --- a/include/ydb-cpp-sdk/client/topic/write_session.h +++ b/include/ydb-cpp-sdk/client/topic/write_session.h @@ -219,7 +219,7 @@ class ISimpleBlockingWriteSession : public TThrRefBase { virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0; - //! Returns true if write session is alive and acitve. False if session was closed. + //! Returns true if write session is alive and active. False if session was closed. virtual bool IsAlive() const = 0; virtual TWriterCounters::TPtr GetCounters() = 0; @@ -269,11 +269,11 @@ class IWriteSession { //! Return true if all writes were completed and acked, false if timeout was reached and some writes were aborted. virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0; - //! Writer counters with different stats (see TWriterConuters). + //! Writer counters with different stats (see TWriterCounters). virtual TWriterCounters::TPtr GetCounters() = 0; //! Close() with timeout = 0 and destroy everything instantly. virtual ~IWriteSession() = default; }; -} +} // namespace NYdb::NTopic diff --git a/src/client/driver/driver.cpp b/src/client/driver/driver.cpp index d8b41783e0..207c67b6d5 100644 --- a/src/client/driver/driver.cpp +++ b/src/client/driver/driver.cpp @@ -97,6 +97,10 @@ TDriverConfig& TDriverConfig::SetEndpoint(const std::string& endpoint) { return *this; } +const std::string& TDriverConfig::GetEndpoint() const { + return Impl_->Endpoint; +} + TDriverConfig& TDriverConfig::SetNetworkThreadsNum(size_t sz) { Impl_->NetworkThreadsNum = sz; return *this; diff --git a/src/client/federated_topic/impl/federated_deferred_commit.cpp b/src/client/federated_topic/impl/federated_deferred_commit.cpp index eb0d711338..dc402c05f5 100644 --- a/src/client/federated_topic/impl/federated_deferred_commit.cpp +++ b/src/client/federated_topic/impl/federated_deferred_commit.cpp @@ -129,10 +129,10 @@ void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent& da void TDeferredCommit::TImpl::Commit() { for (auto&& [partitionStream, offsetRanges] : Offsets) { for (auto&& [startOffset, endOffset] : offsetRanges) { - static_cast*>(partitionStream.Get()->PartitionSession.Get())->Commit(startOffset, endOffset); + static_cast(partitionStream.Get()->PartitionSession.Get())->Commit(startOffset, endOffset); } } Offsets.clear(); } -} +} // namespace NYdb::NFederatedTopic diff --git a/src/client/federated_topic/impl/federated_read_session_event.cpp b/src/client/federated_topic/impl/federated_read_session_event.cpp index 00cb5127b7..658ae75d85 100644 --- a/src/client/federated_topic/impl/federated_read_session_event.cpp +++ b/src/client/federated_topic/impl/federated_read_session_event.cpp @@ -157,7 +157,7 @@ TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEv void TReadSessionEvent::TDataReceivedEvent::Commit() { for (auto [from, to] : OffsetRanges) { - static_cast*>(PartitionSession.Get())->Commit(from, to); + static_cast(PartitionSession.Get())->Commit(from, to); } } @@ -165,4 +165,4 @@ std::string DebugString(const TReadSessionEvent::TEvent& event) { return std::visit([](const auto& ev) { return ev.DebugString(); }, event); } -} // namespace NYdb::Dev::NFederatedTopic +} // namespace NYdb::NFederatedTopic diff --git a/src/client/topic/impl/deferred_commit.cpp b/src/client/topic/impl/deferred_commit.cpp index 53e10bfd15..a59e50961c 100644 --- a/src/client/topic/impl/deferred_commit.cpp +++ b/src/client/topic/impl/deferred_commit.cpp @@ -122,10 +122,10 @@ void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent& da void TDeferredCommit::TImpl::Commit() { for (auto&& [partitionStream, offsetRanges] : Offsets) { for (auto&& [startOffset, endOffset] : offsetRanges) { - static_cast*>(partitionStream.Get())->Commit(startOffset, endOffset); + static_cast(partitionStream.Get())->Commit(startOffset, endOffset); } } Offsets.clear(); } -} +} // namespace NYdb::NTopic diff --git a/src/client/topic/impl/read_session_event.cpp b/src/client/topic/impl/read_session_event.cpp index cf58500d08..7cf6feca11 100644 --- a/src/client/topic/impl/read_session_event.cpp +++ b/src/client/topic/impl/read_session_event.cpp @@ -191,7 +191,7 @@ bool TMessage::HasException() const { } void TMessage::Commit() { - static_cast*>(PartitionSession.Get()) + static_cast(PartitionSession.Get()) ->Commit(Information.Offset, Information.Offset + 1); } @@ -225,7 +225,7 @@ uint64_t TCompressedMessage::GetUncompressedSize() const { } void TCompressedMessage::Commit() { - static_cast*>(PartitionSession.Get()) + static_cast(PartitionSession.Get()) ->Commit(Information.Offset, Information.Offset + 1); } @@ -264,7 +264,7 @@ void TDataReceivedEvent::Commit() { } for (auto [from, to] : OffsetRanges) { - static_cast*>(PartitionSession.Get())->Commit(from, to); + static_cast(PartitionSession.Get())->Commit(from, to); } } @@ -317,7 +317,7 @@ TStartPartitionSessionEvent::TStartPartitionSessionEvent(TPartitionSession::TPtr void TStartPartitionSessionEvent::Confirm(std::optional readOffset, std::optional commitOffset) { if (PartitionSession) { - static_cast*>(PartitionSession.Get()) + static_cast(PartitionSession.Get()) ->ConfirmCreate(readOffset, commitOffset); } } @@ -342,7 +342,7 @@ TStopPartitionSessionEvent::TStopPartitionSessionEvent(TPartitionSession::TPtr p void TStopPartitionSessionEvent::Confirm() { if (PartitionSession) { - static_cast*>(PartitionSession.Get())->ConfirmDestroy(); + static_cast(PartitionSession.Get())->ConfirmDestroy(); } } @@ -366,7 +366,7 @@ TEndPartitionSessionEvent::TEndPartitionSessionEvent(TPartitionSession::TPtr par void TEndPartitionSessionEvent::Confirm() { if (PartitionSession) { - static_cast*>(PartitionSession.Get())->ConfirmEnd(GetChildPartitionIds()); + static_cast(PartitionSession.Get())->ConfirmEnd(GetChildPartitionIds()); } } @@ -454,4 +454,4 @@ std::string DebugString(const TReadSessionEvent::TEvent& event) { return std::visit([](const auto& ev) { return ev.DebugString(); }, event); } -} // namespace NYdb::NPersQueue +} // namespace NYdb::NTopic diff --git a/src/client/topic/impl/read_session_impl.h b/src/client/topic/impl/read_session_impl.h index a05ff69930..ea06593aec 100644 --- a/src/client/topic/impl/read_session_impl.h +++ b/src/client/topic/impl/read_session_impl.h @@ -70,10 +70,17 @@ using TASessionClosedEvent = std::conditional_t; +struct TMigrationPartitionStream: public NYdb::NPersQueue::TPartitionStream { + virtual void Commit(uint64_t startOffset, uint64_t endOffset) = 0; + virtual void ConfirmCreate(std::optional readOffset, std::optional commitOffset) = 0; + virtual void ConfirmDestroy() = 0; + virtual void ConfirmEnd(std::span childIds) = 0; +}; + template using TAPartitionStream = std::conditional_t; + TMigrationPartitionStream, + NYdb::NTopic::TPartitionSessionControl>; template using TAReadSessionEvent = std::conditional_t { TAPartitionStream::LastDirectReadId = id; } - void Commit(ui64 startOffset, ui64 endOffset) /*override*/; + void Commit(uint64_t startOffset, uint64_t endOffset) override; void RequestStatus() override; - void ConfirmCreate(std::optional readOffset, std::optional commitOffset); - void ConfirmDestroy(); - void ConfirmEnd(const std::vector& childIds); + void ConfirmCreate(std::optional readOffset, std::optional commitOffset) override; + void ConfirmDestroy() override; + void ConfirmEnd(std::span childIds) override; void StopReading() /*override*/; void ResumeReading() /*override*/; @@ -1163,7 +1170,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext* partitionStream, std::optional readOffset, std::optional commitOffset); void ConfirmPartitionStreamDestroy(TPartitionStreamImpl* partitionStream); - void ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, const std::vector& childIds); + void ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, std::span childIds); void RequestPartitionStreamStatus(const TPartitionStreamImpl* partitionStream); void Commit(const TPartitionStreamImpl* partitionStream, ui64 startOffset, ui64 endOffset); diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index ef7482c4e7..2727801843 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -48,7 +48,7 @@ TLog TPartitionStreamImpl::GetLog() const { } template -void TPartitionStreamImpl::Commit(ui64 startOffset, ui64 endOffset) { +void TPartitionStreamImpl::Commit(uint64_t startOffset, uint64_t endOffset) { std::vector> toCommit; if (auto sessionShared = CbContext->LockShared()) { Y_ABORT_UNLESS(endOffset > startOffset); @@ -78,7 +78,7 @@ void TPartitionStreamImpl::RequestStatus() { } template -void TPartitionStreamImpl::ConfirmCreate(std::optional readOffset, std::optional commitOffset) { +void TPartitionStreamImpl::ConfirmCreate(std::optional readOffset, std::optional commitOffset) { if (auto sessionShared = CbContext->LockShared()) { if (commitOffset.has_value()) { SetFirstNotReadOffset(commitOffset.value()); @@ -95,7 +95,7 @@ void TPartitionStreamImpl::ConfirmDestroy() { } template -void TPartitionStreamImpl::ConfirmEnd(const std::vector& childIds) { +void TPartitionStreamImpl::ConfirmEnd(std::span childIds) { if (auto sessionShared = CbContext->LockShared()) { sessionShared->ConfirmPartitionStreamEnd(this, childIds); } @@ -2137,8 +2137,8 @@ bool TSingleClusterReadSessionImpl::AllParentSessionsHasBe return true; } -template -void TSingleClusterReadSessionImpl::ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, const std::vector& childIds) { +template <> +inline void TSingleClusterReadSessionImpl::ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, std::span childIds) { { std::lock_guard guard(HierarchyDataLock); ReadingFinishedData.insert(partitionStream->GetPartitionSessionId()); @@ -2153,6 +2153,12 @@ void TSingleClusterReadSessionImpl::ConfirmPartitionStream } } +template <> +inline void TSingleClusterReadSessionImpl::ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, std::span childIds) { + Y_UNUSED(partitionStream, childIds); + Y_ABORT("Not implemented"); +} + template void TSingleClusterReadSessionImpl::CollectOffsets(TTransactionBase& tx, const std::vector& events, diff --git a/src/library/grpc/client/grpc_client_low.h b/src/library/grpc/client/grpc_client_low.h index 37a0999336..ac865d29b0 100644 --- a/src/library/grpc/client/grpc_client_low.h +++ b/src/library/grpc/client/grpc_client_low.h @@ -1274,7 +1274,7 @@ class TServiceConnection { } /* - * Start bidirectional streamming + * Start bidirectional streaming */ template void DoStreamRequest(TStreamConnectedCallback callback, @@ -1412,7 +1412,7 @@ class TGRpcClientLow std::mutex JoinMutex_; }; -} +} // namespace NYdbGrpc template <> class grpc::TimePoint { From b5119d6814d6d31243bd77b682839d41681ee746 Mon Sep 17 00:00:00 2001 From: Bulat Date: Fri, 23 Jan 2026 14:10:52 +0000 Subject: [PATCH 11/17] [C++ SDK] Fixed session_pool flaky tests (#32478) --- .github/last_commit.txt | 2 +- .../impl/session/kqp_session_common.cpp | 21 +++++-------------- src/client/impl/session/kqp_session_common.h | 8 +------ src/client/impl/session/session_pool.cpp | 1 - src/client/query/client.cpp | 1 - src/client/table/impl/table_client.cpp | 3 --- tests/integration/sessions_pool/main.cpp | 3 --- 7 files changed, 7 insertions(+), 32 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 03e2bfcda9..b1e0716a2f 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -33da4eb6e53776e8778f362016b0d2b61ada8fe1 +fe42d31fb0a36bc10265d985e8288bbf172397f3 diff --git a/src/client/impl/session/kqp_session_common.cpp b/src/client/impl/session/kqp_session_common.cpp index fedc32358f..b2c5f38f81 100644 --- a/src/client/impl/session/kqp_session_common.cpp +++ b/src/client/impl/session/kqp_session_common.cpp @@ -6,9 +6,7 @@ namespace NYdb::inline V3 { -using std::string; - -ui64 GetNodeIdFromSession(const std::string& sessionId) { +std::uint64_t GetNodeIdFromSession(const std::string& sessionId) { if (sessionId.empty()) { return 0; } @@ -20,8 +18,7 @@ ui64 GetNodeIdFromSession(const std::string& sessionId) { return 0; } - return FromStringWithDefault(*nodeIds[0], 0); - + return FromStringWithDefault(*nodeIds[0], 0); } catch (...) { return 0; } @@ -50,7 +47,7 @@ const std::string& TKqpSessionCommon::GetId() const { return SessionId_; } -const string& TKqpSessionCommon::GetEndpoint() const { +const std::string& TKqpSessionCommon::GetEndpoint() const { return EndpointKey_.GetEndpoint(); } @@ -111,9 +108,10 @@ void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval, bool updateTimeInPast) { auto now = TInstant::Now(); + std::lock_guard guard(Lock_); if (updateTimeInPast) { - TimeInPast_ = now; + TimeInPast_ = now; } TimeToTouch_.store(now + interval, std::memory_order_relaxed); } @@ -136,15 +134,6 @@ TInstant TKqpSessionCommon::GetTimeInPastFast() const { return TimeInPast_; } -// SetTimeInterval/GetTimeInterval, are not atomic! -void TKqpSessionCommon::SetTimeInterval(TDuration interval) { - TimeInterval_ = interval; -} - -TDuration TKqpSessionCommon::GetTimeInterval() const { - return TimeInterval_; -} - void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) { CloseHandler_.store(handler); } diff --git a/src/client/impl/session/kqp_session_common.h b/src/client/impl/session/kqp_session_common.h index 225d8646df..3cdff81a1e 100644 --- a/src/client/impl/session/kqp_session_common.h +++ b/src/client/impl/session/kqp_session_common.h @@ -12,7 +12,7 @@ namespace NYdb::inline V3 { //////////////////////////////////////////////////////////////////////////////// -ui64 GetNodeIdFromSession(const std::string& sessionId); +std::uint64_t GetNodeIdFromSession(const std::string& sessionId); class TKqpSessionCommon; @@ -57,10 +57,6 @@ class TKqpSessionCommon : public TEndpointObj { TInstant GetTimeToTouchFast() const; TInstant GetTimeInPastFast() const; - // SetTimeInterval/GetTimeInterval, are not atomic! - void SetTimeInterval(TDuration interval); - TDuration GetTimeInterval() const; - static std::function GetSmartDeleter(std::shared_ptr client); // Shoult be called under session pool lock @@ -85,8 +81,6 @@ class TKqpSessionCommon : public TEndpointObj { // so we need to be able to read this value atomicaly std::atomic TimeToTouch_; TInstant TimeInPast_; - // Is used to implement progressive timeout for settler keep alive call - TDuration TimeInterval_; std::atomic CloseHandler_; // Indicate session was in active state, but state was changed diff --git a/src/client/impl/session/session_pool.cpp b/src/client/impl/session/session_pool.cpp index 0984685f8d..877566a34f 100644 --- a/src/client/impl/session/session_pool.cpp +++ b/src/client/impl/session/session_pool.cpp @@ -114,7 +114,6 @@ void TSessionPool::ReplySessionToUser( std::unique_ptr ctx) { Y_ABORT_UNLESS(session->GetState() == TKqpSessionCommon::S_IDLE); - Y_ABORT_UNLESS(!session->GetTimeInterval()); session->MarkActive(); session->SetNeedUpdateActiveCounter(true); ctx->ReplySessionToUser(session); diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index 9ce7b4343f..ccf90f1175 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -349,7 +349,6 @@ class TQueryClient::TImpl: public TClientImplCommon, public bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter(); // Also removes NeedUpdateActiveCounter flag sessionImpl->MarkIdle(); - sessionImpl->SetTimeInterval(TDuration::Zero()); if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); return false; diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index 18c536aff9..4df9e91e24 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -85,7 +85,6 @@ void TTableClient::TImpl::ScheduleTaskUnsafe(std::function&& fn, TDeadli void TTableClient::TImpl::StartPeriodicSessionPoolTask() { // Session pool guarantees than client is alive during call callbacks auto deletePredicate = [this](TKqpSessionCommon* s, size_t sessionsCount) { - const auto& sessionPoolSettings = Settings_.SessionPoolSettings_; const auto spentTime = s->GetTimeToTouchFast() - s->GetTimeInPastFast(); @@ -125,7 +124,6 @@ void TTableClient::TImpl::StartPeriodicSessionPoolTask() { }; if (spentTime >= sessionPoolSettings.KeepAliveIdleThreshold_) { - // Handle of session status will be done inside InjectSessionStatusInterception routine. // We just need to reschedule time to next call because InjectSessionStatusInterception doesn't // update timeInPast for calls from internal keep alive routine @@ -1062,7 +1060,6 @@ bool TTableClient::TImpl::ReturnSession(TKqpSessionCommon* sessionImpl) { bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter(); // Also removes NeedUpdateActiveCounter flag sessionImpl->MarkIdle(); - sessionImpl->SetTimeInterval(TDuration::Zero()); if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); return false; diff --git a/tests/integration/sessions_pool/main.cpp b/tests/integration/sessions_pool/main.cpp index fbc206201c..cc350a1131 100644 --- a/tests/integration/sessions_pool/main.cpp +++ b/tests/integration/sessions_pool/main.cpp @@ -83,7 +83,6 @@ void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) { if (requestedSessions > client.GetActiveSessionsLimit()) { ASSERT_EQ(client.GetActiveSessionCount(), client.GetActiveSessionsLimit()); } - ASSERT_FALSE(sessionFutures.at(sessionId).HasValue()); break; } case EAction::ExtractValue: { @@ -266,7 +265,6 @@ TEST_P(YdbSdkSessionsPool, StressTestSync) { std::this_thread::sleep_for(std::chrono::milliseconds(10000)); ASSERT_EQ(Client->GetActiveSessionCount(), 0); - ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit); } std::uint32_t RunStressTestAsync(std::uint32_t n, std::uint32_t nThreads, NYdb::NTable::TTableClient& client) { @@ -305,7 +303,6 @@ TEST_P(YdbSdkSessionsPool, StressTestAsync) { std::this_thread::sleep_for(std::chrono::milliseconds(10000)); ASSERT_EQ(Client->GetActiveSessionCount(), 0); - ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit); } void TestPeriodicTask(std::uint32_t activeSessionsLimit, NYdb::NTable::TTableClient& client) { From eb161c32e814c0ff6e7e505ec6d9b90c46240899 Mon Sep 17 00:00:00 2001 From: Bulat Date: Fri, 23 Jan 2026 14:10:58 +0000 Subject: [PATCH 12/17] [C++ SDK] Improved connection string parsing (#32358) --- .github/last_commit.txt | 2 +- src/client/common_client/settings.cpp | 2 + src/client/helpers/helpers.cpp | 2 + .../impl/internal/common/balancing_policies.h | 4 +- .../impl/internal/common/client_pid.cpp | 3 +- src/client/impl/internal/common/client_pid.h | 2 + src/client/impl/internal/common/getenv.cpp | 3 +- src/client/impl/internal/common/getenv.h | 3 +- src/client/impl/internal/common/parser.cpp | 95 ++++++++++++----- src/client/impl/internal/common/parser.h | 3 +- .../connection_string_ut.cpp | 100 ++++++++++++++++++ 11 files changed, 185 insertions(+), 34 deletions(-) create mode 100644 tests/unit/client/connection_string/connection_string_ut.cpp diff --git a/.github/last_commit.txt b/.github/last_commit.txt index b1e0716a2f..1edfef2377 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -fe42d31fb0a36bc10265d985e8288bbf172397f3 +a3714944824904cb16d36851135b2b12e23cd5ad diff --git a/src/client/common_client/settings.cpp b/src/client/common_client/settings.cpp index 0a6ddc76a4..88e322e5cd 100644 --- a/src/client/common_client/settings.cpp +++ b/src/client/common_client/settings.cpp @@ -1,6 +1,8 @@ #include +#define INCLUDE_YDB_INTERNAL_H #include +#undef INCLUDE_YDB_INTERNAL_H namespace NYdb::inline V3 { diff --git a/src/client/helpers/helpers.cpp b/src/client/helpers/helpers.cpp index 6bd4fe8f10..c679ca010a 100644 --- a/src/client/helpers/helpers.cpp +++ b/src/client/helpers/helpers.cpp @@ -4,8 +4,10 @@ #include #include +#define INCLUDE_YDB_INTERNAL_H #include #include +#undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/impl/internal/common/balancing_policies.h b/src/client/impl/internal/common/balancing_policies.h index f1180f37ed..ea4c8b70aa 100644 --- a/src/client/impl/internal/common/balancing_policies.h +++ b/src/client/impl/internal/common/balancing_policies.h @@ -4,8 +4,8 @@ #include -#include -#include +#include +#include namespace NYdb::inline V3 { diff --git a/src/client/impl/internal/common/client_pid.cpp b/src/client/impl/internal/common/client_pid.cpp index ec82d6b613..cb5cc8eb0a 100644 --- a/src/client/impl/internal/common/client_pid.cpp +++ b/src/client/impl/internal/common/client_pid.cpp @@ -1,3 +1,4 @@ +#define INCLUDE_YDB_INTERNAL_H #include "client_pid.h" #include @@ -16,7 +17,7 @@ namespace NYdb::inline V3 { namespace { -ui32 GetProcessId() { +std::uint32_t GetProcessId() { #ifdef _win_ return GetCurrentProcessId(); #else diff --git a/src/client/impl/internal/common/client_pid.h b/src/client/impl/internal/common/client_pid.h index b8136e16c0..5998b4179c 100644 --- a/src/client/impl/internal/common/client_pid.h +++ b/src/client/impl/internal/common/client_pid.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NYdb::inline V3 { diff --git a/src/client/impl/internal/common/getenv.cpp b/src/client/impl/internal/common/getenv.cpp index d6d136dbd7..6037eaf20b 100644 --- a/src/client/impl/internal/common/getenv.cpp +++ b/src/client/impl/internal/common/getenv.cpp @@ -1,3 +1,4 @@ +#define INCLUDE_YDB_INTERNAL_H #include "getenv.h" #include @@ -9,4 +10,4 @@ std::string GetStrFromEnv(const char* envVarName, const std::string& defaultValu return envVarPointer ? std::string(envVarPointer) : defaultValue; } -} // namespace NYdb \ No newline at end of file +} // namespace NYdb diff --git a/src/client/impl/internal/common/getenv.h b/src/client/impl/internal/common/getenv.h index b105bf5e94..ec50c737fe 100644 --- a/src/client/impl/internal/common/getenv.h +++ b/src/client/impl/internal/common/getenv.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NYdb::inline V3 { @@ -7,4 +9,3 @@ namespace NYdb::inline V3 { std::string GetStrFromEnv(const char* envVarName, const std::string& defaultValue = ""); } // namespace NYdb - diff --git a/src/client/impl/internal/common/parser.cpp b/src/client/impl/internal/common/parser.cpp index 879048743c..2e72e44433 100644 --- a/src/client/impl/internal/common/parser.cpp +++ b/src/client/impl/internal/common/parser.cpp @@ -1,51 +1,92 @@ +#define INCLUDE_YDB_INTERNAL_H #include "parser.h" #include +#include +#include + +#include + + namespace NYdb::inline V3 { +namespace { + void ThrowContractViolation(const std::string& connectionString, const std::string& message) { + ythrow TContractViolation("Failed to parse connection string: \"" + connectionString + "\", error: " + message + "\n"); + } +} + TConnectionInfo ParseConnectionString(const std::string& connectionString) { - if (connectionString.length() == 0) { - ythrow TContractViolation("Empty connection string"); + if (connectionString.empty()) { + ThrowContractViolation(connectionString, "empty connection string"); } - const std::string databaseFlag = "/?database="; - const std::string grpcProtocol = "grpc://"; - const std::string grpcsProtocol = "grpcs://"; - const std::string localhostDomain = "localhost:"; + std::string connectionStringWithScheme = connectionString; + + if (connectionString.find("://") == std::string::npos) { + if (connectionString.starts_with("localhost:")) { + connectionStringWithScheme = "grpc://" + connectionString; + } else { + connectionStringWithScheme = "grpcs://" + connectionString; + } + } TConnectionInfo connectionInfo; - std::string endpoint; - size_t pathIndex = connectionString.find(databaseFlag); - if (pathIndex == std::string::npos){ - pathIndex = connectionString.length(); + NUri::TUri uri; + NUri::TUri::TState::EParsed parseStatus = uri.Parse( + connectionStringWithScheme, + NUri::TFeature::FeaturesDefault | NUri::TFeature::FeatureSchemeFlexible + ); + + if (parseStatus != NUri::TUri::TState::EParsed::ParsedOK) { + ThrowContractViolation(connectionString, "failure during URI parsing with status: " + std::string(NUri::ParsedStateToString(parseStatus))); } - if (pathIndex != connectionString.length()) { - connectionInfo.Database = connectionString.substr(pathIndex + databaseFlag.length()); - endpoint = connectionString.substr(0, pathIndex); - } else { - endpoint = connectionString; + + std::string_view host = uri.GetHost(); + if (host.empty()) { + ThrowContractViolation(connectionString, "connection string must contain a host"); } - if (!std::string_view{endpoint}.starts_with(grpcProtocol) && !std::string_view{endpoint}.starts_with(grpcsProtocol) && - !std::string_view{endpoint}.starts_with(localhostDomain)) - { - connectionInfo.Endpoint = endpoint; - connectionInfo.EnableSsl = true; - } else if (std::string_view{endpoint}.starts_with(grpcProtocol)) { - connectionInfo.Endpoint = endpoint.substr(grpcProtocol.length()); + // Validate and extract scheme + std::string_view scheme = uri.GetField(NUri::TUri::FieldScheme); + if (scheme == "grpc") { connectionInfo.EnableSsl = false; - } else if (std::string_view{endpoint}.starts_with(grpcsProtocol)) { - connectionInfo.Endpoint = endpoint.substr(grpcsProtocol.length()); + } else if (scheme == "grpcs") { connectionInfo.EnableSsl = true; } else { - connectionInfo.Endpoint = endpoint; - connectionInfo.EnableSsl = false; + ThrowContractViolation(connectionString, "invalid scheme in connection string: only 'grpc' and 'grpcs' are allowed"); + } + + std::uint16_t port = uri.GetPort(); + if (port == 0) { + connectionInfo.Endpoint = std::string(host); + } else { + connectionInfo.Endpoint = std::string(host) + ":" + std::to_string(port); + } + + // Extract database from path or query parameter + std::string_view path = uri.GetField(NUri::TUri::FieldPath); + std::string_view query = uri.GetField(NUri::TUri::FieldQuery); + + bool hasQueryDatabase = false; + if (!query.empty()) { + TCgiParameters queryParams(query); + if (queryParams.Has("database")) { + connectionInfo.Database = queryParams.Get("database"); + hasQueryDatabase = true; + } + } + + if (!path.empty() && path != "/") { + if (hasQueryDatabase) { + ThrowContractViolation(connectionString, "database cannot be specified in both path and query parameter"); + } + connectionInfo.Database = path; } return connectionInfo; } } // namespace NYdb - diff --git a/src/client/impl/internal/common/parser.h b/src/client/impl/internal/common/parser.h index 13df795e6a..cbb46c1f42 100644 --- a/src/client/impl/internal/common/parser.h +++ b/src/client/impl/internal/common/parser.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NYdb::inline V3 { @@ -13,4 +15,3 @@ struct TConnectionInfo { TConnectionInfo ParseConnectionString(const std::string& connectionString); } // namespace NYdb - diff --git a/tests/unit/client/connection_string/connection_string_ut.cpp b/tests/unit/client/connection_string/connection_string_ut.cpp new file mode 100644 index 0000000000..597edb259a --- /dev/null +++ b/tests/unit/client/connection_string/connection_string_ut.cpp @@ -0,0 +1,100 @@ +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + +#include + +using namespace NYdb; + +TEST(ConnectionStringParser, ParseWithQueryParameter) { + auto connInfo = ParseConnectionString("grpc://localhost:2135/?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseWithQueryParameterNoSlash) { + auto connInfo = ParseConnectionString("grpc://localhost:2135?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseWithPathAsDatabase) { + auto connInfo = ParseConnectionString("grpc://localhost:2135/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseSecureConnection) { + auto connInfo = ParseConnectionString("grpcs://localhost:2135/?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, true); +} + +TEST(ConnectionStringParser, ParseSecureConnectionNoSlash) { + auto connInfo = ParseConnectionString("grpcs://localhost:2135?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, true); +} + +TEST(ConnectionStringParser, ParseSecureConnectionWithPath) { + auto connInfo = ParseConnectionString("grpcs://localhost:2135/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, true); +} + +TEST(ConnectionStringParser, ParseNoSchemeLocalhost) { + auto connInfo = ParseConnectionString("localhost:2135/?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseNoSchemeNonLocalhost) { + auto connInfo = ParseConnectionString("example.com:2135/?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "example.com:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, true); +} + +TEST(ConnectionStringParser, ParseNoSchemeWithPath) { + auto connInfo = ParseConnectionString("localhost:2135/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseNoDatabaseInQuery) { + auto connInfo = ParseConnectionString("grpc://localhost:2135"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, ""); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, InvalidScheme) { + EXPECT_THROW( + ParseConnectionString("http://localhost:2135/?database=/local"), + TContractViolation + ); +} + +TEST(ConnectionStringParser, EmptyConnectionString) { + EXPECT_THROW( + ParseConnectionString(""), + TContractViolation + ); +} + +TEST(ConnectionStringParser, DatabaseInBothPathAndQuery) { + EXPECT_THROW( + ParseConnectionString("grpc://localhost:2135/local?database=/other"), + TContractViolation + ); +} From a466a4f56537603a495372650c99e27a8ffec6f0 Mon Sep 17 00:00:00 2001 From: Maksim Date: Fri, 23 Jan 2026 14:11:04 +0000 Subject: [PATCH 13/17] [NBS-6758]: read, write events for NBS 2, grpc draft api (#32477) --- .github/last_commit.txt | 2 +- src/api/grpc/draft/ydb_nbs_v1.proto | 15 ++++++++++--- src/api/protos/draft/ydb_nbs.proto | 33 ++++++++++++++++++++++++++++- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 1edfef2377..e60f0f97d9 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -a3714944824904cb16d36851135b2b12e23cd5ad +4665e1acf46f60b54295d353b14630b18f4d5e24 diff --git a/src/api/grpc/draft/ydb_nbs_v1.proto b/src/api/grpc/draft/ydb_nbs_v1.proto index b2f5351908..9e4e964a20 100644 --- a/src/api/grpc/draft/ydb_nbs_v1.proto +++ b/src/api/grpc/draft/ydb_nbs_v1.proto @@ -11,11 +11,20 @@ import "src/api/protos/draft/ydb_nbs.proto"; // Service for managing nbs 2.0 service NbsService { // Create NBS partition - rpc CreatePartition(Ydb.Nbs.CreatePartitionRequest) returns (Ydb.Nbs.CreatePartitionResponse); + rpc CreatePartition(NYdb.NBS.NProto.CreatePartitionRequest) returns (NYdb.NBS.NProto.CreatePartitionResponse); // Delete NBS partition - rpc DeletePartition(Ydb.Nbs.DeletePartitionRequest) returns (Ydb.Nbs.DeletePartitionResponse); + rpc DeletePartition(NYdb.NBS.NProto.DeletePartitionRequest) returns (NYdb.NBS.NProto.DeletePartitionResponse); // List NBS partitions - rpc ListPartitions(Ydb.Nbs.ListPartitionsRequest) returns (Ydb.Nbs.ListPartitionsResponse); + rpc ListPartitions(NYdb.NBS.NProto.ListPartitionsRequest) returns (NYdb.NBS.NProto.ListPartitionsResponse); + + + // + // Block I/O. + // + + rpc ReadBlocks(NYdb.NBS.NProto.ReadBlocksRequest) returns (NYdb.NBS.NProto.ReadBlocksResponse); + + rpc WriteBlocks(NYdb.NBS.NProto.WriteBlocksRequest) returns (NYdb.NBS.NProto.WriteBlocksResponse); } diff --git a/src/api/protos/draft/ydb_nbs.proto b/src/api/protos/draft/ydb_nbs.proto index dd5afb5a0c..e96f551c60 100644 --- a/src/api/protos/draft/ydb_nbs.proto +++ b/src/api/protos/draft/ydb_nbs.proto @@ -1,12 +1,13 @@ syntax = "proto3"; option cc_enable_arenas = true; -package Ydb.Nbs; +package NYdb.NBS.NProto; option java_package = "com.yandex.ydb.nbs.proto"; option java_outer_classname = "NbsProtos"; option java_multiple_files = true; +import "ydb/core/nbs/cloud/blockstore/public/api/protos/io.proto"; import "src/api/protos/ydb_operation.proto"; @@ -66,3 +67,33 @@ message ListPartitionsResponse { message ListPartitionsResult { repeated string TabletId = 1; } + +//////////////////////////////////////////////////////////////////////////////// +// Blocks read request/response. + +message ReadBlocksRequest +{ + Ydb.Operations.OperationParams operation_params = 1; + + TReadBlocksRequest request = 2; +} + +message ReadBlocksResponse +{ + Ydb.Operations.Operation operation = 1; +} + +//////////////////////////////////////////////////////////////////////////////// +// Blocks write request/response. + +message WriteBlocksRequest +{ + Ydb.Operations.OperationParams operation_params = 1; + + TWriteBlocksRequest request = 2; +} + +message WriteBlocksResponse +{ + Ydb.Operations.Operation operation = 1; +} From cd148f7612fd5dea5b8a666a5c2a88b727aa3985 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 23 Jan 2026 14:11:10 +0000 Subject: [PATCH 14/17] More tests for commit offsets (#32502) --- .github/last_commit.txt | 2 +- src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp | 4 ++-- src/client/topic/ut/ut_utils/topic_sdk_test_setup.h | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index e60f0f97d9..1b79b568ca 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -4665e1acf46f60b54295d353b14630b18f4d5e24 +29e1db4edf7ba107c8ce694775efcc6a1a3a6b56 diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 092a2fde64..4272e5defa 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -81,7 +81,7 @@ void TTopicSdkTestSetup::Write(const std::string& topic, const std::string& mess TTopicSdkTestSetup::TReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function handler, - std::optional partition, const TDuration timeout) { + std::optional partition, const TDuration timeout, bool autoPartitioningSupport) { TTopicClient client(MakeDriver()); auto topicSettings = TTopicReadSettings(topic); @@ -90,7 +90,7 @@ TTopicSdkTestSetup::TReadResult TTopicSdkTestSetup::Read(const std::string& topi } auto settings = TReadSessionSettings() - .AutoPartitioningSupport(true) + .AutoPartitioningSupport(autoPartitioningSupport) .AppendTopics(topicSettings) .ConsumerName(consumer); diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index bef129a69a..e3fe5d48a0 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -46,7 +46,8 @@ class TTopicSdkTestSetup : public ITopicTestSetup { TReadResult Read(const std::string& topic, const std::string& consumer, std::function handler, std::optional partition = std::nullopt, - const TDuration timeout = TDuration::Seconds(5)); + const TDuration timeout = TDuration::Seconds(5), + bool autoPartitioningSupport = true); TStatus Commit(const std::string& path, const std::string& consumerName, std::size_t partitionId, std::size_t offset, From e3de75c462687a200d8c2c78ab4f4843dc073c4f Mon Sep 17 00:00:00 2001 From: azevaykin <145343289+azevaykin@users.noreply.github.com> Date: Fri, 23 Jan 2026 14:11:17 +0000 Subject: [PATCH 15/17] Remove fulltext index layout (#32368) --- .github/last_commit.txt | 2 +- include/ydb-cpp-sdk/client/table/table_enum.h | 3 +- src/api/protos/ydb_table.proto | 35 ++++++++----- src/client/table/table.cpp | 52 ++++++++++++++----- 4 files changed, 64 insertions(+), 28 deletions(-) diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 1b79b568ca..9d86ee1c55 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -29e1db4edf7ba107c8ce694775efcc6a1a3a6b56 +060a9aa97e1c45608a90c5c39979ede58f0951fa diff --git a/include/ydb-cpp-sdk/client/table/table_enum.h b/include/ydb-cpp-sdk/client/table/table_enum.h index ff6b141d68..a53e9e1e86 100644 --- a/include/ydb-cpp-sdk/client/table/table_enum.h +++ b/include/ydb-cpp-sdk/client/table/table_enum.h @@ -35,7 +35,8 @@ enum class EIndexType { GlobalAsync, GlobalUnique, GlobalVectorKMeansTree, - GlobalFulltext, + GlobalFulltextPlain, + GlobalFulltextRelevance, Unknown = std::numeric_limits::max() }; diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index adff527863..2b0deca192 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -90,7 +90,7 @@ message KMeansTreeSettings { VectorIndexSettings settings = 1; optional uint32 clusters = 2; - + optional uint32 levels = 3; optional uint32 overlap_clusters = 4; @@ -228,59 +228,59 @@ message FulltextIndexSettings { message Analyzers { // See Tokenizer enum optional Tokenizer tokenizer = 1; - + // Language used for language-sensitive operations like stopword filtering and stemming // Example: language = "english" // By default is not specified and no language-specific logic is applied optional string language = 2; - + // Whether to convert tokens to lowercase // Example: // Token: "Quick" // Output: "quick" optional bool use_filter_lowercase = 100; - + // Whether to remove common stopwords like "the", "a", "is" // Example: language = "english" // Tokens: ["the", "quick", "brown"] // Output: ["quick", "brown"] optional bool use_filter_stopwords = 110; - + // Whether to apply character n-gram indexing to each token // Must be used with filter_ngram_min_length and filter_ngram_max_length // Example: filter_ngram_min_length = 3, filter_ngram_max_length = 4 // Token: "search" // Output: ["sea", "ear", "arc", "rch", "sear", "earc", "arch"] optional bool use_filter_ngram = 120; - + // Whether to apply edge n-gram indexing (prefix-based) to each token // Used with filter_ngram_min_length and filter_ngram_max_length // Example: filter_ngram_min_length = 3, filter_ngram_max_length = 4 // Token: "search" // Output: ["sea", "sear"] optional bool use_filter_edge_ngram = 121; - + // Minimum length of n-grams to generate (inclusive) // Must be used with use_filter_ngram or use_filter_edge_ngram // Default value is 3 optional int32 filter_ngram_min_length = 122 [(Ydb.value) = ">= 0"]; - + // Maximum length of n-grams to generate (inclusive) // Must be used with use_filter_ngram or use_filter_edge_ngram // Default value is 4 optional int32 filter_ngram_max_length = 123 [(Ydb.value) = ">= 0"]; - + // Whether to filter tokens by their length // Must be used with filter_length_min or filter_length_max // Example: filter_length_min = 4, filter_length_max = 6 // Tokens: ["foo", "fooba", "foobar", "foobarbaz"] // Output: ["fooba", "foobar"] optional bool use_filter_length = 130; - + // Minimum token length to keep (inclusive) // Must be used with use_filter_length optional int32 filter_length_min = 131 [(Ydb.value) = ">= 0"]; - + // Maximum token length to keep (inclusive) // Must be used with use_filter_length optional int32 filter_length_max = 132 [(Ydb.value) = ">= 0"]; @@ -312,7 +312,12 @@ message FulltextIndexSettings { repeated ColumnAnalyzers columns = 2; } -message GlobalFulltextIndex { +message GlobalFulltextPlainIndex { + GlobalIndexSettings settings = 1; + FulltextIndexSettings fulltext_settings = 2; +} + +message GlobalFulltextRelevanceIndex { GlobalIndexSettings settings = 1; FulltextIndexSettings fulltext_settings = 2; } @@ -329,7 +334,8 @@ message TableIndex { GlobalAsyncIndex global_async_index = 4; GlobalUniqueIndex global_unique_index = 6; GlobalVectorKMeansTreeIndex global_vector_kmeans_tree_index = 7; - GlobalFulltextIndex global_fulltext_index = 8; + GlobalFulltextPlainIndex global_fulltext_plain_index = 8; + GlobalFulltextRelevanceIndex global_fulltext_relevance_index = 9; } // list of columns content to be copied in to index table repeated string data_columns = 5; @@ -354,7 +360,8 @@ message TableIndexDescription { GlobalAsyncIndex global_async_index = 5; GlobalUniqueIndex global_unique_index = 8; GlobalVectorKMeansTreeIndex global_vector_kmeans_tree_index = 9; - GlobalFulltextIndex global_fulltext_index = 10; + GlobalFulltextPlainIndex global_fulltext_plain_index = 10; + GlobalFulltextRelevanceIndex global_fulltext_relevance_index = 11; } Status status = 4; // list of columns content to be copied in to index table diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index c67eba0975..8ed110c014 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -793,11 +793,19 @@ void TTableDescription::AddVectorKMeansTreeIndex(const std::string& indexName, c } void TTableDescription::AddFulltextIndex(const std::string& indexName, const std::vector& indexColumns, const TFulltextIndexSettings& indexSettings) { - Impl_->AddFulltextIndex(indexName, EIndexType::GlobalFulltext, indexColumns, indexSettings); + EIndexType indexType = EIndexType::GlobalFulltextPlain; + if (indexSettings.Layout.has_value() && indexSettings.Layout.value() == TFulltextIndexSettings::ELayout::FlatRelevance) { + indexType = EIndexType::GlobalFulltextRelevance; + } + Impl_->AddFulltextIndex(indexName, indexType, indexColumns, indexSettings); } void TTableDescription::AddFulltextIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TFulltextIndexSettings& indexSettings) { - Impl_->AddFulltextIndex(indexName, EIndexType::GlobalFulltext, indexColumns, dataColumns, indexSettings); + EIndexType indexType = EIndexType::GlobalFulltextPlain; + if (indexSettings.Layout.has_value() && indexSettings.Layout.value() == TFulltextIndexSettings::ELayout::FlatRelevance) { + indexType = EIndexType::GlobalFulltextRelevance; + } + Impl_->AddFulltextIndex(indexName, indexType, indexColumns, dataColumns, indexSettings); } void TTableDescription::AddSecondaryIndex(const std::string& indexName, const std::vector& indexColumns) { @@ -2594,7 +2602,7 @@ TFulltextIndexSettings::TAnalyzers FromProto(const Ydb::Table::FulltextIndexSett TAnalyzers result; result.Tokenizer = convertTokenizer(); - + if (proto.has_language()) { result.Language = proto.language(); } @@ -2625,13 +2633,13 @@ TFulltextIndexSettings::TAnalyzers FromProto(const Ydb::Table::FulltextIndexSett if (proto.has_filter_length_max()) { result.FilterLengthMax = proto.filter_length_max(); } - + return result; } Ydb::Table::FulltextIndexSettings::Analyzers ToProto(const TFulltextIndexSettings::TAnalyzers& analyzers) { using ETokenizer = TFulltextIndexSettings::ETokenizer; - + auto convertTokenizer = [&] { switch (*analyzers.Tokenizer) { case ETokenizer::Whitespace: @@ -2743,7 +2751,7 @@ void TFulltextIndexSettings::SerializeTo(Ydb::Table::FulltextIndexSettings& sett if (Layout.has_value()) { settings.set_layout(convertLayout()); } - + for (const auto& column : Columns) { *settings.add_columns() = ToProto(column); } @@ -2789,9 +2797,16 @@ TIndexDescription TIndexDescription::FromProto(const TProto& proto) { specializedIndexSettings = TKMeansTreeSettings::FromProto(vectorProto.vector_settings()); break; } - case TProto::kGlobalFulltextIndex: { - type = EIndexType::GlobalFulltext; - const auto& fulltextProto = proto.global_fulltext_index(); + case TProto::kGlobalFulltextPlainIndex: { + type = EIndexType::GlobalFulltextPlain; + const auto& fulltextProto = proto.global_fulltext_plain_index(); + globalIndexSettings.emplace_back(TGlobalIndexSettings::FromProto(fulltextProto.settings())); + specializedIndexSettings = TFulltextIndexSettings::FromProto(fulltextProto.fulltext_settings()); + break; + } + case TProto::kGlobalFulltextRelevanceIndex: { + type = EIndexType::GlobalFulltextRelevance; + const auto& fulltextProto = proto.global_fulltext_relevance_index(); globalIndexSettings.emplace_back(TGlobalIndexSettings::FromProto(fulltextProto.settings())); specializedIndexSettings = TFulltextIndexSettings::FromProto(fulltextProto.fulltext_settings()); break; @@ -2851,8 +2866,20 @@ void TIndexDescription::SerializeTo(Ydb::Table::TableIndex& proto) const { } break; } - case EIndexType::GlobalFulltext: { - auto* global_fulltext_index = proto.mutable_global_fulltext_index(); + case EIndexType::GlobalFulltextPlain: { + auto* global_fulltext_index = proto.mutable_global_fulltext_plain_index(); + auto& settings = *global_fulltext_index->mutable_settings(); + auto& fulltext_settings = *global_fulltext_index->mutable_fulltext_settings(); + if (GlobalIndexSettings_.size() == 1) { + GlobalIndexSettings_[0].SerializeTo(settings); + } + if (const auto* ftSettings = std::get_if(&SpecializedIndexSettings_)) { + ftSettings->SerializeTo(fulltext_settings); + } + break; + } + case EIndexType::GlobalFulltextRelevance: { + auto* global_fulltext_index = proto.mutable_global_fulltext_relevance_index(); auto& settings = *global_fulltext_index->mutable_settings(); auto& fulltext_settings = *global_fulltext_index->mutable_fulltext_settings(); if (GlobalIndexSettings_.size() == 1) { @@ -2895,7 +2922,8 @@ void TIndexDescription::Out(IOutputStream& o) const { o << ", vector_settings: " << *settings; } break; - case EIndexType::GlobalFulltext: + case EIndexType::GlobalFulltextPlain: + case EIndexType::GlobalFulltextRelevance: if (auto settings = std::get_if(&SpecializedIndexSettings_)) { o << ", fulltext_settings: " << *settings; } From a1cff99795afa4e1fb43a41705f76d43cae4b274 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 14:11:17 +0000 Subject: [PATCH 16/17] Update import generation: 33 --- .github/import_generation.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/import_generation.txt b/.github/import_generation.txt index bb95160cb6..a787364590 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -33 +34 From 0fd76dbfd6bc247e44a92cc019c0ef6a412c2104 Mon Sep 17 00:00:00 2001 From: Bulat Gayazov Date: Fri, 23 Jan 2026 14:33:42 +0000 Subject: [PATCH 17/17] Fixed CMakeLists for import 33 --- src/client/impl/internal/common/CMakeLists.txt | 3 +++ tests/unit/client/CMakeLists.txt | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/src/client/impl/internal/common/CMakeLists.txt b/src/client/impl/internal/common/CMakeLists.txt index d64f631c51..79e21c42fa 100644 --- a/src/client/impl/internal/common/CMakeLists.txt +++ b/src/client/impl/internal/common/CMakeLists.txt @@ -2,6 +2,9 @@ _ydb_sdk_add_library(impl-internal-common) target_link_libraries(impl-internal-common PUBLIC yutil + cgiparam + uri + client-types-exceptions grpc-client yql-public-issue ) diff --git a/tests/unit/client/CMakeLists.txt b/tests/unit/client/CMakeLists.txt index 8c3b142ee7..e697e7ed0e 100644 --- a/tests/unit/client/CMakeLists.txt +++ b/tests/unit/client/CMakeLists.txt @@ -1,5 +1,14 @@ add_subdirectory(oauth2_token_exchange/helpers) +add_ydb_test(NAME client-connection_string_ut + SOURCES + connection_string/connection_string_ut.cpp + LINK_LIBRARIES + impl-internal-common + LABELS + unit +) + add_ydb_test(NAME client-ydb_coordination_ut SOURCES coordination/coordination_ut.cpp