Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 4 additions & 50 deletions crates/op-rbuilder/src/launcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::sync::Arc;

use eyre::Result;
use futures::FutureExt;
use reth_optimism_rpc::OpEthApiBuilder;
use tracing::info;

Expand All @@ -11,23 +8,21 @@ use crate::{
BackrunBundleApiServer, BackrunBundleRpc, maintain_backrun_bundle_pool_future,
},
builder::{BuilderConfig, FlashblocksServiceBuilder},
metrics::{OpRBuilderMetrics, VERSION, record_flag_gauge_metrics},
metrics::{VERSION, record_flag_gauge_metrics},
monitor_tx_pool::monitor_tx_pool,
pool::FlashpoolBuilder,
presim::{TopOfBlockSimulator, maintain_pending_simulations, maintain_tip_state},
revert_protection::{EthApiExtServer, RevertProtectionExt},
};
use reth::builder::{NodeBuilder, WithLaunchContext};
use reth_chain_state::CanonStateSubscriptions;
use reth_cli_commands::launcher::Launcher;
use reth_db::mdbx::DatabaseEnv;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_cli::chainspec::OpChainSpecParser;
use reth_optimism_evm::OpEvmConfig;
use reth_optimism_node::{
OpNode,
node::{OpAddOns, OpAddOnsBuilder, OpEngineValidatorBuilder},
};
use reth_provider::{CanonStateSubscriptions, ChainSpecProvider};
use reth_transaction_pool::TransactionPool;
pub fn launch() -> Result<()> {
let cli = Cli::parsed();
Expand Down Expand Up @@ -97,18 +92,9 @@ impl Launcher<OpChainSpecParser, OpRbuilderArgs> for BuilderLauncher {
let rollup_args = &builder_args.rollup_args;
let op_node = OpNode::new(rollup_args.clone());
let backrun_bundle_enabled = builder_args.backrun_bundle.backruns_enabled;
let block_time_secs = builder_config.block_time.as_millis() as u64 / 1000;
let backrun_bundle_pool = builder_config.backrun_bundle_pool.clone();
let backrun_bundle_pool_maintain = backrun_bundle_pool.clone();

let simulator = if builder_args.pre_simulate_bundles {
Some(Arc::new(TopOfBlockSimulator::new()))
} else {
None
};
let simulator_for_rpc = simulator.clone();
let simulator_for_maintenance = simulator.clone();

let addons: OpAddOns<_, OpEthApiBuilder, OpEngineValidatorBuilder> =
OpAddOnsBuilder::default()
.with_sequencer(rollup_args.sequencer.clone())
Expand All @@ -132,12 +118,8 @@ impl Launcher<OpChainSpecParser, OpRbuilderArgs> for BuilderLauncher {

let pool = ctx.pool().clone();
let provider = ctx.provider().clone();
let revert_protection_ext = RevertProtectionExt::new(
pool,
provider,
ctx.registry.eth_api().clone(),
simulator_for_rpc,
);
let revert_protection_ext =
RevertProtectionExt::new(pool, provider, ctx.registry.eth_api().clone());

ctx.modules
.add_or_replace_configured(revert_protection_ext.into_rpc())?;
Expand Down Expand Up @@ -178,34 +160,6 @@ impl Launcher<OpChainSpecParser, OpRbuilderArgs> for BuilderLauncher {
));
}

if let Some(simulator) = simulator_for_maintenance {
let metrics = Arc::new(OpRBuilderMetrics::default());
let chain_events = ctx.provider.canonical_state_stream();
let evm_config = OpEvmConfig::optimism(ctx.provider.chain_spec());
ctx.task_executor.spawn_task(
maintain_tip_state(
simulator.clone(),
ctx.provider.clone(),
evm_config,
block_time_secs,
metrics.clone(),
chain_events,
)
.boxed(),
);

let pending_events = ctx.pool.all_transactions_event_listener();
ctx.task_executor.spawn_task(
maintain_pending_simulations(
simulator,
ctx.pool.clone(),
metrics,
pending_events,
)
.boxed(),
);
}

Ok(())
})
.launch()
Expand Down
1 change: 0 additions & 1 deletion crates/op-rbuilder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub mod launcher;
pub mod metrics;
mod monitor_tx_pool;
pub mod pool;
pub mod presim;
pub mod primitives;
pub mod revert_protection;
pub(crate) mod runtime_ext;
Expand Down
10 changes: 0 additions & 10 deletions crates/op-rbuilder/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,6 @@ pub struct OpRBuilderMetrics {
pub bundles_reverted: Histogram,
/// Histogram of eth_sendBundle request duration
pub bundle_receive_duration: Histogram,
/// Number of bundles dropped by pre-simulation (reverted)
pub bundle_pre_simulation_reverts: Counter,
/// Number of bundles that passed pre-simulation
pub bundle_pre_simulation_passes: Counter,
/// Histogram of bundle pre-simulation duration
pub bundle_pre_simulation_duration: Histogram,
/// Number of updates to the tip state for the top of block simulator
pub presim_tip_state_updates: Counter,
/// Number of pending txs evicted due to failing top of block simulation
pub presim_pending_evictions: Counter,
/// Transactions rejected by per-tx DA size limit
pub tx_da_size_exceeded_total: Counter,
/// Transactions rejected by cumulative block DA limit
Expand Down
82 changes: 76 additions & 6 deletions crates/op-rbuilder/src/pool/builder.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,41 @@
use std::sync::Arc;

use alloy_primitives::TxHash;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use moka::sync::Cache;
use reth_chain_state::CanonStateSubscriptions;
use reth_evm::ConfigureEvm;
use reth_node_api::{FullNodeTypes, NodeTypes, PrimitivesTy, TxTy};
use reth_node_builder::{BuilderContext, components::PoolBuilder};
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_evm::OpEvmConfig;
use reth_optimism_forks::OpHardforks;
use reth_optimism_node::OpPoolBuilder;
use reth_optimism_txpool::{OpPooledTx, OpTransactionPool};
use reth_optimism_txpool::{OpPooledTx, OpTransactionPool, OpTransactionValidator};
use reth_primitives_traits::{Block, NodePrimitives};
use reth_provider::{BlockReaderIdExt, ChainSpecProvider, NodePrimitivesProvider};
use reth_tasks::TaskExecutor;
use reth_transaction_pool::{
AllTransactionsEvents, EthPoolTransaction, FullTransactionEvent, TransactionPool,
blobstore::DiskFileBlobStore,
TransactionValidationTaskExecutor, blobstore::DiskFileBlobStore,
};

use crate::{args::OpRbuilderArgs, pool::Flashpool, tx::FBPooledTransaction};
use crate::{
args::OpRbuilderArgs,
pool::{
Flashpool,
metrics::PoolMetrics,
presim::{TopOfBlockSimulator, maintain_pending_simulations, maintain_tip_state},
},
tx::FBPooledTransaction,
};

pub struct FlashpoolBuilder {
op_pool_builder: OpPoolBuilder<FBPooledTransaction>,

enable_revert_protection: bool,
pre_simulate_bundles: bool,
block_time_secs: u64,
}

impl FlashpoolBuilder {
Expand All @@ -34,18 +51,29 @@ impl FlashpoolBuilder {
Self {
op_pool_builder,
enable_revert_protection: builder_args.enable_revert_protection,

pre_simulate_bundles: builder_args.pre_simulate_bundles,
block_time_secs: builder_args.chain_block_time / 1000,
}
}
}

impl<Node, Evm> PoolBuilder<Node, Evm> for FlashpoolBuilder
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: OpHardforks>>,
Node::Provider: ChainSpecProvider<ChainSpec = OpChainSpec>
+ BlockReaderIdExt<Header = alloy_consensus::Header>,
<Node::Provider as NodePrimitivesProvider>::Primitives:
NodePrimitives<Block: Block<Header = alloy_consensus::Header>>,
FBPooledTransaction: EthPoolTransaction<Consensus = TxTy<Node::Types>> + OpPooledTx,
Evm: ConfigureEvm<Primitives = PrimitivesTy<Node::Types>> + Clone + 'static,
{
type Pool =
Flashpool<OpTransactionPool<Node::Provider, DiskFileBlobStore, Evm, FBPooledTransaction>>;
type Pool = Flashpool<
OpTransactionPool<Node::Provider, DiskFileBlobStore, Evm, FBPooledTransaction>,
TransactionValidationTaskExecutor<
OpTransactionValidator<Node::Provider, FBPooledTransaction, Evm>,
>,
>;

async fn build_pool(
self,
Expand All @@ -55,6 +83,8 @@ where
let Self {
op_pool_builder,
enable_revert_protection,
pre_simulate_bundles,
block_time_secs,
} = self;

let inner_pool = op_pool_builder.build_pool(ctx, evm_config).await?;
Expand All @@ -64,9 +94,49 @@ where
inner_pool.all_transactions_event_listener(),
));

let validator = inner_pool.validator().clone();
let metrics = Arc::new(PoolMetrics::default());

let simulator = if pre_simulate_bundles {
let simulator = Arc::new(TopOfBlockSimulator::new());

let chain_events = ctx.provider().canonical_state_stream();
let op_evm_config = OpEvmConfig::optimism(ctx.provider().chain_spec());
ctx.task_executor().spawn_task(
maintain_tip_state(
simulator.clone(),
ctx.provider().clone(),
op_evm_config,
block_time_secs,
metrics.clone(),
chain_events,
)
.boxed(),
);

let pending_events = inner_pool.all_transactions_event_listener();
ctx.task_executor().spawn_task(
maintain_pending_simulations(
simulator.clone(),
inner_pool.clone(),
metrics.clone(),
pending_events,
)
.boxed(),
);

Some(simulator)
} else {
None
};

Ok(Flashpool {
inner: inner_pool,
validator,
simulator,
task_executor: ctx.task_executor().clone(),
reverted_cache,
metrics,
})
}
}
Expand Down
22 changes: 15 additions & 7 deletions crates/op-rbuilder/src/pool/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,27 @@ use reth_transaction_pool::{
BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit, NewBlobSidecar,
NewTransactionEvent, PoolResult, PoolSize, PoolTransaction, PropagatedTransactions,
TransactionEvents, TransactionListenerKind, TransactionOrigin, TransactionPool,
ValidPoolTransaction, blobstore::BlobStoreError,
TransactionValidator, ValidPoolTransaction, blobstore::BlobStoreError,
};
use tokio::sync::mpsc::Receiver;

