diff --git a/crates/op-rbuilder/src/launcher.rs b/crates/op-rbuilder/src/launcher.rs index 66617c1f5..0641a9789 100644 --- a/crates/op-rbuilder/src/launcher.rs +++ b/crates/op-rbuilder/src/launcher.rs @@ -1,7 +1,4 @@ -use std::sync::Arc; - use eyre::Result; -use futures::FutureExt; use reth_optimism_rpc::OpEthApiBuilder; use tracing::info; @@ -11,23 +8,21 @@ use crate::{ BackrunBundleApiServer, BackrunBundleRpc, maintain_backrun_bundle_pool_future, }, builder::{BuilderConfig, FlashblocksServiceBuilder}, - metrics::{OpRBuilderMetrics, VERSION, record_flag_gauge_metrics}, + metrics::{VERSION, record_flag_gauge_metrics}, monitor_tx_pool::monitor_tx_pool, pool::FlashpoolBuilder, - presim::{TopOfBlockSimulator, maintain_pending_simulations, maintain_tip_state}, revert_protection::{EthApiExtServer, RevertProtectionExt}, }; use reth::builder::{NodeBuilder, WithLaunchContext}; +use reth_chain_state::CanonStateSubscriptions; use reth_cli_commands::launcher::Launcher; use reth_db::mdbx::DatabaseEnv; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_cli::chainspec::OpChainSpecParser; -use reth_optimism_evm::OpEvmConfig; use reth_optimism_node::{ OpNode, node::{OpAddOns, OpAddOnsBuilder, OpEngineValidatorBuilder}, }; -use reth_provider::{CanonStateSubscriptions, ChainSpecProvider}; use reth_transaction_pool::TransactionPool; pub fn launch() -> Result<()> { let cli = Cli::parsed(); @@ -97,18 +92,9 @@ impl Launcher for BuilderLauncher { let rollup_args = &builder_args.rollup_args; let op_node = OpNode::new(rollup_args.clone()); let backrun_bundle_enabled = builder_args.backrun_bundle.backruns_enabled; - let block_time_secs = builder_config.block_time.as_millis() as u64 / 1000; let backrun_bundle_pool = builder_config.backrun_bundle_pool.clone(); let backrun_bundle_pool_maintain = backrun_bundle_pool.clone(); - let simulator = if builder_args.pre_simulate_bundles { - Some(Arc::new(TopOfBlockSimulator::new())) - } else { - None - }; - let simulator_for_rpc = simulator.clone(); - let simulator_for_maintenance = simulator.clone(); - let addons: OpAddOns<_, OpEthApiBuilder, OpEngineValidatorBuilder> = OpAddOnsBuilder::default() .with_sequencer(rollup_args.sequencer.clone()) @@ -132,12 +118,8 @@ impl Launcher for BuilderLauncher { let pool = ctx.pool().clone(); let provider = ctx.provider().clone(); - let revert_protection_ext = RevertProtectionExt::new( - pool, - provider, - ctx.registry.eth_api().clone(), - simulator_for_rpc, - ); + let revert_protection_ext = + RevertProtectionExt::new(pool, provider, ctx.registry.eth_api().clone()); ctx.modules .add_or_replace_configured(revert_protection_ext.into_rpc())?; @@ -178,34 +160,6 @@ impl Launcher for BuilderLauncher { )); } - if let Some(simulator) = simulator_for_maintenance { - let metrics = Arc::new(OpRBuilderMetrics::default()); - let chain_events = ctx.provider.canonical_state_stream(); - let evm_config = OpEvmConfig::optimism(ctx.provider.chain_spec()); - ctx.task_executor.spawn_task( - maintain_tip_state( - simulator.clone(), - ctx.provider.clone(), - evm_config, - block_time_secs, - metrics.clone(), - chain_events, - ) - .boxed(), - ); - - let pending_events = ctx.pool.all_transactions_event_listener(); - ctx.task_executor.spawn_task( - maintain_pending_simulations( - simulator, - ctx.pool.clone(), - metrics, - pending_events, - ) - .boxed(), - ); - } - Ok(()) }) .launch() diff --git a/crates/op-rbuilder/src/lib.rs b/crates/op-rbuilder/src/lib.rs index 928de532b..40c9a2fa2 100644 --- a/crates/op-rbuilder/src/lib.rs +++ b/crates/op-rbuilder/src/lib.rs @@ -9,7 +9,6 @@ pub mod launcher; pub mod metrics; mod monitor_tx_pool; pub mod pool; -pub mod presim; pub mod primitives; pub mod revert_protection; pub(crate) mod runtime_ext; diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs index 20c692a02..7b1907588 100644 --- a/crates/op-rbuilder/src/metrics.rs +++ b/crates/op-rbuilder/src/metrics.rs @@ -159,16 +159,6 @@ pub struct OpRBuilderMetrics { pub bundles_reverted: Histogram, /// Histogram of eth_sendBundle request duration pub bundle_receive_duration: Histogram, - /// Number of bundles dropped by pre-simulation (reverted) - pub bundle_pre_simulation_reverts: Counter, - /// Number of bundles that passed pre-simulation - pub bundle_pre_simulation_passes: Counter, - /// Histogram of bundle pre-simulation duration - pub bundle_pre_simulation_duration: Histogram, - /// Number of updates to the tip state for the top of block simulator - pub presim_tip_state_updates: Counter, - /// Number of pending txs evicted due to failing top of block simulation - pub presim_pending_evictions: Counter, /// Transactions rejected by per-tx DA size limit pub tx_da_size_exceeded_total: Counter, /// Transactions rejected by cumulative block DA limit diff --git a/crates/op-rbuilder/src/pool/builder.rs b/crates/op-rbuilder/src/pool/builder.rs index 9202d09ac..94015c26b 100644 --- a/crates/op-rbuilder/src/pool/builder.rs +++ b/crates/op-rbuilder/src/pool/builder.rs @@ -1,24 +1,41 @@ +use std::sync::Arc; + use alloy_primitives::TxHash; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use moka::sync::Cache; +use reth_chain_state::CanonStateSubscriptions; use reth_evm::ConfigureEvm; use reth_node_api::{FullNodeTypes, NodeTypes, PrimitivesTy, TxTy}; use reth_node_builder::{BuilderContext, components::PoolBuilder}; +use reth_optimism_chainspec::OpChainSpec; +use reth_optimism_evm::OpEvmConfig; use reth_optimism_forks::OpHardforks; use reth_optimism_node::OpPoolBuilder; -use reth_optimism_txpool::{OpPooledTx, OpTransactionPool}; +use reth_optimism_txpool::{OpPooledTx, OpTransactionPool, OpTransactionValidator}; +use reth_primitives_traits::{Block, NodePrimitives}; +use reth_provider::{BlockReaderIdExt, ChainSpecProvider, NodePrimitivesProvider}; use reth_tasks::TaskExecutor; use reth_transaction_pool::{ AllTransactionsEvents, EthPoolTransaction, FullTransactionEvent, TransactionPool, - blobstore::DiskFileBlobStore, + TransactionValidationTaskExecutor, blobstore::DiskFileBlobStore, }; -use crate::{args::OpRbuilderArgs, pool::Flashpool, tx::FBPooledTransaction}; +use crate::{ + args::OpRbuilderArgs, + pool::{ + Flashpool, + metrics::PoolMetrics, + presim::{TopOfBlockSimulator, maintain_pending_simulations, maintain_tip_state}, + }, + tx::FBPooledTransaction, +}; pub struct FlashpoolBuilder { op_pool_builder: OpPoolBuilder, enable_revert_protection: bool, + pre_simulate_bundles: bool, + block_time_secs: u64, } impl FlashpoolBuilder { @@ -34,6 +51,9 @@ impl FlashpoolBuilder { Self { op_pool_builder, enable_revert_protection: builder_args.enable_revert_protection, + + pre_simulate_bundles: builder_args.pre_simulate_bundles, + block_time_secs: builder_args.chain_block_time / 1000, } } } @@ -41,11 +61,19 @@ impl FlashpoolBuilder { impl PoolBuilder for FlashpoolBuilder where Node: FullNodeTypes>, + Node::Provider: ChainSpecProvider + + BlockReaderIdExt
, + ::Primitives: + NodePrimitives>, FBPooledTransaction: EthPoolTransaction> + OpPooledTx, Evm: ConfigureEvm> + Clone + 'static, { - type Pool = - Flashpool>; + type Pool = Flashpool< + OpTransactionPool, + TransactionValidationTaskExecutor< + OpTransactionValidator, + >, + >; async fn build_pool( self, @@ -55,6 +83,8 @@ where let Self { op_pool_builder, enable_revert_protection, + pre_simulate_bundles, + block_time_secs, } = self; let inner_pool = op_pool_builder.build_pool(ctx, evm_config).await?; @@ -64,9 +94,49 @@ where inner_pool.all_transactions_event_listener(), )); + let validator = inner_pool.validator().clone(); + let metrics = Arc::new(PoolMetrics::default()); + + let simulator = if pre_simulate_bundles { + let simulator = Arc::new(TopOfBlockSimulator::new()); + + let chain_events = ctx.provider().canonical_state_stream(); + let op_evm_config = OpEvmConfig::optimism(ctx.provider().chain_spec()); + ctx.task_executor().spawn_task( + maintain_tip_state( + simulator.clone(), + ctx.provider().clone(), + op_evm_config, + block_time_secs, + metrics.clone(), + chain_events, + ) + .boxed(), + ); + + let pending_events = inner_pool.all_transactions_event_listener(); + ctx.task_executor().spawn_task( + maintain_pending_simulations( + simulator.clone(), + inner_pool.clone(), + metrics.clone(), + pending_events, + ) + .boxed(), + ); + + Some(simulator) + } else { + None + }; + Ok(Flashpool { inner: inner_pool, + validator, + simulator, + task_executor: ctx.task_executor().clone(), reverted_cache, + metrics, }) } } diff --git a/crates/op-rbuilder/src/pool/delegate.rs b/crates/op-rbuilder/src/pool/delegate.rs index 67ceae368..631d0dc3b 100644 --- a/crates/op-rbuilder/src/pool/delegate.rs +++ b/crates/op-rbuilder/src/pool/delegate.rs @@ -13,15 +13,27 @@ use reth_transaction_pool::{ BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit, NewBlobSidecar, NewTransactionEvent, PoolResult, PoolSize, PoolTransaction, PropagatedTransactions, TransactionEvents, TransactionListenerKind, TransactionOrigin, TransactionPool, - ValidPoolTransaction, blobstore::BlobStoreError, + TransactionValidator, ValidPoolTransaction, blobstore::BlobStoreError, }; use tokio::sync::mpsc::Receiver; use crate::{pool::Flashpool, tx::FBPooledTransaction}; -impl> TransactionPool for Flashpool

{ +impl< + P: TransactionPool + 'static, + V: TransactionValidator + Clone, +> TransactionPool for Flashpool +{ type Transaction = FBPooledTransaction; + fn add_transaction( + &self, + origin: TransactionOrigin, + transaction: Self::Transaction, + ) -> impl Future> + Send { + self.add_transaction_override(origin, transaction) + } + delegate! { to self.inner { fn pool_size(&self) -> PoolSize; @@ -31,11 +43,7 @@ impl> TransactionPool for origin: TransactionOrigin, transaction: Self::Transaction, ) -> impl Future> + Send; - fn add_transaction( - &self, - origin: TransactionOrigin, - transaction: Self::Transaction, - ) -> impl Future> + Send; + fn add_transactions_with_origins( &self, transactions: impl IntoIterator + Send, diff --git a/crates/op-rbuilder/src/pool/metrics.rs b/crates/op-rbuilder/src/pool/metrics.rs new file mode 100644 index 000000000..0c3b7ee2f --- /dev/null +++ b/crates/op-rbuilder/src/pool/metrics.rs @@ -0,0 +1,22 @@ +use metrics::{Counter, Histogram, counter}; +use reth_metrics::Metrics; + +#[derive(Metrics, Clone)] +#[metrics(scope = "op_rbuilder.pool")] +pub(super) struct PoolMetrics { + /// Histogram of bundle pre-simulation duration + pub presim_duration: Histogram, + /// Number of updates to the tip state for the top of block simulator + pub presim_tip_state_updates: Counter, + /// Number of pending txs evicted due to failing top of block simulation + pub presim_pending_evictions: Counter, +} + +pub(super) fn increment_presim_count(sim_result: &eyre::Result) { + let label = match sim_result { + Ok(true) => "passed", + Ok(false) => "reverted", + Err(_) => "failed", + }; + counter!("op_rbuilder.pool.presim_count", "result" => label).increment(1); +} diff --git a/crates/op-rbuilder/src/pool/mod.rs b/crates/op-rbuilder/src/pool/mod.rs index bb9c34a07..f0429b506 100644 --- a/crates/op-rbuilder/src/pool/mod.rs +++ b/crates/op-rbuilder/src/pool/mod.rs @@ -1,20 +1,43 @@ mod builder; mod delegate; +mod metrics; +mod overrides; +mod presim; use alloy_primitives::TxHash; pub use builder::FlashpoolBuilder; use moka::sync::Cache; use reth_transaction_pool::TransactionPool; +use std::sync::Arc; -use crate::tx::FBPooledTransaction; +use reth_tasks::TaskExecutor; + +use crate::{ + pool::{metrics::PoolMetrics, presim::TopOfBlockSimulator}, + tx::FBPooledTransaction, +}; #[derive(Debug, Clone)] -pub struct Flashpool> { +pub struct Flashpool { + /// The reth transaction pool we're wrapping around inner: P, + /// The transaction validator + validator: V, + + /// Optional pre-simulator: when present, revert-protected txs are simulated + /// before being added to the pool; those that would revert are rejected. + simulator: Option>, + + /// Task executor for spawning presim tasks + task_executor: TaskExecutor, + /// Cache to store reverted tx hashes reverted_cache: Option>, + + /// Metrics + metrics: Arc, } /// Custom extensions on the pool where it doesn't make sense to intercept an @@ -25,7 +48,7 @@ pub trait FlashpoolExt { fn is_tx_reverted(&self, hash: TxHash) -> bool; } -impl> FlashpoolExt for Flashpool

{ +impl, V> FlashpoolExt for Flashpool { fn is_tx_reverted(&self, hash: TxHash) -> bool { self.reverted_cache .as_ref() diff --git a/crates/op-rbuilder/src/pool/overrides.rs b/crates/op-rbuilder/src/pool/overrides.rs new file mode 100644 index 000000000..bb7b68a60 --- /dev/null +++ b/crates/op-rbuilder/src/pool/overrides.rs @@ -0,0 +1,78 @@ +use std::time::Instant; + +use reth_transaction_pool::{ + AddedTransactionOutcome, PoolResult, PoolTransaction, TransactionOrigin, TransactionPool, + TransactionValidationOutcome, TransactionValidator, error::PoolError, + pool::AddedTransactionState, +}; +use tracing::error; + +use crate::{ + pool::{Flashpool, metrics}, + tx::FBPooledTransaction, +}; + +impl< + P: TransactionPool + Clone + 'static, + V: TransactionValidator + Clone, +> Flashpool +{ + pub(super) async fn add_transaction_override( + &self, + origin: TransactionOrigin, + transaction: FBPooledTransaction, + ) -> PoolResult { + let validation_outcome = self + .validator + .validate_transaction(origin, transaction) + .await; + + use TransactionValidationOutcome::*; + let transaction = match validation_outcome { + Valid { transaction, .. } => transaction, + Invalid(tx, err) => return Err(PoolError::new(*tx.hash(), err)), + Error(hash, err) => return Err(PoolError::other(hash, err)), + }; + + if transaction.transaction().revert_protected() + && let Some(ref simulator) = self.simulator + { + let tx_hash = *transaction.hash(); + let consensus_tx = transaction.transaction().clone_into_consensus(); + let simulator = simulator.clone(); + let inner_pool = self.inner.clone(); + let metrics = self.metrics.clone(); + + self.task_executor.spawn_task(async move { + let sim_start = Instant::now(); + let sim_result = simulator.simulate_tx(consensus_tx).await; + metrics::increment_presim_count(&sim_result); + + match sim_result { + Ok(true) => { + let _ = inner_pool + .add_transaction(origin, transaction.into_transaction()) + .await; + } + Ok(false) => {} + Err(e) => { + error!(tx_hash = %tx_hash, error = %e, "pre-simulation task failed"); + let _ = inner_pool + .add_transaction(origin, transaction.into_transaction()) + .await; + } + } + metrics.presim_duration.record(sim_start.elapsed()); + }); + + return Ok(AddedTransactionOutcome { + hash: tx_hash, + state: AddedTransactionState::Pending, + }); + } + + self.inner + .add_transaction(origin, transaction.into_transaction()) + .await + } +} diff --git a/crates/op-rbuilder/src/presim.rs b/crates/op-rbuilder/src/pool/presim.rs similarity index 97% rename from crates/op-rbuilder/src/presim.rs rename to crates/op-rbuilder/src/pool/presim.rs index 1c73f38c9..7a3a23af2 100644 --- a/crates/op-rbuilder/src/presim.rs +++ b/crates/op-rbuilder/src/pool/presim.rs @@ -22,7 +22,7 @@ use revm::{ }; use tracing::{debug, error, warn}; -use crate::{evm::OpBlockEvmFactory, metrics::OpRBuilderMetrics, tx::FBPooledTransaction}; +use crate::{evm::OpBlockEvmFactory, pool::metrics::PoolMetrics, tx::FBPooledTransaction}; /// Pre-simulates transactions against the current head state to filter out /// reverting transactions before they enter the pool. @@ -30,6 +30,13 @@ pub(crate) struct TopOfBlockSimulator { tip_state: RwLock>>, } +impl std::fmt::Debug for TopOfBlockSimulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TopOfBlockSimulator") + .finish_non_exhaustive() + } +} + pub(crate) struct TipState { evm_factory: OpBlockEvmFactory, state_provider: Box, @@ -150,7 +157,7 @@ pub(crate) async fn maintain_tip_state( provider: Provider, evm_config: OpEvmConfig, block_time_secs: u64, - metrics: Arc, + metrics: Arc, mut events: St, ) where N: NodePrimitives>, @@ -183,7 +190,7 @@ pub(crate) async fn maintain_tip_state( pub(crate) async fn maintain_pending_simulations( simulator: Arc, pool: Pool, - metrics: Arc, + metrics: Arc, mut events: St, ) where Pool: TransactionPool + 'static, diff --git a/crates/op-rbuilder/src/revert_protection.rs b/crates/op-rbuilder/src/revert_protection.rs index ad8c93d89..a9ecd609e 100644 --- a/crates/op-rbuilder/src/revert_protection.rs +++ b/crates/op-rbuilder/src/revert_protection.rs @@ -3,7 +3,6 @@ use std::{sync::Arc, time::Instant}; use crate::{ metrics::OpRBuilderMetrics, pool::FlashpoolExt, - presim::TopOfBlockSimulator, primitives::bundle::{Bundle, BundleResult}, tx::{FBPooledTransaction, MaybeFlashblockFilter}, }; @@ -14,10 +13,8 @@ use jsonrpsee::{ core::{RpcResult, async_trait}, proc_macros::rpc, }; -use op_alloy_consensus::OpTxEnvelope; use reth::rpc::api::eth::{RpcReceipt, helpers::FullEthApi}; use reth_optimism_chainspec::OpChainSpec; -use reth_optimism_primitives::OpTransactionSigned; use reth_optimism_txpool::{OpPooledTransaction, conditional::MaybeConditionalTransaction}; use reth_primitives_traits::Recovered; use reth_provider::{BlockReaderIdExt, ChainSpecProvider, StateProviderFactory}; @@ -41,7 +38,6 @@ pub struct RevertProtectionExt { provider: Provider, eth_api: Eth, metrics: Arc, - simulator: Option>, } impl RevertProtectionExt @@ -50,18 +46,12 @@ where Provider: Clone, Eth: Clone, { - pub(crate) fn new( - pool: Pool, - provider: Provider, - eth_api: Eth, - simulator: Option>, - ) -> Self { + pub(crate) fn new(pool: Pool, provider: Provider, eth_api: Eth) -> Self { Self { pool, provider, eth_api, metrics: Arc::new(OpRBuilderMetrics::default()), - simulator, } } } @@ -182,45 +172,8 @@ where .await .map_err(EthApiError::from)?; - // Pre-simulate the transaction against current head state if: - // - pre-simulation is enabled (simulator is Some) - // - the tx is allowed to revert - if bundle - .reverting_tx_hashes - .as_ref() - .is_some_and(|hashes| !hashes.is_empty()) - && let Some(simulator) = &self.simulator - { - let pool = self.pool.clone(); - let metrics = self.metrics.clone(); - let simulator = simulator.clone(); - tokio::task::spawn(async move { - let sim_start = Instant::now(); - let sim_tx: Recovered = recovered - .clone() - .map(|tx| OpTransactionSigned::from(OpTxEnvelope::from(tx))); - let sim_tx_hash = *sim_tx.hash(); - match simulator.clone().simulate_tx(sim_tx).await { - Ok(true) => { - metrics.bundle_pre_simulation_passes.increment(1); - } - Ok(false) => { - metrics.bundle_pre_simulation_reverts.increment(1); - pool.remove_transaction(sim_tx_hash); - } - Err(e) => { - error!(error = %e, "pre-simulation task failed"); - } - } - metrics - .bundle_pre_simulation_duration - .record(sim_start.elapsed()); - }); - } - - let result = BundleResult { + Ok(BundleResult { bundle_hash: outcome.hash, - }; - Ok(result) + }) } } diff --git a/crates/op-rbuilder/src/tests/framework/instance.rs b/crates/op-rbuilder/src/tests/framework/instance.rs index fe9fa9060..c939f01ea 100644 --- a/crates/op-rbuilder/src/tests/framework/instance.rs +++ b/crates/op-rbuilder/src/tests/framework/instance.rs @@ -2,9 +2,7 @@ use crate::{ args::OpRbuilderArgs, backrun_bundle::{BackrunBundleApiServer, BackrunBundleRpc}, builder::{BuilderConfig, FlashblocksServiceBuilder}, - metrics::OpRBuilderMetrics, pool::FlashpoolBuilder, - presim::{TopOfBlockSimulator, maintain_pending_simulations, maintain_tip_state}, revert_protection::{EthApiExtServer, RevertProtectionExt}, tests::{ EngineApi, Ipc, TEE_DEBUG_ADDRESS, TransactionPoolObserver, builder_signer, create_test_db, @@ -39,13 +37,11 @@ use reth::{ }; use reth_node_builder::{NodeBuilder, NodeConfig}; use reth_optimism_chainspec::OpChainSpec; -use reth_optimism_evm::OpEvmConfig; use reth_optimism_node::{ OpNode, node::{OpAddOns, OpAddOnsBuilder, OpEngineValidatorBuilder}, }; use reth_optimism_rpc::OpEthApiBuilder; -use reth_provider::{CanonStateSubscriptions, ChainSpecProvider}; use reth_transaction_pool::{AllTransactionsEvents, TransactionPool}; use std::{ net::SocketAddr, @@ -114,16 +110,6 @@ impl LocalInstance { let da_config = builder_config.da_config.clone(); let gas_limit_config = builder_config.gas_limit_config.clone(); let backrun_bundle_pool = builder_config.backrun_bundle_pool.clone(); - let block_time_secs = builder_config.block_time.as_millis() as u64 / 1000; - - let simulator = if args.pre_simulate_bundles { - Some(Arc::new(TopOfBlockSimulator::new())) - } else { - None - }; - let simulator_for_rpc = simulator.clone(); - let simulator_for_maintenance = simulator.clone(); - let addons: OpAddOns<_, OpEthApiBuilder, OpEngineValidatorBuilder> = OpAddOnsBuilder::default() .with_sequencer(args.rollup_args.sequencer.clone()) @@ -149,12 +135,8 @@ impl LocalInstance { let pool = ctx.pool().clone(); let provider = ctx.provider().clone(); - let revert_protection_ext = RevertProtectionExt::new( - pool, - provider, - ctx.registry.eth_api().clone(), - simulator_for_rpc, - ); + let revert_protection_ext = + RevertProtectionExt::new(pool, provider, ctx.registry.eth_api().clone()); ctx.modules .add_or_replace_configured(revert_protection_ext.into_rpc())?; @@ -182,34 +164,6 @@ impl LocalInstance { .send(ctx.pool.all_transactions_event_listener()) .expect("Failed to send txpool ready signal"); - if let Some(simulator) = simulator_for_maintenance { - let metrics = Arc::new(OpRBuilderMetrics::default()); - let chain_events = ctx.provider.canonical_state_stream(); - let evm_config = OpEvmConfig::optimism(ctx.provider.chain_spec()); - ctx.task_executor.spawn_task( - maintain_tip_state( - simulator.clone(), - ctx.provider.clone(), - evm_config, - block_time_secs, - metrics.clone(), - chain_events, - ) - .boxed(), - ); - - let pending_events = ctx.pool.all_transactions_event_listener(); - ctx.task_executor.spawn_task( - maintain_pending_simulations( - simulator, - ctx.pool.clone(), - metrics, - pending_events, - ) - .boxed(), - ); - } - Ok(()) }); diff --git a/crates/op-rbuilder/src/tests/revert.rs b/crates/op-rbuilder/src/tests/revert.rs index 0131e1973..6b60c50f8 100644 --- a/crates/op-rbuilder/src/tests/revert.rs +++ b/crates/op-rbuilder/src/tests/revert.rs @@ -445,9 +445,7 @@ async fn check_transaction_receipt_status_message(rbuilder: LocalInstance) -> ey Ok(()) } -/// Pre-simulation evicts reverting bundles from the pool asynchronously after -/// they are accepted. The handler accepts the bundle immediately; a background -/// task runs the simulation and removes the transaction when it reverts. +/// Presim rejects reverting bundles before they are added to the pool. #[rb_test(args = OpRbuilderArgs { enable_revert_protection: true, pre_simulate_bundles: true, @@ -460,28 +458,45 @@ async fn presim_rejects_reverting_bundle(rbuilder: LocalInstance) -> eyre::Resul driver.build_new_block().await?; // The handler accepts the bundle immediately; presim runs in the background - let bundle = driver + let reverting_bundle = driver .create_transaction() .random_reverting_transaction() - .with_reverted_hash() .with_bundle(BundleOpts::default()) .send() .await?; - let tx_hash = *bundle.tx_hash(); + // Submit a valid bundle as a sentinel: once it is pending in the pool we + // know presim has started for both submissions (tasks are spawned in + // order). + let valid_bundle = driver + .create_transaction() + .random_valid_transfer() + .with_bundle(BundleOpts::default()) + .send() + .await?; + + let reverting_hash = *reverting_bundle.tx_hash(); + let valid_hash = *valid_bundle.tx_hash(); - // Wait for the background pre-simulation task to evict the reverting tx + // Wait for the sentinel bundle to be accepted into the pool, and then + // assume the presim task for the other bundle is complete. let deadline = tokio::time::Instant::now() + Duration::from_secs(5); loop { - if rbuilder.pool().is_dropped(tx_hash) { + if rbuilder.pool().is_pending(valid_hash) { break; } assert!( tokio::time::Instant::now() < deadline, - "reverting bundle was not evicted by pre-simulation within timeout" + "valid bundle was not added to pool within timeout" ); tokio::time::sleep(Duration::from_millis(50)).await; } + // The reverting bundle should never have been added to the pool. + assert!( + rbuilder.pool().tx_status(reverting_hash).is_none(), + "reverting bundle should have been rejected by pre-simulation before being added to the pool" + ); + Ok(()) }