diff --git a/integration/indexes.py b/integration/indexes.py index c7ceb6f76..ec66f3ff4 100644 --- a/integration/indexes.py +++ b/integration/indexes.py @@ -158,8 +158,15 @@ def create(self, client: valkey.client): for f in self.fields: cmd += f.create(self.type) print(f"Creating Index: {cmd}") + client.execute_command("DEBUG LOG", f"Creating index {self.name}") client.execute_command(*cmd) + def drop(self, client: valkey.client): + cmd = ["FT.DROPINDEX", self.name] + print("Executing: ", cmd) + client.execute_command(*cmd) + client.execute_command("DEBUG LOG", f"Deleting index {self.name}") + def load_data(self, client: valkey.client, rows: int, start_index: int = 0): print("Loading data to ", client) for i in range(start_index, rows): @@ -194,3 +201,20 @@ def info(self, client: valkey.client) -> FTInfoParser: def backfill_complete(self, client: valkey.client) -> bool: res = self.info(client) return res.backfill_in_progress == 0 + + def query(self, client:valkey.client, query_string: str, *args) -> dict[bytes, dict[bytes, bytes]]: + assert self.type == KeyDataType.HASH, "JSON not supported yet" + query = ["ft.search", self.name, query_string] + list(args) + print("Execute Query Command: ", query) + result = client.execute_command(*query) + print("Result is ", result) + count = result[0] + dict_result = {} + for row in range(1, len(result)-1, 2): + key = result[row] + fields = result[row+1][0::2] + values = result[row+1][1::2] + print("Key", key, "Fields:", fields, " Values:", values) + dict_result[key] = {fields[i]:values[i] for i in range(len(fields))} + print("Final result", dict_result) + return dict_result diff --git a/integration/test_dbnum.py b/integration/test_dbnum.py new file mode 100644 index 000000000..fca775f12 --- /dev/null +++ b/integration/test_dbnum.py @@ -0,0 +1,95 @@ +from valkey_search_test_case import ValkeySearchClusterTestCaseDebugMode +from valkey.cluster import ValkeyCluster +from valkey.client import Valkey +from valkeytestframework.conftest import resource_port_tracker +from valkeytestframework.util import waiters +from valkey.exceptions import ResponseError, ConnectionError +import pytest, time, logging +from indexes import * + +class TestDBNum(ValkeySearchClusterTestCaseDebugMode): + def setup_connections(self): + self.client00 = self.get_primary(0).connect(); + self.client00.select(0) + self.client10 = self.get_primary(1).connect(); + self.client10.select(0) + self.client20 = self.get_primary(2).connect(); + self.client20.select(0) + self.client01 = self.get_primary(0).connect(); + self.client01.select(1) + self.client11 = self.get_primary(1).connect(); + self.client11.select(1) + self.client21 = self.get_primary(2).connect(); + self.client21.select(1) + self.clients = [[self.client00, self.client01], [self.client10, self.client11], [self.client20, self.client21]] + + def test_dbnum(self): + """ + Test for dbnumbers in cluster mode, a Valkey 9 feature. + """ + index0 = Index('index0', [Tag('t')]) + index1 = Index('index1', [Tag('t')]) + + self.setup_connections() + + def show(msg): + for i in range(3): + self.clients[i][0].execute_command("DEBUG LOG", f"{i}:{msg}") + self.clients[i][0].execute_command("FT._DEBUG SHOW_METADATA") + + def exec(dbnum, l): + for i in range(3): + l(self.clients[i][dbnum]) + + index1.create(self.client11) + show("After create index1") + index0.create(self.client10) + show("After create index0") + assert(self.client00.execute_command("FT._LIST") == [b'index0']) + assert(self.client10.execute_command("FT._LIST") == [b'index0']) + assert(self.client20.execute_command("FT._LIST") == [b'index0']) + assert(self.client01.execute_command("FT._LIST") == [b'index1']) + assert(self.client11.execute_command("FT._LIST") == [b'index1']) + assert(self.client21.execute_command("FT._LIST") == [b'index1']) + try: + self.client00.execute_command("debug restart") + except: + pass + self.setup_connections() + assert(self.client00.execute_command("FT._LIST") == [b'index0']) + assert(self.client10.execute_command("FT._LIST") == [b'index0']) + assert(self.client20.execute_command("FT._LIST") == [b'index0']) + assert(self.client01.execute_command("FT._LIST") == [b'index1']) + assert(self.client11.execute_command("FT._LIST") == [b'index1']) + assert(self.client21.execute_command("FT._LIST") == [b'index1']) + # + # Load some data and do some queries.... + # + #cluster0 = self.new_cluster_client() + #cluster0.select(0) + #cluster1 = self.new_cluster_client() + #cluster1.select(1) + self.client20.hset("0", mapping={"t":"tag0"}) + self.client21.hset("0", mapping={"t":"tag1"}) + answer0 = index0.query(self.client20,"@t:{tag*}") + assert answer0 == {b"0": {b"t":b"tag0"}} + self.client21.execute_command("DEBUG LOG", "Doing query 1") + answer1 = index1.query(self.client21,"@t:{tag*}") + assert answer1 == {b"0": {b"t":b"tag1"}} + + index0.drop(self.client00) + show("After drop index0") + assert(self.client00.execute_command("FT._LIST") == []) + assert(self.client10.execute_command("FT._LIST") == []) + assert(self.client20.execute_command("FT._LIST") == []) + assert(self.client01.execute_command("FT._LIST") == [b'index1']) + assert(self.client11.execute_command("FT._LIST") == [b'index1']) + assert(self.client21.execute_command("FT._LIST") == [b'index1']) + index1.drop(self.client01) + show("after drop index1") + assert(self.client00.execute_command("FT._LIST") == []) + assert(self.client10.execute_command("FT._LIST") == []) + assert(self.client20.execute_command("FT._LIST") == []) + assert(self.client01.execute_command("FT._LIST") == []) + assert(self.client11.execute_command("FT._LIST") == []) + assert(self.client21.execute_command("FT._LIST") == []) diff --git a/integration/test_versioning.py b/integration/test_versioning.py new file mode 100644 index 000000000..6c76a34ab --- /dev/null +++ b/integration/test_versioning.py @@ -0,0 +1,49 @@ +from valkey import ResponseError +from valkey.client import Valkey +from valkey_search_test_case import ValkeySearchTestCaseDebugMode, ValkeySearchClusterTestCaseDebugMode +from valkeytestframework.conftest import resource_port_tracker +from indexes import * +import logging, time +from typing import Any +from util import waiters +import pytest + +class TestVersioningCMD(ValkeySearchTestCaseDebugMode): + def test_versioningCMD(self): + """ + Test RDB Versioning logic + """ + client: Valkey = self.server.get_new_client() + client.execute_command("CONFIG SET search.info-developer-visible yes") + hnsw_index = Index("hnsw", [Vector("v", 3, type="HNSW", m=2, efc=1), Numeric("n")]) + + hnsw_index.create(client) + hnsw_index.load_data(client, 1000) + + client.execute_command("ft._debug controlled_variable set override_semantic_version", 10 << 16) + + client.execute_command("save") + with pytest.raises(ResponseError) as e: + client.execute_command("DEBUG RELOAD") + assert " Error trying to load the RDB dump" in str(e) + +class TestVersioningCME(ValkeySearchClusterTestCaseDebugMode): + def test_versioningCME(self): + """ + Test versioning logic of metadata on the wire. + """ + for c in self.get_all_primary_clients(): + c.execute_command("CONFIG SET search.info-developer-visible yes") + + client: Valkey = self.get_primary(0).get_new_client() + client.execute_command("ft._debug controlled_variable set override_semantic_version", 10 << 16) + + hnsw_index = Index("hnsw", [Vector("v", 3, type="HNSW", m=2, efc=1), Numeric("n")]) + + with pytest.raises(ResponseError) as e: + hnsw_index.create(client) + assert "Unable to contact all cluster members" in str(e) + + assert client.execute_command("ft._list") == [hnsw_index.name.encode()] + assert self.get_primary(1).get_new_client().execute_command("ft._list") == [] + assert self.get_primary(2).get_new_client().execute_command("ft._list") == [] diff --git a/integration/valkey_search_test_case.py b/integration/valkey_search_test_case.py index 99dc02391..e3e8c5ba7 100644 --- a/integration/valkey_search_test_case.py +++ b/integration/valkey_search_test_case.py @@ -434,6 +434,7 @@ def setup_test(self, request): def get_config_file_lines(self, testdir, port) -> List[str]: return [ "enable-debug-command yes", + "cluster-databases 16", f"loadmodule {os.getenv('JSON_MODULE_PATH')}", f"dir {testdir}", "cluster-enabled yes", diff --git a/scripts/common.rc b/scripts/common.rc index a89003f41..1230c45d5 100644 --- a/scripts/common.rc +++ b/scripts/common.rc @@ -166,7 +166,7 @@ function setup_valkey_server() { fi # Clone and build it - VALKEY_VERSION="${VALKEY_VERSION:=8.1.1}" + VALKEY_VERSION="${VALKEY_VERSION:=9.0.0}" export VALKEY_SERVER_HOME_DIR=$(get_third_party_build_dir)/valkey-server export VALKEY_SERVER_BUILD_DIR=${VALKEY_SERVER_HOME_DIR}/.build-release if [ ! -d ${VALKEY_SERVER_HOME_DIR} ]; then diff --git a/src/commands/ft_aggregate.cc b/src/commands/ft_aggregate.cc index 2daa8d190..5fee19ae3 100644 --- a/src/commands/ft_aggregate.cc +++ b/src/commands/ft_aggregate.cc @@ -114,7 +114,8 @@ absl::StatusOr> ParseCommand( index_schema_name)); RealIndexInterface index_interface(index_schema); auto params = std::make_unique( - options::GetDefaultTimeoutMs().GetValue(), &index_interface); + options::GetDefaultTimeoutMs().GetValue(), &index_interface, + ValkeyModule_GetSelectedDb(ctx)); DBG << "AggregateParameters created for index: " << index_schema_name << " @" << (void *)params.get() << "\n"; params->index_schema_name = std::move(index_schema_name); diff --git a/src/commands/ft_aggregate_parser.h b/src/commands/ft_aggregate_parser.h index 6e5962bae..12494d89a 100644 --- a/src/commands/ft_aggregate_parser.h +++ b/src/commands/ft_aggregate_parser.h @@ -117,8 +117,9 @@ struct AggregateParameters : public expr::Expression::CompileContext, parse_vars.ClearAtEndOfParse(); } - AggregateParameters(uint64_t timeout, IndexInterface* index_interface) - : query::SearchParameters(timeout, nullptr) { + AggregateParameters(uint64_t timeout, IndexInterface* index_interface, + uint32_t db_num) + : query::SearchParameters(timeout, nullptr, db_num) { parse_vars_.index_interface_ = index_interface; } diff --git a/src/commands/ft_debug.cc b/src/commands/ft_debug.cc index c4ad9a6a2..59e50420a 100644 --- a/src/commands/ft_debug.cc +++ b/src/commands/ft_debug.cc @@ -10,6 +10,7 @@ #include "module_config.h" #include "src/commands/commands.h" +#include "src/coordinator/metadata_manager.h" #include "vmsdk/src/command_parser.h" #include "vmsdk/src/debug.h" #include "vmsdk/src/info.h" @@ -144,6 +145,8 @@ absl::Status HelpCmd(ValkeyModuleCtx *ctx, vmsdk::ArgsIterator &itr) { "list all controlled variables and their values"}, {"FT._DEBUG PAUSEPOINT [ SET | RESET | TEST | LIST] ", "control pause points"}, + {"FT_DEBUG SHOW_METADATA", + "list internal metadata manager table namespace"}, }; ValkeyModule_ReplySetArrayLength(ctx, 2 * help_text.size()); for (auto &pair : help_text) { @@ -185,6 +188,9 @@ absl::Status FTDebugCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, return PausePointControlCmd(ctx, itr); } else if (keyword == "CONTROLLED_VARIABLE") { return ControlledCmd(ctx, itr); + } else if (keyword == "SHOW_METADATA") { + return valkey_search::coordinator::MetadataManager::Instance().ShowMetadata( + ctx, itr); } else if (keyword == "HELP") { return HelpCmd(ctx, itr); } else { diff --git a/src/commands/ft_search_parser.cc b/src/commands/ft_search_parser.cc index 613379d66..c8777a848 100644 --- a/src/commands/ft_search_parser.cc +++ b/src/commands/ft_search_parser.cc @@ -450,7 +450,8 @@ ParseVectorSearchParameters(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc, const SchemaManager &schema_manager) { vmsdk::ArgsIterator itr{argv, argc}; auto parameters = std::make_unique( - options::GetDefaultTimeoutMs().GetValue(), nullptr); + options::GetDefaultTimeoutMs().GetValue(), nullptr, + ValkeyModule_GetSelectedDb(ctx)); VMSDK_RETURN_IF_ERROR( vmsdk::ParseParamValue(itr, parameters->index_schema_name)); VMSDK_ASSIGN_OR_RETURN( diff --git a/src/coordinator/coordinator.proto b/src/coordinator/coordinator.proto index b4734d1ac..0ecfbaf3c 100644 --- a/src/coordinator/coordinator.proto +++ b/src/coordinator/coordinator.proto @@ -93,6 +93,7 @@ message SearchIndexPartitionRequest { bool no_content = 11; optional Predicate root_filter_predicate = 12; repeated ReturnParameter return_parameters = 13; + uint32 db_num = 14; } message NeighborEntry { @@ -172,4 +173,5 @@ message InfoIndexPartitionResponse { string error = 14; optional IndexFingerprintVersion index_fingerprint_version = 15; FanoutErrorType error_type = 16; + uint32 db_num = 17; } \ No newline at end of file diff --git a/src/coordinator/metadata_manager.cc b/src/coordinator/metadata_manager.cc index 6b6e64575..d3628e6c5 100644 --- a/src/coordinator/metadata_manager.cc +++ b/src/coordinator/metadata_manager.cc @@ -23,22 +23,44 @@ #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" +#include "command_parser.h" #include "google/protobuf/any.pb.h" +#include "google/protobuf/util/json_util.h" #include "grpcpp/support/status.h" #include "highwayhash/arch_specific.h" #include "highwayhash/highwayhash.h" +#include "module_config.h" #include "src/coordinator/client_pool.h" #include "src/coordinator/coordinator.pb.h" #include "src/coordinator/util.h" #include "src/metrics.h" #include "src/rdb_serialization.h" -#include "src/valkey_search_options.h" #include "vmsdk/src/debug.h" #include "vmsdk/src/log.h" #include "vmsdk/src/status/status_macros.h" #include "vmsdk/src/utils.h" #include "vmsdk/src/valkey_module_api/valkey_module.h" +/* + +The original metadata manager was designed to provide a 2-level hierarchy: +. This lead to the wire-format tied to: + + Map>. + +With Valkey 9, the introduction of DB num into CME creates a desire for a +three-level hierarchy: . This 3-level +internal namespace is mapped into an external two-level namespace to provide +backward and some degree of forward compatibility. + +The mapping is done by manipulating the object name. It is known that +pre-Valkey 9 object names cannot contain a valid hash-tag. This provides the +necessary information to distinguish between an object name and an encoded + pair. Objects with a db_num of 0 are stored the same +way as pre-Valkey 9 objects, providing backward compatibility. + +*/ + namespace valkey_search::coordinator { namespace { @@ -133,13 +155,13 @@ uint64_t MetadataManager::ComputeTopLevelFingerprint( } absl::Status MetadataManager::TriggerCallbacks( - absl::string_view type_name, absl::string_view id, + absl::string_view type_name, uint32_t db_num, absl::string_view id, const GlobalMetadataEntry &entry) { auto ®istered_types = registered_types_.Get(); auto it = registered_types.find(type_name); if (it != registered_types.end()) { return registered_types.at(type_name).update_callback( - id, entry.has_content() ? &entry.content() : nullptr); + db_num, id, entry.has_content() ? &entry.content() : nullptr); } VMSDK_LOG_EVERY_N_SEC(WARNING, detached_ctx_.get(), 10) << "No type registered for: " << type_name << ", skipping callback"; @@ -147,24 +169,30 @@ absl::Status MetadataManager::TriggerCallbacks( } absl::StatusOr MetadataManager::GetEntry( - absl::string_view type_name, absl::string_view id) { + absl::string_view type_name, uint32_t db_num, absl::string_view id) { + auto encoded_id = EncodeDbNum(db_num, id); auto &metadata = metadata_.Get(); if (!metadata.type_namespace_map().contains(type_name) || !metadata.type_namespace_map().at(type_name).entries().contains(id) || !metadata.type_namespace_map() .at(type_name) .entries() - .at(id) + .at(encoded_id) .has_content()) { return absl::NotFoundError( - absl::StrCat("Entry not found: ", type_name, " ", id)); + absl::StrCat("Entry not found: ", type_name, " ", db_num, " ", id)); } - return metadata.type_namespace_map().at(type_name).entries().at(id).content(); + return metadata.type_namespace_map() + .at(type_name) + .entries() + .at(encoded_id) + .content(); } absl::StatusOr MetadataManager::CreateEntry( - absl::string_view type_name, absl::string_view id, + absl::string_view type_name, uint32_t db_num, absl::string_view id, std::unique_ptr contents) { + auto encoded_id = EncodeDbNum(db_num, id); auto ®istered_types = registered_types_.Get(); auto rt_it = registered_types.find(type_name); if (rt_it == registered_types.end()) { @@ -175,7 +203,7 @@ absl::StatusOr MetadataManager::CreateEntry( auto &metadata = metadata_.Get(); auto it = metadata.type_namespace_map().find(type_name); if (it != metadata.type_namespace_map().end()) { - auto inner_it = it->second.entries().find(id); + auto inner_it = it->second.entries().find(encoded_id); if (inner_it != it->second.entries().end()) { version = inner_it->second.version() + 1; } @@ -183,21 +211,23 @@ absl::StatusOr MetadataManager::CreateEntry( VMSDK_ASSIGN_OR_RETURN( auto fingerprint, ComputeFingerprint(type_name, *contents, registered_types)); + VMSDK_ASSIGN_OR_RETURN(auto encoding_version, + rt_it->second.encoding_version_callback(*contents)); GlobalMetadataEntry new_entry; new_entry.set_version(version); new_entry.set_fingerprint(fingerprint); - new_entry.set_encoding_version(rt_it->second.encoding_version); + new_entry.set_encoding_version(encoding_version); new_entry.set_allocated_content(contents.release()); - auto callback_status = TriggerCallbacks(type_name, id, new_entry); + auto callback_status = TriggerCallbacks(type_name, db_num, id, new_entry); if (!callback_status.ok()) { return callback_status; } auto insert_result = metadata.mutable_type_namespace_map()->insert( {std::string(type_name), GlobalMetadataEntryMap()}); - (*insert_result.first->second.mutable_entries())[id] = new_entry; + (*insert_result.first->second.mutable_entries())[encoded_id] = new_entry; // NOLINTNEXTLINE metadata.mutable_version_header()->set_top_level_version( metadata.version_header().top_level_version() + 1); @@ -211,34 +241,36 @@ absl::StatusOr MetadataManager::CreateEntry( } absl::Status MetadataManager::DeleteEntry(absl::string_view type_name, + uint32_t db_num, absl::string_view id) { + auto encoded_id = EncodeDbNum(db_num, id); auto &metadata = metadata_.Get(); auto it = metadata.type_namespace_map().find(type_name); if (it == metadata.type_namespace_map().end()) { return absl::NotFoundError( absl::StrCat("Entry not found: ", type_name, " ", id)); } - auto inner_it = it->second.entries().find(id); + auto inner_it = it->second.entries().find(encoded_id); if (inner_it == it->second.entries().end()) { return absl::NotFoundError( - absl::StrCat("Entry not found: ", type_name, " ", id)); + absl::StrCat("Entry not found: ", type_name, " ", db_num, " ", id)); } if (!inner_it->second.has_content()) { return absl::NotFoundError( - absl::StrCat("Entry not found: ", type_name, " ", id)); + absl::StrCat("Entry not found: ", type_name, " ", db_num, " ", id)); } GlobalMetadataEntry new_entry; new_entry.set_version(inner_it->second.version() + 1); - // Note that fingerprint and encoding version are not set and will default to - // 0. + // Note that fingerprint and encoding version are not set and will default + // to 0. - auto callback_status = TriggerCallbacks(type_name, id, new_entry); + auto callback_status = TriggerCallbacks(type_name, db_num, id, new_entry); if (!callback_status.ok()) { return callback_status; } - (*(*metadata.mutable_type_namespace_map())[type_name].mutable_entries())[id] = - new_entry; + (*(*metadata.mutable_type_namespace_map())[type_name] + .mutable_entries())[encoded_id] = new_entry; metadata.mutable_version_header()->set_top_level_version( metadata.version_header().top_level_version() + 1); @@ -254,16 +286,39 @@ std::unique_ptr MetadataManager::GetGlobalMetadata() { return result; } -void MetadataManager::RegisterType(absl::string_view type_name, - uint32_t encoding_version, - FingerprintCallback fingerprint_callback, - MetadataUpdateCallback callback) { +absl::StatusOr +MetadataManager::GetFingerprintAndVersion(absl::string_view type_name, + uint32_t db_num, + absl::string_view id) { + IndexFingerprintVersion result; + auto encoded_id = EncodeDbNum(db_num, id); + auto &metadata = metadata_.Get(); + if (metadata.type_namespace_map().contains(type_name) && + metadata.type_namespace_map().at(type_name).entries().contains( + encoded_id)) { + const auto &entry = + metadata.type_namespace_map().at(type_name).entries().at(encoded_id); + IndexFingerprintVersion index_fingerprint_version; + index_fingerprint_version.set_fingerprint(entry.fingerprint()); + index_fingerprint_version.set_version(entry.version()); + return index_fingerprint_version; + } + return absl::NotFoundError( + absl::StrCat("Entry not found: ", type_name, " ", db_num, " ", id)); +} + +void MetadataManager::RegisterType( + absl::string_view type_name, uint32_t max_encoding_version, + FingerprintCallback fingerprint_callback, MetadataUpdateCallback callback, + EncodingVersionCallback encoding_version_callback) { auto insert_result = registered_types_.Get().insert(std::pair{ - type_name, RegisteredType{.encoding_version = encoding_version, - .fingerprint_callback = - std::move(fingerprint_callback), - .update_callback = std::move(callback)}}); + type_name, + RegisteredType{ + .max_encoding_version = max_encoding_version, + .encoding_version_callback = std::move(encoding_version_callback), + .fingerprint_callback = std::move(fingerprint_callback), + .update_callback = std::move(callback)}}); VMSDK_LOG_EVERY_N_SEC(WARNING, detached_ctx_.get(), 10) << "Type already registered for: " << type_name; DCHECK(insert_result.second); @@ -395,7 +450,7 @@ void MetadataManager::HandleBroadcastedMetadata( << "Got GlobalMetadata from " << address << ": " << schema->DebugString(); auto &metadata_manager = MetadataManager::Instance(); - auto status = metadata_manager.ReconcileMetadata(*schema); + auto status = metadata_manager.ReconcileMetadata(*schema, address); if (!status.ok()) { VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1) << "Failed to reconcile schemas: " << status.message(); @@ -409,6 +464,7 @@ void MetadataManager::HandleBroadcastedMetadata( } absl::Status MetadataManager::ReconcileMetadata(const GlobalMetadata &proposed, + absl::string_view source, bool trigger_callbacks, bool prefer_incoming) { // We synthesize the new version in a new variable, so that if we need to @@ -423,7 +479,21 @@ absl::Status MetadataManager::ReconcileMetadata(const GlobalMetadata &proposed, auto insert_result = result.mutable_type_namespace_map()->insert( {type_name, GlobalMetadataEntryMap()}); auto &existing_inner_map = insert_result.first->second; + auto ®istered_types = registered_types_.Get(); + auto rt_it = registered_types.find(type_name); for (const auto &[id, proposed_entry] : proposed_inner_map.entries()) { + if (rt_it != registered_types.end() && + rt_it->second.max_encoding_version < + proposed_entry.encoding_version()) { + VMSDK_LOG_EVERY_N_SEC(WARNING, detached_ctx_.get(), 10) + << "Invalid/Unknown encoding version (" + << proposed_entry.encoding_version() << ") for: " << type_name + << ", for entry " << id << " from " << source + << ", Cluster converge is prohibited."; + return absl::InvalidArgumentError(absl::StrCat( + "Invalid encoding version (", proposed_entry.encoding_version(), + ") for: ", type_name, ", entry ", id, " from ", source)); + } auto it = existing_inner_map.entries().find(id); if (it != existing_inner_map.entries().end() && !prefer_incoming) { auto &existing_entry = it->second; @@ -451,11 +521,10 @@ absl::Status MetadataManager::ReconcileMetadata(const GlobalMetadata &proposed, auto mutable_entries = existing_inner_map.mutable_entries(); (*mutable_entries)[id] = proposed_entry; - auto ®istered_types = registered_types_.Get(); - auto rt_it = registered_types.find(type_name); if (rt_it != registered_types.end() && proposed_entry.has_content() && - proposed_entry.encoding_version() < rt_it->second.encoding_version) { - // If the encoding version is less than the current version, we need + proposed_entry.encoding_version() < + rt_it->second.max_encoding_version) { + // If the encoding version is less than the current max version, we need // to re-fingerprint the entry. New fields being added may result in // unstable fingerprinting. // @@ -465,17 +534,22 @@ absl::Status MetadataManager::ReconcileMetadata(const GlobalMetadata &proposed, auto fingerprint, ComputeFingerprint(type_name, proposed_entry.content(), registered_types)); + VMSDK_ASSIGN_OR_RETURN( + auto encoding_version, + rt_it->second.encoding_version_callback(proposed_entry.content())); (*mutable_entries)[id].set_fingerprint(fingerprint); - (*mutable_entries)[id].set_encoding_version( - rt_it->second.encoding_version); + (*mutable_entries)[id].set_encoding_version(encoding_version); } if (trigger_callbacks) { - auto result = TriggerCallbacks(type_name, id, proposed_entry); + auto decoded = DecodeDbNum(id); + auto result = TriggerCallbacks(type_name, decoded.db_num, decoded.id, + proposed_entry); if (!result.ok()) { VMSDK_LOG(WARNING, detached_ctx_.get()) << "Failed during reconciliation callback: %s" - << result.message().data(); + << result.message().data() << " for type " << type_name << ", id " + << id << " from " << source; return result; } } @@ -489,8 +563,8 @@ absl::Status MetadataManager::ReconcileMetadata(const GlobalMetadata &proposed, ComputeTopLevelFingerprint(result.type_namespace_map()); result.mutable_version_header()->set_top_level_fingerprint(new_fingerprint); - // The new version is the max of the old version and the proposed version. We - // also bump the version if the fingerprint changed, as this indicates a + // The new version is the max of the old version and the proposed version. + // We also bump the version if the fingerprint changed, as this indicates a // distinct version. auto old_version = metadata.version_header().top_level_version(); auto new_version = @@ -582,6 +656,7 @@ absl::Status MetadataManager::LoadMetadata( // could happen if a module triggers a load after we have already been // running. VMSDK_RETURN_IF_ERROR(ReconcileMetadata(section->global_metadata_contents(), + "RDB Load", /*trigger_callbacks=*/false, /*prefer_incoming=*/true)); } @@ -616,10 +691,11 @@ void MetadataManager::OnServerCronCallback( [[maybe_unused]] uint64_t subevent, [[maybe_unused]] void *data) { static bool timer_started = false; if (!timer_started) { - // The first server cron tick after the FT.CREATE is run needs to kick start - // the timer. This can't be done during normal server event subscription - // because timers cannot be safely created in background threads (the GIL - // does not protect event loop code which uses the timers). + // The first server cron tick after the FT.CREATE is run needs to kick + // start the timer. This can't be done during normal server event + // subscription because timers cannot be safely created in background + // threads (the GIL does not protect event loop code which uses the + // timers). timer_started = true; ValkeyModule_CreateTimer( ctx, @@ -638,7 +714,7 @@ void MetadataManager::OnLoadingEnded(ValkeyModuleCtx *ctx) { // Clear the local metadata, then use ReconcileMetadata to recompute // fingerprints in case encoding has changed. metadata_ = GlobalMetadata(); - auto status = ReconcileMetadata(staged_metadata_.Get(), + auto status = ReconcileMetadata(staged_metadata_.Get(), "RDB Load Staged", /*trigger_callbacks=*/false, /*prefer_incoming=*/true); if (!status.ok()) { @@ -703,4 +779,88 @@ void MetadataManager::RegisterForClusterMessages(ValkeyModuleCtx *ctx) { ctx, coordinator::kMetadataBroadcastClusterMessageReceiverId, MetadataManagerOnClusterMessageCallback); } + +absl::Status MetadataManager::ShowMetadata( + ValkeyModuleCtx *ctx, [[maybe_unused]] vmsdk::ArgsIterator &itr) const { + auto metadata = metadata_.Get().DebugString(); + VMSDK_LOG(WARNING, ctx) << "Metadata: " << metadata; + google::protobuf::util::JsonPrintOptions options; + options.always_print_fields_with_no_presence = true; + [[maybe_unused]] auto status = google::protobuf::util::MessageToJsonString( + metadata_.Get(), &metadata, options); + ValkeyModule_ReplyWithStringBuffer(ctx, metadata.data(), metadata.size()); + return absl::OkStatus(); +} + +vmsdk::SemanticVersion MetadataManager::ComputeRDBVersion() const { + vmsdk::SemanticVersion max_encoding_version = 0; + for (auto &[type_name, inner_map] : metadata_.Get().type_namespace_map()) { + auto it = registered_types_.Get().find(type_name); + if (it == registered_types_.Get().end()) { + continue; + } + for (auto &[id, entry] : inner_map.entries()) { + max_encoding_version = + std::max(vmsdk::SemanticVersion(entry.encoding_version()), + max_encoding_version); + } + } + return max_encoding_version; +} + +/* +An 8/1.0(Valkey 8, Search 1.0) encoded string won't have a hashtag anywhere and +is always for db_num == 0 An 9/1.1 encoded string will always have a +false-hashtag at the START AND may have a real hashtag after that. + +Decoded strings that lack a hashtag and are for db_num == 0, are encoded with +the 8/1.0 rules All other decoded strings are encoded according to the 9/1.1 +rules. + +A pseudo-hashtag is of the format: {dddd} +dddd is the database number, i.e., ascii digits 0-9 ONLY. +Characters after the database number up to the trailing right brace are +explicitly ignored -- but preserved -- allowing for potential future +forward/reverse compatibility. + +*/ +MetadataManager::DecodedDbNum MetadataManager::DecodeDbNum( + absl::string_view encoded) { + auto hash_tag = vmsdk::ParseHashTag(encoded); + if (hash_tag) { + std::string_view front_tag = *hash_tag; + CHECK(encoded.size() >= 3); // hash_tag means >= 3 chars + if (front_tag.data() == (encoded.data() + 1)) { + std::string db_num_str; + while (!front_tag.empty() && std::isdigit(front_tag.front())) { + db_num_str += front_tag.front(); + front_tag.remove_prefix(1); + } + if (!front_tag.empty()) { + VMSDK_LOG_EVERY_N_SEC(NOTICE, nullptr, 10) + << "Ignoring extended index name metadata"; + } + if (!db_num_str.empty()) { + // Found valid 9/1.1 encoding. + return {.db_num = static_cast(std::stoul(db_num_str)), + .id = std::string(encoded.substr(hash_tag->size() + 2))}; + } + } + VMSDK_LOG_EVERY_N(WARNING, nullptr, 10) + << "Found invalid encoded index name: " << encoded; + } + // Assume 8/1.0 encoding. + return {.db_num = 0, .id = std::string(encoded)}; +} + +std::string MetadataManager::EncodeDbNum(uint32_t db_num, + absl::string_view id) { + if (db_num == 0) { + // 8/1.0 encoding. + return std::string(id); + } + // 9/1.1 encoding. + return absl::StrCat("{", db_num, "}", id); +} + } // namespace valkey_search::coordinator diff --git a/src/coordinator/metadata_manager.h b/src/coordinator/metadata_manager.h index 7585ff2c3..903d0199c 100644 --- a/src/coordinator/metadata_manager.h +++ b/src/coordinator/metadata_manager.h @@ -19,6 +19,7 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" +#include "command_parser.h" #include "google/protobuf/any.pb.h" #include "highwayhash/hh_types.h" #include "src/coordinator/client_pool.h" @@ -30,10 +31,13 @@ namespace valkey_search::coordinator { -using FingerprintCallback = absl::AnyInvocable( +using FingerprintCallback = + absl::AnyInvocable( + const google::protobuf::Any &metadata)>; +using EncodingVersionCallback = absl::AnyInvocable( const google::protobuf::Any &metadata)>; using MetadataUpdateCallback = absl::AnyInvocable; + uint32_t db_num, absl::string_view, const google::protobuf::Any *metadata)>; using AuxSaveCallback = void (*)(ValkeyModuleIO *rdb, int when); using AuxLoadCallback = int (*)(ValkeyModuleIO *rdb, int encver, int when); static constexpr int kEncodingVersion = 0; @@ -43,7 +47,6 @@ static constexpr uint8_t kMetadataBroadcastClusterMessageReceiverId = 0x00; static constexpr highwayhash::HHKey kHashKey{ 0x9736bad976c904ea, 0x08f963a1a52eece9, 0x1ea3f3f773f3b510, 0x9290a6b4e4db3d51}; - class MetadataManager { public: MetadataManager(ValkeyModuleCtx *ctx, ClientPool &client_pool) @@ -64,28 +67,34 @@ class MetadataManager { .section_count = [this](ValkeyModuleCtx *ctx, int when) -> int { return GetSectionsCount(); }, - .minimum_semantic_version = [](ValkeyModuleCtx *ctx, - int when) -> int { - return 0x010000; // Always use 1.0.0 for now - }}); + .minimum_semantic_version = [](ValkeyModuleCtx *ctx, int when) + -> int { return Instance().ComputeRDBVersion(); }}); } static uint64_t ComputeTopLevelFingerprint( const google::protobuf::Map &type_namespace_map); - absl::Status TriggerCallbacks(absl::string_view type_name, + vmsdk::SemanticVersion ComputeRDBVersion() const; + + absl::Status TriggerCallbacks(absl::string_view type_name, uint32_t db_num, absl::string_view id, const GlobalMetadataEntry &entry); absl::StatusOr GetEntry(absl::string_view type_name, + uint32_t db_num, absl::string_view id); absl::StatusOr CreateEntry( - absl::string_view type_name, absl::string_view id, + absl::string_view type_name, uint32_t db_num, absl::string_view id, std::unique_ptr contents); - absl::Status DeleteEntry(absl::string_view type_name, absl::string_view id); + absl::Status DeleteEntry(absl::string_view type_name, uint32_t db_num, + absl::string_view id); + + absl::StatusOr GetFingerprintAndVersion( + absl::string_view type_name, uint32_t db_num, + absl::string_view index_name); std::unique_ptr GetGlobalMetadata(); @@ -94,15 +103,23 @@ class MetadataManager { // accept updates to that type both locally and over the cluster bus. // // * type_name should be a unique string identifying the type. - // * encoding_version should be bumped any time the underlying metadata format - // is changed. // * fingerprint_callback should be a function for computing the fingerprint // of the metadata for the given encoding version. This function can only // change when the encoding version is bumped. // * update_callback will be called whenever the metadata is updated. - void RegisterType(absl::string_view type_name, uint32_t encoding_version, + // + // Each entry has an encoding version associated with it. When an entry + // is created or updated locally, the encoding version is computed using the + // encoding_version parameter passed to RegisterType. When an entry is updated + // from the cluster bus, the encoding version is read from the metadata entry + // itself. If the encoding version in the new metadata entry is greater than + // the max_encoding_version registered for the type, the update will be + // rejected. + // + void RegisterType(absl::string_view type_name, uint32_t max_encoding_version, FingerprintCallback fingerprint_callback, - MetadataUpdateCallback callback); + MetadataUpdateCallback callback, + EncodingVersionCallback encoding_version_callback); void BroadcastMetadata(ValkeyModuleCtx *ctx); @@ -122,6 +139,7 @@ class MetadataManager { std::unique_ptr header); absl::Status ReconcileMetadata(const GlobalMetadata &proposed, + absl::string_view source, bool trigger_callbacks = true, bool prefer_incoming = false); @@ -146,9 +164,19 @@ class MetadataManager { static void InitInstance(std::unique_ptr instance); static MetadataManager &Instance(); + absl::Status ShowMetadata(ValkeyModuleCtx *ctx, + vmsdk::ArgsIterator &iter) const; + static std::string EncodeDbNum(uint32_t db_num, absl::string_view id); + struct DecodedDbNum { + uint32_t db_num; + std::string id; + }; + static DecodedDbNum DecodeDbNum(absl::string_view encoded_id); + private: struct RegisteredType { - uint32_t encoding_version; + uint32_t max_encoding_version; + EncodingVersionCallback encoding_version_callback; FingerprintCallback fingerprint_callback; MetadataUpdateCallback update_callback; }; diff --git a/src/coordinator/search_converter.cc b/src/coordinator/search_converter.cc index afe5c63ce..f19e52969 100644 --- a/src/coordinator/search_converter.cc +++ b/src/coordinator/search_converter.cc @@ -119,13 +119,13 @@ absl::StatusOr> GRPCPredicateToPredicate( absl::StatusOr> GRPCSearchRequestToParameters(const SearchIndexPartitionRequest& request, grpc::CallbackServerContext* context) { - auto parameters = - std::make_unique(request.timeout_ms(), context); + auto parameters = std::make_unique( + request.timeout_ms(), context, request.db_num()); parameters->index_schema_name = request.index_schema_name(); parameters->attribute_alias = request.attribute_alias(); - VMSDK_ASSIGN_OR_RETURN( - parameters->index_schema, - SchemaManager::Instance().GetIndexSchema(0, request.index_schema_name())); + VMSDK_ASSIGN_OR_RETURN(parameters->index_schema, + SchemaManager::Instance().GetIndexSchema( + request.db_num(), request.index_schema_name())); if (request.has_score_as()) { parameters->score_as = vmsdk::MakeUniqueValkeyString(request.score_as()); } else { @@ -227,6 +227,7 @@ std::unique_ptr ParametersToGRPCSearchRequest( const query::SearchParameters& parameters) { auto request = std::make_unique(); request->set_index_schema_name(parameters.index_schema_name); + request->set_db_num(parameters.db_num_); request->set_attribute_alias(parameters.attribute_alias); request->set_score_as(vmsdk::ToStringView(parameters.score_as.get())); request->set_query(parameters.query); diff --git a/src/coordinator/server.cc b/src/coordinator/server.cc index f49dc21a6..a663eca01 100644 --- a/src/coordinator/server.cc +++ b/src/coordinator/server.cc @@ -189,6 +189,7 @@ grpc::ServerUnaryReactor* Service::SearchIndexPartition( std::pair Service::GenerateInfoResponse( const coordinator::InfoIndexPartitionRequest& request) { + vmsdk::VerifyMainThread(); uint32_t db_num = request.db_num(); std::string index_name = request.index_name(); coordinator::InfoIndexPartitionResponse response; @@ -221,20 +222,18 @@ Service::GenerateInfoResponse( std::optional index_fingerprint_version; - auto global_metadata = - coordinator::MetadataManager::Instance().GetGlobalMetadata(); - CHECK(global_metadata->type_namespace_map().contains( - kSchemaManagerMetadataTypeName)); - const auto& entry_map = - global_metadata->type_namespace_map().at(kSchemaManagerMetadataTypeName); - CHECK(entry_map.entries().contains(index_name)); - const auto& entry = entry_map.entries().at(index_name); + auto entry = + coordinator::MetadataManager::Instance().GetFingerprintAndVersion( + kSchemaManagerMetadataTypeName, db_num, index_name); + CHECK(entry.ok()); + index_fingerprint_version.emplace(); - index_fingerprint_version->set_fingerprint(entry.fingerprint()); - index_fingerprint_version->set_version(entry.version()); + index_fingerprint_version->set_fingerprint(entry->fingerprint()); + index_fingerprint_version->set_version(entry->version()); response.set_exists(true); response.set_index_name(index_name); + response.set_db_num(db_num); response.set_num_docs(data.num_docs); response.set_num_records(data.num_records); response.set_hash_indexing_failures(data.hash_indexing_failures); diff --git a/src/query/fanout_operation_base.h b/src/query/fanout_operation_base.h index 4592ed6e3..aa5e80797 100644 --- a/src/query/fanout_operation_base.h +++ b/src/query/fanout_operation_base.h @@ -270,6 +270,7 @@ class FanoutOperationBase { ++Metrics::GetStats().info_fanout_retry_cnt; ResetBaseForRetry(); ResetForRetry(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); StartFanoutRound(); } else { OnCompletion(); diff --git a/src/query/search.h b/src/query/search.h index 9679a3701..6a77810f9 100644 --- a/src/query/search.h +++ b/src/query/search.h @@ -74,6 +74,7 @@ struct SearchParameters { vmsdk::UniqueValkeyString score_as; std::string query; uint32_t dialect{kDialect}; + uint32_t db_num_; bool local_only{false}; int k{0}; std::optional ef; @@ -110,9 +111,11 @@ struct SearchParameters { } parse_vars; bool IsNonVectorQuery() const { return attribute_alias.empty(); } bool IsVectorQuery() const { return !IsNonVectorQuery(); } - SearchParameters(uint64_t timeout, grpc::CallbackServerContext* context) + SearchParameters(uint64_t timeout, grpc::CallbackServerContext* context, + uint32_t db_num) : timeout_ms(timeout), - cancellation_token(cancel::Make(timeout, context)) {} + cancellation_token(cancel::Make(timeout, context)), + db_num_(db_num) {} }; // Callback to be called when the search is done. diff --git a/src/rdb_serialization.cc b/src/rdb_serialization.cc index b35737b6d..bbc991832 100644 --- a/src/rdb_serialization.cc +++ b/src/rdb_serialization.cc @@ -146,13 +146,17 @@ absl::Status PerformRDBLoad(ValkeyModuleCtx *ctx, SafeRDB *rdb, int encver) { encver, kCurrentEncVer)); } VMSDK_ASSIGN_OR_RETURN( - auto min_semantic_version, rdb->LoadUnsigned(), + auto min_semantic_version_int, rdb->LoadUnsigned(), _ << "IO error reading semantic version from RDB. Failing RDB load."); + auto min_semantic_version = vmsdk::SemanticVersion(min_semantic_version_int); + VMSDK_LOG(DEBUG, ctx) << absl::StrFormat( + "RDB contains minimum semantic version %s", + min_semantic_version.ToString()); if (min_semantic_version > kCurrentSemanticVersion) { return absl::InternalError(absl::StrCat( "ValkeySearch RDB contents require minimum version ", - HumanReadableSemanticVersion(min_semantic_version), " and we are on ", - HumanReadableSemanticVersion(kCurrentSemanticVersion), + min_semantic_version.ToString(), " and we are on ", + kCurrentSemanticVersion.ToString(), ". If you are downgrading, ensure all feature usage on the new " "version of ValkeySearch is supported by this version and retry.")); } @@ -230,10 +234,17 @@ absl::Status PerformRDBSave(ValkeyModuleCtx *ctx, SafeRDB *rdb, int when) { registeredRDBSectionCallback.second.section_count(ctx, when); if (section_counts[rdb_section_type] > 0) { + auto this_semantic_version = + registeredRDBSectionCallback.second.minimum_semantic_version(ctx, + when); + VMSDK_LOG(DEBUG, ctx) + << "RDB section type " + << data_model::RDBSectionType_Name(rdb_section_type) + << " requires minimum semantic version " + << vmsdk::SemanticVersion(this_semantic_version).ToString(); + min_semantic_version = - std::max(min_semantic_version, - registeredRDBSectionCallback.second.minimum_semantic_version( - ctx, when)); + std::max(min_semantic_version, this_semantic_version); } rdb_section_count += section_counts[rdb_section_type]; } diff --git a/src/rdb_serialization.h b/src/rdb_serialization.h index 624436cd1..e719169a7 100644 --- a/src/rdb_serialization.h +++ b/src/rdb_serialization.h @@ -14,7 +14,6 @@ #include "absl/log/check.h" #include "absl/status/status.h" #include "absl/status/statusor.h" -#include "absl/strings/str_format.h" #include "absl/strings/string_view.h" #include "src/rdb_section.pb.h" #include "third_party/hnswlib/iostream.h" @@ -25,8 +24,38 @@ namespace valkey_search { constexpr uint32_t kCurrentEncVer = 1; -// Format is 0xMMmmpp (M=major, m=minor, p=patch) -constexpr uint64_t kCurrentSemanticVersion = 0x010000; +// +// Semantic Versioning +// +// RDB contents are written with a semantic version tag. This tag +// indicates the minimum version of ValkeySearch required to read the RDB +// contents. +// +// Before an RDB is written, each metadata object is queried for +// its minimum required semantic version (aka encoding_version). The maximum of +// these versions is written as the semantic version of the RDB. This +// enables forward compatibility, i.e., newer ValkeySearch versions can write +// RDBs that older versions can read, as long no new features are used that +// require the newer version. +// +// The 1.0 code line has this version in the RDB. +// It is compatible with Valkey 8. +// +constexpr vmsdk::SemanticVersion kSemanticVersion10(1, 0, 0); +// +// Valkey 9 introduced DB num into the CME metadata, requiring changes to the +// RDB serialization format. This is represented by semantic version 1.1.0. +// +constexpr vmsdk::SemanticVersion kSemanticVersion11(1, 1, 0); + +// +// This is the current semantic version which is also the maximum semantic +// version that this ValkeySearch version can read. When you introduce +// incompatible changes to the RDB serialization format, update this version +// too, or you won't be able to read RDBs written by your own code. +// +constexpr vmsdk::SemanticVersion kCurrentSemanticVersion = kSemanticVersion11; + constexpr absl::string_view kValkeySearchModuleTypeName{"Vk-Search"}; class SafeRDB; @@ -63,12 +92,6 @@ using RDBSectionCallbacks = struct RDBSectionCallbacks { extern absl::flat_hash_map kRegisteredRDBSectionCallbacks; -inline std::string HumanReadableSemanticVersion(uint64_t semantic_version) { - return absl::StrFormat("%d.%d.%d", (semantic_version >> 16) & 0xFF, - (semantic_version >> 8) & 0xFF, - semantic_version & 0xFF); -} - /* SafeRDB wraps a ValkeyModuleIO object and performs IO error checking, * returning absl::StatusOr to force error handling on the caller side. */ class SafeRDB { diff --git a/src/schema_manager.cc b/src/schema_manager.cc index 2e0d2fbd6..7eddafcbb 100644 --- a/src/schema_manager.cc +++ b/src/schema_manager.cc @@ -36,6 +36,7 @@ #include "src/rdb_serialization.h" #include "src/valkey_search.h" #include "src/vector_externalizer.h" +#include "vmsdk/src/debug.h" #include "vmsdk/src/info.h" #include "vmsdk/src/log.h" #include "vmsdk/src/managed_pointers.h" @@ -55,7 +56,7 @@ constexpr uint32_t kIndexSchemaBackfillBatchSize{10240}; namespace options { -/// Register the "--max-indexes" flag. Controls the max number of indexes we can +/// Register the "--max-indexes the max number of indexes we can /// have. static auto max_indexes = vmsdk::config::NumberBuilder(kMaxIndexesConfig, // name @@ -123,27 +124,30 @@ SchemaManager::SchemaManager( .section_count = [this](ValkeyModuleCtx *ctx, int when) -> int { return this->GetNumberOfIndexSchemas(); }, - .minimum_semantic_version = [](ValkeyModuleCtx *ctx, - int when) -> int { - return 0x010000; // Always use 1.0.0 for now - }}); + .minimum_semantic_version = [this](ValkeyModuleCtx *ctx, int when) + -> int { return this->ComputeSemanticVersionOfIndexes(); }}); if (coordinator_enabled) { coordinator::MetadataManager::Instance().RegisterType( - kSchemaManagerMetadataTypeName, kMetadataEncodingVersion, + kSchemaManagerMetadataTypeName, kCurrentSemanticVersion, ComputeFingerprint, - [this](absl::string_view id, const google::protobuf::Any *metadata) - -> absl::Status { return this->OnMetadataCallback(id, metadata); }); + [this](uint32_t db_num, absl::string_view id, + const google::protobuf::Any *metadata) -> absl::Status { + return this->OnMetadataCallback(db_num, id, metadata); + }, + ComputeSemanticVersion); } } -absl::Status GenerateIndexNotFoundError(absl::string_view name) { - return absl::NotFoundError( - absl::StrFormat("Index with name '%s' not found", name)); +absl::Status GenerateIndexNotFoundError(uint32_t db_num, + absl::string_view name) { + return absl::NotFoundError(absl::StrFormat( + "Index with name '%s' not found in database %d", name, db_num)); } -absl::Status GenerateIndexAlreadyExistsError(absl::string_view name) { +absl::Status GenerateIndexAlreadyExistsError(uint32_t db_num, + absl::string_view name) { return absl::AlreadyExistsError( - absl::StrFormat("Index %s already exists.", name)); + absl::StrFormat("Index %s in database %d already exists.", name, db_num)); } absl::StatusOr> SchemaManager::LookupInternal( @@ -174,7 +178,7 @@ absl::Status SchemaManager::ImportIndexSchema( const std::string &name = index_schema->GetName(); auto existing_entry = LookupInternal(db_num, name); if (existing_entry.ok()) { - return GenerateIndexAlreadyExistsError(name); + return GenerateIndexAlreadyExistsError(db_num, name); } db_to_index_schemas_[db_num][name] = std::move(index_schema); @@ -191,7 +195,7 @@ absl::Status SchemaManager::CreateIndexSchemaInternal( const std::string &name = index_schema_proto.name(); auto existing_entry = LookupInternal(db_num, name); if (existing_entry.ok()) { - return GenerateIndexAlreadyExistsError(index_schema_proto.name()); + return GenerateIndexAlreadyExistsError(db_num, index_schema_proto.name()); } VMSDK_ASSIGN_OR_RETURN( @@ -220,20 +224,20 @@ SchemaManager::CreateIndexSchema( << "). Cannot create additional indexes."; if (coordinator_enabled_) { - CHECK(index_schema_proto.db_num() == 0) - << "In cluster mode, we only support DB 0"; // In coordinated mode, use the metadata_manager as the source of truth. // It will callback into us with the update. if (coordinator::MetadataManager::Instance() - .GetEntry(kSchemaManagerMetadataTypeName, index_schema_proto.name()) + .GetEntry(kSchemaManagerMetadataTypeName, + index_schema_proto.db_num(), index_schema_proto.name()) .ok()) { - return GenerateIndexAlreadyExistsError(index_schema_proto.name()); + return GenerateIndexAlreadyExistsError(index_schema_proto.db_num(), + index_schema_proto.name()); } auto any_proto = std::make_unique(); any_proto->PackFrom(index_schema_proto); return coordinator::MetadataManager::Instance().CreateEntry( - kSchemaManagerMetadataTypeName, index_schema_proto.name(), - std::move(any_proto)); + kSchemaManagerMetadataTypeName, index_schema_proto.db_num(), + index_schema_proto.name(), std::move(any_proto)); } // In non-coordinated mode, apply the update inline. @@ -251,7 +255,7 @@ absl::StatusOr> SchemaManager::GetIndexSchema( absl::MutexLock lock(&db_to_index_schemas_mutex_); auto existing_entry = LookupInternal(db_num, name); if (!existing_entry.ok()) { - return GenerateIndexNotFoundError(name); + return GenerateIndexNotFoundError(db_num, name); } return existing_entry.value(); } @@ -261,7 +265,7 @@ SchemaManager::RemoveIndexSchemaInternal(uint32_t db_num, absl::string_view name) { auto existing_entry = LookupInternal(db_num, name); if (!existing_entry.ok()) { - return GenerateIndexNotFoundError(name); + return GenerateIndexNotFoundError(db_num, name); } auto result = std::move(db_to_index_schemas_[db_num][name]); db_to_index_schemas_[db_num].erase(name); @@ -278,15 +282,14 @@ SchemaManager::RemoveIndexSchemaInternal(uint32_t db_num, absl::Status SchemaManager::RemoveIndexSchema(uint32_t db_num, absl::string_view name) { if (coordinator_enabled_) { - CHECK(db_num == 0) << "In cluster mode, we only support DB 0"; // In coordinated mode, use the metadata_manager as the source of truth. // It will callback into us with the update. auto status = coordinator::MetadataManager::Instance().DeleteEntry( - kSchemaManagerMetadataTypeName, name); + kSchemaManagerMetadataTypeName, db_num, name); if (status.ok()) { return status; } else if (absl::IsNotFound(status)) { - return GenerateIndexNotFoundError(name); + return GenerateIndexNotFoundError(db_num, name); } else { return absl::InternalError(status.message()); } @@ -346,11 +349,39 @@ absl::StatusOr SchemaManager::ComputeFingerprint( return entry_fingerprint; } +// +// Determine the minimum encoding version required to interpret the metadata for +// this Schema +// +CONTROLLED_INT(override_semantic_version, 0); + +absl::StatusOr SchemaManager::ComputeSemanticVersion( + const google::protobuf::Any &metadata) { + if (override_semantic_version.GetValue() != 0) { + VMSDK_LOG(WARNING, nullptr) + << "Overriding index schema semantic version to " + << override_semantic_version.GetValue(); + return override_semantic_version.GetValue(); + } + auto unpacked = std::make_unique(); + if (!metadata.UnpackTo(unpacked.get())) { + return absl::InternalError( + "Unable to unpack metadata for index schema fingerprint " + "calculation"); + } + if (unpacked->has_db_num() && unpacked->db_num() != 0) { + return valkey_search::kSemanticVersion11; + } else { + return valkey_search::kSemanticVersion10; + } +} + absl::Status SchemaManager::OnMetadataCallback( - absl::string_view id, const google::protobuf::Any *metadata) { + uint32_t db_num, absl::string_view id, + const google::protobuf::Any *metadata) { absl::MutexLock lock(&db_to_index_schemas_mutex_); // Note that there is only DB 0 in cluster mode, so we can hardcode this. - auto status = RemoveIndexSchemaInternal(0, id); + auto status = RemoveIndexSchemaInternal(db_num, id); if (!status.ok() && !absl::IsNotFound(status.status())) { return status.status(); } @@ -381,6 +412,25 @@ uint64_t SchemaManager::GetNumberOfIndexSchemas() const { } return num_schemas; } +vmsdk::SemanticVersion SchemaManager::ComputeSemanticVersionOfIndexes() const { + absl::MutexLock lock(&db_to_index_schemas_mutex_); + auto max_version = vmsdk::SemanticVersion(0); + for (const auto &[db_num, schema_map] : db_to_index_schemas_) { + for (const auto &[name, schema] : schema_map) { + google::protobuf::Any any; + any.PackFrom(*schema->ToProto()); + auto semantic_version = ComputeSemanticVersion(any); + if (semantic_version.ok()) { + max_version = std::max(max_version, *semantic_version); + } else { + VMSDK_LOG(WARNING, nullptr) + << "Unable to compute semantic version for index schema " << name + << ": " << semantic_version.status().message(); + } + } + } + return max_version; +} uint64_t SchemaManager::GetNumberOfAttributes() const { absl::MutexLock lock(&db_to_index_schemas_mutex_); auto num_attributes = 0; diff --git a/src/schema_manager.h b/src/schema_manager.h index 6b8f99bec..6139a1252 100644 --- a/src/schema_manager.h +++ b/src/schema_manager.h @@ -22,7 +22,6 @@ #include "absl/strings/string_view.h" #include "absl/synchronization/mutex.h" #include "src/coordinator/coordinator.pb.h" -#include "src/coordinator/metadata_manager.h" #include "src/index_schema.h" #include "src/index_schema.pb.h" #include "vmsdk/src/managed_pointers.h" @@ -34,7 +33,6 @@ namespace valkey_search { constexpr absl::string_view kSchemaManagerMetadataTypeName{"vs_index_schema"}; -constexpr uint32_t kMetadataEncodingVersion = 1; namespace options { @@ -104,6 +102,8 @@ class SchemaManager { absl::Status SaveIndexes(ValkeyModuleCtx *ctx, SafeRDB *rdb, int when); static absl::StatusOr ComputeFingerprint( const google::protobuf::Any &metadata); + static absl::StatusOr ComputeSemanticVersion( + const google::protobuf::Any &metadata); private: absl::Status RemoveAll() @@ -113,7 +113,9 @@ class SchemaManager { vmsdk::ThreadPool *mutations_thread_pool_; vmsdk::UniqueValkeyDetachedThreadSafeContext detached_ctx_; - absl::Status OnMetadataCallback(absl::string_view id, + vmsdk::SemanticVersion ComputeSemanticVersionOfIndexes() const; + + absl::Status OnMetadataCallback(uint32_t db_num, absl::string_view id, const google::protobuf::Any *metadata) ABSL_LOCKS_EXCLUDED(db_to_index_schemas_mutex_); diff --git a/testing/coordinator/metadata_manager_test.cc b/testing/coordinator/metadata_manager_test.cc index 5ae7626cb..5488c5498 100644 --- a/testing/coordinator/metadata_manager_test.cc +++ b/testing/coordinator/metadata_manager_test.cc @@ -40,20 +40,22 @@ using ::testing::ValuesIn; struct TypeToRegister { std::string type_name; - uint64_t encoding_version{1}; + uint64_t max_encoding_version{1}; absl::Status status_to_return; absl::StatusOr fingerprint_to_return{ absl::UnimplementedError("Fingerprint not set")}; + absl::StatusOr encoding_version_to_return{1}; }; struct CallbackResult { std::string type_name; + uint32_t db_num; std::string id; bool has_content; bool operator==(const CallbackResult& other) const { return type_name == other.type_name && id == other.id && - has_content == other.has_content; + db_num == other.db_num && has_content == other.has_content; } }; @@ -66,6 +68,7 @@ struct EntryOperationTestParam { }; Operation operation_type; std::string type_name; + uint32_t db_num{0}; std::string id; std::string content; }; @@ -108,19 +111,22 @@ TEST_P(EntryOperationTest, TestEntryOperations) { std::vector callbacks_tracker; for (auto& type_to_register : test_case.types_to_register) { test_metadata_manager_->RegisterType( - type_to_register.type_name, type_to_register.encoding_version, + type_to_register.type_name, type_to_register.max_encoding_version, [&](const google::protobuf::Any& metadata) -> absl::StatusOr { return type_to_register.fingerprint_to_return; }, - [&](absl::string_view id, const google::protobuf::Any* metadata) { + [&](uint32_t db_num, absl::string_view id, + const google::protobuf::Any* metadata) { CallbackResult callback_result{ .type_name = type_to_register.type_name, + .db_num = db_num, .id = std::string(id), .has_content = metadata != nullptr, }; callbacks_tracker.push_back(std::move(callback_result)); return type_to_register.status_to_return; - }); + }, + [&](auto) { return type_to_register.encoding_version_to_return; }); } if (test_case.expect_num_broadcasts > 0) { EXPECT_CALL(*kMockValkeyModule, @@ -146,12 +152,14 @@ TEST_P(EntryOperationTest, TestEntryOperations) { content->set_type_url("type.googleapis.com/FakeType"); content->set_value(operation.content); auto result = test_metadata_manager_->CreateEntry( - operation.type_name, operation.id, std::move(content)); + operation.type_name, operation.db_num, operation.id, + std::move(content)); EXPECT_EQ(result.status().code(), test_case.expected_status_code); } else if (operation.operation_type == EntryOperationTestParam::EntryOperation::kDelete) { EXPECT_EQ( - test_metadata_manager_->DeleteEntry(operation.type_name, operation.id) + test_metadata_manager_ + ->DeleteEntry(operation.type_name, operation.db_num, operation.id) .code(), test_case.expected_status_code); } @@ -527,18 +535,21 @@ TEST_P(MetadataManagerReconciliationTest, TestReconciliation) { std::vector callbacks_tracker; for (const auto& type_to_register : test_case.types_to_register) { test_metadata_manager_->RegisterType( - type_to_register.type_name, type_to_register.encoding_version, + type_to_register.type_name, type_to_register.max_encoding_version, [&](const google::protobuf::Any& metadata) -> absl::StatusOr { return type_to_register.fingerprint_to_return; }, - [&](absl::string_view id, const google::protobuf::Any* metadata) { + [&](uint32_t db_num, absl::string_view id, + const google::protobuf::Any* metadata) { callbacks_tracker.push_back(CallbackResult{ .type_name = type_to_register.type_name, + .db_num = db_num, .id = std::string(id), .has_content = metadata != nullptr, }); return type_to_register.status_to_return; - }); + }, + [&](auto) { return type_to_register.encoding_version_to_return; }); } if (test_case.expect_broadcast) { @@ -613,6 +624,10 @@ TEST_P(MetadataManagerReconciliationTest, TestReconciliation) { test_case.expected_callbacks.end())); auto actual_metadata = test_metadata_manager_->GetGlobalMetadata(); + std::cout << "Actual Metadata: " << actual_metadata->DebugString() + << std::endl; + std::cout << "Expected Metadata: " << expected_metadata.DebugString() + << std::endl; EXPECT_TRUE(google::protobuf::util::MessageDifferencer::Equals( *actual_metadata, expected_metadata)); } @@ -830,9 +845,10 @@ INSTANTIATE_TEST_SUITE_P( { { .type_name = "my_type", - .encoding_version = 2, + .max_encoding_version = 2, .status_to_return = absl::OkStatus(), .fingerprint_to_return = 5678, + .encoding_version_to_return = 2, }, }, .get_global_metadata_status = absl::OkStatus(), @@ -878,7 +894,7 @@ INSTANTIATE_TEST_SUITE_P( { { .type_name = "my_type", - .encoding_version = 2, + .max_encoding_version = 2, .status_to_return = absl::OkStatus(), .fingerprint_to_return = absl::InternalError("Failed"), }, @@ -938,7 +954,7 @@ INSTANTIATE_TEST_SUITE_P( { { .type_name = "my_type", - .encoding_version = 1, + .max_encoding_version = 1, .status_to_return = absl::OkStatus(), }, }, @@ -1039,7 +1055,7 @@ INSTANTIATE_TEST_SUITE_P( { { .type_name = "my_type", - .encoding_version = 1, + .max_encoding_version = 1, .status_to_return = absl::OkStatus(), }, }, @@ -1127,7 +1143,7 @@ INSTANTIATE_TEST_SUITE_P( { { .type_name = "my_type", - .encoding_version = 1, + .max_encoding_version = 1, .status_to_return = absl::OkStatus(), }, }, @@ -1207,7 +1223,7 @@ INSTANTIATE_TEST_SUITE_P( { { .type_name = "my_type", - .encoding_version = 1, + .max_encoding_version = 2, .status_to_return = absl::OkStatus(), }, }, @@ -1295,7 +1311,7 @@ INSTANTIATE_TEST_SUITE_P( { { .type_name = "my_type", - .encoding_version = 2, + .max_encoding_version = 2, .status_to_return = absl::OkStatus(), }, }, @@ -1325,6 +1341,86 @@ INSTANTIATE_TEST_SUITE_P( } )", }, + { + .test_name = "EncodingVersionTooLarge", + .existing_metadata_pbtxt = R"( + version_header { + top_level_version: 1 + } + type_namespace_map { + key: "my_type" + value { + entries { + key: "my_id" + value { + version: 1 + fingerprint: 1111 + encoding_version: 1 + content { + type_url: "type.googleapis.com/FakeType" + value: "serialized_content_1" + } + } + } + } + } + )", + .proposed_metadata_pbtxt = R"( + version_header { + top_level_version: 1 + } + type_namespace_map { + key: "my_type" + value { + entries { + key: "my_id" + value { + version: 2 + fingerprint: 9999 + encoding_version: 2 + content { + type_url: "type.googleapis.com/FakeType" + value: "serialized_content_2" + } + } + } + } + } + )", + .types_to_register = + { + { + .type_name = "my_type", + .max_encoding_version = 1, + .status_to_return = absl::OkStatus(), + }, + }, + .get_global_metadata_status = absl::OkStatus(), + .expect_get_cluster_node_info = true, + .expect_reconcile = true, + .expected_metadata_pbtxt = R"( + version_header { + top_level_version: 1 + } + type_namespace_map { + key: "my_type" + value { + entries { + key: "my_id" + value { + version: 1 + fingerprint: 1111 + encoding_version: 1 + content { + type_url: "type.googleapis.com/FakeType" + value: "serialized_content_1" + } + } + } + } + } + )", + }, { .test_name = "EntryDeleted", .existing_metadata_pbtxt = R"( @@ -1371,7 +1467,7 @@ INSTANTIATE_TEST_SUITE_P( { { .type_name = "my_type", - .encoding_version = 1, + .max_encoding_version = 1, .status_to_return = absl::OkStatus(), }, }, @@ -1498,7 +1594,7 @@ INSTANTIATE_TEST_SUITE_P( { { .type_name = "my_type", - .encoding_version = 2, + .max_encoding_version = 2, .status_to_return = absl::OkStatus(), }, }, @@ -1825,7 +1921,8 @@ TEST_F(MetadataManagerTimestampTest, -1); // Reconcile metadata successfully - VMSDK_EXPECT_OK(test_metadata_manager_->ReconcileMetadata(proposed_metadata)); + VMSDK_EXPECT_OK( + test_metadata_manager_->ReconcileMetadata(proposed_metadata, "test")); // Now should return 0 (current time - current time = 0) EXPECT_EQ(test_metadata_manager_->GetMilliSecondsSinceLastHealthyMetadata(), @@ -1845,7 +1942,8 @@ TEST_F(MetadataManagerTimestampTest, TestTimestampCalculation) { kV1Metadata, &proposed_metadata)); // Reconcile metadata at time 1000000ms - VMSDK_EXPECT_OK(test_metadata_manager_->ReconcileMetadata(proposed_metadata)); + VMSDK_EXPECT_OK( + test_metadata_manager_->ReconcileMetadata(proposed_metadata, "test")); // Test various time differences struct TestCase { @@ -1882,7 +1980,8 @@ TEST_F(MetadataManagerTimestampTest, google::protobuf::TextFormat::ParseFromString(kV2Metadata, &metadata2)); // First reconciliation at time 1000000ms - VMSDK_EXPECT_OK(test_metadata_manager_->ReconcileMetadata(metadata1)); + VMSDK_EXPECT_OK( + test_metadata_manager_->ReconcileMetadata(metadata1, "test ")); EXPECT_EQ(test_metadata_manager_->GetMilliSecondsSinceLastHealthyMetadata(), 0); @@ -1892,7 +1991,7 @@ TEST_F(MetadataManagerTimestampTest, 10000); // Second reconciliation - should update timestamp to current time - VMSDK_EXPECT_OK(test_metadata_manager_->ReconcileMetadata(metadata2)); + VMSDK_EXPECT_OK(test_metadata_manager_->ReconcileMetadata(metadata2, "test")); EXPECT_EQ(test_metadata_manager_->GetMilliSecondsSinceLastHealthyMetadata(), 0); @@ -1917,12 +2016,15 @@ TEST_F(MetadataManagerTimestampTest, [](const google::protobuf::Any& metadata) -> absl::StatusOr { return 1234; }, - [](absl::string_view id, const google::protobuf::Any* metadata) { + [](uint32_t db_num, absl::string_view id, + const google::protobuf::Any* metadata) { return absl::InternalError("Callback failed"); - }); + }, + [](auto) { return 1; }); // Reconciliation should fail due to callback failure - auto status = test_metadata_manager_->ReconcileMetadata(proposed_metadata); + auto status = + test_metadata_manager_->ReconcileMetadata(proposed_metadata, "test"); EXPECT_FALSE(status.ok()); // But timestamp should not be updated since reconciliation failed @@ -1938,7 +2040,8 @@ TEST_F(MetadataManagerTimestampTest, TestConcurrentAccess) { kV1Metadata, &proposed_metadata)); // First, reconcile some metadata so we have a valid timestamp - VMSDK_EXPECT_OK(test_metadata_manager_->ReconcileMetadata(proposed_metadata)); + VMSDK_EXPECT_OK( + test_metadata_manager_->ReconcileMetadata(proposed_metadata, "test")); constexpr int kNumThreads = 8; constexpr int kCallsPerThread = 100; @@ -1991,7 +2094,8 @@ TEST_F(MetadataManagerTimestampTest, TestTimestampPersistsAcrossLoadMetadata) { kV1Metadata, &proposed_metadata)); // First reconciliation - VMSDK_EXPECT_OK(test_metadata_manager_->ReconcileMetadata(proposed_metadata)); + VMSDK_EXPECT_OK( + test_metadata_manager_->ReconcileMetadata(proposed_metadata, "test")); EXPECT_EQ(test_metadata_manager_->GetMilliSecondsSinceLastHealthyMetadata(), 0); @@ -2015,4 +2119,33 @@ TEST_F(MetadataManagerTimestampTest, TestTimestampPersistsAcrossLoadMetadata) { 0); } +TEST(IndexNameTest, IndexName) { + for (std::string prefix : {"", "a", "abc", "{", "}"}) { + for (std::string hash_tag : {"", "{a}", "{b}", "{}"}) { + for (std::string suffix : {"", "x", "xy", "{", "}", "{}"}) { + for (uint32_t db_num : {0, 1}) { + std::string id = prefix + hash_tag + suffix; + // + // Construct a IndexName + // + std::cout << "Doing test: DB:" << db_num << " name:'" << id << "'\n"; + std::string encoded = MetadataManager::EncodeDbNum(db_num, id); + // + // Now reverse it and compare equality + // + auto decoded = MetadataManager::DecodeDbNum(encoded); + EXPECT_EQ(id, decoded.id); + EXPECT_EQ(db_num, decoded.db_num); + // + // And re-forward it + // + auto re_forward = + MetadataManager::EncodeDbNum(decoded.db_num, decoded.id); + EXPECT_EQ(re_forward, encoded); + } + } + } + } +} + } // namespace valkey_search::coordinator diff --git a/testing/ft_info_test.cc b/testing/ft_info_test.cc index e54fa4fc1..43ceeaba1 100644 --- a/testing/ft_info_test.cc +++ b/testing/ft_info_test.cc @@ -363,7 +363,7 @@ INSTANTIATE_TEST_SUITE_P( .expect_return_failure = true, .expected_output = "-Index with name 'non_exist_test_name' not " - "found\r\n", + "found in database 0\r\n", }, }, }, @@ -398,7 +398,7 @@ INSTANTIATE_TEST_SUITE_P( .expect_return_failure = true, .expected_output = "-Index with name 'non_exist_test_name' not " - "found\r\n", + "found in database 0\r\n", }, }, }, diff --git a/testing/ft_search_test.cc b/testing/ft_search_test.cc index 901f2be06..6de1bd4ec 100644 --- a/testing/ft_search_test.cc +++ b/testing/ft_search_test.cc @@ -176,7 +176,8 @@ void SendReplyTest::DoSendReplyTest( for (const auto &neighbor : input.neighbors) { neighbors.push_back(ToIndexesNeighbor(neighbor)); } - auto parameters = std::make_unique(10000, nullptr); + auto parameters = + std::make_unique(10000, nullptr, 0); parameters->index_schema = test_index_schema; parameters->attribute_alias = attribute_alias; parameters->score_as = vmsdk::MakeUniqueValkeyString(score_as); diff --git a/testing/integration/vector_search_integration_test.py b/testing/integration/vector_search_integration_test.py index 3fd85721d..64cdbd2de 100644 --- a/testing/integration/vector_search_integration_test.py +++ b/testing/integration/vector_search_integration_test.py @@ -50,7 +50,7 @@ def generate_test_cases(): knn=3, score_as="score", returns=None, - expected_error="Index with name 'not_a_real_index' not found", + expected_error="Index with name 'not_a_real_index' not found in database 0", expected_result=None, no_content=True, ), @@ -84,7 +84,7 @@ def generate_test_cases(): knn=3, score_as="score", returns=None, - expected_error="Index with name 'not_a_real_index' not found", + expected_error="Index with name 'not_a_real_index' not found in database 0", expected_result=None, no_content=False, ), diff --git a/testing/query/response_generator_test.cc b/testing/query/response_generator_test.cc index 334fe251a..1fbbaa896 100644 --- a/testing/query/response_generator_test.cc +++ b/testing/query/response_generator_test.cc @@ -88,7 +88,7 @@ TEST_P(ResponseGeneratorTest, ProcessNeighborsForReply) { for (const auto &expected_content : params.expected_contents) { expected_contents.push_back(ToRecordsMap(expected_content)); } - query::SearchParameters parameters(100000, nullptr); + query::SearchParameters parameters(100000, nullptr, 0); for (const auto &return_attribute : params.return_attributes) { parameters.return_attributes.push_back( {.identifier = @@ -175,7 +175,7 @@ TEST_F(ResponseGeneratorTest, ProcessNeighborsForReplyContentLimits) { neighbors.push_back(indexes::Neighbor(many_fields_id, 0)); // Set up parameters - query::SearchParameters parameters(100000, nullptr); + query::SearchParameters parameters(100000, nullptr, 0); parameters.return_attributes.push_back( {.identifier = vmsdk::MakeUniqueValkeyString("content"), .alias = vmsdk::MakeUniqueValkeyString("content_alias")}); diff --git a/testing/schema_manager_test.cc b/testing/schema_manager_test.cc index df470b9b5..f38b8aedd 100644 --- a/testing/schema_manager_test.cc +++ b/testing/schema_manager_test.cc @@ -139,8 +139,9 @@ TEST_F(SchemaManagerTest, TestCreateIndexSchemaAlreadyExists) { .CreateIndexSchema(&fake_ctx_, test_index_schema_proto_) .status(); EXPECT_EQ(status.code(), absl::StatusCode::kAlreadyExists); - EXPECT_EQ(status.message(), - absl::StrFormat("Index %s already exists.", index_name_)); + EXPECT_EQ( + status.message(), + absl::StrFormat("Index %s in database 0 already exists.", index_name_)); EXPECT_EQ(callback_triggered, 1); } } diff --git a/testing/search_test.cc b/testing/search_test.cc index 3a78f3137..bdd46f960 100644 --- a/testing/search_test.cc +++ b/testing/search_test.cc @@ -399,7 +399,7 @@ class LocalSearchTest : public ValkeySearchTestWithParam { TEST_P(LocalSearchTest, LocalSearchTest) { auto index_schema = CreateIndexSchemaWithMultipleAttributes(); const LocalSearchTestCase &test_case = GetParam(); - query::SearchParameters params(100000, nullptr); + query::SearchParameters params(100000, nullptr, 0); params.index_schema_name = kIndexSchemaName; if (test_case.is_vector_search_query) { params.attribute_alias = kVectorAttributeAlias; @@ -504,7 +504,7 @@ TEST_P(FetchFilteredKeysTest, ParseParams) { auto vector_index = dynamic_cast( index_schema->GetIndex(kVectorAttributeAlias)->get()); const FetchFilteredKeysTestCase &test_case = GetParam(); - query::SearchParameters params(100000, nullptr); + query::SearchParameters params(100000, nullptr, 0); FilterParser parser(*index_schema, test_case.filter); params.filter_parse_results = std::move(parser.Parse().value()); params.k = 100; @@ -582,7 +582,7 @@ TEST_P(SearchTest, ParseParams) { const auto ¶m = GetParam(); IndexerType indexer_type = std::get<0>(param); SearchTestCase test_case = std::get<1>(param); - query::SearchParameters params(100000, nullptr); + query::SearchParameters params(100000, nullptr, 0); params.index_schema = CreateIndexSchemaWithMultipleAttributes(indexer_type); params.index_schema_name = kIndexSchemaName; params.attribute_alias = kVectorAttributeAlias; @@ -859,7 +859,7 @@ TEST_P(IndexedContentTest, MaybeAddIndexedContentTest) { } } - auto parameters = query::SearchParameters(100000, nullptr); + auto parameters = query::SearchParameters(100000, nullptr, 0); parameters.index_schema = index_schema; for (auto &attribute : test_case.return_attributes) { auto identifier = vmsdk::MakeUniqueValkeyString(attribute.identifier); diff --git a/vmsdk/src/debug.h b/vmsdk/src/debug.h index 6241c90e5..d3b8fc40f 100644 --- a/vmsdk/src/debug.h +++ b/vmsdk/src/debug.h @@ -85,7 +85,7 @@ struct ControlledBase { template struct Controlled : private ControlledBase { - bool GetValue() const { return value_; } + T GetValue() const { return value_.load(std::memory_order_relaxed); } std::string DisplayValue() const override { std::ostringstream os; os << value_.load(std::memory_order_relaxed); diff --git a/vmsdk/src/utils.cc b/vmsdk/src/utils.cc index c2f3c2aed..9085ee264 100644 --- a/vmsdk/src/utils.cc +++ b/vmsdk/src/utils.cc @@ -315,4 +315,36 @@ std::optional JsonUnquote(absl::string_view sv) { return result; } +static const char *hex_chars = "0123456789abcdef"; + +std::string PrintableBytes(absl::string_view sv) { + std::string result; + for (const auto &c : sv) { + if (std::isprint(c)) { + if (c == '\\') { + result += c; + } + result += c; + } else { + result += '\\'; + switch (c) { + case '\n': + result += 'n'; + break; + case '\r': + result += 'r'; + break; + case '\t': + result += 't'; + break; + default: + result += hex_chars[(c >> 4) & 0xf]; + result += hex_chars[c & 0xf]; + break; + } + } + } + return result; +} + } // namespace vmsdk diff --git a/vmsdk/src/utils.h b/vmsdk/src/utils.h index 2bc0ba34a..0221b5c4b 100644 --- a/vmsdk/src/utils.h +++ b/vmsdk/src/utils.h @@ -7,6 +7,8 @@ #ifndef VMSDK_SRC_UTILS_H_ #define VMSDK_SRC_UTILS_H_ +#include + #include #include #include @@ -107,6 +109,7 @@ inline int MakeValkeyVersion(int major, int minor, int patch) { CHECK(major < 256 && minor < 256 && patch < 256); return (major << 16) | (minor << 8) | patch; } +std::string PrintableBytes(absl::string_view sv); // Checks if a numeric value falls within an optional inclusive range [min, // max]. The range is inclusive: a value is considered valid if min <= value <= @@ -116,6 +119,34 @@ absl::Status VerifyRange(long long num_value, std::optional min, std::optional max); std::optional JsonUnquote(absl::string_view sv); +// +// Class for Semantic Version +// +class SemanticVersion { + public: + constexpr SemanticVersion(uint8_t major, uint8_t minor, uint8_t patch) + : version_((static_cast(major) << 16) | + (static_cast(minor) << 8) | + static_cast(patch)) {} + constexpr SemanticVersion(unsigned version) : version_(version) {} + unsigned Major() const { return (version_ >> 16) & 0xFF; } + unsigned Minor() const { return (version_ >> 8) & 0xFF; } + unsigned Patch() const { return (version_) & 0xFF; } + operator unsigned() const { return version_; } + std::string ToString() const { + return absl::StrFormat("%d.%d.%d", Major(), Minor(), Patch()); + } + + auto operator<=>(const SemanticVersion &other) const = default; + + private: + unsigned version_; +}; + +inline std::ostream &operator<<(std::ostream &os, const SemanticVersion &sv) { + return os << sv.ToString(); +} + struct JsonQuotedStringView { absl::string_view view_; friend std::ostream &operator<<(std::ostream &os,