Skip to content

Commit 9614d41

Browse files
committed
chore(test-suite): improved nonce manager
1 parent 991df00 commit 9614d41

File tree

5 files changed

+201
-58
lines changed

5 files changed

+201
-58
lines changed

test-suite/gateway-stress/src/blockchain/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod manager;
2+
pub mod nonce_manager;
23
pub mod provider;
34
pub mod wallet;
45

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use alloy::{
2+
network::Network,
3+
primitives::Address,
4+
providers::{Provider, fillers::NonceManager},
5+
transports::TransportResult,
6+
};
7+
use async_trait::async_trait;
8+
use futures::lock::{Mutex, MutexGuard};
9+
use std::collections::{BTreeSet, HashMap, hash_map::Entry};
10+
use std::sync::Arc;
11+
use tracing::debug;
12+
13+
/// A robust, in-memory nonce manager for a scalable transaction engine.
14+
#[derive(Clone, Debug, Default)]
15+
pub struct ZamaNonceManager {
16+
/// Nonce state for each account, shared across all tasks/threads using the nonce manager.
17+
accounts: Arc<Mutex<HashMap<Address, AccountState>>>,
18+
}
19+
20+
/// Represents the complete nonce state for a single account.
21+
#[derive(Debug, Clone, Default, PartialEq, Eq)]
22+
pub struct AccountState {
23+
/// The "high-water mark" nonce. Used only when no gaps are available.
24+
pub next_nonce: u64,
25+
/// Nonces that have been dispatched but not yet confirmed or rejected.
26+
pub locked_nonces: BTreeSet<u64>,
27+
/// Nonces that were previously locked but have been released, creating gaps.
28+
pub available_nonces: BTreeSet<u64>,
29+
}
30+
31+
impl ZamaNonceManager {
32+
pub fn new() -> Self {
33+
Self::default()
34+
}
35+
36+
/// The primary logic for acquiring and locking the next valid nonce.
37+
///
38+
/// The logic prioritizes filling gaps from `available_nonces` before
39+
/// incrementing the main `next_nonce` counter.
40+
pub async fn get_increase_and_lock_nonce<P, N>(
41+
&self,
42+
provider: &P,
43+
address: Address,
44+
) -> TransportResult<u64>
45+
where
46+
P: Provider<N>,
47+
N: Network,
48+
{
49+
let mut accounts_guard = self.accounts.lock().await;
50+
let account =
51+
Self::get_or_init_account_state(&mut accounts_guard, provider, address).await?;
52+
let nonce_to_use =
53+
if let Some(available_nonce) = account.available_nonces.iter().next().copied() {
54+
account.available_nonces.remove(&available_nonce);
55+
debug!(%address, nonce = available_nonce, "Reusing available nonce");
56+
available_nonce
57+
} else {
58+
let next = account.next_nonce;
59+
account.next_nonce += 1;
60+
debug!(%address, nonce = next, "Using next sequential nonce");
61+
next
62+
};
63+
64+
account.locked_nonces.insert(nonce_to_use);
65+
Ok(nonce_to_use)
66+
}
67+
68+
/// Releases a locked nonce, making it available for reuse.
69+
pub async fn release_nonce(&self, address: Address, nonce: u64) {
70+
let mut accounts = self.accounts.lock().await;
71+
if let Some(account) = accounts.get_mut(&address)
72+
&& account.locked_nonces.remove(&nonce)
73+
{
74+
account.available_nonces.insert(nonce);
75+
}
76+
}
77+
78+
/// Confirms a nonce has been used on-chain, removing it permanently.
79+
pub async fn confirm_nonce(&self, address: Address, nonce: u64) {
80+
let mut accounts = self.accounts.lock().await;
81+
if let Some(account) = accounts.get_mut(&address) {
82+
account.locked_nonces.remove(&nonce);
83+
}
84+
}
85+
86+
/// Helper to retrieve or initialize the `AccountState` for an address.
87+
async fn get_or_init_account_state<'a, P, N>(
88+
accounts_guard: &'a mut MutexGuard<'_, HashMap<Address, AccountState>>,
89+
provider: &P,
90+
address: Address,
91+
) -> TransportResult<&'a mut AccountState>
92+
where
93+
P: Provider<N>,
94+
N: Network,
95+
{
96+
let account = match accounts_guard.entry(address) {
97+
Entry::Occupied(entry) => entry.into_mut(),
98+
Entry::Vacant(entry) => {
99+
let initial_nonce = provider.get_transaction_count(address).await?;
100+
entry.insert(AccountState {
101+
next_nonce: initial_nonce,
102+
..Default::default()
103+
})
104+
}
105+
};
106+
Ok(account)
107+
}
108+
}
109+
110+
// Implements the `NonceManager` trait for seamless integration with Alloy's provider stack.
111+
#[async_trait]
112+
impl NonceManager for ZamaNonceManager {
113+
async fn get_next_nonce<P, N>(&self, provider: &P, address: Address) -> TransportResult<u64>
114+
where
115+
P: Provider<N>,
116+
N: Network,
117+
{
118+
self.get_increase_and_lock_nonce(provider, address).await
119+
}
120+
}

