Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/commands/ft_dropindex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,36 @@ class DropConsistencyCheckFanoutOperation
: public query::fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
query::fanout::FanoutTargetMode::kAll> {
vmsdk::cluster_map::FanoutTargetMode::kAll> {
public:
DropConsistencyCheckFanoutOperation(uint32_t db_num,
const std::string& index_name,
unsigned timeout_ms)
: query::fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
query::fanout::FanoutTargetMode::kAll>(),
vmsdk::cluster_map::FanoutTargetMode::kAll>(),
db_num_(db_num),
index_name_(index_name),
timeout_ms_(timeout_ms){};

std::vector<vmsdk::cluster_map::NodeInfo> GetTargets() const {
return ValkeySearch::Instance().GetClusterMap()->GetAllTargets();
}

unsigned GetTimeoutMs() const override { return timeout_ms_; }

coordinator::InfoIndexPartitionRequest GenerateRequest(
const query::fanout::FanoutSearchTarget&) override {
const vmsdk::cluster_map::NodeInfo&) override {
coordinator::InfoIndexPartitionRequest req;
req.set_db_num(db_num_);
req.set_index_name(index_name_);
return req;
}

void OnResponse(const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const query::fanout::FanoutSearchTarget&
target) override {
void OnResponse(
const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) override {
// if the index exist on some node and returns a valid response, treat it as
// inconsistent error
absl::MutexLock lock(&mutex_);
Expand All @@ -59,7 +63,7 @@ class DropConsistencyCheckFanoutOperation
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
GetLocalResponse(
const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const query::fanout::FanoutSearchTarget&) override {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override {
return coordinator::Service::GenerateInfoResponse(request);
}

Expand Down
14 changes: 10 additions & 4 deletions src/query/cluster_info_fanout_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ClusterInfoFanoutOperation::ClusterInfoFanoutOperation(
uint32_t db_num, const std::string& index_name, unsigned timeout_ms)
: fanout::FanoutOperationBase<coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
fanout::FanoutTargetMode::kAll>(),
vmsdk::cluster_map::FanoutTargetMode::kAll>(),
db_num_(db_num),
index_name_(index_name),
timeout_ms_(timeout_ms),
Expand All @@ -25,12 +25,18 @@ ClusterInfoFanoutOperation::ClusterInfoFanoutOperation(
backfill_complete_percent_min_(0.0f),
backfill_in_progress_(false) {}

std::vector<vmsdk::cluster_map::NodeInfo>
ClusterInfoFanoutOperation::GetTargets() const {
return ValkeySearch::Instance().GetClusterMap()->GetAllTargets();
}

unsigned ClusterInfoFanoutOperation::GetTimeoutMs() const {
return timeout_ms_;
}

coordinator::InfoIndexPartitionRequest
ClusterInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {
ClusterInfoFanoutOperation::GenerateRequest(
const vmsdk::cluster_map::NodeInfo&) {
coordinator::InfoIndexPartitionRequest req;
req.set_db_num(db_num_);
req.set_index_name(index_name_);
Expand All @@ -39,7 +45,7 @@ ClusterInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {

void ClusterInfoFanoutOperation::OnResponse(
const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
if (!resp.error().empty()) {
grpc::Status status =
grpc::Status(grpc::StatusCode::INTERNAL, resp.error());
Expand Down Expand Up @@ -111,7 +117,7 @@ void ClusterInfoFanoutOperation::OnResponse(
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
ClusterInfoFanoutOperation::GetLocalResponse(
const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
return coordinator::Service::GenerateInfoResponse(request);
}

Expand Down
23 changes: 14 additions & 9 deletions src/query/cluster_info_fanout_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,30 @@

namespace valkey_search::query::cluster_info_fanout {

class ClusterInfoFanoutOperation : public fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
fanout::FanoutTargetMode::kAll> {
class ClusterInfoFanoutOperation
: public fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
vmsdk::cluster_map::FanoutTargetMode::kAll> {
public:
ClusterInfoFanoutOperation(uint32_t db_num, const std::string& index_name,
unsigned timeout_ms);

std::vector<vmsdk::cluster_map::NodeInfo> GetTargets() const;

unsigned GetTimeoutMs() const override;

coordinator::InfoIndexPartitionRequest GenerateRequest(
const fanout::FanoutSearchTarget&) override;
const vmsdk::cluster_map::NodeInfo&) override;

void OnResponse(const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const fanout::FanoutSearchTarget&) override;
void OnResponse(
const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override;

std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
GetLocalResponse(const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const fanout::FanoutSearchTarget&) override;
GetLocalResponse(
const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override;

void InvokeRemoteRpc(
coordinator::Client* client,
Expand Down
78 changes: 48 additions & 30 deletions src/query/fanout_operation_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <thread>

#include "absl/synchronization/mutex.h"
#include "cluster_map.h"
#include "grpcpp/support/status.h"
#include "src/coordinator/client_pool.h"
#include "src/metrics.h"
Expand All @@ -30,7 +31,8 @@ namespace valkey_search::query::fanout {

constexpr unsigned kNoValkeyTimeout = 86400000;

template <typename Request, typename Response, FanoutTargetMode kTargetMode>
template <typename Request, typename Response,
vmsdk::cluster_map::FanoutTargetMode kTargetMode>
class FanoutOperationBase {
public:
explicit FanoutOperationBase() = default;
Expand All @@ -43,7 +45,9 @@ class FanoutOperationBase {
blocked_client_->MeasureTimeStart();
deadline_tp_ = std::chrono::steady_clock::now() +
std::chrono::milliseconds(GetTimeoutMs());
targets_ = GetTargets(ctx);
// Ensure cluster map is fresh
ValkeySearch::Instance().GetOrRefreshClusterMap(ctx);
targets_ = GetTargets();
StartFanoutRound();
}

Expand Down Expand Up @@ -85,16 +89,14 @@ class FanoutOperationBase {
}
}

std::vector<FanoutSearchTarget> GetTargets(ValkeyModuleCtx* ctx) const {
return query::fanout::FanoutTemplate::GetTargets(ctx, kTargetMode);
}
virtual std::vector<vmsdk::cluster_map::NodeInfo> GetTargets() const = 0;

void IssueRpc(const FanoutSearchTarget& target, const Request& request,
unsigned timeout_ms) {
void IssueRpc(const vmsdk::cluster_map::NodeInfo& target,
const Request& request, unsigned timeout_ms) {
coordinator::ClientPool* client_pool_ =
ValkeySearch::Instance().GetCoordinatorClientPool();

if (target.type == FanoutSearchTarget::Type::kLocal) {
if (target.is_local) {
vmsdk::RunByMain([this, target, request]() {
auto [status, resp] = this->GetLocalResponse(request, target);
if (status.ok()) {
Expand All @@ -110,12 +112,15 @@ class FanoutOperationBase {
this->RpcDone();
});
} else {
auto client = client_pool_->GetClient(target.address);
std::string client_address = absl::StrCat(
target.socket_address.primary_endpoint, ":",
coordinator::GetCoordinatorPort(target.socket_address.port));
auto client = client_pool_->GetClient(client_address);
if (!client) {
++Metrics::GetStats().info_fanout_fail_cnt;
VMSDK_LOG_EVERY_N_SEC(WARNING, nullptr, 1)
<< "FANOUT_DEBUG: Found invalid client on target "
<< target.address;
<< client_address;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client->GetAddress() ?? If it doesn't have this ability, let's add it.

this->OnError(grpc::Status(grpc::StatusCode::INTERNAL, ""),
coordinator::FanoutErrorType::COMMUNICATION_ERROR,
target);
Expand All @@ -124,14 +129,14 @@ class FanoutOperationBase {
}
this->InvokeRemoteRpc(
client.get(), request,
[this, target](grpc::Status status, Response& resp) {
[this, target, client_address](grpc::Status status, Response& resp) {
if (status.ok()) {
this->OnResponse(resp, target);
} else {
++Metrics::GetStats().info_fanout_fail_cnt;
VMSDK_LOG_EVERY_N_SEC(WARNING, nullptr, 1)
<< "FANOUT_DEBUG: InvokeRemoteRpc error on target "
<< target.address << ", status code: " << status.error_code()
<< client_address << ", status code: " << status.error_code()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, here you're passing duplicate information. Imagine if there was a programming error that caused the client_address to not match target->GetAddress(), then the log message would actually hide the problem.

<< ", error message: " << status.error_message();
// if grpc failed, the response is invalid, so we need to manually
// set the error type
Expand All @@ -151,7 +156,7 @@ class FanoutOperationBase {
}

virtual std::pair<grpc::Status, Response> GetLocalResponse(
const Request&, [[maybe_unused]] const FanoutSearchTarget&) = 0;
const Request&, [[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) = 0;

virtual void InvokeRemoteRpc(coordinator::Client*, const Request&,
std::function<void(grpc::Status, Response&)>,
Expand All @@ -160,14 +165,15 @@ class FanoutOperationBase {
virtual unsigned GetTimeoutMs() const = 0;

virtual Request GenerateRequest(
[[maybe_unused]] const FanoutSearchTarget&) = 0;
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) = 0;

virtual void OnResponse(const Response&,
[[maybe_unused]] const FanoutSearchTarget&) = 0;
virtual void OnResponse(
const Response&,
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) = 0;

virtual void OnError(grpc::Status status,
coordinator::FanoutErrorType error_type,
const FanoutSearchTarget& target) {
const vmsdk::cluster_map::NodeInfo& target) {
absl::MutexLock lock(&mutex_);
if (error_type == coordinator::FanoutErrorType::INDEX_NAME_ERROR) {
index_name_error_nodes.push_back(target);
Expand Down Expand Up @@ -205,39 +211,51 @@ class FanoutOperationBase {
// Log index name errors
if (!index_name_error_nodes.empty()) {
error_message = "Index name not found.";
for (const FanoutSearchTarget& target : index_name_error_nodes) {
if (target.type == FanoutSearchTarget::Type::kLocal) {
for (const vmsdk::cluster_map::NodeInfo& target :
index_name_error_nodes) {
if (target.is_local) {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< INDEX_NAME_ERROR_LOG_PREFIX << "LOCAL NODE";
} else {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< INDEX_NAME_ERROR_LOG_PREFIX << target.address;
<< INDEX_NAME_ERROR_LOG_PREFIX
<< absl::StrCat(target.socket_address.primary_endpoint, ":",
coordinator::GetCoordinatorPort(
target.socket_address.port));
}
}
}
// Log communication errors
if (!communication_error_nodes.empty()) {
error_message = "Communication error between nodes found.";
for (const FanoutSearchTarget& target : communication_error_nodes) {
if (target.type == FanoutSearchTarget::Type::kLocal) {
for (const vmsdk::cluster_map::NodeInfo& target :
communication_error_nodes) {
if (target.is_local) {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< COMMUNICATION_ERROR_LOG_PREFIX << "LOCAL NODE";
} else {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< COMMUNICATION_ERROR_LOG_PREFIX << target.address;
<< COMMUNICATION_ERROR_LOG_PREFIX
<< absl::StrCat(target.socket_address.primary_endpoint, ":",
coordinator::GetCoordinatorPort(
target.socket_address.port));
}
}
}
// Log inconsistent state errors
if (!inconsistent_state_error_nodes.empty()) {
error_message = "Inconsistent index state error found.";
for (const FanoutSearchTarget& target : inconsistent_state_error_nodes) {
if (target.type == FanoutSearchTarget::Type::kLocal) {
for (const vmsdk::cluster_map::NodeInfo& target :
inconsistent_state_error_nodes) {
if (target.is_local) {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< INCONSISTENT_STATE_ERROR_LOG_PREFIX << "LOCAL NODE";
} else {
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
<< INCONSISTENT_STATE_ERROR_LOG_PREFIX << target.address;
<< INCONSISTENT_STATE_ERROR_LOG_PREFIX
<< absl::StrCat(target.socket_address.primary_endpoint, ":",
coordinator::GetCoordinatorPort(
target.socket_address.port));
}
}
}
Expand Down Expand Up @@ -286,10 +304,10 @@ class FanoutOperationBase {
unsigned outstanding_{0};
absl::Mutex mutex_;
std::unique_ptr<vmsdk::BlockedClient> blocked_client_;
std::vector<FanoutSearchTarget> index_name_error_nodes;
std::vector<FanoutSearchTarget> inconsistent_state_error_nodes;
std::vector<FanoutSearchTarget> communication_error_nodes;
std::vector<FanoutSearchTarget> targets_;
std::vector<vmsdk::cluster_map::NodeInfo> index_name_error_nodes;
std::vector<vmsdk::cluster_map::NodeInfo> inconsistent_state_error_nodes;
std::vector<vmsdk::cluster_map::NodeInfo> communication_error_nodes;
std::vector<vmsdk::cluster_map::NodeInfo> targets_;
std::chrono::steady_clock::time_point deadline_tp_;
bool timeout_occurred_ = false;
};
Expand Down
19 changes: 13 additions & 6 deletions src/query/primary_info_fanout_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ namespace valkey_search::query::primary_info_fanout {

PrimaryInfoFanoutOperation::PrimaryInfoFanoutOperation(
uint32_t db_num, const std::string& index_name, unsigned timeout_ms)
: fanout::FanoutOperationBase<coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
fanout::FanoutTargetMode::kPrimary>(),
: fanout::FanoutOperationBase<
coordinator::InfoIndexPartitionRequest,
coordinator::InfoIndexPartitionResponse,
vmsdk::cluster_map::FanoutTargetMode::kPrimary>(),
db_num_(db_num),
index_name_(index_name),
timeout_ms_(timeout_ms),
Expand All @@ -25,12 +26,18 @@ PrimaryInfoFanoutOperation::PrimaryInfoFanoutOperation(
num_records_(0),
hash_indexing_failures_(0) {}

std::vector<vmsdk::cluster_map::NodeInfo>
PrimaryInfoFanoutOperation::GetTargets() const {
return ValkeySearch::Instance().GetClusterMap()->GetPrimaryTargets();
}

unsigned PrimaryInfoFanoutOperation::GetTimeoutMs() const {
return timeout_ms_;
}

coordinator::InfoIndexPartitionRequest
PrimaryInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {
PrimaryInfoFanoutOperation::GenerateRequest(
const vmsdk::cluster_map::NodeInfo&) {
coordinator::InfoIndexPartitionRequest req;
req.set_db_num(db_num_);
req.set_index_name(index_name_);
Expand All @@ -39,7 +46,7 @@ PrimaryInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {

void PrimaryInfoFanoutOperation::OnResponse(
const coordinator::InfoIndexPartitionResponse& resp,
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
if (!resp.error().empty()) {
grpc::Status status =
grpc::Status(grpc::StatusCode::INTERNAL, resp.error());
Expand Down Expand Up @@ -96,7 +103,7 @@ void PrimaryInfoFanoutOperation::OnResponse(
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
PrimaryInfoFanoutOperation::GetLocalResponse(
const coordinator::InfoIndexPartitionRequest& request,
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
return coordinator::Service::GenerateInfoResponse(request);
}

Expand Down
Loading
Loading