Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/gas_limiter/args.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use clap::Args;

#[derive(Debug, Clone, Default, PartialEq, Eq, Args)]
pub struct GasLimiterArgs {
/// Enable address-based gas rate limiting
#[arg(long = "gas-limiter.enabled", env)]
pub gas_limiter_enabled: bool,

/// Maximum gas per address in token bucket. Defaults to 10 million gas.
#[arg(
long = "gas-limiter.max-gas-per-address",
env,
default_value = "10000000"
)]
pub max_gas_per_address: u64,

/// Gas refill rate per block. Defaults to 1 million gas per block.
#[arg(
long = "gas-limiter.refill-rate-per-block",
env,
default_value = "1000000"
)]
pub refill_rate_per_block: u64,

/// How many blocks to wait before cleaning up stale buckets for addresses.
#[arg(long = "gas-limiter.cleanup-interval", env, default_value = "100")]
pub cleanup_interval: u64,
}
13 changes: 13 additions & 0 deletions src/gas_limiter/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use alloy_primitives::Address;

#[derive(Debug, thiserror::Error)]
pub enum GasLimitError {
#[error(
"Address {address} exceeded gas limit: {requested} gwei requested, {available} gwei available"
)]
AddressLimitExceeded {
address: Address,
requested: u64,
available: u64,
},
}
47 changes: 47 additions & 0 deletions src/gas_limiter/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::time::Duration;

use metrics::{Counter, Gauge, Histogram};
use reth_metrics::Metrics;

use crate::gas_limiter::error::GasLimitError;

#[derive(Metrics, Clone)]
#[metrics(scope = "op_rbuilder.gas_limiter")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be #[derive(MetricsSet)]

See:

#[derive(MetricsSet)]

Pipeline steps learn about the name of their scope when the pipeline is instantiated. So far, the pattern that has emerged is that metrics are initialized (::with_scope(..)) in Step::setup. Try to find an elegant way to initialize the filter-specific metrics scope at runtime in the setup function. We might need to change the filter interface if there is no clean way of doing it.

pub(super) struct GasLimiterMetrics {
/// Transactions rejected by gas limits Labeled by reason: "per_address",
/// "global", "burst"
pub rejections: Counter,

/// Time spent in rate limiting logic
pub check_time: Histogram,

/// Number of addresses with active budgets
pub active_address_count: Gauge,

/// Time to refill buckets
pub refresh_duration: Histogram,
}

impl GasLimiterMetrics {
pub(super) fn record_gas_check(
&self,
check_result: &Result<bool, GasLimitError>,
duration: Duration,
) {
if let Ok(created_new_bucket) = check_result {
if *created_new_bucket {
self.active_address_count.increment(1);
}
} else {
self.rejections.increment(1);
}

self.check_time.record(duration);
}

pub(super) fn record_refresh(&self, removed_addresses: usize, duration: Duration) {
self.active_address_count
.decrement(removed_addresses as f64);
self.refresh_duration.record(duration);
}
}
296 changes: 296 additions & 0 deletions src/gas_limiter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
use std::{cmp::min, marker::PhantomData, sync::Arc, time::Instant};

use alloy_primitives::Address;
use dashmap::DashMap;

use crate::{
alloy::consensus::Transaction,
gas_limiter::metrics::GasLimiterMetrics,
pool::Order,
prelude::*,
};

pub mod args;
pub mod error;
mod metrics;

pub use args::GasLimiterArgs;
pub use error::GasLimitError;

#[derive(Debug, Clone)]
pub struct AddressGasLimiter {
inner: Option<AddressGasLimiterInner>,
}
Comment on lines +22 to +24
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, maybe this alternative shape of the data structure?

enum AddressGasLimiter {
   Disabled,
   Enabled { config, address_buckets, metrics }
}


#[derive(Debug, Clone)]
struct AddressGasLimiterInner {
config: GasLimiterArgs,
// We don't need an Arc<Mutex<_>> here, we can get away with RefCell, but
// the reth PayloadBuilder trait needs this to be Send + Sync
address_buckets: Arc<DashMap<Address, TokenBucket>>,
metrics: GasLimiterMetrics,
}

