Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions crates/ingress-rpc/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use alloy_provider::{ProviderBuilder, RootProvider};
use alloy_provider::ProviderBuilder;
use clap::Parser;
use jsonrpsee::server::Server;
use op_alloy_network::Optimism;
Expand All @@ -13,7 +13,7 @@ use tips_ingress_rpc::connect_ingress_to_builder;
use tips_ingress_rpc::health::bind_health_server;
use tips_ingress_rpc::metrics::init_prometheus_exporter;
use tips_ingress_rpc::queue::KafkaQueuePublisher;
use tips_ingress_rpc::service::{IngressApiServer, IngressService};
use tips_ingress_rpc::service::{IngressApiServer, IngressService, Providers};
use tokio::sync::{broadcast, mpsc};
use tracing::info;

Expand All @@ -39,15 +39,22 @@ async fn main() -> anyhow::Result<()> {
health_check_address = %config.health_check_addr,
);

let provider: RootProvider<Optimism> = ProviderBuilder::new()
.disable_recommended_fillers()
.network::<Optimism>()
.connect_http(config.mempool_url);

let simulation_provider: RootProvider<Optimism> = ProviderBuilder::new()
.disable_recommended_fillers()
.network::<Optimism>()
.connect_http(config.simulation_rpc);
let providers = Providers {
mempool: ProviderBuilder::new()
.disable_recommended_fillers()
.network::<Optimism>()
.connect_http(config.mempool_url),
simulation: ProviderBuilder::new()
.disable_recommended_fillers()
.network::<Optimism>()
.connect_http(config.simulation_rpc),
raw_tx_forward: config.raw_tx_forward_rpc.clone().map(|url| {
ProviderBuilder::new()
.disable_recommended_fillers()
.network::<Optimism>()
.connect_http(url)
}),
};