test-suite/gateway-stress/src/blockchain/provider.rs

Lines changed: 57 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::blockchain::nonce_manager::ZamaNonceManager;
12
use alloy::{
23
consensus::Account,
34
eips::{BlockId, BlockNumberOrTag, Encodable2718},
@@ -7,29 +8,25 @@ use alloy::{
78
U256,
89
},
910
providers::{
10-
EthCall, EthCallMany, EthGetBlock, FilterPollerBuilder, GetSubscription,
11-
PendingTransaction, PendingTransactionBuilder, PendingTransactionConfig,
12-
PendingTransactionError, Provider, ProviderCall, RootProvider, RpcWithBlock, SendableTx,
13-
fillers::{
14-
BlobGasFiller, CachedNonceManager, FillProvider, GasFiller, JoinFill, NonceManager,
15-
TxFiller,
16-
},
11+
EthCall, EthCallMany, EthGetBlock, FilterPollerBuilder, PendingTransaction,
12+
PendingTransactionBuilder, PendingTransactionConfig, PendingTransactionError, Provider,
13+
ProviderCall, RootProvider, RpcWithBlock, SendableTx,
14+
fillers::{BlobGasFiller, FillProvider, GasFiller, JoinFill, NonceManager, TxFiller},
1715
},
1816
rpc::{
1917
client::NoParams,
18+
json_rpc::ErrorPayload,
2019
types::{
2120
AccessListResult, Bundle, EIP1186AccountProofResponse, EthCallResponse, FeeHistory,
2221
Filter, FilterChanges, Index, Log, SyncStatus, TransactionReceipt, TransactionRequest,
2322
erc4337::TransactionConditional,
24-
pubsub::{Params, SubscriptionKind},
2523
simulate::{SimulatePayload, SimulatedBlock},
2624
},
2725
},
28-
transports::{TransportError, TransportResult},
26+
transports::{RpcError, TransportError, TransportErrorKind, TransportResult},
2927
};
30-
use futures::lock::Mutex;
3128
use serde_json::value::RawValue;
32-
use std::{borrow::Cow, sync::Arc};
29+
use std::borrow::Cow;
3330

3431
pub type FillersWithoutNonceManagement = JoinFill<GasFiller, BlobGasFiller>;
3532

@@ -46,7 +43,7 @@ where
4643
{
4744
inner: FillProvider<F, P, N>,
4845
signer_address: Address,
49-
nonce_manager: Arc<Mutex<CachedNonceManager>>,
46+
nonce_manager: ZamaNonceManager,
5047
}
5148

5249
impl<F, P> NonceManagedProvider<F, P>
@@ -58,19 +55,18 @@ where
5855
Self {
5956
inner: provider,
6057
signer_address,
61-
nonce_manager: Default::default(),
58+
nonce_manager: ZamaNonceManager::new(),
6259
}
6360
}
6461

6562
pub async fn send_transaction_sync(
6663
&self,
6764
mut tx: TransactionRequest,
6865
) -> TransportResult<TransactionReceipt> {
66+
let signer_addr = self.signer_address;
6967
let nonce = self
7068
.nonce_manager
71-
.lock()
72-
.await
73-
.get_next_nonce(&self.inner, self.signer_address)
69+
.get_next_nonce(&self.inner, signer_addr)
7470
.await?;
7571
tx.set_nonce(nonce);
7672

@@ -82,15 +78,22 @@ where
8278
.map_err(|e| TransportError::LocalUsageError(Box::new(e)))?
8379
.encode_2718(&mut tx_bytes);
8480

85-
let res = self
81+
let send_tx_result = self
8682
.client()
8783
.request("eth_sendRawTransactionSync", (Bytes::from(tx_bytes),))
8884
.await;
89-
if res.is_err() {
90-
// Reset the nonce manager if the transaction sending failed.
91-
*self.nonce_manager.lock().await = Default::default();
85+
86+
match &send_tx_result {
87+
Err(e) if is_nonce_too_low(e) => {
88+
self.nonce_manager.confirm_nonce(signer_addr, nonce).await;
89+
}
90+
Err(_) => {
91+
self.nonce_manager.release_nonce(signer_addr, nonce).await;
92+
}
93+
Ok(_) => self.nonce_manager.confirm_nonce(signer_addr, nonce).await,
9294
}
93-
res
95+
96+
send_tx_result
9497
}
9598
}
9699

@@ -109,6 +112,25 @@ where
109112
}
110113
}
111114

