Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions crates/op-rbuilder/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::{
presim::{TopOfBlockSimulator, maintain_pending_simulations, maintain_tip_state},
revert_protection::{EthApiExtServer, RevertProtectionExt},
};
use moka::future::Cache;
use reth::builder::{NodeBuilder, WithLaunchContext};
use reth_cli_commands::launcher::Launcher;
use reth_db::mdbx::DatabaseEnv;
Expand Down Expand Up @@ -97,8 +96,6 @@ impl Launcher<OpChainSpecParser, OpRbuilderArgs> for BuilderLauncher {
let gas_limit_config = builder_config.gas_limit_config.clone();
let rollup_args = &builder_args.rollup_args;
let op_node = OpNode::new(rollup_args.clone());
let reverted_cache = Cache::builder().max_capacity(100).build();
let reverted_cache_copy = reverted_cache.clone();
let backrun_bundle_enabled = builder_args.backrun_bundle.backruns_enabled;
let block_time_secs = builder_config.block_time.as_millis() as u64 / 1000;
let backrun_bundle_pool = builder_config.backrun_bundle_pool.clone();
Expand Down Expand Up @@ -139,7 +136,6 @@ impl Launcher<OpChainSpecParser, OpRbuilderArgs> for BuilderLauncher {
pool,
provider,
ctx.registry.eth_api().clone(),
reverted_cache,
simulator_for_rpc,
);

Expand All @@ -166,11 +162,8 @@ impl Launcher<OpChainSpecParser, OpRbuilderArgs> for BuilderLauncher {
if builder_args.log_pool_transactions {
info!("Logging pool transactions");
let listener = ctx.pool.all_transactions_event_listener();
let task = monitor_tx_pool(
listener,
reverted_cache_copy,
builder_args.enable_tx_tracking_debug_logs,
);
let task =
monitor_tx_pool(listener, builder_args.enable_tx_tracking_debug_logs);
ctx.task_executor.spawn_critical_task("txlogging", task);
}

Expand Down
12 changes: 2 additions & 10 deletions crates/op-rbuilder/src/monitor_tx_pool.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
use crate::tx::FBPooledTransaction;
use alloy_primitives::B256;
use futures_util::StreamExt;
use moka::future::Cache;
use reth_transaction_pool::{AllTransactionsEvents, FullTransactionEvent};
use tracing::debug;

pub(crate) async fn monitor_tx_pool(
mut new_transactions: AllTransactionsEvents<FBPooledTransaction>,
reverted_cache: Cache<B256, ()>,
enable_tx_tracking_debug_logs: bool,
) {
while let Some(event) = new_transactions.next().await {
transaction_event_log(event, &reverted_cache, enable_tx_tracking_debug_logs).await;
transaction_event_log(event, enable_tx_tracking_debug_logs);
}
}

async fn transaction_event_log(
fn transaction_event_log(
event: FullTransactionEvent<FBPooledTransaction>,
reverted_cache: &Cache<B256, ()>,
enable_tx_tracking_debug_logs: bool,
) {
if !enable_tx_tracking_debug_logs {
Expand Down Expand Up @@ -62,10 +58,6 @@ async fn transaction_event_log(
"Transaction event received"
),
FullTransactionEvent::Discarded(hash) => {
// add the transaction hash to the reverted cache to notify the
// eth get transaction receipt method
reverted_cache.insert(hash, ()).await;

debug!(
target: "tx_trace",
tx_hash = %hash,
Expand Down
56 changes: 53 additions & 3 deletions crates/op-rbuilder/src/pool/builder.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
use alloy_primitives::TxHash;
use futures::StreamExt;
use moka::sync::Cache;
use reth_evm::ConfigureEvm;
use reth_node_api::{FullNodeTypes, NodeTypes, PrimitivesTy, TxTy};
use reth_node_builder::{BuilderContext, components::PoolBuilder};
use reth_optimism_forks::OpHardforks;
use reth_optimism_node::OpPoolBuilder;
use reth_optimism_txpool::{OpPooledTx, OpTransactionPool};
use reth_transaction_pool::{EthPoolTransaction, blobstore::DiskFileBlobStore};
use reth_tasks::TaskExecutor;
use reth_transaction_pool::{
AllTransactionsEvents, EthPoolTransaction, FullTransactionEvent, TransactionPool,
blobstore::DiskFileBlobStore,
};

use crate::{args::OpRbuilderArgs, pool::Flashpool, tx::FBPooledTransaction};

pub struct FlashpoolBuilder {
op_pool_builder: OpPoolBuilder<FBPooledTransaction>,

enable_revert_protection: bool,
}

impl FlashpoolBuilder {
Expand All @@ -22,7 +31,10 @@ impl FlashpoolBuilder {
rollup_args.enable_tx_conditional || builder_args.enable_revert_protection,
);

Self { op_pool_builder }
Self {
op_pool_builder,
enable_revert_protection: builder_args.enable_revert_protection,
}
}
}

Expand All @@ -40,8 +52,46 @@ where
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> eyre::Result<Self::Pool> {
let Self {
op_pool_builder,
enable_revert_protection,
} = self;

let inner_pool = op_pool_builder.build_pool(ctx, evm_config).await?;

let reverted_cache = enable_revert_protection.then_some(setup_revert_protection(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a reverted cache? is it for easier test assertions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no this is a core feature for revert protection functionality. it's so senders can track if their txs have been evicted from the pool because they were reverted

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't expose rpc methods for transaction receipts though? rollup-boost only forwards bundles and raw transactions

Copy link
Copy Markdown
Collaborator

@avalonche avalonche May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if its for debug purposes, the cache size should be configurable - right now the bundle request rate is 300 req/s for reverted transactions on mainnet when the cache max_capacity is only 100

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pr is meant to just fix the bug, not add features. i think the cache capacity should be configured in terms of storage size (like configured to a max of 4mb).

and i thought the plan was to expose the rpc for receipts at some point

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no plans for receipts rpc

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what i'm referring to #188

ctx.task_executor(),
inner_pool.all_transactions_event_listener(),
));

Ok(Flashpool {
inner: self.op_pool_builder.build_pool(ctx, evm_config).await?,
inner: inner_pool,
reverted_cache,
})
}
}

fn setup_revert_protection(
task_executor: &TaskExecutor,
mut events: AllTransactionsEvents<FBPooledTransaction>,
) -> Cache<TxHash, ()> {
let reverted_cache: Cache<_, ()> = Cache::builder().max_capacity(100).build();
// Reverted transactions are removed from the pool by the conditional-tx GC
// maintenance task. This is spawned during `OpPoolBuilder::build_pool` and
// accesses the inner `OpTransactionPool` directly, calling
// `remove_transactions`. Unfortunately, this bypasses our Flashpool
// wrapper. So to ensure the reverted_cache is populated, we need to
// subscribe to pool events and insert on Discarded events.
task_executor.spawn_task({
let reverted_cache = reverted_cache.clone();
async move {
while let Some(event) = events.next().await {
if let FullTransactionEvent::Discarded(hash) = event {
reverted_cache.insert(hash, ());
}
}
}
});

reverted_cache
}
21 changes: 21 additions & 0 deletions crates/op-rbuilder/src/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
mod builder;
mod delegate;

use alloy_primitives::TxHash;
pub use builder::FlashpoolBuilder;

use moka::sync::Cache;
use reth_transaction_pool::TransactionPool;

use crate::tx::FBPooledTransaction;

#[derive(Debug, Clone)]
pub struct Flashpool<P: TransactionPool<Transaction = FBPooledTransaction>> {
inner: P,

/// Cache to store reverted tx hashes
reverted_cache: Option<Cache<TxHash, ()>>,
}

/// Custom extensions on the pool where it doesn't make sense to intercept an
/// existing pool method.
pub trait FlashpoolExt {
/// Checks if a transaction is reverted by checking if the given transaction
/// hash is present in the reverted cache.
fn is_tx_reverted(&self, hash: TxHash) -> bool;
}

impl<P: TransactionPool<Transaction = FBPooledTransaction>> FlashpoolExt for Flashpool<P> {
fn is_tx_reverted(&self, hash: TxHash) -> bool {
self.reverted_cache
.as_ref()
.is_some_and(|cache| cache.get(&hash).is_some())
}
}
9 changes: 3 additions & 6 deletions crates/op-rbuilder/src/revert_protection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Instant};

use crate::{
metrics::OpRBuilderMetrics,
pool::FlashpoolExt,
presim::TopOfBlockSimulator,
primitives::bundle::{Bundle, BundleResult},
tx::{FBPooledTransaction, MaybeFlashblockFilter},
Expand All @@ -13,7 +14,6 @@ use jsonrpsee::{
core::{RpcResult, async_trait},
proc_macros::rpc,
};
use moka::future::Cache;
use op_alloy_consensus::OpTxEnvelope;
use reth::rpc::api::eth::{RpcReceipt, helpers::FullEthApi};
use reth_optimism_chainspec::OpChainSpec;
Expand Down Expand Up @@ -41,7 +41,6 @@ pub struct RevertProtectionExt<Pool, Provider, Eth> {
provider: Provider,
eth_api: Eth,
metrics: Arc<OpRBuilderMetrics>,
reverted_cache: Cache<B256, ()>,
simulator: Option<Arc<TopOfBlockSimulator>>,
}

Expand All @@ -55,15 +54,13 @@ where
pool: Pool,
provider: Provider,
eth_api: Eth,
reverted_cache: Cache<B256, ()>,
simulator: Option<Arc<TopOfBlockSimulator>>,
) -> Self {
Self {
pool,
provider,
eth_api,
metrics: Arc::new(OpRBuilderMetrics::default()),
reverted_cache,
simulator,
}
}
Expand All @@ -73,7 +70,7 @@ where
impl<Pool, Provider, Eth> EthApiExtServer<RpcReceipt<Eth::NetworkTypes>>
for RevertProtectionExt<Pool, Provider, Eth>
where
Pool: TransactionPool<Transaction = FBPooledTransaction> + Clone + 'static,
Pool: TransactionPool<Transaction = FBPooledTransaction> + FlashpoolExt + Clone + 'static,
Provider: StateProviderFactory
+ BlockReaderIdExt<Header = Header>
+ ChainSpecProvider<ChainSpec = OpChainSpec>
Expand Down Expand Up @@ -119,7 +116,7 @@ where
return Ok(Some(receipt));
}

if self.reverted_cache.get(&hash).await.is_some() {
if self.pool.is_tx_reverted(hash) {
return Err(EthApiError::InvalidParams(
"the transaction was dropped from the pool".into(),
)
Expand Down
6 changes: 1 addition & 5 deletions crates/op-rbuilder/src/tests/framework/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use http::{Request, Response, StatusCode};
use http_body_util::Full;
use hyper::{body::Bytes as HyperBytes, server::conn::http1, service::service_fn};
use hyper_util::rt::TokioIo;
use moka::future::Cache;
use nanoid::nanoid;
use op_alloy_network::Optimism;
use op_alloy_rpc_types_engine::OpFlashblockPayload;
Expand Down Expand Up @@ -92,8 +91,6 @@ impl LocalInstance {
let mut args = args;
let task_runtime = task_runtime();
let op_node = OpNode::new(args.rollup_args.clone());
let reverted_cache = Cache::builder().max_capacity(100).build();
let reverted_cache_clone = reverted_cache.clone();

let (rpc_ready_tx, rpc_ready_rx) = oneshot::channel::<()>();
let (txpool_ready_tx, txpool_ready_rx) =
Expand Down Expand Up @@ -156,7 +153,6 @@ impl LocalInstance {
pool,
provider,
ctx.registry.eth_api().clone(),
reverted_cache,
simulator_for_rpc,
);

Expand Down Expand Up @@ -235,7 +231,7 @@ impl LocalInstance {
exit_future,
_node_handle: node_handle,
task_runtime: Some(task_runtime),
pool_observer: TransactionPoolObserver::new(pool_monitor, reverted_cache_clone),
pool_observer: TransactionPoolObserver::new(pool_monitor),
attestation_server,
})
}
Expand Down
7 changes: 1 addition & 6 deletions crates/op-rbuilder/src/tests/framework/txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use alloy_provider::{PendingTransactionBuilder, Provider, RootProvider};
use core::cmp::max;
use dashmap::DashMap;
use futures::StreamExt;
use moka::future::Cache;
use op_alloy_consensus::{OpTxEnvelope, OpTypedTransaction};
use op_alloy_network::Optimism;
use reth_primitives_traits::Recovered;
Expand Down Expand Up @@ -295,10 +294,7 @@ impl Drop for TransactionPoolObserver {
}

impl TransactionPoolObserver {
pub fn new(
stream: AllTransactionsEvents<FBPooledTransaction>,
reverts: Cache<B256, ()>,
) -> Self {
pub fn new(stream: AllTransactionsEvents<FBPooledTransaction>) -> Self {
let mut stream = stream;
let observations = Arc::new(ObservationsMap::new());
let observations_clone = Arc::clone(&observations);
Expand Down Expand Up @@ -344,7 +340,6 @@ impl TransactionPoolObserver {
Some(FullTransactionEvent::Discarded(hash)) => {
debug!(hash = %hash, "Transaction discarded");
observations.entry(hash).or_default().push_back(TransactionEvent::Discarded);
reverts.insert(hash, ()).await;
},
Some(FullTransactionEvent::Invalid(hash)) => {
debug!(hash = %hash, "Transaction invalid");
Expand Down
10 changes: 8 additions & 2 deletions crates/op-rbuilder/src/tests/revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,15 @@ async fn check_transaction_receipt_status_message(rbuilder: LocalInstance) -> ey

// Dropped
let _ = driver.build_new_block().await?;
let receipt = provider.get_transaction_receipt(*tx_hash).await;
let err = provider
.get_transaction_receipt(*tx_hash)
.await
.expect_err("expected an error for a dropped reverting tx");

assert!(receipt.is_err());
assert!(
err.to_string()
.contains("the transaction was dropped from the pool")
);

Ok(())
}
Expand Down
Loading