diff --git a/crates/rbuilder-primitives/benches/ssz_proof.rs b/crates/rbuilder-primitives/benches/ssz_proof.rs index 362ff6346..3b0ba9433 100644 --- a/crates/rbuilder-primitives/benches/ssz_proof.rs +++ b/crates/rbuilder-primitives/benches/ssz_proof.rs @@ -77,9 +77,9 @@ mod impls { let tx_size = 1_024; let mut runner = TestRunner::deterministic(); - let mut vanilla = VanillaSszTxProof::default(); + let mut vanilla = VanillaSszTxProof; let mut vanilla_buf = VanillaBufferedSszTxProof::default(); - let mut compact = CompactSszTxProof::default(); + let mut compact = CompactSszTxProof; for _ in 0..100 { let txs = generate_test_data(&mut runner, num_txs, tx_size); let expected = vanilla.generate(&txs, proof_target); @@ -135,7 +135,7 @@ mod impls { for idx in 0..MAX_CHUNK_COUNT { let leaf = txs .get(idx) - .map(|tx| tx_ssz_leaf_root(&tx)) + .map(|tx| tx_ssz_leaf_root(tx)) .unwrap_or(B256::ZERO); current_buf.insert(idx, leaf); } diff --git a/crates/rbuilder-primitives/src/lib.rs b/crates/rbuilder-primitives/src/lib.rs index 4a860e50e..4970da9e5 100644 --- a/crates/rbuilder-primitives/src/lib.rs +++ b/crates/rbuilder-primitives/src/lib.rs @@ -144,6 +144,8 @@ pub struct ReplacementData { pub sequence_number: u64, } +impl Copy for ReplacementData {} + impl ReplacementData { /// Next sequence_number, useful for testing. pub fn next(&self) -> Self { @@ -439,13 +441,17 @@ impl FakeSidecar for BlobTransactionSidecar { } /// First idea to handle blobs, might change. -/// Don't like the fact that blobs_sidecar exists no matter if Recovered contains a non blob tx. -/// Great effort was put in avoiding simple access to the internal tx so we don't accidentally leak information on logs (particularly the tx sign). +/// +/// Don't like the fact that blobs_sidecar exists no matter if +/// [`Recovered`] contains a non blob tx. +/// +/// Great effort was put in avoiding simple access to the internal tx so we +/// don't accidentally leak information on logs (particularly the tx sign). #[derive(Derivative)] #[derivative(Clone, PartialEq, Eq)] pub struct TransactionSignedEcRecoveredWithBlobs { tx: Recovered, - /// Will have a non empty BlobTransactionSidecarVariant if Recovered is 4844 + /// Will have a non empty [`BlobTransactionSidecarVariant`] if [`Recovered`] is 4844 pub blobs_sidecar: Arc, #[derivative(PartialEq = "ignore", Hash = "ignore")] @@ -858,7 +864,7 @@ impl Order { Order::Bundle(bundle) => bundle .replacement_data .as_ref() - .map(|r| (r.clone().key, r.sequence_number)), + .map(|r| (r.key, r.sequence_number)), Order::Tx(_) => None, } } diff --git a/crates/rbuilder-primitives/src/test_data_generator.rs b/crates/rbuilder-primitives/src/test_data_generator.rs index c8b3017f7..c0bc26974 100644 --- a/crates/rbuilder-primitives/src/test_data_generator.rs +++ b/crates/rbuilder-primitives/src/test_data_generator.rs @@ -93,7 +93,7 @@ impl TestDataGenerator { reverting_tx_hashes: vec![], hash: B256::default(), uuid: Uuid::default(), - replacement_data: replacement_data.clone(), + replacement_data, signer: replacement_data.as_ref().and_then(|r| r.key.signer), refund_identity: None, metadata: Default::default(), @@ -130,7 +130,7 @@ impl TestDataGenerator { reverting_tx_hashes, hash: B256::default(), uuid: Uuid::default(), - replacement_data: replacement_data.clone(), + replacement_data, signer: replacement_data.as_ref().and_then(|r| r.key.signer), refund_identity: None, metadata: Default::default(), diff --git a/crates/rbuilder/src/live_builder/building/mod.rs b/crates/rbuilder/src/live_builder/building/mod.rs index f78e2f27e..54695ba27 100644 --- a/crates/rbuilder/src/live_builder/building/mod.rs +++ b/crates/rbuilder/src/live_builder/building/mod.rs @@ -8,8 +8,12 @@ use crate::{ live_builder::{ building::built_block_cache::BuiltBlockCache, order_flow_tracing::order_flow_tracer_manager::OrderFlowTracerManager, - order_input::replaceable_order_sink::ReplaceableOrderSink, - payload_events::MevBoostSlotData, simulation::SlotOrderSimResults, + order_input::{ + blob_type_order_filter::BlobTypeOrderFilter, + replaceable_order_sink::ReplaceableOrderSink, + }, + payload_events::MevBoostSlotData, + simulation::SlotOrderSimResults, }, provider::StateProviderFactory, }; @@ -112,13 +116,13 @@ where .chain_spec .is_osaka_active_at_timestamp(block_ctx.attributes.timestamp) { - Box::new(order_input::blob_type_order_filter::new_fusaka(Box::new( + Box::new(BlobTypeOrderFilter::new_fusaka(Box::new( order_replacement_manager, ))) } else { - Box::new(order_input::blob_type_order_filter::new_pre_fusaka( - Box::new(order_replacement_manager), - )) + Box::new(BlobTypeOrderFilter::new_pre_fusaka(Box::new( + order_replacement_manager, + ))) }; let mempool_txs_detector_sniffer = diff --git a/crates/rbuilder/src/live_builder/order_flow_tracing/events.rs b/crates/rbuilder/src/live_builder/order_flow_tracing/events.rs index 9659addeb..0d40718ad 100644 --- a/crates/rbuilder/src/live_builder/order_flow_tracing/events.rs +++ b/crates/rbuilder/src/live_builder/order_flow_tracing/events.rs @@ -54,4 +54,22 @@ pub enum ReplaceableOrderEvent { RemoveBundle(BundleReplacementData), } +impl From for ReplaceableOrderEvent { + fn from(data: InsertOrderData) -> Self { + ReplaceableOrderEvent::InsertOrder(data) + } +} + +impl From<&BundleReplacementData> for ReplaceableOrderEvent { + fn from(data: &BundleReplacementData) -> Self { + ReplaceableOrderEvent::RemoveBundle(*data) + } +} + +impl From for ReplaceableOrderEvent { + fn from(data: BundleReplacementData) -> Self { + ReplaceableOrderEvent::RemoveBundle(data) + } +} + pub type ReplaceableOrderEventWithTimestamp = EventWithTimestamp; diff --git a/crates/rbuilder/src/live_builder/order_flow_tracing/order_flow_tracer.rs b/crates/rbuilder/src/live_builder/order_flow_tracing/order_flow_tracer.rs index 77cfdc68d..0b315e386 100644 --- a/crates/rbuilder/src/live_builder/order_flow_tracing/order_flow_tracer.rs +++ b/crates/rbuilder/src/live_builder/order_flow_tracing/order_flow_tracer.rs @@ -7,8 +7,8 @@ use crate::{ live_builder::{ block_output::bidding_service_interface::SlotBlockId, order_flow_tracing::events::{ - InsertOrderData, ReplaceableOrderEvent, ReplaceableOrderEventWithTimestamp, - SimulatedOrderData, SimulationEvent, SimulationEventWithTimestamp, + InsertOrderData, ReplaceableOrderEventWithTimestamp, SimulatedOrderData, + SimulationEvent, SimulationEventWithTimestamp, }, order_input::replaceable_order_sink::ReplaceableOrderSink, simulation::simulation_job_tracer::SimulationJobTracer, @@ -24,6 +24,7 @@ pub struct OrderFlowTracer { order_input_events: Mutex>, } +/// Report generated by the [`OrderFlowTracer`]. #[derive(Debug, Serialize, Deserialize)] pub struct OrderFlowTracerReport { pub sim_events: Vec, @@ -31,8 +32,10 @@ pub struct OrderFlowTracerReport { } impl OrderFlowTracer { - /// Takes the next ReplaceableOrderSink on the chain and returns the one that will be used to forward the events. - /// Also returns the OrderFlowTracer itself. + /// Takes the next [`ReplaceableOrderSink`] on the chain and returns the + /// one that will be used to forward the events. + /// + /// Also returns the [`OrderFlowTracer`] itself. pub fn new( id: SlotBlockId, sink: Box, @@ -53,19 +56,18 @@ impl OrderFlowTracer { } fn insert_order(&self, order: &Order) { - let event = ReplaceableOrderEventWithTimestamp::new(ReplaceableOrderEvent::InsertOrder( + let event = ReplaceableOrderEventWithTimestamp::new( InsertOrderData { order_id: order.id(), replacement_key_and_sequence_number: order.replacement_key_and_sequence_number(), tx_hashes: order.list_txs().iter().map(|(tx, _)| tx.hash()).collect(), - }, - )); + } + .into(), + ); self.order_input_events.lock().push(event); } fn remove_bundle(&self, replacement_data: &BundleReplacementData) { - let event = ReplaceableOrderEventWithTimestamp::new(ReplaceableOrderEvent::RemoveBundle( - replacement_data.clone(), - )); + let event = ReplaceableOrderEventWithTimestamp::new(replacement_data.into()); self.order_input_events.lock().push(event); } diff --git a/crates/rbuilder/src/live_builder/order_input/blob_type_order_filter.rs b/crates/rbuilder/src/live_builder/order_input/blob_type_order_filter.rs index 64982cf5f..106f91264 100644 --- a/crates/rbuilder/src/live_builder/order_input/blob_type_order_filter.rs +++ b/crates/rbuilder/src/live_builder/order_input/blob_type_order_filter.rs @@ -1,62 +1,82 @@ -use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718}; - use crate::live_builder::order_input::replaceable_order_sink::ReplaceableOrderSink; use rbuilder_primitives::{BundleReplacementData, Order, TransactionSignedEcRecoveredWithBlobs}; +use tracing::trace; -/// Filters out Orders with incorrect blobs (pre/post fusaka). -/// Since it's very unlikely what we have many wrong blobs we only filter on insert_order without take note of filtered orders. -/// If remove_bundle is called we just forward the call to the sink so it might try to remove a filtered order. -pub struct BlobTypeOrderFilter { +/// Filters out [`Order`]s based on their blob type. [`Order`]s that do not +/// pass the filter are dropped not inserted. Preconfigured filters are +/// provided to remove pre-Fusaka [EIP-4844] blobs ([`Self::new_fusaka`]) or +/// post-Fusaka [EIP-7594] blobs ([`Self::new_pre_fusaka`]). +/// +/// Since it's very unlikely that we have many wrong blobs we only filter on +/// [`ReplaceableOrderSink::insert_order`] without taking note of filtered +/// [`Order`]s. +/// +/// [`ReplaceableOrderSink::remove_bundle`] calls are simply forwarded to the +/// sink, so it might try to remove a filtered [`Order`]. +/// +/// [EIP-4844]: https://eips.ethereum.org/EIPS/eip-4844 +/// [EIP-7594]: https://eips.ethereum.org/EIPS/eip-7594 +pub struct BlobTypeOrderFilter { sink: Box, - ///true if it likes the blob sidecar, false if it doesn't (Order gets filtered). - filter_func: FilterFunc, + + /// Name of the filter for logging purposes. + rule_name: &'static str, + + /// `true` if it likes the blob sidecar, `false` if it doesn't ([`Order`] + /// gets filtered). + filter_func: fn(&TransactionSignedEcRecoveredWithBlobs) -> bool, } -impl std::fmt::Debug for BlobTypeOrderFilter { +impl std::fmt::Debug for BlobTypeOrderFilter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BlobTypeOrderFilter") .field("sink", &"") + .field("rule", &self.rule_name) .finish() } } -/// Filters out EIP-7594 style blobs, supports only EIP-4844 style. -pub fn new_pre_fusaka( - sink: Box, -) -> BlobTypeOrderFilter bool + Send + Sync> { - BlobTypeOrderFilter::new(sink, |tx| { - if tx.is_eip4844() { - matches!(*tx.blobs_sidecar, BlobTransactionSidecarVariant::Eip4844(_)) - } else { - true +impl BlobTypeOrderFilter { + /// Creates a new [`BlobTypeOrderFilter`], using the given `filter_func` to + /// filter out orders. + pub const fn new( + sink: Box, + rule_name: &'static str, + filter_func: fn(&TransactionSignedEcRecoveredWithBlobs) -> bool, + ) -> Self { + Self { + sink, + rule_name, + filter_func, } - }) -} + } -/// Filters out EIP-4844 style, supports only EIP-7594 style blobs. -pub fn new_fusaka( - sink: Box, -) -> BlobTypeOrderFilter bool + Send + Sync> { - BlobTypeOrderFilter::new(sink, |tx| { - if tx.is_eip4844() { - matches!(*tx.blobs_sidecar, BlobTransactionSidecarVariant::Eip7594(_)) - } else { - true + /// Filters out [EIP-4844] style, allowing only [EIP-7594] style blobs. + /// + /// [EIP-4844]: https://eips.ethereum.org/EIPS/eip-4844 + /// [EIP-7594]: https://eips.ethereum.org/EIPS/eip-7594 + pub const fn new_fusaka(sink: Box) -> Self { + fn fusaka(tx: &TransactionSignedEcRecoveredWithBlobs) -> bool { + !tx.as_ref().is_eip4844() || tx.blobs_sidecar.is_eip7594() + } + + Self::new(sink, "fusaka", fusaka) + } + + /// Filters out [EIP-7594] style blobs, allowing only [EIP-4844] style. + /// + /// [EIP-4844]: https://eips.ethereum.org/EIPS/eip-4844 + /// [EIP-7594]: https://eips.ethereum.org/EIPS/eip-7594 + pub const fn new_pre_fusaka(sink: Box) -> Self { + fn pre_fusaka(tx: &TransactionSignedEcRecoveredWithBlobs) -> bool { + !tx.as_ref().is_eip4844() || tx.blobs_sidecar.is_eip4844() } - }) -} -impl bool> - BlobTypeOrderFilter -{ - fn new(sink: Box, filter_func: FilterFunc) -> Self { - Self { sink, filter_func } + Self::new(sink, "pre-fusaka", pre_fusaka) } } -impl bool + Send + Sync> - ReplaceableOrderSink for BlobTypeOrderFilter -{ +impl ReplaceableOrderSink for BlobTypeOrderFilter { fn insert_order(&mut self, order: Order) -> bool { if order .list_txs() @@ -65,6 +85,7 @@ impl bool + Send + Syn { self.sink.insert_order(order) } else { + trace!(order_id = ?order.id(), rule = self.rule_name, "Order filtered out by BlobTypeOrderFilter"); true } } diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs index 3d8ca05fc..015cd8b2b 100644 --- a/crates/rbuilder/src/live_builder/order_input/mod.rs +++ b/crates/rbuilder/src/live_builder/order_input/mod.rs @@ -20,13 +20,14 @@ use crate::{ }; use alloy_consensus::Header; use alloy_primitives::Address; +use futures::{stream::FuturesUnordered, StreamExt}; use jsonrpsee::RpcModule; use parking_lot::Mutex; use rbuilder_primitives::{BundleReplacementData, Order}; use std::{ net::Ipv4Addr, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, Weak}, time::{Duration, Instant}, }; use tokio::{sync::mpsc, task::JoinHandle}; @@ -35,13 +36,14 @@ use tracing::{debug, error, info, trace, warn}; use super::base_config::BaseConfig; -/// Thread safe access to OrderPool to get orderflow +/// Thread safe access to [`OrderPool`] to get orderflow. #[derive(Debug)] pub struct OrderPoolSubscriber { orderpool: Arc>, } impl OrderPoolSubscriber { + /// Subscribe to events from the [`OrderPool`] by adding a sink. pub fn add_sink( &self, block_number: u64, @@ -50,6 +52,7 @@ impl OrderPoolSubscriber { self.orderpool.lock().add_sink(block_number, sink) } + /// Unsubscribe from the [`OrderPool`] by removing a sink. pub fn remove_sink( &self, id: &OrderPoolSubscriptionId, @@ -57,40 +60,47 @@ impl OrderPoolSubscriber { self.orderpool.lock().remove_sink(id) } - /// Returned AutoRemovingOrderPoolSubscriptionId will call remove when dropped + /// Subscribe to events from the [`OrderPool`] by adding a sink. The sink + /// is automatically removed when the returned id is dropped. pub fn add_sink_auto_remove( &self, block_number: u64, sink: Box, ) -> AutoRemovingOrderPoolSubscriptionId { AutoRemovingOrderPoolSubscriptionId { - orderpool: self.orderpool.clone(), + orderpool: Arc::downgrade(&self.orderpool), id: self.add_sink(block_number, sink), } } } -/// OrderPoolSubscriptionId that removes on drop. -/// Call add_sink to get flow and remove_sink to stop it -/// For easy auto remove we have add_sink_auto_remove +/// [`OrderPoolSubscriptionId`] that unsubscribes itself on drop. Create this +/// struct with [`OrderPoolSubscriber::add_sink_auto_remove`]. pub struct AutoRemovingOrderPoolSubscriptionId { - orderpool: Arc>, + /// Using [`Weak`] prevents subscriptions from keeping the [`OrderPool`] + /// alive. If the [`OrderPool`] has already been dropped, the sink + /// no longer needs to be removed at all. + orderpool: Weak>, id: OrderPoolSubscriptionId, } impl Drop for AutoRemovingOrderPoolSubscriptionId { fn drop(&mut self) { - self.orderpool.lock().remove_sink(&self.id); + if let Some(orderpool) = self.orderpool.upgrade() { + orderpool.lock().remove_sink(&self.id); + } } } +/// Source of mempool transactions stream information. #[derive(Debug, Clone)] pub enum MempoolSource { Ipc(PathBuf), Ws(String), } -/// All the info needed to start all the order related jobs (mempool, rcp, clean) +/// All the info needed to start all the order related jobs (mempool, rcp, +/// clean). #[derive(Debug, Clone)] pub struct OrderInputConfig { /// if true - cancellations are disabled. @@ -117,9 +127,11 @@ pub struct OrderInputConfig { /// The allowlisted recipients for system transactions. system_recipient_allowlist: Vec
, } + pub const DEFAULT_SERVE_MAX_CONNECTIONS: u32 = 4096; pub const DEFAULT_RESULTS_CHANNEL_TIMEOUT: Duration = Duration::from_millis(50); pub const DEFAULT_INPUT_CHANNEL_BUFFER_SIZE: usize = 10_000; + impl OrderInputConfig { #[allow(clippy::too_many_arguments)] pub fn new( @@ -260,7 +272,7 @@ where ) .await?; - let mut handles = vec![clean_job, rpc_server]; + let mut handles: FuturesUnordered<_> = [clean_job, rpc_server].into_iter().collect(); if config.mempool_source.is_some() { info!("Txpool source configured, starting txpool subscription"); @@ -330,9 +342,8 @@ where new_commands.clear(); } - for handle in handles { + while let Some(handle) = handles.next().await { handle - .await .map_err(|err| { tracing::error!(?err, "Error while waiting for OrderPoolJobs to finish") }) diff --git a/crates/rbuilder/src/live_builder/order_input/order_replacement_manager.rs b/crates/rbuilder/src/live_builder/order_input/order_replacement_manager.rs index ad71acd3f..78f123861 100644 --- a/crates/rbuilder/src/live_builder/order_input/order_replacement_manager.rs +++ b/crates/rbuilder/src/live_builder/order_input/order_replacement_manager.rs @@ -4,13 +4,20 @@ use rbuilder_primitives::{BundleReplacementData, BundleReplacementKey, Order, Or use super::{order_sink::OrderSink, replaceable_order_sink::ReplaceableOrderSink}; -/// Handles all replacement and cancellation for bundles by receiving -/// low level orderflow data via ReplaceableOrderSink and forwarding to an OrderSink. -/// The OrderReplacementManager works for a single block. -/// IMPORTANT: Due to infra problems we can get notifications our of order, we must always honor the one -/// with higher sequence_number or the cancel. -/// Although all the structs and fields say "bundle" we always reefer to Bundle or ShareBundle -/// For each bundle we keep the current BundleReplacementState +/// Handle all replacement and cancellation for bundles by receiving +/// low level orderflow data via [`ReplaceableOrderSink`] and forwarding to an +/// [`OrderSink`]. +/// +/// The `OrderReplacementManager` works for a single block. +/// +/// IMPORTANT: Due to infra problems, notifications may arrive out of order. +/// Cancels are treated as highest-priority, and after that we must always +/// honor the replacement with highest `sequence_number`. +/// +/// Although all the structs and fields say "bundle" we always refer to Bundle +/// or ShareBundle. +/// +/// For each bundle we keep the current [`BundleReplacementState`] #[derive(Debug)] pub struct OrderReplacementManager { sink: Box, @@ -68,7 +75,7 @@ impl ReplaceableOrderSink for OrderReplacementManager { } } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] struct ValidBundleState { /// Current valid sequence_number (larges we've seen) pub sequence_number: u64, @@ -81,11 +88,11 @@ struct ValidBundleState { /// On new seq: /// Valid upgrades if seq > current. /// Cancelled ignores. -/// On Cancel always ends in Cancelled. -#[derive(Debug)] +/// On Cancel always ends in [`Self::Cancelled`]. +#[derive(Debug, Copy, Clone)] enum BundleReplacementState { Valid(ValidBundleState), - // sequence number of the cancellation. + /// Sequence number of the cancellation. Cancelled(u64), } @@ -211,7 +218,7 @@ mod test { fn test_insert_cancel() { let mut data_gen = TestDataGenerator::new(); let replacement_data = data_gen.create_bundle_replacement_data(); - let bundle = Order::Bundle(data_gen.create_bundle(Some(replacement_data.clone()))); + let bundle = Order::Bundle(data_gen.create_bundle(Some(replacement_data))); let mut order_sink = MockOrderSink::new(); // expect order added @@ -244,7 +251,7 @@ mod test { fn test_insert_ignored_cancel() { let mut data_gen = TestDataGenerator::new(); let replacement_data = data_gen.create_bundle_replacement_data(); - let bundle = Order::Bundle(data_gen.create_bundle(Some(replacement_data.clone()))); + let bundle = Order::Bundle(data_gen.create_bundle(Some(replacement_data))); let mut order_sink = MockOrderSink::new(); // expect order added @@ -275,7 +282,7 @@ mod test { fn test_cancel_insert() { let mut data_gen = TestDataGenerator::new(); let replacement_data = data_gen.create_bundle_replacement_data(); - let bundle = Order::Bundle(data_gen.create_bundle(Some(replacement_data.clone()))); + let bundle = Order::Bundle(data_gen.create_bundle(Some(replacement_data))); let order_sink = MockOrderSink::new(); let mut manager = OrderReplacementManager::new(Box::new(order_sink)); @@ -289,7 +296,7 @@ mod test { let mut data_gen = TestDataGenerator::new(); let old_replacement_data = data_gen.create_bundle_replacement_data(); let new_replacement_data = old_replacement_data.next(); - let old_bundle = Order::Bundle(data_gen.create_bundle(Some(old_replacement_data.clone()))); + let old_bundle = Order::Bundle(data_gen.create_bundle(Some(old_replacement_data))); let new_bundle = Order::Bundle(data_gen.create_bundle(Some(new_replacement_data))); let mut order_sink = MockOrderSink::new(); @@ -329,7 +336,7 @@ mod test { let mut data_gen = TestDataGenerator::new(); let old_replacement_data = data_gen.create_bundle_replacement_data(); let new_replacement_data = old_replacement_data.next(); - let old_bundle = Order::Bundle(data_gen.create_bundle(Some(old_replacement_data.clone()))); + let old_bundle = Order::Bundle(data_gen.create_bundle(Some(old_replacement_data))); let new_bundle = Order::Bundle(data_gen.create_bundle(Some(new_replacement_data))); let mut order_sink = MockOrderSink::new(); diff --git a/crates/rbuilder/src/live_builder/order_input/orderpool.rs b/crates/rbuilder/src/live_builder/order_input/orderpool.rs index 485b610ac..487ac9d47 100644 --- a/crates/rbuilder/src/live_builder/order_input/orderpool.rs +++ b/crates/rbuilder/src/live_builder/order_input/orderpool.rs @@ -1,3 +1,8 @@ +use super::{ + order_sink::{OrderPoolCommand, OrderSender2OrderSink}, + replaceable_order_sink::ReplaceableOrderSink, + ReplaceableOrderPoolCommand, +}; use ahash::HashMap; use lru::LruCache; use rbuilder_primitives::{BundleReplacementData, Order, OrderId}; @@ -11,22 +16,25 @@ use std::{ use tokio::sync::mpsc::{self}; use tracing::{error, trace}; -use super::{ - order_sink::{OrderPoolCommand, OrderSender2OrderSink}, - replaceable_order_sink::ReplaceableOrderSink, - ReplaceableOrderPoolCommand, -}; - const TIME_TO_KEEP_BUNDLE_CANCELLATIONS: Duration = Duration::from_secs(60); -/// Push to pull for OrderSink. Just poll de UnboundedReceiver to get the orders. + +/// Push to pull for OrderSink. Just poll the [`UnboundedReceiver`] to get the +/// orders. +/// +/// [`UnboundedReceiver`]: mpsc::UnboundedReceiver #[derive(Debug)] pub struct OrdersForBlock { + /// Receiver for new orders/cancellations via [`OrderPoolCommand`]. pub new_order_sub: mpsc::UnboundedReceiver, } impl OrdersForBlock { - /// Helper to create a OrdersForBlock "wrapped" with a OrderSender2OrderSink. - /// Give this OrdersForBlock to an order pull stage and push on the returned OrderSender2OrderSink + /// Create a new [`OrdersForBlock`], and return it and a corresponding + /// [`OrderSender2OrderSink`]. Orders sent via this sink will be available + /// on the returned [`OrdersForBlock`]. + /// + /// The [`OrdersForBlock`] can be given to an order pull stage to receive + /// orders for a specific block, allowing convenient testing. pub fn new_with_sink() -> (Self, OrderSender2OrderSink) { let (sink, sender) = OrderSender2OrderSink::new(); ( @@ -38,28 +46,38 @@ impl OrdersForBlock { } } -/// Events (orders/cancellations) for a single block -#[derive(Debug, Default)] +/// [`Order`]s for a specific block. +#[derive(Debug, Default, Clone)] +#[repr(transparent)] struct BundleBlockStore { - /// Bundles and SharedBundles + /// The stored orders bundles: Vec, } +/// A sink for [`ReplaceableOrderPoolCommand`] for a specific block. This sink +/// is stored in the [`OrderPool`], and used to notify tasks about new orders +/// and cancellations. #[derive(Debug)] struct SinkSubscription { sink: Box, block_number: u64, } -/// returned by add_sink to be used on remove_sink +/// Identifier for an [`OrderPool`] subscription, returned by +/// [`OrderPool::add_sink`]. May be passed to [`OrderPool::remove_sink`] to +/// cancel the subscription. #[derive(Debug, Eq, Hash, PartialEq, Clone)] pub struct OrderPoolSubscriptionId(u64); -/// Repository of ALL orders and cancellations that arrives us via process_commands. No processing is done here. -/// The idea is that OrderPool is alive from the start of the universe and we can ask for the -/// events (Orders and cancellations) for a particular block even if the orders arrived in the past. -/// Since by infra restrictions bundle cancellations don't have an associated block so we store them for a while and asume -/// they are valid for all in progress sinks +/// Repository of ALL orders and cancellations that arrives us via +/// [`Self::process_commands`]. No simulation is done here. +/// The idea is that OrderPool is alive from the start of the universe and we +/// can ask for the events ([`Order`]s and [`BundleReplacementData`]) for a +/// particular block even if the orders arrived in the past. +/// +/// Since by infra restrictions bundle cancellations don't have an associated +/// block, we store them for a while and assume they are valid for all in +/// progress sinks. #[derive(Debug)] pub struct OrderPool { mempool_txs: Vec<(Order, Instant)>, @@ -78,6 +96,7 @@ pub struct OrderPool { } impl OrderPool { + /// Instantiate a new OrderPool. pub fn new(time_to_keep_mempool_txs: Duration) -> Self { OrderPool { mempool_txs: Vec::new(), @@ -92,7 +111,11 @@ impl OrderPool { } } - pub fn process_commands(&mut self, commands: Vec) { + /// Process a set of [`ReplaceableOrderPoolCommand`]. + pub fn process_commands(&mut self, commands: I) + where + I: IntoIterator, + { commands.into_iter().for_each(|oc| self.process_command(oc)); } @@ -136,8 +159,7 @@ impl OrderPool { } fn process_remove_bundle(&mut self, key: &BundleReplacementData) { - self.bundle_cancellations - .push_back((key.clone(), Instant::now())); + self.bundle_cancellations.push_back((*key, Instant::now())); } fn process_command(&mut self, command: ReplaceableOrderPoolCommand) { @@ -175,7 +197,7 @@ impl OrderPool { sink.insert_order(order); } for replacement_data in self.bundle_cancellations.iter().map(|(key, _)| key) { - sink.remove_bundle(replacement_data.clone()); + sink.remove_bundle(*replacement_data); } if let Some(bundle_store) = self.bundles_by_target_block.get(&block_number) {