From 7a08e3dc5a91eb0477cbfb101d1b54d79af6fdf5 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 2 Dec 2025 20:29:30 -0300 Subject: [PATCH 1/6] add raw tx forwarder --- crates/ingress-rpc/src/bin/main.rs | 19 +++++++++++++++++++ crates/ingress-rpc/src/lib.rs | 8 ++++++++ crates/ingress-rpc/src/service.rs | 19 +++++++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index d5fd543..706ac7b 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -49,6 +49,24 @@ async fn main() -> anyhow::Result<()> { .network::() .connect_http(config.simulation_rpc); + let raw_tx_forward_provider: Option> = + if config.raw_tx_forward_enabled { + match config.raw_tx_forward_rpc.clone() { + Some(url) => Some( + ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(url), + ), + None => { + tracing::warn!(message = "Raw tx forwarding enabled but no RPC URL configured"); + None + } + } + } else { + None + }; + let ingress_client_config = ClientConfig::from_iter(load_kafka_config_from_file( &config.ingress_kafka_properties, )?); @@ -85,6 +103,7 @@ async fn main() -> anyhow::Result<()> { let service = IngressService::new( provider, simulation_provider, + raw_tx_forward_provider, queue, audit_tx, builder_tx, diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 9401400..13e8c4b 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -164,6 +164,14 @@ pub struct Config { /// Enable backrun bundle submission to op-rbuilder #[arg(long, env = "TIPS_INGRESS_BACKRUN_ENABLED", default_value = "false")] pub backrun_enabled: bool, + + /// Enable forwarding raw transactions to third-party endpoint + #[arg(long, env = "TIPS_INGRESS_RAW_TX_FORWARD_ENABLED", default_value = "false")] + pub raw_tx_forward_enabled: bool, + + /// URL of third-party RPC endpoint to forward raw transactions to + #[arg(long, env = "TIPS_INGRESS_RAW_TX_FORWARD_RPC")] + pub raw_tx_forward_rpc: Option, } pub fn connect_ingress_to_builder( diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 0755e35..8d5ab89 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -55,6 +55,7 @@ pub trait IngressApi { pub struct IngressService { provider: Arc>, simulation_provider: Arc>, + raw_tx_forward_provider: Option>>, account_abstraction_service: AccountAbstractionServiceImpl, tx_submission_method: TxSubmissionMethod, bundle_queue: Queue, @@ -72,6 +73,7 @@ impl IngressService { pub fn new( provider: RootProvider, simulation_provider: RootProvider, + raw_tx_forward_provider: Option>, queue: Queue, audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, @@ -80,14 +82,17 @@ impl IngressService { ) -> Self { let provider = Arc::new(provider); let simulation_provider = Arc::new(simulation_provider); + let raw_tx_forward_provider = raw_tx_forward_provider.map(Arc::new); let account_abstraction_service: AccountAbstractionServiceImpl = AccountAbstractionServiceImpl::new( simulation_provider.clone(), config.validate_user_operation_timeout_ms, ); + Self { provider, simulation_provider, + raw_tx_forward_provider, account_abstraction_service, tx_submission_method: config.tx_submission_method, bundle_queue: queue, @@ -249,6 +254,20 @@ where } } + if let Some(ref forward_provider) = self.raw_tx_forward_provider { + let response = forward_provider + .send_raw_transaction(data.iter().as_slice()) + .await; + match response { + Ok(_) => { + info!(message = "Forwarded raw tx", hash=%transaction.tx_hash()); + } + Err(e) => { + warn!(message = "Failed to forward raw tx", hash=%transaction.tx_hash(), error = %e); + } + } + } + self.send_audit_event(&accepted_bundle, transaction.tx_hash()); self.metrics From be089747f0da116b3c711fc32921722a2f529b23 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 2 Dec 2025 20:34:02 -0300 Subject: [PATCH 2/6] fmt --- crates/ingress-rpc/src/bin/main.rs | 31 +++++++++++++++--------------- crates/ingress-rpc/src/lib.rs | 6 +++++- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 706ac7b..572c383 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -49,23 +49,22 @@ async fn main() -> anyhow::Result<()> { .network::() .connect_http(config.simulation_rpc); - let raw_tx_forward_provider: Option> = - if config.raw_tx_forward_enabled { - match config.raw_tx_forward_rpc.clone() { - Some(url) => Some( - ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(url), - ), - None => { - tracing::warn!(message = "Raw tx forwarding enabled but no RPC URL configured"); - None - } + let raw_tx_forward_provider: Option> = if config.raw_tx_forward_enabled { + match config.raw_tx_forward_rpc.clone() { + Some(url) => Some( + ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(url), + ), + None => { + tracing::warn!(message = "Raw tx forwarding enabled but no RPC URL configured"); + None } - } else { - None - }; + } + } else { + None + }; let ingress_client_config = ClientConfig::from_iter(load_kafka_config_from_file( &config.ingress_kafka_properties, diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 13e8c4b..f62155e 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -166,7 +166,11 @@ pub struct Config { pub backrun_enabled: bool, /// Enable forwarding raw transactions to third-party endpoint - #[arg(long, env = "TIPS_INGRESS_RAW_TX_FORWARD_ENABLED", default_value = "false")] + #[arg( + long, + env = "TIPS_INGRESS_RAW_TX_FORWARD_ENABLED", + default_value = "false" + )] pub raw_tx_forward_enabled: bool, /// URL of third-party RPC endpoint to forward raw transactions to From 217f8841e14396eb83d1ddbb03080e6d0e8d8cc1 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Tue, 2 Dec 2025 20:51:43 -0300 Subject: [PATCH 3/6] refactor provider to reduce number of args --- crates/ingress-rpc/src/bin/main.rs | 56 ++++++++++++++---------------- crates/ingress-rpc/src/service.rs | 23 +++++++----- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 572c383..362befb 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -1,4 +1,4 @@ -use alloy_provider::{ProviderBuilder, RootProvider}; +use alloy_provider::ProviderBuilder; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; @@ -13,7 +13,7 @@ use tips_ingress_rpc::connect_ingress_to_builder; use tips_ingress_rpc::health::bind_health_server; use tips_ingress_rpc::metrics::init_prometheus_exporter; use tips_ingress_rpc::queue::KafkaQueuePublisher; -use tips_ingress_rpc::service::{IngressApiServer, IngressService}; +use tips_ingress_rpc::service::{IngressApiServer, IngressService, Providers}; use tokio::sync::{broadcast, mpsc}; use tracing::info; @@ -39,31 +39,31 @@ async fn main() -> anyhow::Result<()> { health_check_address = %config.health_check_addr, ); - let provider: RootProvider = ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(config.mempool_url); - - let simulation_provider: RootProvider = ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(config.simulation_rpc); - - let raw_tx_forward_provider: Option> = if config.raw_tx_forward_enabled { - match config.raw_tx_forward_rpc.clone() { - Some(url) => Some( - ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(url), - ), - None => { - tracing::warn!(message = "Raw tx forwarding enabled but no RPC URL configured"); - None + let providers = Providers { + mempool: ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(config.mempool_url), + simulation: ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(config.simulation_rpc), + raw_tx_forward: if config.raw_tx_forward_enabled { + match config.raw_tx_forward_rpc.clone() { + Some(url) => Some( + ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(url), + ), + None => { + tracing::warn!(message = "Raw tx forwarding enabled but no RPC URL configured"); + None + } } - } - } else { - None + } else { + None + }, }; let ingress_client_config = ClientConfig::from_iter(load_kafka_config_from_file( @@ -100,9 +100,7 @@ async fn main() -> anyhow::Result<()> { ); let service = IngressService::new( - provider, - simulation_provider, - raw_tx_forward_provider, + providers, queue, audit_tx, builder_tx, diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 8d5ab89..8bb3c8e 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -27,6 +27,13 @@ use account_abstraction_core::types::{SendUserOperationResponse, UserOperationRe use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl}; use std::sync::Arc; +/// RPC providers for different endpoints +pub struct Providers { + pub mempool: RootProvider, + pub simulation: RootProvider, + pub raw_tx_forward: Option>, +} + #[rpc(server, namespace = "eth")] pub trait IngressApi { /// `eth_sendBundle` can be used to send your bundles to the builder. @@ -53,7 +60,7 @@ pub trait IngressApi { } pub struct IngressService { - provider: Arc>, + mempool_provider: Arc>, simulation_provider: Arc>, raw_tx_forward_provider: Option>>, account_abstraction_service: AccountAbstractionServiceImpl, @@ -71,18 +78,16 @@ pub struct IngressService { impl IngressService { pub fn new( - provider: RootProvider, - simulation_provider: RootProvider, - raw_tx_forward_provider: Option>, + providers: Providers, queue: Queue, audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, builder_backrun_tx: broadcast::Sender, config: Config, ) -> Self { - let provider = Arc::new(provider); - let simulation_provider = Arc::new(simulation_provider); - let raw_tx_forward_provider = raw_tx_forward_provider.map(Arc::new); + let mempool_provider = Arc::new(providers.mempool); + let simulation_provider = Arc::new(providers.simulation); + let raw_tx_forward_provider = providers.raw_tx_forward.map(Arc::new); let account_abstraction_service: AccountAbstractionServiceImpl = AccountAbstractionServiceImpl::new( simulation_provider.clone(), @@ -90,7 +95,7 @@ impl IngressService { ); Self { - provider, + mempool_provider, simulation_provider, raw_tx_forward_provider, account_abstraction_service, @@ -241,7 +246,7 @@ where if send_to_mempool { let response = self - .provider + .mempool_provider .send_raw_transaction(data.iter().as_slice()) .await; match response { From 1a5861cc696fbbc01a0b0b4c5e7a1fa9aa4cd336 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 3 Dec 2025 07:08:39 -0500 Subject: [PATCH 4/6] address comments --- crates/ingress-rpc/src/bin/main.rs | 22 ++++++---------------- crates/ingress-rpc/src/lib.rs | 10 +--------- crates/ingress-rpc/src/service.rs | 4 ++-- 3 files changed, 9 insertions(+), 27 deletions(-) diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 362befb..0265dde 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -48,22 +48,12 @@ async fn main() -> anyhow::Result<()> { .disable_recommended_fillers() .network::() .connect_http(config.simulation_rpc), - raw_tx_forward: if config.raw_tx_forward_enabled { - match config.raw_tx_forward_rpc.clone() { - Some(url) => Some( - ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(url), - ), - None => { - tracing::warn!(message = "Raw tx forwarding enabled but no RPC URL configured"); - None - } - } - } else { - None - }, + raw_tx_forward: config.raw_tx_forward_rpc.clone().map(|url| { + ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(url) + }), }; let ingress_client_config = ClientConfig::from_iter(load_kafka_config_from_file( diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index f62155e..fe98f2b 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -165,15 +165,7 @@ pub struct Config { #[arg(long, env = "TIPS_INGRESS_BACKRUN_ENABLED", default_value = "false")] pub backrun_enabled: bool, - /// Enable forwarding raw transactions to third-party endpoint - #[arg( - long, - env = "TIPS_INGRESS_RAW_TX_FORWARD_ENABLED", - default_value = "false" - )] - pub raw_tx_forward_enabled: bool, - - /// URL of third-party RPC endpoint to forward raw transactions to + /// URL of third-party RPC endpoint to forward raw transactions to (enables forwarding if set) #[arg(long, env = "TIPS_INGRESS_RAW_TX_FORWARD_RPC")] pub raw_tx_forward_rpc: Option, } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 8bb3c8e..e0451b1 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -17,7 +17,7 @@ use tips_core::{ }; use tokio::sync::{broadcast, mpsc}; use tokio::time::{Duration, Instant, timeout}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::metrics::{Metrics, record_histogram}; use crate::queue::QueuePublisher; @@ -265,7 +265,7 @@ where .await; match response { Ok(_) => { - info!(message = "Forwarded raw tx", hash=%transaction.tx_hash()); + debug!(message = "Forwarded raw tx", hash=%transaction.tx_hash()); } Err(e) => { warn!(message = "Failed to forward raw tx", hash=%transaction.tx_hash(), error = %e); From 80c80a94ea0e54370dffd6c62f1b24b66890ec91 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 3 Dec 2025 11:58:50 -0500 Subject: [PATCH 5/6] async forwarding; add metric --- crates/ingress-rpc/src/metrics.rs | 3 +++ crates/ingress-rpc/src/service.rs | 27 ++++++++++++++++----------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/crates/ingress-rpc/src/metrics.rs b/crates/ingress-rpc/src/metrics.rs index b41eb01..1b7c8a5 100644 --- a/crates/ingress-rpc/src/metrics.rs +++ b/crates/ingress-rpc/src/metrics.rs @@ -35,6 +35,9 @@ pub struct Metrics { #[metric(describe = "Duration to send backrun bundle to op-rbuilder")] pub backrun_bundles_sent_duration: Histogram, + + #[metric(describe = "Total raw transactions forwarded to additional endpoint")] + pub raw_tx_forwards_total: Counter, } /// Initialize Prometheus metrics exporter diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index e0451b1..ae7a1ee 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -259,18 +259,23 @@ where } } - if let Some(ref forward_provider) = self.raw_tx_forward_provider { - let response = forward_provider - .send_raw_transaction(data.iter().as_slice()) - .await; - match response { - Ok(_) => { - debug!(message = "Forwarded raw tx", hash=%transaction.tx_hash()); + if let Some(forward_provider) = self.raw_tx_forward_provider.clone() { + self.metrics.raw_tx_forwards_total.increment(1); + let tx_data = data.clone(); + let tx_hash = transaction.tx_hash(); + tokio::spawn(async move { + match forward_provider + .send_raw_transaction(tx_data.iter().as_slice()) + .await + { + Ok(_) => { + debug!(message = "Forwarded raw tx", hash = %tx_hash); + } + Err(e) => { + warn!(message = "Failed to forward raw tx", hash = %tx_hash, error = %e); + } } - Err(e) => { - warn!(message = "Failed to forward raw tx", hash=%transaction.tx_hash(), error = %e); - } - } + }); } self.send_audit_event(&accepted_bundle, transaction.tx_hash()); From 820589da916330e37b2a98699e601d28fd697104 Mon Sep 17 00:00:00 2001 From: Cody Wang Date: Wed, 3 Dec 2025 14:16:28 -0500 Subject: [PATCH 6/6] fix test; add a test to verify raw tx flow --- crates/ingress-rpc/src/service.rs | 78 +++++++++++++++++++++++++++---- 1 file changed, 69 insertions(+), 9 deletions(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 62d802d..7f28e67 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -436,7 +436,7 @@ mod tests { use alloy_provider::RootProvider; use async_trait::async_trait; use std::net::{IpAddr, SocketAddr}; - use std::sync::Arc; + use std::str::FromStr; use tips_core::test_utils::create_test_meter_bundle_response; use tokio::sync::{broadcast, mpsc}; use url::Url; @@ -478,6 +478,7 @@ mod tests { max_buffered_backrun_bundles: 100, health_check_addr: SocketAddr::from(([127, 0, 0, 1], 8081)), backrun_enabled: false, + raw_tx_forward_rpc: None, } } @@ -548,20 +549,19 @@ mod tests { let provider: RootProvider = RootProvider::new_http(mock_server.uri().parse().unwrap()); - let simulation_provider = Arc::new(provider.clone()); + + let providers = Providers { + mempool: provider.clone(), + simulation: provider.clone(), + raw_tx_forward: None, + }; let (audit_tx, _audit_rx) = mpsc::unbounded_channel(); let (builder_tx, _builder_rx) = broadcast::channel(1); let (backrun_tx, _backrun_rx) = broadcast::channel(1); let service = IngressService::new( - provider, - simulation_provider.as_ref().clone(), - MockQueue, - audit_tx, - builder_tx, - backrun_tx, - config, + providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, ); let bundle = Bundle::default(); @@ -574,4 +574,64 @@ mod tests { let response = result.unwrap_or_else(|_| MeterBundleResponse::default()); assert_eq!(response, MeterBundleResponse::default()); } + + #[tokio::test] + async fn test_raw_tx_forward() { + let simulation_server = MockServer::start().await; + let forward_server = MockServer::start().await; + + // Mock error response from base_meterBundle + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(500).set_body_json(serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "error": { + "code": -32000, + "message": "Simulation failed" + } + }))) + .mount(&simulation_server) + .await; + + // Mock forward endpoint - expect exactly 1 call + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": "0x0000000000000000000000000000000000000000000000000000000000000000" + }))) + .expect(1) + .mount(&forward_server) + .await; + + let mut config = create_test_config(&simulation_server); + config.tx_submission_method = TxSubmissionMethod::Kafka; // Skip mempool send + + let providers = Providers { + mempool: RootProvider::new_http(simulation_server.uri().parse().unwrap()), + simulation: RootProvider::new_http(simulation_server.uri().parse().unwrap()), + raw_tx_forward: Some(RootProvider::new_http( + forward_server.uri().parse().unwrap(), + )), + }; + + let (audit_tx, _audit_rx) = mpsc::unbounded_channel(); + let (builder_tx, _builder_rx) = broadcast::channel(1); + let (backrun_tx, _backrun_rx) = broadcast::channel(1); + + let service = IngressService::new( + providers, MockQueue, audit_tx, builder_tx, backrun_tx, config, + ); + + // Valid signed transaction bytes + let tx_bytes = Bytes::from_str("0x02f86c0d010183072335825208940000000000000000000000000000000000000000872386f26fc1000080c001a0cdb9e4f2f1ba53f9429077e7055e078cf599786e29059cd80c5e0e923bb2c114a01c90e29201e031baf1da66296c3a5c15c200bcb5e6c34da2f05f7d1778f8be07").unwrap(); + + let result = service.send_raw_transaction(tx_bytes).await; + assert!(result.is_ok()); + + // Wait for spawned forward task to complete + tokio::time::sleep(Duration::from_millis(100)).await; + + // wiremock automatically verifies expect(1) when forward_server is dropped + } }