#[derive(Debug, Clone)]
struct TokenBucket {
capacity: u64,
available: u64,
}

/// A filter wrapper around AddressGasLimiter that can be used with AppendOrders.
/// This provides a convenient way to integrate per-address gas limiting into
/// the payload building pipeline.
#[derive(Debug, Clone)]
pub struct GasLimitFilter<P: Platform> {
limiter: AddressGasLimiter,
_phantom: PhantomData<P>,
}

impl<P: Platform> GasLimitFilter<P> {
/// Creates a new gas limit filter with the given configuration.
pub fn new(config: GasLimiterArgs) -> Self {
Self {
limiter: AddressGasLimiter::new(config),
_phantom: PhantomData,
}
}

/// Creates a filter closure that can be used with AppendOrders::with_filter().
/// The filter will reject orders if any of their transactions would exceed
/// the per-address gas limit. The filter automatically refreshes gas buckets
/// when it detects a new block number.
pub fn create_filter(
self,
) -> impl Fn(&Checkpoint<P>, &Order<P>) -> bool + Send + Sync + 'static {
use std::sync::atomic::{AtomicU64, Ordering};

// Track the last block number we refreshed for
let last_refreshed_block = Arc::new(AtomicU64::new(0));

move |payload: &Checkpoint<P>, order: &Order<P>| -> bool {
// Get current block number from the payload's context
let current_block = payload.block().number();
let last_block = last_refreshed_block.load(Ordering::Relaxed);

// Refresh gas buckets if we're on a new block
if current_block > last_block {
self.limiter.refresh(current_block);
last_refreshed_block.store(current_block, Ordering::Relaxed);
}

// For each transaction in the order, check if the signer has enough gas budget
for tx in order.transactions() {
let signer = tx.signer();
// Access the underlying transaction to get gas limit

let gas_limit = tx.gas_limit();

// Try to consume gas from the signer's bucket
if self.limiter.consume_gas(signer, gas_limit).is_err() {
// Not enough gas in bucket, reject this order
return true; // true means "skip this order"
}
}
false // false means "don't skip this order"
}
}

/// Refreshes the gas buckets, typically called at the start of each block.
/// This refills buckets and performs garbage collection of stale entries.
pub fn refresh(&self, block_number: u64) {
self.limiter.refresh(block_number);
}
Comment on lines +97 to +101
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright... we're seeing an emerging pattern here. Maybe more filters will need to have some logic that kicks in before or after a payload job.

Try to think about an interface design that optionally allows devs to add pre/post job logic, while still allowing the minimalistic syntax that we have right now.

}

impl AddressGasLimiter {
pub fn new(config: GasLimiterArgs) -> Self {
Self {
inner: AddressGasLimiterInner::try_new(config),
}
}

/// Check if there's enough gas for this address and consume it. Returns
/// Ok(()) if there's enough otherwise returns an error.
pub fn consume_gas(&self, address: Address, gas_requested: u64) -> Result<(), GasLimitError> {
if let Some(inner) = &self.inner {
inner.consume_gas(address, gas_requested)
} else {
Ok(())
}
}

/// Should be called upon each new block. Refills buckets/Garbage collection
pub fn refresh(&self, block_number: u64) {
if let Some(inner) = self.inner.as_ref() {
inner.refresh(block_number)
}
}
}

