Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion source/extensions/filters/common/local_ratelimit/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,23 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "dynamic_descriptor_lib",
hdrs = ["dynamic_descriptor.h"],
deps = [
"//envoy/ratelimit:ratelimit_interface",
],
)

envoy_cc_library(
name = "local_ratelimit_lib",
srcs = ["local_ratelimit_impl.cc"],
srcs = [
"dynamic_descriptor.cc",
"local_ratelimit_impl.cc",
],
hdrs = ["local_ratelimit_impl.h"],
deps = [
":dynamic_descriptor_lib",
":local_ratelimit_interface",
"//envoy/event:dispatcher_interface",
"//envoy/event:timer_interface",
Expand Down
109 changes: 109 additions & 0 deletions source/extensions/filters/common/local_ratelimit/dynamic_descriptor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#include "source/extensions/filters/common/local_ratelimit/dynamic_descriptor.h"

#include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h"

namespace Envoy {
namespace Extensions {
namespace Filters {
namespace Common {
namespace LocalRateLimit {

// Compare the request descriptor entries with the user descriptor entries. If all non-empty user
// descriptor values match the request descriptor values, return true
bool DynamicDescriptorMap::matchDescriptorEntries(
const std::vector<RateLimit::DescriptorEntry>& request_entries,
const std::vector<RateLimit::DescriptorEntry>& config_entries) {
// Check for equality of sizes
if (request_entries.size() != config_entries.size()) {
return false;
}

for (size_t i = 0; i < request_entries.size(); ++i) {
// Check if the keys are equal.
if (request_entries[i].key_ != config_entries[i].key_) {
return false;
}

// Check values are equal or wildcard value is used.
if (config_entries[i].value_.empty()) {
continue;
}
if (request_entries[i].value_ != config_entries[i].value_) {
return false;
}
}
return true;
}

void DynamicDescriptorMap::addDescriptor(const RateLimit::LocalDescriptor& config_descriptor,
DynamicDescriptorSharedPtr dynamic_descriptor) {
auto result = config_descriptors_.emplace(config_descriptor, std::move(dynamic_descriptor));
if (!result.second) {
throw EnvoyException(absl::StrCat("duplicate descriptor in the local rate descriptor: ",
result.first->first.toString()));
}
}

RateLimitTokenBucketSharedPtr
DynamicDescriptorMap::getBucket(const RateLimit::Descriptor request_descriptor) {
for (const auto& pair : config_descriptors_) {
auto config_descriptor = pair.first;
if (!matchDescriptorEntries(request_descriptor.entries_, config_descriptor.entries_)) {
continue;
}

// here is when a user configured wildcard descriptor matches the request descriptor.
return pair.second->addOrGetDescriptor(request_descriptor);
}
return nullptr;
}

DynamicDescriptor::DynamicDescriptor(uint64_t per_descriptor_max_tokens,
uint64_t per_descriptor_tokens_per_fill,
std::chrono::milliseconds per_descriptor_fill_interval,
uint32_t lru_size, TimeSource& time_source, bool shadow_mode)
: max_tokens_(per_descriptor_max_tokens), tokens_per_fill_(per_descriptor_tokens_per_fill),
fill_interval_(per_descriptor_fill_interval), lru_size_(lru_size), time_source_(time_source),
shadow_mode_(shadow_mode) {}

RateLimitTokenBucketSharedPtr
DynamicDescriptor::addOrGetDescriptor(const RateLimit::Descriptor& request_descriptor) {
absl::WriterMutexLock lock(dyn_desc_lock_);
auto iter = dynamic_descriptors_.find(request_descriptor);
if (iter != dynamic_descriptors_.end()) {
if (iter->second.second != lru_list_.begin()) {
lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second);
}
return iter->second.first;
}
// add a new descriptor to the set along with its token bucket
RateLimitTokenBucketSharedPtr per_descriptor_token_bucket;
ENVOY_LOG(trace, "creating atomic token bucket for dynamic descriptor");
ENVOY_LOG(trace, "max_tokens: {}, tokens_per_fill: {}, fill_interval: {}", max_tokens_,
tokens_per_fill_, std::chrono::duration<double>(fill_interval_).count());
per_descriptor_token_bucket = std::make_shared<RateLimitTokenBucket>(
max_tokens_, tokens_per_fill_, fill_interval_, time_source_, shadow_mode_);

ENVOY_LOG(trace, "DynamicDescriptor::addorGetDescriptor: adding dynamic descriptor: {}",
request_descriptor.toString());
lru_list_.emplace_front(request_descriptor);
auto result = dynamic_descriptors_.emplace(
request_descriptor, std::pair(per_descriptor_token_bucket, lru_list_.begin()));
auto token_bucket = result.first->second.first;
if (lru_list_.size() > lru_size_) {
ENVOY_LOG(trace,
"DynamicDescriptor::addorGetDescriptor: lru_size({}) overflow. Removing dynamic "
"descriptor: {}",
lru_size_, lru_list_.back().toString());
dynamic_descriptors_.erase(lru_list_.back());
lru_list_.pop_back();
}
ASSERT(lru_list_.size() == dynamic_descriptors_.size());
return token_bucket;
}

} // namespace LocalRateLimit
} // namespace Common
} // namespace Filters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#pragma once

#include <chrono>

#include "envoy/event/dispatcher.h"
#include "envoy/ratelimit/ratelimit.h"

#include "source/common/common/logger.h"

#include "absl/synchronization/mutex.h"

namespace Envoy {
namespace Extensions {
namespace Filters {
namespace Common {
namespace LocalRateLimit {

class RateLimitTokenBucket;
using RateLimitTokenBucketSharedPtr = std::shared_ptr<RateLimitTokenBucket>;

class DynamicDescriptor : public Logger::Loggable<Logger::Id::rate_limit_quota> {
public:
DynamicDescriptor(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval, uint32_t lru_size,
TimeSource& time_source, bool shadow_mode);
// add a new user configured descriptor to the set.
RateLimitTokenBucketSharedPtr
addOrGetDescriptor(const Envoy::RateLimit::Descriptor& request_descriptor);

private:
using LruList = std::list<Envoy::RateLimit::Descriptor>;

mutable absl::Mutex dyn_desc_lock_;
Envoy::RateLimit::Descriptor::Map<std::pair<RateLimitTokenBucketSharedPtr, LruList::iterator>>
dynamic_descriptors_ ABSL_GUARDED_BY(dyn_desc_lock_);

uint64_t max_tokens_;
uint64_t tokens_per_fill_;
const std::chrono::milliseconds fill_interval_;
LruList lru_list_;
uint32_t lru_size_;
TimeSource& time_source_;
const bool shadow_mode_{false};
};

using DynamicDescriptorSharedPtr = std::shared_ptr<DynamicDescriptor>;

class DynamicDescriptorMap : public Logger::Loggable<Logger::Id::rate_limit_quota> {
public:
// add a new user configured descriptor to the set.
void addDescriptor(const Envoy::RateLimit::LocalDescriptor& descriptor,
DynamicDescriptorSharedPtr dynamic_descriptor);
// pass request_descriptors to the dynamic descriptor set to get the token bucket.
RateLimitTokenBucketSharedPtr getBucket(const Envoy::RateLimit::Descriptor);

private:
bool matchDescriptorEntries(const std::vector<Envoy::RateLimit::DescriptorEntry>& request_entries,
const std::vector<Envoy::RateLimit::DescriptorEntry>& user_entries);
Envoy::RateLimit::LocalDescriptor::Map<DynamicDescriptorSharedPtr> config_descriptors_;
};

} // namespace LocalRateLimit
} // namespace Common
} // namespace Filters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ class LocalRateLimiter {
struct Result {
bool allowed{};
std::shared_ptr<const TokenBucketContext> token_bucket_context;
RateLimit::XRateLimitOption x_ratelimit_option{};
Envoy::RateLimit::XRateLimitOption x_ratelimit_option{};
};

virtual ~LocalRateLimiter() = default;

// Returns true if the request should be rate limited.
virtual Result requestAllowed(absl::Span<const RateLimit::Descriptor> request_descriptors) = 0;
virtual Result
requestAllowed(absl::Span<const Envoy::RateLimit::Descriptor> request_descriptors) = 0;
};
using LocalRateLimiterSharedPtr = std::shared_ptr<LocalRateLimiter>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
}

for (const auto& descriptor : descriptors) {
RateLimit::LocalDescriptor new_descriptor;
Envoy::RateLimit::LocalDescriptor new_descriptor;
bool wildcard_found = false;
new_descriptor.entries_.reserve(descriptor.entries_size());
for (const auto& entry : descriptor.entries()) {
Expand Down Expand Up @@ -154,11 +154,11 @@ LocalRateLimiterImpl::~LocalRateLimiterImpl() = default;

struct MatchResult {
RateLimitTokenBucketSharedPtr token_bucket;
std::reference_wrapper<const RateLimit::Descriptor> request_descriptor;
std::reference_wrapper<const Envoy::RateLimit::Descriptor> request_descriptor;
};

LocalRateLimiterImpl::Result
LocalRateLimiterImpl::requestAllowed(absl::Span<const RateLimit::Descriptor> request_descriptors) {
LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed(
absl::Span<const Envoy::RateLimit::Descriptor> request_descriptors) {

// In most cases the request descriptors has only few elements. We use a inlined vector to
// avoid heap allocation.
Expand Down Expand Up @@ -212,7 +212,7 @@ LocalRateLimiterImpl::requestAllowed(absl::Span<const RateLimit::Descriptor> req
? std::shared_ptr<TokenBucketContext>(nullptr)
: std::shared_ptr<TokenBucketContext>(matched_results[0].token_bucket),
matched_results.empty()
? RateLimit::XRateLimitOption::RateLimit_XRateLimitOption_UNSPECIFIED
? Envoy::RateLimit::XRateLimitOption::RateLimit_XRateLimitOption_UNSPECIFIED
: matched_results[0].request_descriptor.get().x_ratelimit_option_,

};
Expand All @@ -223,14 +223,14 @@ LocalRateLimiterImpl::requestAllowed(absl::Span<const RateLimit::Descriptor> req
// If the request is forbidden by the default token bucket, return the result and the
// default token bucket.
return {false, std::shared_ptr<TokenBucketContext>(default_token_bucket_),
RateLimit::XRateLimitOption::RateLimit_XRateLimitOption_UNSPECIFIED};
Envoy::RateLimit::XRateLimitOption::RateLimit_XRateLimitOption_UNSPECIFIED};
}

// If the request is allowed then return the result the token bucket. The descriptor
// token bucket will be selected as priority if it exists.
return {true, matched_results.empty() ? default_token_bucket_ : matched_results[0].token_bucket,
matched_results.empty()
? RateLimit::XRateLimitOption::RateLimit_XRateLimitOption_UNSPECIFIED
? Envoy::RateLimit::XRateLimitOption::RateLimit_XRateLimitOption_UNSPECIFIED
: matched_results[0].request_descriptor.get().x_ratelimit_option_};
};

Expand All @@ -240,100 +240,6 @@ LocalRateLimiterImpl::requestAllowed(absl::Span<const RateLimit::Descriptor> req
return {true, bucket_context, matched_results[0].request_descriptor.get().x_ratelimit_option_};
}

// Compare the request descriptor entries with the user descriptor entries. If all non-empty user
// descriptor values match the request descriptor values, return true
bool DynamicDescriptorMap::matchDescriptorEntries(
const std::vector<RateLimit::DescriptorEntry>& request_entries,
const std::vector<RateLimit::DescriptorEntry>& config_entries) {
// Check for equality of sizes
if (request_entries.size() != config_entries.size()) {
return false;
}

for (size_t i = 0; i < request_entries.size(); ++i) {
// Check if the keys are equal.
if (request_entries[i].key_ != config_entries[i].key_) {
return false;
}

// Check values are equal or wildcard value is used.
if (config_entries[i].value_.empty()) {
continue;
}
if (request_entries[i].value_ != config_entries[i].value_) {
return false;
}
}
return true;
}

void DynamicDescriptorMap::addDescriptor(const RateLimit::LocalDescriptor& config_descriptor,
DynamicDescriptorSharedPtr dynamic_descriptor) {
auto result = config_descriptors_.emplace(config_descriptor, std::move(dynamic_descriptor));
if (!result.second) {
throw EnvoyException(absl::StrCat("duplicate descriptor in the local rate descriptor: ",
result.first->first.toString()));
}
}

RateLimitTokenBucketSharedPtr
DynamicDescriptorMap::getBucket(const RateLimit::Descriptor request_descriptor) {
for (const auto& pair : config_descriptors_) {
auto config_descriptor = pair.first;
if (!matchDescriptorEntries(request_descriptor.entries_, config_descriptor.entries_)) {
continue;
}

// here is when a user configured wildcard descriptor matches the request descriptor.
return pair.second->addOrGetDescriptor(request_descriptor);
}
return nullptr;
}

DynamicDescriptor::DynamicDescriptor(uint64_t per_descriptor_max_tokens,
uint64_t per_descriptor_tokens_per_fill,
std::chrono::milliseconds per_descriptor_fill_interval,
uint32_t lru_size, TimeSource& time_source, bool shadow_mode)
: max_tokens_(per_descriptor_max_tokens), tokens_per_fill_(per_descriptor_tokens_per_fill),
fill_interval_(per_descriptor_fill_interval), lru_size_(lru_size), time_source_(time_source),
shadow_mode_(shadow_mode) {}

RateLimitTokenBucketSharedPtr
DynamicDescriptor::addOrGetDescriptor(const RateLimit::Descriptor& request_descriptor) {
absl::WriterMutexLock lock(dyn_desc_lock_);
auto iter = dynamic_descriptors_.find(request_descriptor);
if (iter != dynamic_descriptors_.end()) {
if (iter->second.second != lru_list_.begin()) {
lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second);
}
return iter->second.first;
}
// add a new descriptor to the set along with its token bucket
RateLimitTokenBucketSharedPtr per_descriptor_token_bucket;
ENVOY_LOG(trace, "creating atomic token bucket for dynamic descriptor");
ENVOY_LOG(trace, "max_tokens: {}, tokens_per_fill: {}, fill_interval: {}", max_tokens_,
tokens_per_fill_, std::chrono::duration<double>(fill_interval_).count());
per_descriptor_token_bucket = std::make_shared<RateLimitTokenBucket>(
max_tokens_, tokens_per_fill_, fill_interval_, time_source_, shadow_mode_);

ENVOY_LOG(trace, "DynamicDescriptor::addorGetDescriptor: adding dynamic descriptor: {}",
request_descriptor.toString());
lru_list_.emplace_front(request_descriptor);
auto result = dynamic_descriptors_.emplace(
request_descriptor, std::pair(per_descriptor_token_bucket, lru_list_.begin()));
auto token_bucket = result.first->second.first;
if (lru_list_.size() > lru_size_) {
ENVOY_LOG(trace,
"DynamicDescriptor::addorGetDescriptor: lru_size({}) overflow. Removing dynamic "
"descriptor: {}",
lru_size_, lru_list_.back().toString());
dynamic_descriptors_.erase(lru_list_.back());
lru_list_.pop_back();
}
ASSERT(lru_list_.size() == dynamic_descriptors_.size());
return token_bucket;
}

} // namespace LocalRateLimit
} // namespace Common
} // namespace Filters
Expand Down
Loading
Loading