diff --git a/Cargo.lock b/Cargo.lock index c6dd12958..342b0154e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8064,7 +8064,7 @@ dependencies = [ "rand 0.8.5", "rayon", "redis", - "reqwest 0.11.27", + "reqwest 0.12.5", "reth", "reth-basic-payload-builder", "reth-chainspec", @@ -8277,8 +8277,11 @@ checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", + "futures-channel", "futures-core", "futures-util", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.1", "http-body-util", @@ -8302,6 +8305,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", + "system-configuration", "tokio", "tokio-native-tls", "tokio-rustls 0.26.0", diff --git a/config-backtest-example.toml b/config-backtest-example.toml index f789ce939..fe81a04ef 100644 --- a/config-backtest-example.toml +++ b/config-backtest-example.toml @@ -2,8 +2,10 @@ log_level = "info,rbuilder=debug" chain = "mainnet" reth_datadir = "/mnt/data/reth" +relay_secret_key = "5eae315483f028b5cdd5d1090ff0c7618b18737ea9bf3c35047189db22835c48" +coinbase_secret_key = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" -backtest_fetch_eth_rpc_url = "http://127.0.0.1:8545" +backtest_fetch_eth_rpc_url = "http://localhost:8545" backtest_fetch_eth_rpc_parallel = 400 backtest_fetch_output_file = "~/.rbuilder/backtest/main.sqlite" backtest_fetch_mempool_data_dir = "~/.rbuilder/mempool-data" diff --git a/crates/rbuilder/Cargo.toml b/crates/rbuilder/Cargo.toml index 251b43fa0..6c7120430 100644 --- a/crates/rbuilder/Cargo.toml +++ b/crates/rbuilder/Cargo.toml @@ -56,7 +56,7 @@ ethereum_ssz.workspace = true test_utils = { path = "src/test_utils" } -reqwest = { version = "0.11.20", features = ["blocking"] } +reqwest = { version = "0.12", features = ["blocking"] } serde_with = { version = "3.8.1", features = ["time_0_3"] } primitive-types = "0.12.1" url = "2.4.1" diff --git a/crates/rbuilder/src/backtest/backtest_build_block.rs b/crates/rbuilder/src/backtest/backtest_build_block.rs index a0ad88f26..778fb92de 100644 --- a/crates/rbuilder/src/backtest/backtest_build_block.rs +++ b/crates/rbuilder/src/backtest/backtest_build_block.rs @@ -20,6 +20,7 @@ use crate::{ building::builders::BacktestSimulateBlockInput, live_builder::{base_config::load_config_toml_and_env, cli::LiveBuilderConfig}, primitives::{Order, OrderId, SimulatedOrder}, + provider::http_provider::HttpProvider, utils::timestamp_as_u64, }; use clap::Parser; @@ -80,15 +81,18 @@ pub async fn run_backtest_build_block() -> eyre:: .unzip(); println!("Available orders: {}", orders.len()); - + println!("A"); if cli.show_orders { print_order_and_timestamp(&block_data.available_orders, &block_data); } - let provider_factory = config.base_config().provider_factory()?; + // let provider_factory = config.base_config().provider_factory()?; + let provider_factory = + HttpProvider::new_with_url(config.base_config().backtest_fetch_eth_rpc_url.as_str()); + let chain_spec = config.base_config().chain_spec()?; let sbundle_mergeabe_signers = config.base_config().sbundle_mergeabe_signers(); - + println!("B"); if cli.sim_landed_block { let tx_sim_results = sim_historical_block( provider_factory.clone(), @@ -97,7 +101,7 @@ pub async fn run_backtest_build_block() -> eyre:: )?; print_onchain_block_data(tx_sim_results, &orders, &block_data); } - + println!("B1"); let BacktestBlockInput { ctx, sim_orders, .. } = backtest_prepare_ctx_for_block( @@ -108,16 +112,20 @@ pub async fn run_backtest_build_block() -> eyre:: config.base_config().blocklist()?, config.base_config().coinbase_signer()?, )?; - + println!("C"); if cli.show_sim { print_simulated_orders(&sim_orders, &order_and_timestamp, &block_data); } + println!("D"); + if !cli.no_block_building { let winning_builder = cli .builders .iter() .filter_map(|builder_name: &String| { + println!("E"); + let input = BacktestSimulateBlockInput { ctx: ctx.clone(), builder_name: builder_name.clone(), diff --git a/crates/rbuilder/src/building/mod.rs b/crates/rbuilder/src/building/mod.rs index aefb10a06..571e8a1d3 100644 --- a/crates/rbuilder/src/building/mod.rs +++ b/crates/rbuilder/src/building/mod.rs @@ -688,6 +688,9 @@ impl PartialBlock { ctx: &BlockBuildingContext, state: &mut BlockState, ) -> eyre::Result<()> { + println!("SKIP FOR NOW; FIX LATER PRECALL"); + return Ok(()); + let mut db = state.new_db_ref(); pre_block_beacon_root_contract_call( db.as_mut(), diff --git a/crates/rbuilder/src/building/sim.rs b/crates/rbuilder/src/building/sim.rs index 07fdfb54b..1d793ba3b 100644 --- a/crates/rbuilder/src/building/sim.rs +++ b/crates/rbuilder/src/building/sim.rs @@ -206,7 +206,13 @@ impl SimTree { pub fn push_orders(&mut self, orders: Vec) -> Result<(), ProviderError> { let state = self.nonce_cache.get_ref()?; + let total = orders.len(); + println!("Nim orders: {:?}", orders.len()); + let mut count = 0; for order in orders { + println!("Nim order: {:?}/{:?}", count, total); + count += 1; + self.push_order(order, &state)?; } Ok(()) @@ -311,23 +317,34 @@ pub fn simulate_all_orders_with_sim_tree orders: &[Order], randomize_insertion: bool, ) -> Result<(Vec, Vec), CriticalCommitOrderError> { + println!("HERE!"); + let mut sim_tree = SimTree::new(factory.clone(), ctx.attributes.parent); + println!("HERE2!"); + let mut orders = orders.to_vec(); let random_insert_size = max(orders.len() / 20, 1); if randomize_insertion { + println!("HERE2.1!"); + let mut rng = rand::thread_rng(); // shuffle orders orders.shuffle(&mut rng); } else { + println!("HERE2.2!"); + sim_tree.push_orders(orders.clone())?; } + println!("HERE3!"); let mut sim_errors = Vec::new(); let mut state_for_sim = Arc::::from(factory.history_by_block_hash(ctx.attributes.parent)?); let mut cache_reads = Some(CachedReads::default()); loop { + println!("Simulating orders, orders len: {}", orders.len()); + // mix new orders into the sim_tree if randomize_insertion && !orders.is_empty() { let insert_size = min(random_insert_size, orders.len()); @@ -344,6 +361,8 @@ pub fn simulate_all_orders_with_sim_tree } } + println!("Simulating orders, sim_tasks len: {}", sim_tasks.len()); + let mut sim_results = Vec::new(); for sim_task in sim_tasks { let start_time = Instant::now(); diff --git a/crates/rbuilder/src/provider/http_provider.rs b/crates/rbuilder/src/provider/http_provider.rs new file mode 100644 index 000000000..423a782b1 --- /dev/null +++ b/crates/rbuilder/src/provider/http_provider.rs @@ -0,0 +1,321 @@ +use super::StateProviderFactory; +use alloy_provider::{Provider, ProviderBuilder, RootProvider}; +use alloy_rpc_types::{BlockId, BlockNumberOrTag, BlockTransactionsKind}; +use alloy_transport_http::Http; +use reqwest::Client; +use reth_errors::ProviderResult; +use reth_primitives::{ + Account, Address, Block, BlockHash, BlockNumber, Bytecode, Header, StorageKey, StorageValue, + B256, +}; +use reth_provider::{ + AccountReader, BlockHashReader, ExecutionOutcome, ProviderError, StateProofProvider, + StateProvider, StateProviderBox, StateRootProvider, +}; +use reth_trie::{updates::TrieUpdates, AccountProof, HashedPostState}; +use tokio::runtime::Handle; + +#[derive(Clone)] +pub struct HttpProvider { + provider: RootProvider>, +} + +impl HttpProvider { + pub fn new_with_url(url: &str) -> Self { + let provider = ProviderBuilder::new().on_http(url.parse().unwrap()); + + Self { provider } + } +} + +impl StateProviderFactory for HttpProvider { + fn history_by_block_number( + &self, + block_number: BlockNumber, + ) -> ProviderResult { + let handle = Handle::current(); + let res = futures::executor::block_on(async { + self.provider + .get_block_by_number(BlockNumberOrTag::Number(block_number), false) + .await + }) + .unwrap() + .expect("a block"); + + Ok(HttpProviderState::new( + self.provider.clone(), + res.header.hash.unwrap(), + )) + } + + fn latest(&self) -> ProviderResult { + let handle = Handle::current(); + let res = futures::executor::block_on(async { + self.provider + .get_block(BlockId::latest(), BlockTransactionsKind::Hashes) + .await + }) + .unwrap() + .expect("a block"); + + Ok(HttpProviderState::new( + self.provider.clone(), + res.header.hash.unwrap(), + )) + } + + fn history_by_block_hash(&self, block_hash: B256) -> ProviderResult { + let handle = Handle::current(); + let res = futures::executor::block_on(async { + self.provider + .get_block_by_hash(block_hash, BlockTransactionsKind::Hashes) + .await + }) + .unwrap() + .expect("a block"); + + Ok(HttpProviderState::new( + self.provider.clone(), + res.header.hash.unwrap(), + )) + } + + /// Get header by block hash + fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + let handle = Handle::current(); + let _res = futures::executor::block_on(async { + self.provider + .get_block_by_hash(*block_hash, BlockTransactionsKind::Hashes) + .await + }) + .unwrap() + .expect("a block"); + + unimplemented!("TODO") + } + + fn last_block_number(&self) -> ProviderResult { + unimplemented!("TODO") + } + + fn block_by_number(&self, _num: u64) -> ProviderResult> { + unimplemented!("TODO") + } + + fn state_root( + &self, + _parent_hash: B256, + _output: &ExecutionOutcome, + ) -> Result { + unimplemented!("TODO") + } +} + +pub struct HttpProviderState { + provider: RootProvider>, + hash: B256, +} + +impl HttpProviderState { + pub fn new(provider: RootProvider>, hash: B256) -> Box { + Box::new(Self { provider, hash }) + } +} + +impl StateProvider for HttpProviderState { + /// Get storage of given account. + fn storage( + &self, + address: Address, + storage_key: StorageKey, + ) -> ProviderResult> { + let block_id = BlockId::hash(self.hash); + + let handle = Handle::current(); + println!("[DEBUG] storage fetch: {:?} {:?}", address, storage_key); + + let res = futures::executor::block_on(async { + self.provider + .get_storage_at(address, storage_key.into()) + .block_id(block_id) + .await + }) + .unwrap(); + + Ok(Some(res.into())) + } + + /// Get account code by its hash + fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult> { + let address = Address::from_word(code_hash); + let block_id = BlockId::hash(self.hash); + + let handle = Handle::current(); + let res = futures::executor::block_on(async { + self.provider.get_code_at(address).block_id(block_id).await + }) + .unwrap(); + + println!("[DEBUG] bytecode fetch: {:?} {:?}", address, res.clone()); + + Ok(Some(Bytecode::new_raw(res))) + } +} + +impl BlockHashReader for HttpProviderState { + /// Get the hash of the block with the given number. Returns `None` if no block with this number + /// exists. + fn block_hash(&self, number: BlockNumber) -> ProviderResult> { + let handle = Handle::current(); + let res = futures::executor::block_on( + self.provider + .get_block_by_number(BlockNumberOrTag::Number(number), false), + ) + .unwrap() + .unwrap(); + + Ok(res.header.hash) + } + + fn canonical_hashes_range( + &self, + start: BlockNumber, + end: BlockNumber, + ) -> ProviderResult> { + let mut res = vec![]; + + let handle = Handle::current(); + for i in start..end { + let block: alloy_rpc_types::Block = futures::executor::block_on( + self.provider + .get_block_by_number(BlockNumberOrTag::Number(i), false), + ) + .unwrap() + .unwrap(); + + res.push(block.header.hash.unwrap()); + } + + Ok(res) + } +} + +impl AccountReader for HttpProviderState { + fn basic_account(&self, address: Address) -> ProviderResult> { + let block_id = BlockId::hash(self.hash); + + //let handle = Handle::current(); + // let _ = handle.enter(); + + // let rt = tokio::runtime::Runtime::new().unwrap(); + // rt.block_on(async {}); + + let res: Result = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + println!( + "[DEBUG] basic account fetch: {:?} {:?}", + address, + block_id.clone() + ); + + let balance = self + .provider + .get_balance(address) + .block_id(block_id) + .await + .unwrap(); + + println!("balance: {:?}", balance); + + let nonce = self + .provider + .get_transaction_count(address) + .block_id(block_id) + .await + .unwrap(); + + println!("nonce: {:?}", nonce); + + Ok(Account { + balance, + nonce, + bytecode_hash: Some(address.into_word()), + }) + }) + }); + + /* + let res: Result = futures::executor::block_on(async { + println!( + "[DEBUG] basic account fetch: {:?} {:?}", + address, + block_id.clone() + ); + + let balance = self + .provider + .get_balance(address) + .block_id(block_id) + .await + .unwrap(); + + println!("balance: {:?}", balance); + + let nonce = self + .provider + .get_transaction_count(address) + .block_id(block_id) + .await + .unwrap(); + + println!("nonce: {:?}", nonce); + + Ok(Account { + balance, + nonce, + bytecode_hash: Some(address.into_word()), + }) + }); + */ + + let res = res.unwrap(); + + println!( + "[DEBUG] basic account fetch: {:?} {:?} {:?}", + address, + res.nonce.clone(), + res.balance.clone(), + ); + + Ok(Some(res)) + } +} + +impl StateRootProvider for HttpProviderState { + /// Returns the state root of the `HashedPostState` on top of the current state. + fn hashed_state_root(&self, _hashed_state: &HashedPostState) -> ProviderResult { + unimplemented!(); + } + + /// Returns the state root of the `HashedPostState` on top of the current state with trie + /// updates to be committed to the database. + fn hashed_state_root_with_updates( + &self, + _hashed_state: &HashedPostState, + ) -> ProviderResult<(B256, TrieUpdates)> { + unimplemented!(); + } +} + +impl StateProofProvider for HttpProviderState { + /// Get account and storage proofs of target keys in the `HashedPostState` + /// on top of the current state. + fn hashed_proof( + &self, + _hashed_state: &HashedPostState, + _address: Address, + _slots: &[B256], + ) -> ProviderResult { + unimplemented!(); + } +} diff --git a/crates/rbuilder/src/provider/mod.rs b/crates/rbuilder/src/provider/mod.rs index 19c47209f..9bd6b130d 100644 --- a/crates/rbuilder/src/provider/mod.rs +++ b/crates/rbuilder/src/provider/mod.rs @@ -2,6 +2,8 @@ use reth_errors::ProviderResult; use reth_primitives::{Block, BlockHash, BlockNumber, Header, B256}; use reth_provider::{ExecutionOutcome, StateProviderBox}; +pub mod http_provider; + pub trait StateProviderFactory: Send + Sync { fn history_by_block_number( &self,