let ingress_client_config = ClientConfig::from_iter(load_kafka_config_from_file(
&config.ingress_kafka_properties,
Expand Down Expand Up @@ -83,8 +90,7 @@ async fn main() -> anyhow::Result<()> {
);

let service = IngressService::new(
provider,
simulation_provider,
providers,
queue,
audit_tx,
builder_tx,
Expand Down
4 changes: 4 additions & 0 deletions crates/ingress-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ pub struct Config {
/// Enable backrun bundle submission to op-rbuilder
#[arg(long, env = "TIPS_INGRESS_BACKRUN_ENABLED", default_value = "false")]
pub backrun_enabled: bool,

/// URL of third-party RPC endpoint to forward raw transactions to (enables forwarding if set)
#[arg(long, env = "TIPS_INGRESS_RAW_TX_FORWARD_RPC")]
pub raw_tx_forward_rpc: Option<Url>,
}

pub fn connect_ingress_to_builder(
Expand Down
3 changes: 3 additions & 0 deletions crates/ingress-rpc/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct Metrics {

#[metric(describe = "Duration to send backrun bundle to op-rbuilder")]
pub backrun_bundles_sent_duration: Histogram,

#[metric(describe = "Total raw transactions forwarded to additional endpoint")]
pub raw_tx_forwards_total: Counter,
}

/// Initialize Prometheus metrics exporter
Expand Down
123 changes: 106 additions & 17 deletions crates/ingress-rpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tips_core::{
};
use tokio::sync::{broadcast, mpsc};
use tokio::time::{Duration, Instant, timeout};
use tracing::{info, warn};
use tracing::{debug, info, warn};

use crate::metrics::{Metrics, record_histogram};
use crate::queue::QueuePublisher;
Expand All @@ -27,6 +27,13 @@ use account_abstraction_core::types::{SendUserOperationResponse, UserOperationRe
use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl};
use std::sync::Arc;

/// RPC providers for different endpoints
pub struct Providers {
pub mempool: RootProvider<Optimism>,
pub simulation: RootProvider<Optimism>,
pub raw_tx_forward: Option<RootProvider<Optimism>>,
}

#[rpc(server, namespace = "eth")]
pub trait IngressApi {
/// `eth_sendBundle` can be used to send your bundles to the builder.
Expand All @@ -53,8 +60,9 @@ pub trait IngressApi {
}

pub struct IngressService<Queue> {
provider: Arc<RootProvider<Optimism>>,
mempool_provider: Arc<RootProvider<Optimism>>,
simulation_provider: Arc<RootProvider<Optimism>>,
raw_tx_forward_provider: Option<Arc<RootProvider<Optimism>>>,
account_abstraction_service: AccountAbstractionServiceImpl,
tx_submission_method: TxSubmissionMethod,
bundle_queue: Queue,
Expand All @@ -70,24 +78,26 @@ pub struct IngressService<Queue> {

impl<Queue> IngressService<Queue> {
pub fn new(
provider: RootProvider<Optimism>,
simulation_provider: RootProvider<Optimism>,
providers: Providers,
queue: Queue,
audit_channel: mpsc::UnboundedSender<BundleEvent>,
builder_tx: broadcast::Sender<MeterBundleResponse>,
builder_backrun_tx: broadcast::Sender<Bundle>,
config: Config,
) -> Self {
let provider = Arc::new(provider);
let simulation_provider = Arc::new(simulation_provider);
let mempool_provider = Arc::new(providers.mempool);
let simulation_provider = Arc::new(providers.simulation);
let raw_tx_forward_provider = providers.raw_tx_forward.map(Arc::new);
let account_abstraction_service: AccountAbstractionServiceImpl =
AccountAbstractionServiceImpl::new(
simulation_provider.clone(),
config.validate_user_operation_timeout_ms,
);

Self {
provider,
mempool_provider,
simulation_provider,
raw_tx_forward_provider,
account_abstraction_service,
tx_submission_method: config.tx_submission_method,
bundle_queue: queue,
Expand Down Expand Up @@ -241,7 +251,7 @@ where

if send_to_mempool {
let response = self
.provider
.mempool_provider
.send_raw_transaction(data.iter().as_slice())
.await;
match response {
Expand All @@ -254,6 +264,25 @@ where
}
}

if let Some(forward_provider) = self.raw_tx_forward_provider.clone() {
self.metrics.raw_tx_forwards_total.increment(1);
let tx_data = data.clone();
let tx_hash = transaction.tx_hash();
tokio::spawn(async move {
match forward_provider
.send_raw_transaction(tx_data.iter().as_slice())
.await
{
Ok(_) => {
debug!(message = "Forwarded raw tx", hash = %tx_hash);
}
Err(e) => {
warn!(message = "Failed to forward raw tx", hash = %tx_hash, error = %e);
}
}
});
}

self.send_audit_event(&accepted_bundle, transaction.tx_hash());

self.metrics
Expand Down Expand Up @@ -407,7 +436,7 @@ mod tests {
use alloy_provider::RootProvider;
use async_trait::async_trait;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::str::FromStr;
use tips_core::test_utils::create_test_meter_bundle_response;
use tokio::sync::{broadcast, mpsc};
use url::Url;
Expand Down Expand Up @@ -449,6 +478,7 @@ mod tests {
max_buffered_backrun_bundles: 100,
health_check_addr: SocketAddr::from(([127, 0, 0, 1], 8081)),
backrun_enabled: false,
raw_tx_forward_rpc: None,
}
}

Expand Down Expand Up @@ -519,20 +549,19 @@ mod tests {

let provider: RootProvider<Optimism> =
RootProvider::new_http(mock_server.uri().parse().unwrap());
let simulation_provider = Arc::new(provider.clone());

let providers = Providers {
mempool: provider.clone(),
simulation: provider.clone(),
raw_tx_forward: None,
};

let (audit_tx, _audit_rx) = mpsc::unbounded_channel();
let (builder_tx, _builder_rx) = broadcast::channel(1);
let (backrun_tx, _backrun_rx) = broadcast::channel(1);

let service = IngressService::new(
provider,
simulation_provider.as_ref().clone(),
MockQueue,
audit_tx,
builder_tx,
backrun_tx,
config,
providers, MockQueue, audit_tx, builder_tx, backrun_tx, config,
);

let bundle = Bundle::default();
Expand All @@ -545,4 +574,64 @@ mod tests {
let response = result.unwrap_or_else(|_| MeterBundleResponse::default());
assert_eq!(response, MeterBundleResponse::default());
}

#[tokio::test]
async fn test_raw_tx_forward() {
let simulation_server = MockServer::start().await;
let forward_server = MockServer::start().await;

// Mock error response from base_meterBundle
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(500).set_body_json(serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32000,
"message": "Simulation failed"
}
})))
.mount(&simulation_server)
.await;

// Mock forward endpoint - expect exactly 1 call
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"result": "0x0000000000000000000000000000000000000000000000000000000000000000"
})))
.expect(1)
.mount(&forward_server)
.await;

let mut config = create_test_config(&simulation_server);
config.tx_submission_method = TxSubmissionMethod::Kafka; // Skip mempool send

let providers = Providers {
mempool: RootProvider::new_http(simulation_server.uri().parse().unwrap()),
simulation: RootProvider::new_http(simulation_server.uri().parse().unwrap()),
raw_tx_forward: Some(RootProvider::new_http(
forward_server.uri().parse().unwrap(),
)),
};

let (audit_tx, _audit_rx) = mpsc::unbounded_channel();
let (builder_tx, _builder_rx) = broadcast::channel(1);
let (backrun_tx, _backrun_rx) = broadcast::channel(1);

let service = IngressService::new(
providers, MockQueue, audit_tx, builder_tx, backrun_tx, config,
);

// Valid signed transaction bytes
let tx_bytes = Bytes::from_str("0x02f86c0d010183072335825208940000000000000000000000000000000000000000872386f26fc1000080c001a0cdb9e4f2f1ba53f9429077e7055e078cf599786e29059cd80c5e0e923bb2c114a01c90e29201e031baf1da66296c3a5c15c200bcb5e6c34da2f05f7d1778f8be07").unwrap();

let result = service.send_raw_transaction(tx_bytes).await;
assert!(result.is_ok());

// Wait for spawned forward task to complete
tokio::time::sleep(Duration::from_millis(100)).await;

// wiremock automatically verifies expect(1) when forward_server is dropped
}
}
Loading