impl AddressGasLimiterInner {
fn try_new(config: GasLimiterArgs) -> Option<Self> {
if !config.gas_limiter_enabled {
return None;
}

Some(Self {
config,
address_buckets: Default::default(),
metrics: Default::default(),
})
}

fn consume_gas_inner(
&self,
address: Address,
gas_requested: u64,
) -> Result<bool, GasLimitError> {
let mut created_new_bucket = false;
let mut bucket = self
.address_buckets
.entry(address)
// if we don't find a bucket we need to initialize a new one
.or_insert_with(|| {
created_new_bucket = true;
TokenBucket::new(self.config.max_gas_per_address)
});

if gas_requested > bucket.available {
return Err(GasLimitError::AddressLimitExceeded {
address,
requested: gas_requested,
available: bucket.available,
});
}

bucket.available -= gas_requested;

Ok(created_new_bucket)
}

fn consume_gas(&self, address: Address, gas_requested: u64) -> Result<(), GasLimitError> {
let start = Instant::now();
let result = self.consume_gas_inner(address, gas_requested);

self.metrics.record_gas_check(&result, start.elapsed());

result.map(|_| ())
}

fn refresh_inner(&self, block_number: u64) -> usize {
let active_addresses = self.address_buckets.len();

self.address_buckets.iter_mut().for_each(|mut bucket| {
bucket.available = min(
bucket.capacity,
bucket.available + self.config.refill_rate_per_block,
)
});

// Only clean up stale buckets every `cleanup_interval` blocks
if block_number % self.config.cleanup_interval == 0 {
self.address_buckets
.retain(|_, bucket| bucket.available <= bucket.capacity);
}

active_addresses - self.address_buckets.len()
}

fn refresh(&self, block_number: u64) {
let start = Instant::now();
let removed_addresses = self.refresh_inner(block_number);

self.metrics
.record_refresh(removed_addresses, start.elapsed());
}
}

impl TokenBucket {
fn new(capacity: u64) -> Self {
Self {
capacity,
available: capacity,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::Address;

fn create_test_config(max_gas: u64, refill_rate: u64, cleanup_interval: u64) -> GasLimiterArgs {
GasLimiterArgs {
gas_limiter_enabled: true,
max_gas_per_address: max_gas,
refill_rate_per_block: refill_rate,
cleanup_interval,
}
}

fn test_address() -> Address {
Address::from([1u8; 20])
}

#[test]
fn test_basic_refill() {
let config = create_test_config(1000, 200, 10);
let limiter = AddressGasLimiter::new(config);

// Consume all gas
assert!(limiter.consume_gas(test_address(), 1000).is_ok());
assert!(limiter.consume_gas(test_address(), 1).is_err());

// Refill and check available gas increased
limiter.refresh(1);
assert!(limiter.consume_gas(test_address(), 200).is_ok());
assert!(limiter.consume_gas(test_address(), 1).is_err());
}

#[test]
fn test_over_capacity_request() {
let config = create_test_config(1000, 100, 10);
let limiter = AddressGasLimiter::new(config);

// Request more than capacity should fail
let result = limiter.consume_gas(test_address(), 1500);
assert!(result.is_err());

if let Err(GasLimitError::AddressLimitExceeded { available, .. }) = result {
assert_eq!(available, 1000);
}

// Bucket should still be full after failed request
assert!(limiter.consume_gas(test_address(), 1000).is_ok());
}

#[test]
fn test_multiple_users() {
// Simulate more realistic scenario
let config = create_test_config(10_000_000, 1_000_000, 100); // 10M max, 1M refill
let limiter = AddressGasLimiter::new(config);

let searcher1 = Address::from([0x1; 20]);
let searcher2 = Address::from([0x2; 20]);
let attacker = Address::from([0x3; 20]);

// Normal searchers use reasonable amounts
assert!(limiter.consume_gas(searcher1, 500_000).is_ok());
assert!(limiter.consume_gas(searcher2, 750_000).is_ok());

// Attacker tries to consume massive amounts
assert!(limiter.consume_gas(attacker, 15_000_000).is_err()); // Should fail - over capacity
assert!(limiter.consume_gas(attacker, 5_000_000).is_ok()); // Should succeed - within capacity

// Attacker tries to consume more
assert!(limiter.consume_gas(attacker, 6_000_000).is_err()); // Should fail - would exceed remaining

// New block - refill
limiter.refresh(1);

// Everyone should get some gas back
assert!(limiter.consume_gas(searcher1, 1_000_000).is_ok()); // Had 9.5M + 1M refill, now 9.5M
assert!(limiter.consume_gas(searcher2, 1_000_000).is_ok()); // Had 9.25M + 1M refill, now 9.25M
assert!(limiter.consume_gas(attacker, 1_000_000).is_ok()); // Had 5M + 1M refill, now 5M
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub mod pool;
/// Common steps library
pub mod steps;

/// Gas Limiter
pub mod gas_limiter;

/// Externally available test utilities
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
Expand Down
Loading
Loading