use crate::{pool::Flashpool, tx::FBPooledTransaction};

impl<P: TransactionPool<Transaction = FBPooledTransaction>> TransactionPool for Flashpool<P> {
impl<
P: TransactionPool<Transaction = FBPooledTransaction> + 'static,
V: TransactionValidator<Transaction = FBPooledTransaction> + Clone,
> TransactionPool for Flashpool<P, V>
{
type Transaction = FBPooledTransaction;

fn add_transaction(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> impl Future<Output = PoolResult<AddedTransactionOutcome>> + Send {
self.add_transaction_override(origin, transaction)
}

delegate! {
to self.inner {
fn pool_size(&self) -> PoolSize;
Expand All @@ -31,11 +43,7 @@ impl<P: TransactionPool<Transaction = FBPooledTransaction>> TransactionPool for
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> impl Future<Output = PoolResult<TransactionEvents>> + Send;
fn add_transaction(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> impl Future<Output = PoolResult<AddedTransactionOutcome>> + Send;

fn add_transactions_with_origins(
&self,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
Expand Down
22 changes: 22 additions & 0 deletions crates/op-rbuilder/src/pool/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use metrics::{Counter, Histogram, counter};
use reth_metrics::Metrics;

#[derive(Metrics, Clone)]
#[metrics(scope = "op_rbuilder.pool")]
pub(super) struct PoolMetrics {
/// Histogram of bundle pre-simulation duration
pub presim_duration: Histogram,
/// Number of updates to the tip state for the top of block simulator
pub presim_tip_state_updates: Counter,
/// Number of pending txs evicted due to failing top of block simulation
pub presim_pending_evictions: Counter,
}

pub(super) fn increment_presim_count(sim_result: &eyre::Result<bool>) {
let label = match sim_result {
Ok(true) => "passed",
Ok(false) => "reverted",
Err(_) => "failed",
};
counter!("op_rbuilder.pool.presim_count", "result" => label).increment(1);
}
29 changes: 26 additions & 3 deletions crates/op-rbuilder/src/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,43 @@
mod builder;
mod delegate;
mod metrics;
mod overrides;
mod presim;

use alloy_primitives::TxHash;
pub use builder::FlashpoolBuilder;

use moka::sync::Cache;
use reth_transaction_pool::TransactionPool;
use std::sync::Arc;

use crate::tx::FBPooledTransaction;
use reth_tasks::TaskExecutor;

use crate::{
pool::{metrics::PoolMetrics, presim::TopOfBlockSimulator},
tx::FBPooledTransaction,
};

#[derive(Debug, Clone)]
pub struct Flashpool<P: TransactionPool<Transaction = FBPooledTransaction>> {
pub struct Flashpool<P, V> {
/// The reth transaction pool we're wrapping around
inner: P,

/// The transaction validator
validator: V,

/// Optional pre-simulator: when present, revert-protected txs are simulated
/// before being added to the pool; those that would revert are rejected.
simulator: Option<Arc<TopOfBlockSimulator>>,

/// Task executor for spawning presim tasks
task_executor: TaskExecutor,

/// Cache to store reverted tx hashes
reverted_cache: Option<Cache<TxHash, ()>>,

/// Metrics
metrics: Arc<PoolMetrics>,
}

/// Custom extensions on the pool where it doesn't make sense to intercept an
Expand All @@ -25,7 +48,7 @@ pub trait FlashpoolExt {
fn is_tx_reverted(&self, hash: TxHash) -> bool;
}

impl<P: TransactionPool<Transaction = FBPooledTransaction>> FlashpoolExt for Flashpool<P> {
impl<P: TransactionPool<Transaction = FBPooledTransaction>, V> FlashpoolExt for Flashpool<P, V> {
fn is_tx_reverted(&self, hash: TxHash) -> bool {
self.reverted_cache
.as_ref()
Expand Down
Loading
Loading