diff --git a/.github/import_generation.txt b/.github/import_generation.txt index bb95160cb6e..a7873645902 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -33 +34 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 3522bfc6d07..9d86ee1c558 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -303019a794dd98ca44a77440af08bbeecf56d727 +060a9aa97e1c45608a90c5c39979ede58f0951fa diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index b1a9d0ada90..72aa008ccca 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -32,6 +32,9 @@ class TDriverConfig { //! client will connect to others nodes according to client loadbalancing TDriverConfig& SetEndpoint(const std::string& endpoint); + //! Get endpoint, returns the endpoint set via connection string or SetEndpoint() + const std::string& GetEndpoint() const; + //! Set number of network threads, default: 2 TDriverConfig& SetNetworkThreadsNum(size_t sz); diff --git a/include/ydb-cpp-sdk/client/scheme/scheme.h b/include/ydb-cpp-sdk/client/scheme/scheme.h index ec0291f1e4e..df6d0b01332 100644 --- a/include/ydb-cpp-sdk/client/scheme/scheme.h +++ b/include/ydb-cpp-sdk/client/scheme/scheme.h @@ -53,6 +53,7 @@ enum class ESchemeEntryType : i32 { SysView = 22, Transfer = 23, StreamingQuery = 24, + BackupCollection = 25, }; struct TVirtualTimestamp { diff --git a/include/ydb-cpp-sdk/client/table/table_enum.h b/include/ydb-cpp-sdk/client/table/table_enum.h index ff6b141d68b..a53e9e1e86e 100644 --- a/include/ydb-cpp-sdk/client/table/table_enum.h +++ b/include/ydb-cpp-sdk/client/table/table_enum.h @@ -35,7 +35,8 @@ enum class EIndexType { GlobalAsync, GlobalUnique, GlobalVectorKMeansTree, - GlobalFulltext, + GlobalFulltextPlain, + GlobalFulltextRelevance, Unknown = std::numeric_limits::max() }; diff --git a/include/ydb-cpp-sdk/client/topic/read_events.h b/include/ydb-cpp-sdk/client/topic/read_events.h index 7dc550e49c8..6e1f841ce17 100644 --- a/include/ydb-cpp-sdk/client/topic/read_events.h +++ b/include/ydb-cpp-sdk/client/topic/read_events.h @@ -57,6 +57,21 @@ struct TPartitionSession: public TThrRefBase, public TPrintable void TPrintable::DebugString(TStringBuilder& res, bool) const; +struct TPartitionSessionControl: public TPartitionSession { + //! Commit offsets range. + //! Can be used from TDataReceivedEvent or TDeferredCommit. + virtual void Commit(uint64_t startOffset, uint64_t endOffset) = 0; + + //! Confirm partition session creation from TStartPartitionSessionEvent. + virtual void ConfirmCreate(std::optional readOffset, std::optional commitOffset) = 0; + + //! Confirm partition session destruction from TStopPartitionSessionEvent. + virtual void ConfirmDestroy() = 0; + + //! Confirm partition session end from TEndPartitionSessionEvent. + virtual void ConfirmEnd(std::span childIds) = 0; +}; + //! Events for read session. struct TReadSessionEvent { class TPartitionSessionAccessor { @@ -73,7 +88,7 @@ struct TReadSessionEvent { //! Event with new data. //! Contains batch of messages from single partition session. - struct TDataReceivedEvent : public TPartitionSessionAccessor, public TPrintable { + struct TDataReceivedEvent: public TPartitionSessionAccessor, public TPrintable { struct TMessageInformation { TMessageInformation(uint64_t offset, std::string producerId, @@ -95,7 +110,7 @@ struct TReadSessionEvent { std::string MessageGroupId; }; - class TMessageBase : public TPrintable { + class TMessageBase: public TPrintable { public: TMessageBase(const std::string& data, TMessageInformation info); @@ -249,7 +264,7 @@ struct TReadSessionEvent { //! This means that from now the first available //! message offset in current partition //! for current consumer is this offset. - //! All messages before are committed and futher never be available. + //! All messages before are committed and further never be available. uint64_t GetCommittedOffset() const { return CommittedOffset; } @@ -452,4 +467,4 @@ void TPrintable::DebugString(TStringBuilder& ret, bool prin std::string DebugString(const TReadSessionEvent::TEvent& event); -} +} // namespace NYdb::NTopic diff --git a/include/ydb-cpp-sdk/client/topic/read_session.h b/include/ydb-cpp-sdk/client/topic/read_session.h index b7d4e698411..1a7d4be36cc 100644 --- a/include/ydb-cpp-sdk/client/topic/read_session.h +++ b/include/ydb-cpp-sdk/client/topic/read_session.h @@ -193,8 +193,8 @@ struct TReadSessionSettings: public TRequestSettings { //! AutoPartitioningSupport. FLUENT_SETTING_DEFAULT(bool, AutoPartitioningSupport, false); - // TODO(qyryq) Uncomment when direct read is ready. - // FLUENT_SETTING_DEFAULT(bool, DirectRead, false); + //! Direct read from partition nodes. Experimental setting — not recommended for use. + FLUENT_SETTING_DEFAULT(bool, DirectRead, false); //! Log. FLUENT_SETTING_OPTIONAL(TLog, Log); diff --git a/include/ydb-cpp-sdk/client/topic/write_session.h b/include/ydb-cpp-sdk/client/topic/write_session.h index 68525b87fef..e0c4d4618e2 100644 --- a/include/ydb-cpp-sdk/client/topic/write_session.h +++ b/include/ydb-cpp-sdk/client/topic/write_session.h @@ -219,7 +219,7 @@ class ISimpleBlockingWriteSession : public TThrRefBase { virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0; - //! Returns true if write session is alive and acitve. False if session was closed. + //! Returns true if write session is alive and active. False if session was closed. virtual bool IsAlive() const = 0; virtual TWriterCounters::TPtr GetCounters() = 0; @@ -269,11 +269,11 @@ class IWriteSession { //! Return true if all writes were completed and acked, false if timeout was reached and some writes were aborted. virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0; - //! Writer counters with different stats (see TWriterConuters). + //! Writer counters with different stats (see TWriterCounters). virtual TWriterCounters::TPtr GetCounters() = 0; //! Close() with timeout = 0 and destroy everything instantly. virtual ~IWriteSession() = default; }; -} +} // namespace NYdb::NTopic diff --git a/src/api/grpc/draft/ydb_nbs_v1.proto b/src/api/grpc/draft/ydb_nbs_v1.proto new file mode 100644 index 00000000000..9e4e964a20e --- /dev/null +++ b/src/api/grpc/draft/ydb_nbs_v1.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package Ydb.Nbs.V1; + +option java_package = "com.yandex.ydb.nbs.v1"; +option java_outer_classname = "NbsGrpc"; +option java_multiple_files = true; + +import "src/api/protos/draft/ydb_nbs.proto"; + +// Service for managing nbs 2.0 +service NbsService { + // Create NBS partition + rpc CreatePartition(NYdb.NBS.NProto.CreatePartitionRequest) returns (NYdb.NBS.NProto.CreatePartitionResponse); + + // Delete NBS partition + rpc DeletePartition(NYdb.NBS.NProto.DeletePartitionRequest) returns (NYdb.NBS.NProto.DeletePartitionResponse); + + // List NBS partitions + rpc ListPartitions(NYdb.NBS.NProto.ListPartitionsRequest) returns (NYdb.NBS.NProto.ListPartitionsResponse); + + + // + // Block I/O. + // + + rpc ReadBlocks(NYdb.NBS.NProto.ReadBlocksRequest) returns (NYdb.NBS.NProto.ReadBlocksResponse); + + rpc WriteBlocks(NYdb.NBS.NProto.WriteBlocksRequest) returns (NYdb.NBS.NProto.WriteBlocksResponse); +} diff --git a/src/api/protos/draft/ydb_nbs.proto b/src/api/protos/draft/ydb_nbs.proto new file mode 100644 index 00000000000..e96f551c600 --- /dev/null +++ b/src/api/protos/draft/ydb_nbs.proto @@ -0,0 +1,99 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package NYdb.NBS.NProto; + +option java_package = "com.yandex.ydb.nbs.proto"; +option java_outer_classname = "NbsProtos"; +option java_multiple_files = true; + +import "ydb/core/nbs/cloud/blockstore/public/api/protos/io.proto"; +import "src/api/protos/ydb_operation.proto"; + + +//////////////////////////////////////////////////////////////////////////////// +// Partition create request/response. + +message CreatePartitionRequest { + Ydb.Operations.OperationParams operation_params = 1; + + // Storage pool name to use. + string StoragePoolName = 2; + + // Minimum addressable block size (smallest unit of I/O operations). + uint32 BlockSize = 3; + + // Maximum number of blocks stored in partition. + uint64 BlocksCount = 4; +} + +message CreatePartitionResponse { + Ydb.Operations.Operation operation = 1; +} + +message CreatePartitionResult { + string TabletId = 1; +} + +//////////////////////////////////////////////////////////////////////////////// +// Partition delete request/response. + +message DeletePartitionRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Partition tablet id to delete. + string TabletId = 2; +} + +message DeletePartitionResponse { + Ydb.Operations.Operation operation = 1; +} + +message DeletePartitionResult { + // Deleted partition tablet id. + string TabletId = 1; +} + +//////////////////////////////////////////////////////////////////////////////// +// Partition list request/response. + +message ListPartitionsRequest { + Ydb.Operations.OperationParams operation_params = 1; +} + +message ListPartitionsResponse { + Ydb.Operations.Operation operation = 1; +} + +message ListPartitionsResult { + repeated string TabletId = 1; +} + +//////////////////////////////////////////////////////////////////////////////// +// Blocks read request/response. + +message ReadBlocksRequest +{ + Ydb.Operations.OperationParams operation_params = 1; + + TReadBlocksRequest request = 2; +} + +message ReadBlocksResponse +{ + Ydb.Operations.Operation operation = 1; +} + +//////////////////////////////////////////////////////////////////////////////// +// Blocks write request/response. + +message WriteBlocksRequest +{ + Ydb.Operations.OperationParams operation_params = 1; + + TWriteBlocksRequest request = 2; +} + +message WriteBlocksResponse +{ + Ydb.Operations.Operation operation = 1; +} diff --git a/src/api/protos/ydb_scheme.proto b/src/api/protos/ydb_scheme.proto index d4c98313654..c3a93c33565 100644 --- a/src/api/protos/ydb_scheme.proto +++ b/src/api/protos/ydb_scheme.proto @@ -66,6 +66,7 @@ message Entry { EXTERNAL_DATA_SOURCE = 19; VIEW = 20; RESOURCE_POOL = 21; + BACKUP_COLLECTION = 22; TRANSFER = 23; SYS_VIEW = 24; STREAMING_QUERY = 26; diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index de83c9c57f2..2b0deca192a 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -90,7 +90,7 @@ message KMeansTreeSettings { VectorIndexSettings settings = 1; optional uint32 clusters = 2; - + optional uint32 levels = 3; optional uint32 overlap_clusters = 4; @@ -228,59 +228,59 @@ message FulltextIndexSettings { message Analyzers { // See Tokenizer enum optional Tokenizer tokenizer = 1; - + // Language used for language-sensitive operations like stopword filtering and stemming // Example: language = "english" // By default is not specified and no language-specific logic is applied optional string language = 2; - + // Whether to convert tokens to lowercase // Example: // Token: "Quick" // Output: "quick" optional bool use_filter_lowercase = 100; - + // Whether to remove common stopwords like "the", "a", "is" // Example: language = "english" // Tokens: ["the", "quick", "brown"] // Output: ["quick", "brown"] optional bool use_filter_stopwords = 110; - + // Whether to apply character n-gram indexing to each token // Must be used with filter_ngram_min_length and filter_ngram_max_length // Example: filter_ngram_min_length = 3, filter_ngram_max_length = 4 // Token: "search" // Output: ["sea", "ear", "arc", "rch", "sear", "earc", "arch"] optional bool use_filter_ngram = 120; - + // Whether to apply edge n-gram indexing (prefix-based) to each token // Used with filter_ngram_min_length and filter_ngram_max_length // Example: filter_ngram_min_length = 3, filter_ngram_max_length = 4 // Token: "search" // Output: ["sea", "sear"] optional bool use_filter_edge_ngram = 121; - + // Minimum length of n-grams to generate (inclusive) // Must be used with use_filter_ngram or use_filter_edge_ngram // Default value is 3 optional int32 filter_ngram_min_length = 122 [(Ydb.value) = ">= 0"]; - + // Maximum length of n-grams to generate (inclusive) // Must be used with use_filter_ngram or use_filter_edge_ngram // Default value is 4 optional int32 filter_ngram_max_length = 123 [(Ydb.value) = ">= 0"]; - + // Whether to filter tokens by their length // Must be used with filter_length_min or filter_length_max // Example: filter_length_min = 4, filter_length_max = 6 // Tokens: ["foo", "fooba", "foobar", "foobarbaz"] // Output: ["fooba", "foobar"] optional bool use_filter_length = 130; - + // Minimum token length to keep (inclusive) // Must be used with use_filter_length optional int32 filter_length_min = 131 [(Ydb.value) = ">= 0"]; - + // Maximum token length to keep (inclusive) // Must be used with use_filter_length optional int32 filter_length_max = 132 [(Ydb.value) = ">= 0"]; @@ -312,7 +312,12 @@ message FulltextIndexSettings { repeated ColumnAnalyzers columns = 2; } -message GlobalFulltextIndex { +message GlobalFulltextPlainIndex { + GlobalIndexSettings settings = 1; + FulltextIndexSettings fulltext_settings = 2; +} + +message GlobalFulltextRelevanceIndex { GlobalIndexSettings settings = 1; FulltextIndexSettings fulltext_settings = 2; } @@ -329,7 +334,8 @@ message TableIndex { GlobalAsyncIndex global_async_index = 4; GlobalUniqueIndex global_unique_index = 6; GlobalVectorKMeansTreeIndex global_vector_kmeans_tree_index = 7; - GlobalFulltextIndex global_fulltext_index = 8; + GlobalFulltextPlainIndex global_fulltext_plain_index = 8; + GlobalFulltextRelevanceIndex global_fulltext_relevance_index = 9; } // list of columns content to be copied in to index table repeated string data_columns = 5; @@ -354,7 +360,8 @@ message TableIndexDescription { GlobalAsyncIndex global_async_index = 5; GlobalUniqueIndex global_unique_index = 8; GlobalVectorKMeansTreeIndex global_vector_kmeans_tree_index = 9; - GlobalFulltextIndex global_fulltext_index = 10; + GlobalFulltextPlainIndex global_fulltext_plain_index = 10; + GlobalFulltextRelevanceIndex global_fulltext_relevance_index = 11; } Status status = 4; // list of columns content to be copied in to index table @@ -637,6 +644,22 @@ message SequenceDescription { optional Type data_type = 9; // data type of the sequence } +// TODO: merge it with Column Family Compression +message ColumnCompression { + enum Algorithm { + ALGORITHM_UNSPECIFIED = 0; + ALGORITHM_OFF = 1; + ALGORITHM_LZ4 = 2; + ALGORITHM_ZSTD = 3; + } + + optional Algorithm algorithm = 1; + + // Set the compression level for selected compression type. If no value is specified, default value will be chosen. + // For ZSTD compression level must be in range [-131072:22] + optional int32 compression_level = 2; +} + message ColumnMeta { // Name of column string name = 1; @@ -652,6 +675,7 @@ message ColumnMeta { SequenceDescription from_sequence = 6; google.protobuf.NullValue empty_default = 7; } + optional ColumnCompression compression = 8; } message EvictionToExternalStorageSettings { diff --git a/src/client/common_client/settings.cpp b/src/client/common_client/settings.cpp index 0a6ddc76a4a..88e322e5cdc 100644 --- a/src/client/common_client/settings.cpp +++ b/src/client/common_client/settings.cpp @@ -1,6 +1,8 @@ #include +#define INCLUDE_YDB_INTERNAL_H #include +#undef INCLUDE_YDB_INTERNAL_H namespace NYdb::inline V3 { diff --git a/src/client/driver/driver.cpp b/src/client/driver/driver.cpp index d8b41783e06..207c67b6d5f 100644 --- a/src/client/driver/driver.cpp +++ b/src/client/driver/driver.cpp @@ -97,6 +97,10 @@ TDriverConfig& TDriverConfig::SetEndpoint(const std::string& endpoint) { return *this; } +const std::string& TDriverConfig::GetEndpoint() const { + return Impl_->Endpoint; +} + TDriverConfig& TDriverConfig::SetNetworkThreadsNum(size_t sz) { Impl_->NetworkThreadsNum = sz; return *this; diff --git a/src/client/federated_topic/impl/federated_deferred_commit.cpp b/src/client/federated_topic/impl/federated_deferred_commit.cpp index eb0d711338e..dc402c05f55 100644 --- a/src/client/federated_topic/impl/federated_deferred_commit.cpp +++ b/src/client/federated_topic/impl/federated_deferred_commit.cpp @@ -129,10 +129,10 @@ void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent& da void TDeferredCommit::TImpl::Commit() { for (auto&& [partitionStream, offsetRanges] : Offsets) { for (auto&& [startOffset, endOffset] : offsetRanges) { - static_cast*>(partitionStream.Get()->PartitionSession.Get())->Commit(startOffset, endOffset); + static_cast(partitionStream.Get()->PartitionSession.Get())->Commit(startOffset, endOffset); } } Offsets.clear(); } -} +} // namespace NYdb::NFederatedTopic diff --git a/src/client/federated_topic/impl/federated_read_session_event.cpp b/src/client/federated_topic/impl/federated_read_session_event.cpp index 00cb5127b73..658ae75d859 100644 --- a/src/client/federated_topic/impl/federated_read_session_event.cpp +++ b/src/client/federated_topic/impl/federated_read_session_event.cpp @@ -157,7 +157,7 @@ TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEv void TReadSessionEvent::TDataReceivedEvent::Commit() { for (auto [from, to] : OffsetRanges) { - static_cast*>(PartitionSession.Get())->Commit(from, to); + static_cast(PartitionSession.Get())->Commit(from, to); } } @@ -165,4 +165,4 @@ std::string DebugString(const TReadSessionEvent::TEvent& event) { return std::visit([](const auto& ev) { return ev.DebugString(); }, event); } -} // namespace NYdb::Dev::NFederatedTopic +} // namespace NYdb::NFederatedTopic diff --git a/src/client/helpers/helpers.cpp b/src/client/helpers/helpers.cpp index 6bd4fe8f106..c679ca010ad 100644 --- a/src/client/helpers/helpers.cpp +++ b/src/client/helpers/helpers.cpp @@ -4,8 +4,10 @@ #include #include +#define INCLUDE_YDB_INTERNAL_H #include #include +#undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/impl/internal/common/CMakeLists.txt b/src/client/impl/internal/common/CMakeLists.txt index d64f631c517..79e21c42fa9 100644 --- a/src/client/impl/internal/common/CMakeLists.txt +++ b/src/client/impl/internal/common/CMakeLists.txt @@ -2,6 +2,9 @@ _ydb_sdk_add_library(impl-internal-common) target_link_libraries(impl-internal-common PUBLIC yutil + cgiparam + uri + client-types-exceptions grpc-client yql-public-issue ) diff --git a/src/client/impl/internal/common/balancing_policies.h b/src/client/impl/internal/common/balancing_policies.h index f1180f37ede..ea4c8b70aa2 100644 --- a/src/client/impl/internal/common/balancing_policies.h +++ b/src/client/impl/internal/common/balancing_policies.h @@ -4,8 +4,8 @@ #include -#include -#include +#include +#include namespace NYdb::inline V3 { diff --git a/src/client/impl/internal/common/client_pid.cpp b/src/client/impl/internal/common/client_pid.cpp index ec82d6b613f..cb5cc8eb0a5 100644 --- a/src/client/impl/internal/common/client_pid.cpp +++ b/src/client/impl/internal/common/client_pid.cpp @@ -1,3 +1,4 @@ +#define INCLUDE_YDB_INTERNAL_H #include "client_pid.h" #include @@ -16,7 +17,7 @@ namespace NYdb::inline V3 { namespace { -ui32 GetProcessId() { +std::uint32_t GetProcessId() { #ifdef _win_ return GetCurrentProcessId(); #else diff --git a/src/client/impl/internal/common/client_pid.h b/src/client/impl/internal/common/client_pid.h index b8136e16c04..5998b4179ce 100644 --- a/src/client/impl/internal/common/client_pid.h +++ b/src/client/impl/internal/common/client_pid.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NYdb::inline V3 { diff --git a/src/client/impl/internal/common/getenv.cpp b/src/client/impl/internal/common/getenv.cpp index d6d136dbd73..6037eaf20b8 100644 --- a/src/client/impl/internal/common/getenv.cpp +++ b/src/client/impl/internal/common/getenv.cpp @@ -1,3 +1,4 @@ +#define INCLUDE_YDB_INTERNAL_H #include "getenv.h" #include @@ -9,4 +10,4 @@ std::string GetStrFromEnv(const char* envVarName, const std::string& defaultValu return envVarPointer ? std::string(envVarPointer) : defaultValue; } -} // namespace NYdb \ No newline at end of file +} // namespace NYdb diff --git a/src/client/impl/internal/common/getenv.h b/src/client/impl/internal/common/getenv.h index b105bf5e948..ec50c737fe5 100644 --- a/src/client/impl/internal/common/getenv.h +++ b/src/client/impl/internal/common/getenv.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NYdb::inline V3 { @@ -7,4 +9,3 @@ namespace NYdb::inline V3 { std::string GetStrFromEnv(const char* envVarName, const std::string& defaultValue = ""); } // namespace NYdb - diff --git a/src/client/impl/internal/common/parser.cpp b/src/client/impl/internal/common/parser.cpp index 879048743cb..2e72e444339 100644 --- a/src/client/impl/internal/common/parser.cpp +++ b/src/client/impl/internal/common/parser.cpp @@ -1,51 +1,92 @@ +#define INCLUDE_YDB_INTERNAL_H #include "parser.h" #include +#include +#include + +#include + + namespace NYdb::inline V3 { +namespace { + void ThrowContractViolation(const std::string& connectionString, const std::string& message) { + ythrow TContractViolation("Failed to parse connection string: \"" + connectionString + "\", error: " + message + "\n"); + } +} + TConnectionInfo ParseConnectionString(const std::string& connectionString) { - if (connectionString.length() == 0) { - ythrow TContractViolation("Empty connection string"); + if (connectionString.empty()) { + ThrowContractViolation(connectionString, "empty connection string"); } - const std::string databaseFlag = "/?database="; - const std::string grpcProtocol = "grpc://"; - const std::string grpcsProtocol = "grpcs://"; - const std::string localhostDomain = "localhost:"; + std::string connectionStringWithScheme = connectionString; + + if (connectionString.find("://") == std::string::npos) { + if (connectionString.starts_with("localhost:")) { + connectionStringWithScheme = "grpc://" + connectionString; + } else { + connectionStringWithScheme = "grpcs://" + connectionString; + } + } TConnectionInfo connectionInfo; - std::string endpoint; - size_t pathIndex = connectionString.find(databaseFlag); - if (pathIndex == std::string::npos){ - pathIndex = connectionString.length(); + NUri::TUri uri; + NUri::TUri::TState::EParsed parseStatus = uri.Parse( + connectionStringWithScheme, + NUri::TFeature::FeaturesDefault | NUri::TFeature::FeatureSchemeFlexible + ); + + if (parseStatus != NUri::TUri::TState::EParsed::ParsedOK) { + ThrowContractViolation(connectionString, "failure during URI parsing with status: " + std::string(NUri::ParsedStateToString(parseStatus))); } - if (pathIndex != connectionString.length()) { - connectionInfo.Database = connectionString.substr(pathIndex + databaseFlag.length()); - endpoint = connectionString.substr(0, pathIndex); - } else { - endpoint = connectionString; + + std::string_view host = uri.GetHost(); + if (host.empty()) { + ThrowContractViolation(connectionString, "connection string must contain a host"); } - if (!std::string_view{endpoint}.starts_with(grpcProtocol) && !std::string_view{endpoint}.starts_with(grpcsProtocol) && - !std::string_view{endpoint}.starts_with(localhostDomain)) - { - connectionInfo.Endpoint = endpoint; - connectionInfo.EnableSsl = true; - } else if (std::string_view{endpoint}.starts_with(grpcProtocol)) { - connectionInfo.Endpoint = endpoint.substr(grpcProtocol.length()); + // Validate and extract scheme + std::string_view scheme = uri.GetField(NUri::TUri::FieldScheme); + if (scheme == "grpc") { connectionInfo.EnableSsl = false; - } else if (std::string_view{endpoint}.starts_with(grpcsProtocol)) { - connectionInfo.Endpoint = endpoint.substr(grpcsProtocol.length()); + } else if (scheme == "grpcs") { connectionInfo.EnableSsl = true; } else { - connectionInfo.Endpoint = endpoint; - connectionInfo.EnableSsl = false; + ThrowContractViolation(connectionString, "invalid scheme in connection string: only 'grpc' and 'grpcs' are allowed"); + } + + std::uint16_t port = uri.GetPort(); + if (port == 0) { + connectionInfo.Endpoint = std::string(host); + } else { + connectionInfo.Endpoint = std::string(host) + ":" + std::to_string(port); + } + + // Extract database from path or query parameter + std::string_view path = uri.GetField(NUri::TUri::FieldPath); + std::string_view query = uri.GetField(NUri::TUri::FieldQuery); + + bool hasQueryDatabase = false; + if (!query.empty()) { + TCgiParameters queryParams(query); + if (queryParams.Has("database")) { + connectionInfo.Database = queryParams.Get("database"); + hasQueryDatabase = true; + } + } + + if (!path.empty() && path != "/") { + if (hasQueryDatabase) { + ThrowContractViolation(connectionString, "database cannot be specified in both path and query parameter"); + } + connectionInfo.Database = path; } return connectionInfo; } } // namespace NYdb - diff --git a/src/client/impl/internal/common/parser.h b/src/client/impl/internal/common/parser.h index 13df795e6ae..cbb46c1f426 100644 --- a/src/client/impl/internal/common/parser.h +++ b/src/client/impl/internal/common/parser.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NYdb::inline V3 { @@ -13,4 +15,3 @@ struct TConnectionInfo { TConnectionInfo ParseConnectionString(const std::string& connectionString); } // namespace NYdb - diff --git a/src/client/impl/session/kqp_session_common.cpp b/src/client/impl/session/kqp_session_common.cpp index fedc32358f5..b2c5f38f818 100644 --- a/src/client/impl/session/kqp_session_common.cpp +++ b/src/client/impl/session/kqp_session_common.cpp @@ -6,9 +6,7 @@ namespace NYdb::inline V3 { -using std::string; - -ui64 GetNodeIdFromSession(const std::string& sessionId) { +std::uint64_t GetNodeIdFromSession(const std::string& sessionId) { if (sessionId.empty()) { return 0; } @@ -20,8 +18,7 @@ ui64 GetNodeIdFromSession(const std::string& sessionId) { return 0; } - return FromStringWithDefault(*nodeIds[0], 0); - + return FromStringWithDefault(*nodeIds[0], 0); } catch (...) { return 0; } @@ -50,7 +47,7 @@ const std::string& TKqpSessionCommon::GetId() const { return SessionId_; } -const string& TKqpSessionCommon::GetEndpoint() const { +const std::string& TKqpSessionCommon::GetEndpoint() const { return EndpointKey_.GetEndpoint(); } @@ -111,9 +108,10 @@ void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval, bool updateTimeInPast) { auto now = TInstant::Now(); + std::lock_guard guard(Lock_); if (updateTimeInPast) { - TimeInPast_ = now; + TimeInPast_ = now; } TimeToTouch_.store(now + interval, std::memory_order_relaxed); } @@ -136,15 +134,6 @@ TInstant TKqpSessionCommon::GetTimeInPastFast() const { return TimeInPast_; } -// SetTimeInterval/GetTimeInterval, are not atomic! -void TKqpSessionCommon::SetTimeInterval(TDuration interval) { - TimeInterval_ = interval; -} - -TDuration TKqpSessionCommon::GetTimeInterval() const { - return TimeInterval_; -} - void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) { CloseHandler_.store(handler); } diff --git a/src/client/impl/session/kqp_session_common.h b/src/client/impl/session/kqp_session_common.h index 225d8646df3..3cdff81a1e0 100644 --- a/src/client/impl/session/kqp_session_common.h +++ b/src/client/impl/session/kqp_session_common.h @@ -12,7 +12,7 @@ namespace NYdb::inline V3 { //////////////////////////////////////////////////////////////////////////////// -ui64 GetNodeIdFromSession(const std::string& sessionId); +std::uint64_t GetNodeIdFromSession(const std::string& sessionId); class TKqpSessionCommon; @@ -57,10 +57,6 @@ class TKqpSessionCommon : public TEndpointObj { TInstant GetTimeToTouchFast() const; TInstant GetTimeInPastFast() const; - // SetTimeInterval/GetTimeInterval, are not atomic! - void SetTimeInterval(TDuration interval); - TDuration GetTimeInterval() const; - static std::function GetSmartDeleter(std::shared_ptr client); // Shoult be called under session pool lock @@ -85,8 +81,6 @@ class TKqpSessionCommon : public TEndpointObj { // so we need to be able to read this value atomicaly std::atomic TimeToTouch_; TInstant TimeInPast_; - // Is used to implement progressive timeout for settler keep alive call - TDuration TimeInterval_; std::atomic CloseHandler_; // Indicate session was in active state, but state was changed diff --git a/src/client/impl/session/session_pool.cpp b/src/client/impl/session/session_pool.cpp index 0984685f8da..877566a34fe 100644 --- a/src/client/impl/session/session_pool.cpp +++ b/src/client/impl/session/session_pool.cpp @@ -114,7 +114,6 @@ void TSessionPool::ReplySessionToUser( std::unique_ptr ctx) { Y_ABORT_UNLESS(session->GetState() == TKqpSessionCommon::S_IDLE); - Y_ABORT_UNLESS(!session->GetTimeInterval()); session->MarkActive(); session->SetNeedUpdateActiveCounter(true); ctx->ReplySessionToUser(session); diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index 9ce7b4343f6..ccf90f1175c 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -349,7 +349,6 @@ class TQueryClient::TImpl: public TClientImplCommon, public bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter(); // Also removes NeedUpdateActiveCounter flag sessionImpl->MarkIdle(); - sessionImpl->SetTimeInterval(TDuration::Zero()); if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); return false; diff --git a/src/client/scheme/scheme.cpp b/src/client/scheme/scheme.cpp index 25794e5bf63..3377486f962 100644 --- a/src/client/scheme/scheme.cpp +++ b/src/client/scheme/scheme.cpp @@ -109,6 +109,8 @@ static ESchemeEntryType ConvertProtoEntryType(::Ydb::Scheme::Entry::Type entry) return ESchemeEntryType::View; case ::Ydb::Scheme::Entry::RESOURCE_POOL: return ESchemeEntryType::ResourcePool; + case ::Ydb::Scheme::Entry::BACKUP_COLLECTION: + return ESchemeEntryType::BackupCollection; case ::Ydb::Scheme::Entry::SYS_VIEW: return ESchemeEntryType::SysView; case ::Ydb::Scheme::Entry::TRANSFER: diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index 18c536aff95..4df9e91e24e 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -85,7 +85,6 @@ void TTableClient::TImpl::ScheduleTaskUnsafe(std::function&& fn, TDeadli void TTableClient::TImpl::StartPeriodicSessionPoolTask() { // Session pool guarantees than client is alive during call callbacks auto deletePredicate = [this](TKqpSessionCommon* s, size_t sessionsCount) { - const auto& sessionPoolSettings = Settings_.SessionPoolSettings_; const auto spentTime = s->GetTimeToTouchFast() - s->GetTimeInPastFast(); @@ -125,7 +124,6 @@ void TTableClient::TImpl::StartPeriodicSessionPoolTask() { }; if (spentTime >= sessionPoolSettings.KeepAliveIdleThreshold_) { - // Handle of session status will be done inside InjectSessionStatusInterception routine. // We just need to reschedule time to next call because InjectSessionStatusInterception doesn't // update timeInPast for calls from internal keep alive routine @@ -1062,7 +1060,6 @@ bool TTableClient::TImpl::ReturnSession(TKqpSessionCommon* sessionImpl) { bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter(); // Also removes NeedUpdateActiveCounter flag sessionImpl->MarkIdle(); - sessionImpl->SetTimeInterval(TDuration::Zero()); if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) { sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter); return false; diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index c67eba0975f..8ed110c0147 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -793,11 +793,19 @@ void TTableDescription::AddVectorKMeansTreeIndex(const std::string& indexName, c } void TTableDescription::AddFulltextIndex(const std::string& indexName, const std::vector& indexColumns, const TFulltextIndexSettings& indexSettings) { - Impl_->AddFulltextIndex(indexName, EIndexType::GlobalFulltext, indexColumns, indexSettings); + EIndexType indexType = EIndexType::GlobalFulltextPlain; + if (indexSettings.Layout.has_value() && indexSettings.Layout.value() == TFulltextIndexSettings::ELayout::FlatRelevance) { + indexType = EIndexType::GlobalFulltextRelevance; + } + Impl_->AddFulltextIndex(indexName, indexType, indexColumns, indexSettings); } void TTableDescription::AddFulltextIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns, const TFulltextIndexSettings& indexSettings) { - Impl_->AddFulltextIndex(indexName, EIndexType::GlobalFulltext, indexColumns, dataColumns, indexSettings); + EIndexType indexType = EIndexType::GlobalFulltextPlain; + if (indexSettings.Layout.has_value() && indexSettings.Layout.value() == TFulltextIndexSettings::ELayout::FlatRelevance) { + indexType = EIndexType::GlobalFulltextRelevance; + } + Impl_->AddFulltextIndex(indexName, indexType, indexColumns, dataColumns, indexSettings); } void TTableDescription::AddSecondaryIndex(const std::string& indexName, const std::vector& indexColumns) { @@ -2594,7 +2602,7 @@ TFulltextIndexSettings::TAnalyzers FromProto(const Ydb::Table::FulltextIndexSett TAnalyzers result; result.Tokenizer = convertTokenizer(); - + if (proto.has_language()) { result.Language = proto.language(); } @@ -2625,13 +2633,13 @@ TFulltextIndexSettings::TAnalyzers FromProto(const Ydb::Table::FulltextIndexSett if (proto.has_filter_length_max()) { result.FilterLengthMax = proto.filter_length_max(); } - + return result; } Ydb::Table::FulltextIndexSettings::Analyzers ToProto(const TFulltextIndexSettings::TAnalyzers& analyzers) { using ETokenizer = TFulltextIndexSettings::ETokenizer; - + auto convertTokenizer = [&] { switch (*analyzers.Tokenizer) { case ETokenizer::Whitespace: @@ -2743,7 +2751,7 @@ void TFulltextIndexSettings::SerializeTo(Ydb::Table::FulltextIndexSettings& sett if (Layout.has_value()) { settings.set_layout(convertLayout()); } - + for (const auto& column : Columns) { *settings.add_columns() = ToProto(column); } @@ -2789,9 +2797,16 @@ TIndexDescription TIndexDescription::FromProto(const TProto& proto) { specializedIndexSettings = TKMeansTreeSettings::FromProto(vectorProto.vector_settings()); break; } - case TProto::kGlobalFulltextIndex: { - type = EIndexType::GlobalFulltext; - const auto& fulltextProto = proto.global_fulltext_index(); + case TProto::kGlobalFulltextPlainIndex: { + type = EIndexType::GlobalFulltextPlain; + const auto& fulltextProto = proto.global_fulltext_plain_index(); + globalIndexSettings.emplace_back(TGlobalIndexSettings::FromProto(fulltextProto.settings())); + specializedIndexSettings = TFulltextIndexSettings::FromProto(fulltextProto.fulltext_settings()); + break; + } + case TProto::kGlobalFulltextRelevanceIndex: { + type = EIndexType::GlobalFulltextRelevance; + const auto& fulltextProto = proto.global_fulltext_relevance_index(); globalIndexSettings.emplace_back(TGlobalIndexSettings::FromProto(fulltextProto.settings())); specializedIndexSettings = TFulltextIndexSettings::FromProto(fulltextProto.fulltext_settings()); break; @@ -2851,8 +2866,20 @@ void TIndexDescription::SerializeTo(Ydb::Table::TableIndex& proto) const { } break; } - case EIndexType::GlobalFulltext: { - auto* global_fulltext_index = proto.mutable_global_fulltext_index(); + case EIndexType::GlobalFulltextPlain: { + auto* global_fulltext_index = proto.mutable_global_fulltext_plain_index(); + auto& settings = *global_fulltext_index->mutable_settings(); + auto& fulltext_settings = *global_fulltext_index->mutable_fulltext_settings(); + if (GlobalIndexSettings_.size() == 1) { + GlobalIndexSettings_[0].SerializeTo(settings); + } + if (const auto* ftSettings = std::get_if(&SpecializedIndexSettings_)) { + ftSettings->SerializeTo(fulltext_settings); + } + break; + } + case EIndexType::GlobalFulltextRelevance: { + auto* global_fulltext_index = proto.mutable_global_fulltext_relevance_index(); auto& settings = *global_fulltext_index->mutable_settings(); auto& fulltext_settings = *global_fulltext_index->mutable_fulltext_settings(); if (GlobalIndexSettings_.size() == 1) { @@ -2895,7 +2922,8 @@ void TIndexDescription::Out(IOutputStream& o) const { o << ", vector_settings: " << *settings; } break; - case EIndexType::GlobalFulltext: + case EIndexType::GlobalFulltextPlain: + case EIndexType::GlobalFulltextRelevance: if (auto settings = std::get_if(&SpecializedIndexSettings_)) { o << ", fulltext_settings: " << *settings; } diff --git a/src/client/topic/impl/deferred_commit.cpp b/src/client/topic/impl/deferred_commit.cpp index 53e10bfd156..a59e50961c1 100644 --- a/src/client/topic/impl/deferred_commit.cpp +++ b/src/client/topic/impl/deferred_commit.cpp @@ -122,10 +122,10 @@ void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent& da void TDeferredCommit::TImpl::Commit() { for (auto&& [partitionStream, offsetRanges] : Offsets) { for (auto&& [startOffset, endOffset] : offsetRanges) { - static_cast*>(partitionStream.Get())->Commit(startOffset, endOffset); + static_cast(partitionStream.Get())->Commit(startOffset, endOffset); } } Offsets.clear(); } -} +} // namespace NYdb::NTopic diff --git a/src/client/topic/impl/read_session_event.cpp b/src/client/topic/impl/read_session_event.cpp index cf58500d08f..7cf6feca11c 100644 --- a/src/client/topic/impl/read_session_event.cpp +++ b/src/client/topic/impl/read_session_event.cpp @@ -191,7 +191,7 @@ bool TMessage::HasException() const { } void TMessage::Commit() { - static_cast*>(PartitionSession.Get()) + static_cast(PartitionSession.Get()) ->Commit(Information.Offset, Information.Offset + 1); } @@ -225,7 +225,7 @@ uint64_t TCompressedMessage::GetUncompressedSize() const { } void TCompressedMessage::Commit() { - static_cast*>(PartitionSession.Get()) + static_cast(PartitionSession.Get()) ->Commit(Information.Offset, Information.Offset + 1); } @@ -264,7 +264,7 @@ void TDataReceivedEvent::Commit() { } for (auto [from, to] : OffsetRanges) { - static_cast*>(PartitionSession.Get())->Commit(from, to); + static_cast(PartitionSession.Get())->Commit(from, to); } } @@ -317,7 +317,7 @@ TStartPartitionSessionEvent::TStartPartitionSessionEvent(TPartitionSession::TPtr void TStartPartitionSessionEvent::Confirm(std::optional readOffset, std::optional commitOffset) { if (PartitionSession) { - static_cast*>(PartitionSession.Get()) + static_cast(PartitionSession.Get()) ->ConfirmCreate(readOffset, commitOffset); } } @@ -342,7 +342,7 @@ TStopPartitionSessionEvent::TStopPartitionSessionEvent(TPartitionSession::TPtr p void TStopPartitionSessionEvent::Confirm() { if (PartitionSession) { - static_cast*>(PartitionSession.Get())->ConfirmDestroy(); + static_cast(PartitionSession.Get())->ConfirmDestroy(); } } @@ -366,7 +366,7 @@ TEndPartitionSessionEvent::TEndPartitionSessionEvent(TPartitionSession::TPtr par void TEndPartitionSessionEvent::Confirm() { if (PartitionSession) { - static_cast*>(PartitionSession.Get())->ConfirmEnd(GetChildPartitionIds()); + static_cast(PartitionSession.Get())->ConfirmEnd(GetChildPartitionIds()); } } @@ -454,4 +454,4 @@ std::string DebugString(const TReadSessionEvent::TEvent& event) { return std::visit([](const auto& ev) { return ev.DebugString(); }, event); } -} // namespace NYdb::NPersQueue +} // namespace NYdb::NTopic diff --git a/src/client/topic/impl/read_session_impl.h b/src/client/topic/impl/read_session_impl.h index a05ff699300..ea06593aecc 100644 --- a/src/client/topic/impl/read_session_impl.h +++ b/src/client/topic/impl/read_session_impl.h @@ -70,10 +70,17 @@ using TASessionClosedEvent = std::conditional_t; +struct TMigrationPartitionStream: public NYdb::NPersQueue::TPartitionStream { + virtual void Commit(uint64_t startOffset, uint64_t endOffset) = 0; + virtual void ConfirmCreate(std::optional readOffset, std::optional commitOffset) = 0; + virtual void ConfirmDestroy() = 0; + virtual void ConfirmEnd(std::span childIds) = 0; +}; + template using TAPartitionStream = std::conditional_t; + TMigrationPartitionStream, + NYdb::NTopic::TPartitionSessionControl>; template using TAReadSessionEvent = std::conditional_t { TAPartitionStream::LastDirectReadId = id; } - void Commit(ui64 startOffset, ui64 endOffset) /*override*/; + void Commit(uint64_t startOffset, uint64_t endOffset) override; void RequestStatus() override; - void ConfirmCreate(std::optional readOffset, std::optional commitOffset); - void ConfirmDestroy(); - void ConfirmEnd(const std::vector& childIds); + void ConfirmCreate(std::optional readOffset, std::optional commitOffset) override; + void ConfirmDestroy() override; + void ConfirmEnd(std::span childIds) override; void StopReading() /*override*/; void ResumeReading() /*override*/; @@ -1163,7 +1170,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext* partitionStream, std::optional readOffset, std::optional commitOffset); void ConfirmPartitionStreamDestroy(TPartitionStreamImpl* partitionStream); - void ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, const std::vector& childIds); + void ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, std::span childIds); void RequestPartitionStreamStatus(const TPartitionStreamImpl* partitionStream); void Commit(const TPartitionStreamImpl* partitionStream, ui64 startOffset, ui64 endOffset); diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 99133975632..27278018435 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -48,7 +48,7 @@ TLog TPartitionStreamImpl::GetLog() const { } template -void TPartitionStreamImpl::Commit(ui64 startOffset, ui64 endOffset) { +void TPartitionStreamImpl::Commit(uint64_t startOffset, uint64_t endOffset) { std::vector> toCommit; if (auto sessionShared = CbContext->LockShared()) { Y_ABORT_UNLESS(endOffset > startOffset); @@ -78,7 +78,7 @@ void TPartitionStreamImpl::RequestStatus() { } template -void TPartitionStreamImpl::ConfirmCreate(std::optional readOffset, std::optional commitOffset) { +void TPartitionStreamImpl::ConfirmCreate(std::optional readOffset, std::optional commitOffset) { if (auto sessionShared = CbContext->LockShared()) { if (commitOffset.has_value()) { SetFirstNotReadOffset(commitOffset.value()); @@ -95,7 +95,7 @@ void TPartitionStreamImpl::ConfirmDestroy() { } template -void TPartitionStreamImpl::ConfirmEnd(const std::vector& childIds) { +void TPartitionStreamImpl::ConfirmEnd(std::span childIds) { if (auto sessionShared = CbContext->LockShared()) { sessionShared->ConfirmPartitionStreamEnd(this, childIds); } @@ -244,10 +244,6 @@ template TSingleClusterReadSessionImpl::~TSingleClusterReadSessionImpl() { std::lock_guard guard(Lock); - for (auto&& [_, partitionStream] : PartitionStreams) { - partitionStream->ClearQueue(); - } - for (auto& e : DecompressionQueue) { e.OnDestroyReadSession(); } @@ -516,9 +512,7 @@ inline void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions template<> inline bool TSingleClusterReadSessionImpl::IsDirectRead() { - // TODO(qyryq) Replace this return with the next one when direct read is ready for production. - return ExperimentalDirectRead; - // return Settings.DirectRead_; + return ExperimentalDirectRead && Settings.DirectRead_; } template<> @@ -531,7 +525,10 @@ inline void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions::OnReadDoneImpl( if (partitionStreamIt == PartitionStreams.end()) { return; } + bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, TReadSessionEvent::TPartitionSessionStatusEvent( partitionStreamIt->second, msg.committed_offset(), @@ -2103,8 +2101,6 @@ void TSingleClusterReadSessionImpl::UnregisterPartition(ui template std::vector TSingleClusterReadSessionImpl::GetParentPartitionSessions(ui32 partitionId, ui64 partitionSessionId) { - std::lock_guard guard(HierarchyDataLock); - auto it = HierarchyData.find(partitionId); if (it == HierarchyData.end()) { return {}; @@ -2131,6 +2127,7 @@ std::vector TSingleClusterReadSessionImpl::GetParent template bool TSingleClusterReadSessionImpl::AllParentSessionsHasBeenRead(ui32 partitionId, ui64 partitionSessionId) { + std::lock_guard guard(HierarchyDataLock); for (auto id : GetParentPartitionSessions(partitionId, partitionSessionId)) { if (!ReadingFinishedData.contains(id)) { return false; @@ -2140,9 +2137,12 @@ bool TSingleClusterReadSessionImpl::AllParentSessionsHasBe return true; } -template -void TSingleClusterReadSessionImpl::ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, const std::vector& childIds) { - ReadingFinishedData.insert(partitionStream->GetPartitionSessionId()); +template <> +inline void TSingleClusterReadSessionImpl::ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, std::span childIds) { + { + std::lock_guard guard(HierarchyDataLock); + ReadingFinishedData.insert(partitionStream->GetPartitionSessionId()); + } for (auto& [_, s] : PartitionStreams) { for (auto partitionId : childIds) { if (s->GetPartitionId() == partitionId) { @@ -2153,6 +2153,12 @@ void TSingleClusterReadSessionImpl::ConfirmPartitionStream } } +template <> +inline void TSingleClusterReadSessionImpl::ConfirmPartitionStreamEnd(TPartitionStreamImpl* partitionStream, std::span childIds) { + Y_UNUSED(partitionStream, childIds); + Y_ABORT("Not implemented"); +} + template void TSingleClusterReadSessionImpl::CollectOffsets(TTransactionBase& tx, const std::vector& events, diff --git a/src/client/topic/ut/local_partition_ut.cpp b/src/client/topic/ut/local_partition_ut.cpp index 19809f111c6..b0f94be2c8b 100644 --- a/src/client/topic/ut/local_partition_ut.cpp +++ b/src/client/topic/ut/local_partition_ut.cpp @@ -35,7 +35,6 @@ namespace NYdb::inline V3::NTopic::NTests { TTopicSdkTestSetup CreateSetupForSplitMerge(const std::string& testCaseName) { NKikimrConfig::TFeatureFlags ff; ff.SetEnableTopicSplitMerge(true); - ff.SetEnablePQConfigTransactionsAtSchemeShard(true); auto settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetFeatureFlags(ff); auto setup = TTopicSdkTestSetup(testCaseName, settings, false); diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 448a56bbfce..e1974e1074c 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -182,7 +182,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_43_Query, TFixtureQuery) Y_UNIT_TEST_F(ReadRuleGeneration, TFixtureNoClient) { // There was a server - NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false}); + NotifySchemeShard({}); // Users have created their own topic on it CreateTopic(TEST_TOPIC); @@ -200,7 +200,7 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixtureNoClient) CloseTopicReadSession(TEST_TOPIC, "consumer-1"); // And then the Logbroker team turned on the feature flag - NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true}); + NotifySchemeShard({}); // Users continued to write to the topic WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4"); diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 092a2fde644..4272e5defa0 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -81,7 +81,7 @@ void TTopicSdkTestSetup::Write(const std::string& topic, const std::string& mess TTopicSdkTestSetup::TReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function handler, - std::optional partition, const TDuration timeout) { + std::optional partition, const TDuration timeout, bool autoPartitioningSupport) { TTopicClient client(MakeDriver()); auto topicSettings = TTopicReadSettings(topic); @@ -90,7 +90,7 @@ TTopicSdkTestSetup::TReadResult TTopicSdkTestSetup::Read(const std::string& topi } auto settings = TReadSessionSettings() - .AutoPartitioningSupport(true) + .AutoPartitioningSupport(autoPartitioningSupport) .AppendTopics(topicSettings) .ConsumerName(consumer); diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index bef129a69a8..e3fe5d48a0b 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -46,7 +46,8 @@ class TTopicSdkTestSetup : public ITopicTestSetup { TReadResult Read(const std::string& topic, const std::string& consumer, std::function handler, std::optional partition = std::nullopt, - const TDuration timeout = TDuration::Seconds(5)); + const TDuration timeout = TDuration::Seconds(5), + bool autoPartitioningSupport = true); TStatus Commit(const std::string& path, const std::string& consumerName, std::size_t partitionId, std::size_t offset, diff --git a/src/client/topic/ut/ut_utils/txusage_fixture.cpp b/src/client/topic/ut/ut_utils/txusage_fixture.cpp index 9ff1610dae8..3b33e5f5064 100644 --- a/src/client/topic/ut/ut_utils/txusage_fixture.cpp +++ b/src/client/topic/ut/ut_utils/txusage_fixture.cpp @@ -38,7 +38,6 @@ void TFixture::SetUp(NUnitTest::TTestContext&) NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetEnableTopicServiceTx(true); settings.SetEnableTopicSplitMerge(true); - settings.SetEnablePQConfigTransactionsAtSchemeShard(true); settings.SetEnableOltpSink(GetEnableOltpSink()); settings.SetEnableOlapSink(GetEnableOlapSink()); settings.SetEnableHtapTx(GetEnableHtapTx()); @@ -61,9 +60,10 @@ void TFixture::SetUp(NUnitTest::TTestContext&) void TFixture::NotifySchemeShard(const TFeatureFlags& flags) { + Y_UNUSED(flags); + auto request = std::make_unique(); *request->Record.MutableConfig() = *Setup->GetServer().ServerSettings.AppConfig; - request->Record.MutableConfig()->MutableFeatureFlags()->SetEnablePQConfigTransactionsAtSchemeShard(flags.EnablePQConfigTransactionsAtSchemeShard); auto& runtime = Setup->GetRuntime(); auto actorId = runtime.AllocateEdgeActor(); diff --git a/src/client/topic/ut/ut_utils/txusage_fixture.h b/src/client/topic/ut/ut_utils/txusage_fixture.h index 28354a4db57..5ee506491b4 100644 --- a/src/client/topic/ut/ut_utils/txusage_fixture.h +++ b/src/client/topic/ut/ut_utils/txusage_fixture.h @@ -37,7 +37,6 @@ class TFixture : public NUnitTest::TBaseFixture { }; struct TFeatureFlags { - bool EnablePQConfigTransactionsAtSchemeShard = true; }; class ISession { diff --git a/src/library/grpc/client/grpc_client_low.h b/src/library/grpc/client/grpc_client_low.h index 37a09993368..ac865d29b00 100644 --- a/src/library/grpc/client/grpc_client_low.h +++ b/src/library/grpc/client/grpc_client_low.h @@ -1274,7 +1274,7 @@ class TServiceConnection { } /* - * Start bidirectional streamming + * Start bidirectional streaming */ template void DoStreamRequest(TStreamConnectedCallback callback, @@ -1412,7 +1412,7 @@ class TGRpcClientLow std::mutex JoinMutex_; }; -} +} // namespace NYdbGrpc template <> class grpc::TimePoint { diff --git a/tests/integration/sessions_pool/main.cpp b/tests/integration/sessions_pool/main.cpp index fbc206201ce..cc350a11310 100644 --- a/tests/integration/sessions_pool/main.cpp +++ b/tests/integration/sessions_pool/main.cpp @@ -83,7 +83,6 @@ void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) { if (requestedSessions > client.GetActiveSessionsLimit()) { ASSERT_EQ(client.GetActiveSessionCount(), client.GetActiveSessionsLimit()); } - ASSERT_FALSE(sessionFutures.at(sessionId).HasValue()); break; } case EAction::ExtractValue: { @@ -266,7 +265,6 @@ TEST_P(YdbSdkSessionsPool, StressTestSync) { std::this_thread::sleep_for(std::chrono::milliseconds(10000)); ASSERT_EQ(Client->GetActiveSessionCount(), 0); - ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit); } std::uint32_t RunStressTestAsync(std::uint32_t n, std::uint32_t nThreads, NYdb::NTable::TTableClient& client) { @@ -305,7 +303,6 @@ TEST_P(YdbSdkSessionsPool, StressTestAsync) { std::this_thread::sleep_for(std::chrono::milliseconds(10000)); ASSERT_EQ(Client->GetActiveSessionCount(), 0); - ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit); } void TestPeriodicTask(std::uint32_t activeSessionsLimit, NYdb::NTable::TTableClient& client) { diff --git a/tests/integration/topic/direct_read_it.cpp b/tests/integration/topic/direct_read_it.cpp index 646487c36c4..af60431c792 100644 --- a/tests/integration/topic/direct_read_it.cpp +++ b/tests/integration/topic/direct_read_it.cpp @@ -724,7 +724,7 @@ class TDirectReadSessionImplTestSetup { TDirectReadSessionImplTestSetup::TDirectReadSessionImplTestSetup() { ReadSessionSettings - // .DirectRead(true) + .DirectRead(true) .AppendTopics({"TestTopic"}) .ConsumerName("TestConsumer") .RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10))) @@ -888,7 +888,7 @@ TEST_F(DirectReadWithClient, OneMessage) { auto settings = TReadSessionSettings() .ConsumerName(GetConsumerName()) .AppendTopics(GetTopicPath()) - // .DirectRead(true) + .DirectRead(true) ; auto reader = client.CreateReadSession(settings); @@ -973,7 +973,7 @@ TEST_F(DirectReadWithClient, ManyMessages) { .ConsumerName(GetConsumerName()) .AppendTopics(GetTopicPath()) .MaxMemoryUsageBytes(1_MB) - // .DirectRead(GetEnv("DIRECT", "0") == "1") + .DirectRead(true) ; std::shared_ptr reader; @@ -1396,7 +1396,7 @@ TEST_F(DirectReadWithControlSession, EmptyDirectReadResponse) { // Sometimes the server might send a DirectReadResponse with no data, but with bytes_size value > 0. // It can happen, if the server tried to send DirectReadResponse, but did not succeed, // and in the meantime the messages that should had been sent have been rotated by retention period, - // and do not exist anymore. To keep ReadSizeBudget bookkeeping correct, the server still sends the an DirectReadResponse, + // and do not exist anymore. To keep ReadSizeBudget bookkeeping correct, the server still sends DirectReadResponse, // and SDK should process it correctly: basically it should immediately send a ReadRequest(bytes_size=DirectReadResponse.bytes_size). auto const startPartitionSessionRequest = TStartPartitionSessionRequest{ @@ -2048,7 +2048,7 @@ TEST_F(DirectReadWithServer, Devslice) { auto settings = TReadSessionSettings() .AppendTopics(TTopicReadSettings("t1").AppendPartitionIds({0})) .ConsumerName("c1") - // .DirectRead(true) + .DirectRead(true) ; settings.EventHandlers_ diff --git a/tests/unit/client/CMakeLists.txt b/tests/unit/client/CMakeLists.txt index 8c3b142ee74..e697e7ed0e3 100644 --- a/tests/unit/client/CMakeLists.txt +++ b/tests/unit/client/CMakeLists.txt @@ -1,5 +1,14 @@ add_subdirectory(oauth2_token_exchange/helpers) +add_ydb_test(NAME client-connection_string_ut + SOURCES + connection_string/connection_string_ut.cpp + LINK_LIBRARIES + impl-internal-common + LABELS + unit +) + add_ydb_test(NAME client-ydb_coordination_ut SOURCES coordination/coordination_ut.cpp diff --git a/tests/unit/client/connection_string/connection_string_ut.cpp b/tests/unit/client/connection_string/connection_string_ut.cpp new file mode 100644 index 00000000000..597edb259a6 --- /dev/null +++ b/tests/unit/client/connection_string/connection_string_ut.cpp @@ -0,0 +1,100 @@ +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + +#include + +using namespace NYdb; + +TEST(ConnectionStringParser, ParseWithQueryParameter) { + auto connInfo = ParseConnectionString("grpc://localhost:2135/?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseWithQueryParameterNoSlash) { + auto connInfo = ParseConnectionString("grpc://localhost:2135?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseWithPathAsDatabase) { + auto connInfo = ParseConnectionString("grpc://localhost:2135/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseSecureConnection) { + auto connInfo = ParseConnectionString("grpcs://localhost:2135/?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, true); +} + +TEST(ConnectionStringParser, ParseSecureConnectionNoSlash) { + auto connInfo = ParseConnectionString("grpcs://localhost:2135?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, true); +} + +TEST(ConnectionStringParser, ParseSecureConnectionWithPath) { + auto connInfo = ParseConnectionString("grpcs://localhost:2135/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, true); +} + +TEST(ConnectionStringParser, ParseNoSchemeLocalhost) { + auto connInfo = ParseConnectionString("localhost:2135/?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseNoSchemeNonLocalhost) { + auto connInfo = ParseConnectionString("example.com:2135/?database=/local"); + EXPECT_EQ(connInfo.Endpoint, "example.com:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, true); +} + +TEST(ConnectionStringParser, ParseNoSchemeWithPath) { + auto connInfo = ParseConnectionString("localhost:2135/local"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, "/local"); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, ParseNoDatabaseInQuery) { + auto connInfo = ParseConnectionString("grpc://localhost:2135"); + EXPECT_EQ(connInfo.Endpoint, "localhost:2135"); + EXPECT_EQ(connInfo.Database, ""); + EXPECT_EQ(connInfo.EnableSsl, false); +} + +TEST(ConnectionStringParser, InvalidScheme) { + EXPECT_THROW( + ParseConnectionString("http://localhost:2135/?database=/local"), + TContractViolation + ); +} + +TEST(ConnectionStringParser, EmptyConnectionString) { + EXPECT_THROW( + ParseConnectionString(""), + TContractViolation + ); +} + +TEST(ConnectionStringParser, DatabaseInBothPathAndQuery) { + EXPECT_THROW( + ParseConnectionString("grpc://localhost:2135/local?database=/other"), + TContractViolation + ); +}