diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 9eee2071ce289..9f530106f2a44 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -24,7 +24,10 @@ mod logging; use codec::{Compact, Decode, Encode}; use indicatif::{ProgressBar, ProgressStyle}; -use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient}; +use jsonrpsee::{ + core::params::ArrayParams, + ws_client::{WsClient, WsClientBuilder}, +}; use log::*; use serde::de::DeserializeOwned; use sp_core::{ @@ -158,34 +161,35 @@ pub struct OfflineConfig { pub enum Transport { /// Use the `URI` to open a new WebSocket connection. Uri(String), - /// Use HTTP connection. - RemoteClient(HttpClient), + /// Use WS connection. + RemoteClient(Arc), } impl Transport { - fn as_client(&self) -> Option<&HttpClient> { + fn as_client(&self) -> Option<&WsClient> { match self { Self::RemoteClient(client) => Some(client), _ => None, } } - // Build an HttpClient from a URI. + // Build an [`Self::RemoteClient`] from a URI. async fn init(&mut self) -> Result<()> { if let Self::Uri(uri) = self { debug!(target: LOG_TARGET, "initializing remote client to {uri:?}"); - let http_client = HttpClient::builder() + let ws_client = WsClientBuilder::default() .max_request_size(u32::MAX) .max_response_size(u32::MAX) .request_timeout(std::time::Duration::from_secs(60 * 5)) .build(uri) + .await .map_err(|e| { error!(target: LOG_TARGET, "error: {e:?}"); "failed to build http client" })?; - *self = Self::RemoteClient(http_client) + *self = Self::RemoteClient(Arc::new(ws_client)) } Ok(()) @@ -198,9 +202,9 @@ impl From for Transport { } } -impl From for Transport { - fn from(client: HttpClient) -> Self { - Transport::RemoteClient(client) +impl From for Transport { + fn from(client: WsClient) -> Self { + Transport::RemoteClient(Arc::new(client)) } } @@ -228,11 +232,11 @@ pub struct OnlineConfig { } impl OnlineConfig { - /// Return rpc (http) client reference. - fn rpc_client(&self) -> &HttpClient { + /// Return rpc (ws) client reference. + fn rpc_client(&self) -> &WsClient { self.transport .as_client() - .expect("http client must have been initialized by now; qed.") + .expect("ws client must have been initialized by now; qed.") } fn at_expected(&self) -> H { @@ -338,7 +342,7 @@ where B::Hash: DeserializeOwned, B::Header: DeserializeOwned, { - const PARALLEL_REQUESTS: usize = 4; + const PARALLEL_REQUESTS: usize = 8; const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10; const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50; const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15); @@ -433,7 +437,8 @@ where let builder = Arc::new(self.clone()); let mut handles = vec![]; - for (start_key, end_key) in start_keys.into_iter().zip(end_keys) { + for (worker_index, (start_key, end_key)) in start_keys.into_iter().zip(end_keys).enumerate() + { let permit = parallel .clone() .acquire_owned() @@ -447,7 +452,13 @@ where let handle = tokio::spawn(async move { let res = builder - .rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref()) + .rpc_get_keys_in_range( + &prefix, + block, + start_key.as_ref(), + end_key.as_ref(), + worker_index, + ) .await; drop(permit); res @@ -479,6 +490,7 @@ where block: B::Hash, start_key: Option<&StorageKey>, end_key: Option<&StorageKey>, + worker_index: usize, ) -> Result> { let mut last_key: Option<&StorageKey> = start_key; let mut keys: Vec = vec![]; @@ -512,7 +524,7 @@ where debug!( target: LOG_TARGET, - "new total = {}, full page received: {}", + "new total = {}, full page received: {}, worker = {worker_index}", keys.len(), HexDisplay::from(last_key.expect("full page received, cannot be None")) ); @@ -566,7 +578,7 @@ where /// } /// ``` async fn get_storage_data_dynamic_batch_size( - client: &HttpClient, + client: &WsClient, payloads: Vec<(String, ArrayParams)>, bar: &ProgressBar, ) -> Result>, String> { @@ -767,7 +779,7 @@ where /// Get the values corresponding to `child_keys` at the given `prefixed_top_key`. pub(crate) async fn rpc_child_get_storage_paged( - client: &HttpClient, + client: &WsClient, prefixed_top_key: &StorageKey, child_keys: Vec, at: B::Hash, @@ -814,7 +826,7 @@ where } pub(crate) async fn rpc_child_get_keys( - client: &HttpClient, + client: &WsClient, prefixed_top_key: &StorageKey, child_prefix: StorageKey, at: B::Hash, @@ -1589,12 +1601,12 @@ mod remote_tests { let at = builder.as_online().at.unwrap(); let prefix = StorageKey(vec![13]); - let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap(); + let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None, 0).await.unwrap(); let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap(); assert_eq!(paged, para); let prefix = StorageKey(vec![]); - let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap(); + let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None, 0).await.unwrap(); let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap(); assert_eq!(paged, para); }