115+
// See https://ethereum-json-rpc.com/errors
116+
const ETH_INVALID_INPUT_RPC_ERROR_CODE: i64 = -32000;
117+
118+
/// Returns `true` if the RPC error is "nonce too low" or "already known", `false` otherwise.
119+
fn is_nonce_too_low(error: &RpcError<TransportErrorKind>) -> bool {
120+
match error {
121+
RpcError::ErrorResp(ErrorPayload { code, message, .. }) => {
122+
if *code == ETH_INVALID_INPUT_RPC_ERROR_CODE {
123+
let lowercase_msg = message.to_lowercase();
124+
lowercase_msg.starts_with("nonce too low")
125+
|| lowercase_msg.starts_with("already known")
126+
} else {
127+
false
128+
}
129+
}
130+
_ => false,
131+
}
132+
}
133+
112134
#[async_trait::async_trait]
113135
impl<F, P, N> Provider<N> for NonceManagedProvider<F, P, N>
114136
where
@@ -124,19 +146,25 @@ where
124146
&self,
125147
mut tx: N::TransactionRequest,
126148
) -> TransportResult<PendingTransactionBuilder<N>> {
149+
let signer_addr = self.signer_address;
127150
let nonce = self
128151
.nonce_manager
129-
.lock()
130-
.await
131-
.get_next_nonce(&self.inner, self.signer_address)
152+
.get_next_nonce(&self.inner, signer_addr)
132153
.await?;
133154
tx.set_nonce(nonce);
134-
let res = self.inner.send_transaction(tx).await;
135-
if res.is_err() {
136-
// Reset the nonce manager if the transaction sending failed.
137-
*self.nonce_manager.lock().await = Default::default();
155+
let send_tx_result = self.inner.send_transaction(tx).await;
156+
157+
match &send_tx_result {
158+
Err(e) if is_nonce_too_low(e) => {
159+
self.nonce_manager.confirm_nonce(signer_addr, nonce).await;
160+
}
161+
Err(_) => {
162+
self.nonce_manager.release_nonce(signer_addr, nonce).await;
163+
}
164+
Ok(_) => self.nonce_manager.confirm_nonce(signer_addr, nonce).await,
138165
}
139-
res
166+
167+
send_tx_result
140168
}
141169

142170
fn get_accounts(&self) -> ProviderCall<NoParams, Vec<Address>> {
@@ -416,28 +444,6 @@ where
416444
self.inner.sign_transaction(tx).await
417445
}
418446

419-
fn subscribe_blocks(&self) -> GetSubscription<(SubscriptionKind,), N::HeaderResponse> {
420-
self.inner.subscribe_blocks()
421-
}
422-
423-
fn subscribe_pending_transactions(&self) -> GetSubscription<(SubscriptionKind,), B256> {
424-
self.inner.subscribe_pending_transactions()
425-
}
426-
427-
fn subscribe_full_pending_transactions(
428-
&self,
429-
) -> GetSubscription<(SubscriptionKind, Params), N::TransactionResponse> {
430-
self.inner.subscribe_full_pending_transactions()
431-
}
432-
433-
fn subscribe_logs(&self, filter: &Filter) -> GetSubscription<(SubscriptionKind, Params), Log> {
434-
self.inner.subscribe_logs(filter)
435-
}
436-
437-
async fn unsubscribe(&self, id: B256) -> TransportResult<()> {
438-
self.inner.unsubscribe(id).await
439-
}
440-
441447
fn syncing(&self) -> ProviderCall<NoParams, SyncStatus> {
442448
self.inner.syncing()
443449
}

test-suite/gateway-stress/src/decryption/mod.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,8 @@ where
6262
for i in 1..=TX_RETRIES {
6363
overprovision_gas(provider, &mut decryption_call).await;
6464

65-
trace!("Sending transaction to the Gateway");
66-
match provider
67-
.send_transaction_sync(decryption_call.clone())
68-
.await
69-
{
65+
debug!("Sending transaction to the Gateway");
66+
match send_tx_sync_with_increased_gas_limit(provider, &mut decryption_call).await {
7067
Ok(receipt) => {
7168
let id = extract_id_fn(&receipt)?;
7269
id_sender.send(id)?;
@@ -83,6 +80,21 @@ where
8380
))
8481
}
8582

83+
async fn send_tx_sync_with_increased_gas_limit(
84+
provider: &AppProvider,
85+
call: &mut TransactionRequest,
86+
) -> anyhow::Result<TransactionReceipt> {
87+
// Force a fresh gas estimation on each attempt to account for state drift
88+
call.gas = None;
89+
overprovision_gas(provider, call).await;
90+
91+
let receipt = provider.send_transaction_sync(call.clone()).await?;
92+
if !receipt.status() {
93+
return Err(anyhow!("Tx {} was reverted", receipt.transaction_hash));
94+
}
95+
Ok(receipt)
96+
}
97+
8698
async fn overprovision_gas<P: Provider>(provider: &P, call: &mut TransactionRequest) {
8799
let current_gas = match call.gas {
88100
Some(gas) => gas,

test-suite/gateway-stress/templates/gw_bench.csv

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ parallel_requests;number_of_measures;decryption_type
77
50;50;public
88
100;50;public
99
200;10;public
10+
300;5;public
11+
400;5;public
1012
1;50;user
1113
2;50;user
1214
5;50;user
1315
10;50;user
1416
20;50;user
1517
50;30;user
16-
100;10;user
17-
200;5;user
18+
100;50;user
19+
200;10;user
20+
300;5;user
21+
400;5;user

0 commit comments

Comments
 (0)