Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ target_link_libraries(valkey_search PUBLIC utils)
target_link_libraries(valkey_search PUBLIC status_macros)
target_link_libraries(valkey_search PUBLIC valkey_module)
target_link_libraries(valkey_search PUBLIC acl)
target_link_libraries(valkey_search PUBLIC cluster_map)

set(SRCS_KEYSPACE_EVENT_MANAGER
${CMAKE_CURRENT_LIST_DIR}/keyspace_event_manager.cc
Expand Down
18 changes: 11 additions & 7 deletions src/commands/ft_dropindex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,36 @@ class DropConsistencyCheckFanoutOperation
: public query::fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
query::fanout::FanoutTargetMode::kAll> {
vmsdk::cluster_map::FanoutTargetMode::kAll> {
public:
DropConsistencyCheckFanoutOperation(uint32_t db_num,
const std::string& index_name,
unsigned timeout_ms)
: query::fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
query::fanout::FanoutTargetMode::kAll>(),
vmsdk::cluster_map::FanoutTargetMode::kAll>(),
db_num_(db_num),
index_name_(index_name),
timeout_ms_(timeout_ms){};

std::vector<vmsdk::cluster_map::NodeInfo> GetTargets() const {
return ValkeySearch::Instance().GetClusterMap()->GetAllTargets();
}

unsigned GetTimeoutMs() const override { return timeout_ms_; }

coordinator::InfoIndexPartitionRequest GenerateRequest(
const query::fanout::FanoutSearchTarget&) override {
const vmsdk::cluster_map::NodeInfo&) override {
coordinator::InfoIndexPartitionRequest req;
req.set_db_num(db_num_);
req.set_index_name(index_name_);
return req;
}

void OnResponse(const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const query::fanout::FanoutSearchTarget&
target) override {
void OnResponse(
const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) override {
// if the index exist on some node and returns a valid response, treat it as
// inconsistent error
absl::MutexLock lock(&mutex_);
Expand All @@ -59,7 +63,7 @@ class DropConsistencyCheckFanoutOperation
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
GetLocalResponse(
const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const query::fanout::FanoutSearchTarget&) override {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override {
return coordinator::Service::GenerateInfoResponse(request);
}

Expand Down
14 changes: 10 additions & 4 deletions src/query/cluster_info_fanout_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ClusterInfoFanoutOperation::ClusterInfoFanoutOperation(
uint32_t db_num, const std::string& index_name, unsigned timeout_ms)
: fanout::FanoutOperationBase<coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
fanout::FanoutTargetMode::kAll>(),
vmsdk::cluster_map::FanoutTargetMode::kAll>(),
db_num_(db_num),
index_name_(index_name),
timeout_ms_(timeout_ms),
Expand All @@ -25,12 +25,18 @@ ClusterInfoFanoutOperation::ClusterInfoFanoutOperation(
backfill_complete_percent_min_(0.0f),
backfill_in_progress_(false) {}

std::vector<vmsdk::cluster_map::NodeInfo>
ClusterInfoFanoutOperation::GetTargets() const {
return ValkeySearch::Instance().GetClusterMap()->GetAllTargets();
}

unsigned ClusterInfoFanoutOperation::GetTimeoutMs() const {
return timeout_ms_;
}

coordinator::InfoIndexPartitionRequest
ClusterInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {
ClusterInfoFanoutOperation::GenerateRequest(
const vmsdk::cluster_map::NodeInfo&) {
coordinator::InfoIndexPartitionRequest req;
req.set_db_num(db_num_);
req.set_index_name(index_name_);
Expand All @@ -39,7 +45,7 @@ ClusterInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {

void ClusterInfoFanoutOperation::OnResponse(
const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
if (!resp.error().empty()) {
grpc::Status status =
grpc::Status(grpc::StatusCode::INTERNAL, resp.error());
Expand Down Expand Up @@ -111,7 +117,7 @@ void ClusterInfoFanoutOperation::OnResponse(
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
ClusterInfoFanoutOperation::GetLocalResponse(
const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
return coordinator::Service::GenerateInfoResponse(request);
}

Expand Down
23 changes: 14 additions & 9 deletions src/query/cluster_info_fanout_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,30 @@

namespace valkey_search::query::cluster_info_fanout {

class ClusterInfoFanoutOperation : public fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
fanout::FanoutTargetMode::kAll> {
class ClusterInfoFanoutOperation
: public fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
vmsdk::cluster_map::FanoutTargetMode::kAll> {
public:
ClusterInfoFanoutOperation(uint32_t db_num, const std::string& index_name,
unsigned timeout_ms);

std::vector<vmsdk::cluster_map::NodeInfo> GetTargets() const;

unsigned GetTimeoutMs() const override;

coordinator::InfoIndexPartitionRequest GenerateRequest(
const fanout::FanoutSearchTarget&) override;
const vmsdk::cluster_map::NodeInfo&) override;

void OnResponse(const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const fanout::FanoutSearchTarget&) override;
void OnResponse(
const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override;

std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
GetLocalResponse(const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const fanout::FanoutSearchTarget&) override;
GetLocalResponse(
const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override;

void InvokeRemoteRpc(
coordinator::Client* client,
Expand Down
82 changes: 52 additions & 30 deletions src/query/fanout_operation_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <thread>

#include "absl/synchronization/mutex.h"
#include "cluster_map.h"
#include "grpcpp/support/status.h"
#include "src/coordinator/client_pool.h"
#include "src/metrics.h"
Expand All @@ -30,7 +31,8 @@ namespace valkey_search::query::fanout {

constexpr unsigned kNoValkeyTimeout = 86400000;

template <typename Request, typename Response, FanoutTargetMode kTargetMode>
template <typename Request, typename Response,
vmsdk::cluster_map::FanoutTargetMode kTargetMode>
class FanoutOperationBase {
public:
explicit FanoutOperationBase() = default;
Expand All @@ -43,7 +45,14 @@ class FanoutOperationBase {
blocked_client_->MeasureTimeStart();
deadline_tp_ = std::chrono::steady_clock::now() +
std::chrono::milliseconds(GetTimeoutMs());
targets_ = GetTargets(ctx);
// if current cluster map is not complete or the current cluster map
// expires, refresh cluster map
if (!ValkeySearch::Instance().GetClusterMap()->GetIsClusterMapFull() ||
std::chrono::steady_clock::now() >
ValkeySearch::Instance().GetClusterMap()->GetExpirationTime()) {
ValkeySearch::Instance().RefreshClusterMap(ctx);
}
targets_ = GetTargets();
StartFanoutRound();
}

Expand Down Expand Up @@ -85,16 +94,14 @@ class FanoutOperationBase {
}
}

std::vector<FanoutSearchTarget> GetTargets(ValkeyModuleCtx* ctx) const {
return query::fanout::FanoutTemplate::GetTargets(ctx, kTargetMode);
}
virtual std::vector<vmsdk::cluster_map::NodeInfo> GetTargets() const = 0;

void IssueRpc(const FanoutSearchTarget& target, const Request& request,
unsigned timeout_ms) {
void IssueRpc(const vmsdk::cluster_map::NodeInfo& target,
const Request& request, unsigned timeout_ms) {
coordinator::ClientPool* client_pool_ =
ValkeySearch::Instance().GetCoordinatorClientPool();

if (target.type == FanoutSearchTarget::Type::kLocal) {
if (target.location == vmsdk::cluster_map::NodeInfo::NodeLocation::kLocal) {
vmsdk::RunByMain([this, target, request]() {
auto [status, resp] = this->GetLocalResponse(request, target);
if (status.ok()) {
Expand All @@ -110,12 +117,14 @@ class FanoutOperationBase {
this->RpcDone();
});
} else {
auto client = client_pool_->GetClient(target.address);
std::string client_ip_port = absl::StrCat(
target.ip, ":", coordinator::GetCoordinatorPort(target.port));
auto client = client_pool_->GetClient(client_ip_port);
if (!client) {
++Metrics::GetStats().info_fanout_fail_cnt;
VMSDK_LOG_EVERY_N_SEC(WARNING, nullptr, 1)
<< "FANOUT_DEBUG: Found invalid client on target "
<< target.address;
<< client_ip_port;
this->OnError(grpc::Status(grpc::StatusCode::INTERNAL, ""),
coordinator::FanoutErrorType::COMMUNICATION_ERROR,
target);
Expand All @@ -124,14 +133,14 @@ class FanoutOperationBase {
}
this->InvokeRemoteRpc(
client.get(), request,
[this, target](grpc::Status status, Response& resp) {
[this, target, client_ip_port](grpc::Status status, Response& resp) {
if (status.ok()) {
this->OnResponse(resp, target);
} else {
++Metrics::GetStats().info_fanout_fail_cnt;
VMSDK_LOG_EVERY_N_SEC(WARNING, nullptr, 1)
<< "FANOUT_DEBUG: InvokeRemoteRpc error on target "
<< target.address << ", status code: " << status.error_code()
<< client_ip_port << ", status code: " << status.error_code()
<< ", error message: " << status.error_message();
// if grpc failed, the response is invalid, so we need to manually
// set the error type
Expand All @@ -151,7 +160,7 @@ class FanoutOperationBase {
}

virtual std::pair<grpc::Status, Response> GetLocalResponse(
const Request&, [[maybe_unused]] const FanoutSearchTarget&) = 0;
const Request&, [[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) = 0;

virtual void InvokeRemoteRpc(coordinator::Client*, const Request&,
std::function<void(grpc::Status, Response&)>,
Expand All @@ -160,14 +169,15 @@ class FanoutOperationBase {
virtual unsigned GetTimeoutMs() const = 0;

virtual Request GenerateRequest(
[[maybe_unused]] const FanoutSearchTarget&) = 0;
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) = 0;

virtual void OnResponse(const Response&,
[[maybe_unused]] const FanoutSearchTarget&) = 0;
virtual void OnResponse(
const Response&,
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) = 0;

virtual void OnError(grpc::Status status,
coordinator::FanoutErrorType error_type,
const FanoutSearchTarget& target) {
const vmsdk::cluster_map::NodeInfo& target) {
absl::MutexLock lock(&mutex_);
if (error_type == coordinator::FanoutErrorType::INDEX_NAME_ERROR) {
index_name_error_nodes.push_back(target);
Expand Down Expand Up @@ -205,39 +215,51 @@ class FanoutOperationBase {
// Log index name errors
if (!index_name_error_nodes.empty()) {
error_message = "Index name not found.";
for (const FanoutSearchTarget& target : index_name_error_nodes) {
if (target.type == FanoutSearchTarget::Type::kLocal) {
for (const vmsdk::cluster_map::NodeInfo& target :
index_name_error_nodes) {
if (target.location ==
vmsdk::cluster_map::NodeInfo::NodeLocation::kLocal) {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< INDEX_NAME_ERROR_LOG_PREFIX << "LOCAL NODE";
} else {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< INDEX_NAME_ERROR_LOG_PREFIX << target.address;
<< INDEX_NAME_ERROR_LOG_PREFIX
<< absl::StrCat(target.ip, ":",
coordinator::GetCoordinatorPort(target.port));
}
}
}
// Log communication errors
if (!communication_error_nodes.empty()) {
error_message = "Communication error between nodes found.";
for (const FanoutSearchTarget& target : communication_error_nodes) {
if (target.type == FanoutSearchTarget::Type::kLocal) {
for (const vmsdk::cluster_map::NodeInfo& target :
communication_error_nodes) {
if (target.location ==
vmsdk::cluster_map::NodeInfo::NodeLocation::kLocal) {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< COMMUNICATION_ERROR_LOG_PREFIX << "LOCAL NODE";
} else {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< COMMUNICATION_ERROR_LOG_PREFIX << target.address;
<< COMMUNICATION_ERROR_LOG_PREFIX
<< absl::StrCat(target.ip, ":",
coordinator::GetCoordinatorPort(target.port));
}
}
}
// Log inconsistent state errors
if (!inconsistent_state_error_nodes.empty()) {
error_message = "Inconsistent index state error found.";
for (const FanoutSearchTarget& target : inconsistent_state_error_nodes) {
if (target.type == FanoutSearchTarget::Type::kLocal) {
for (const vmsdk::cluster_map::NodeInfo& target :
inconsistent_state_error_nodes) {
if (target.location ==
vmsdk::cluster_map::NodeInfo::NodeLocation::kLocal) {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< INCONSISTENT_STATE_ERROR_LOG_PREFIX << "LOCAL NODE";
} else {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< INCONSISTENT_STATE_ERROR_LOG_PREFIX << target.address;
<< INCONSISTENT_STATE_ERROR_LOG_PREFIX
<< absl::StrCat(target.ip, ":",
coordinator::GetCoordinatorPort(target.port));
}
}
}
Expand Down Expand Up @@ -286,10 +308,10 @@ class FanoutOperationBase {
unsigned outstanding_{0};
absl::Mutex mutex_;
std::unique_ptr<vmsdk::BlockedClient> blocked_client_;
std::vector<FanoutSearchTarget> index_name_error_nodes;
std::vector<FanoutSearchTarget> inconsistent_state_error_nodes;
std::vector<FanoutSearchTarget> communication_error_nodes;
std::vector<FanoutSearchTarget> targets_;
std::vector<vmsdk::cluster_map::NodeInfo> index_name_error_nodes;
std::vector<vmsdk::cluster_map::NodeInfo> inconsistent_state_error_nodes;
std::vector<vmsdk::cluster_map::NodeInfo> communication_error_nodes;
std::vector<vmsdk::cluster_map::NodeInfo> targets_;
std::chrono::steady_clock::time_point deadline_tp_;
bool timeout_occurred_ = false;
};
Expand Down
19 changes: 13 additions & 6 deletions src/query/primary_info_fanout_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ namespace valkey_search::query::primary_info_fanout {

PrimaryInfoFanoutOperation::PrimaryInfoFanoutOperation(
uint32_t db_num, const std::string& index_name, unsigned timeout_ms)
: fanout::FanoutOperationBase<coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
fanout::FanoutTargetMode::kPrimary>(),
: fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
vmsdk::cluster_map::FanoutTargetMode::kPrimary>(),
db_num_(db_num),
index_name_(index_name),
timeout_ms_(timeout_ms),
Expand All @@ -25,12 +26,18 @@ PrimaryInfoFanoutOperation::PrimaryInfoFanoutOperation(
num_records_(0),
hash_indexing_failures_(0) {}

std::vector<vmsdk::cluster_map::NodeInfo>
PrimaryInfoFanoutOperation::GetTargets() const {
return ValkeySearch::Instance().GetClusterMap()->GetPrimaryTargets();
}

unsigned PrimaryInfoFanoutOperation::GetTimeoutMs() const {
return timeout_ms_;
}

coordinator::InfoIndexPartitionRequest
PrimaryInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {
PrimaryInfoFanoutOperation::GenerateRequest(
const vmsdk::cluster_map::NodeInfo&) {
coordinator::InfoIndexPartitionRequest req;
req.set_db_num(db_num_);
req.set_index_name(index_name_);
Expand All @@ -39,7 +46,7 @@ PrimaryInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {

void PrimaryInfoFanoutOperation::OnResponse(
const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
if (!resp.error().empty()) {
grpc::Status status =
grpc::Status(grpc::StatusCode::INTERNAL, resp.error());
Expand Down Expand Up @@ -96,7 +103,7 @@ void PrimaryInfoFanoutOperation::OnResponse(
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
PrimaryInfoFanoutOperation::GetLocalResponse(
const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
return coordinator::Service::GenerateInfoResponse(request);
}

Expand Down
Loading
Loading