Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 34 additions & 22 deletions substrate/utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ mod logging;

use codec::{Compact, Decode, Encode};
use indicatif::{ProgressBar, ProgressStyle};
use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient};
use jsonrpsee::{
core::params::ArrayParams,
ws_client::{WsClient, WsClientBuilder},
};
use log::*;
use serde::de::DeserializeOwned;
use sp_core::{
Expand Down Expand Up @@ -158,34 +161,35 @@ pub struct OfflineConfig {
pub enum Transport {
/// Use the `URI` to open a new WebSocket connection.
Uri(String),
/// Use HTTP connection.
RemoteClient(HttpClient),
/// Use WS connection.
RemoteClient(Arc<WsClient>),
}

impl Transport {
fn as_client(&self) -> Option<&HttpClient> {
fn as_client(&self) -> Option<&WsClient> {
match self {
Self::RemoteClient(client) => Some(client),
_ => None,
}
}

// Build an HttpClient from a URI.
// Build an [`Self::RemoteClient`] from a URI.
async fn init(&mut self) -> Result<()> {
if let Self::Uri(uri) = self {
debug!(target: LOG_TARGET, "initializing remote client to {uri:?}");

let http_client = HttpClient::builder()
let ws_client = WsClientBuilder::default()
.max_request_size(u32::MAX)
.max_response_size(u32::MAX)
.request_timeout(std::time::Duration::from_secs(60 * 5))
.build(uri)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "error: {e:?}");
"failed to build http client"
})?;

*self = Self::RemoteClient(http_client)
*self = Self::RemoteClient(Arc::new(ws_client))
}

Ok(())
Expand All @@ -198,9 +202,9 @@ impl From<String> for Transport {
}
}

impl From<HttpClient> for Transport {
fn from(client: HttpClient) -> Self {
Transport::RemoteClient(client)
impl From<WsClient> for Transport {
fn from(client: WsClient) -> Self {
Transport::RemoteClient(Arc::new(client))
}
}

Expand Down Expand Up @@ -228,11 +232,11 @@ pub struct OnlineConfig<H> {
}

impl<H: Clone> OnlineConfig<H> {
/// Return rpc (http) client reference.
fn rpc_client(&self) -> &HttpClient {
/// Return rpc (ws) client reference.
fn rpc_client(&self) -> &WsClient {
self.transport
.as_client()
.expect("http client must have been initialized by now; qed.")
.expect("ws client must have been initialized by now; qed.")
}

fn at_expected(&self) -> H {
Expand Down Expand Up @@ -338,7 +342,7 @@ where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
const PARALLEL_REQUESTS: usize = 4;
const PARALLEL_REQUESTS: usize = 8;
const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15);
Expand Down Expand Up @@ -433,7 +437,8 @@ where
let builder = Arc::new(self.clone());
let mut handles = vec![];

for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
for (worker_index, (start_key, end_key)) in start_keys.into_iter().zip(end_keys).enumerate()
{
let permit = parallel
.clone()
.acquire_owned()
Expand All @@ -447,7 +452,13 @@ where

let handle = tokio::spawn(async move {
let res = builder
.rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
.rpc_get_keys_in_range(
&prefix,
block,
start_key.as_ref(),
end_key.as_ref(),
worker_index,
)
.await;
drop(permit);
res
Expand Down Expand Up @@ -479,6 +490,7 @@ where
block: B::Hash,
start_key: Option<&StorageKey>,
end_key: Option<&StorageKey>,
worker_index: usize,
) -> Result<Vec<StorageKey>> {
let mut last_key: Option<&StorageKey> = start_key;
let mut keys: Vec<StorageKey> = vec![];
Expand Down Expand Up @@ -512,7 +524,7 @@ where

debug!(
target: LOG_TARGET,
"new total = {}, full page received: {}",
"new total = {}, full page received: {}, worker = {worker_index}",
keys.len(),
HexDisplay::from(last_key.expect("full page received, cannot be None"))
);
Expand Down Expand Up @@ -566,7 +578,7 @@ where
/// }
/// ```
async fn get_storage_data_dynamic_batch_size(
client: &HttpClient,
client: &WsClient,
payloads: Vec<(String, ArrayParams)>,
bar: &ProgressBar,
) -> Result<Vec<Option<StorageData>>, String> {
Expand Down Expand Up @@ -767,7 +779,7 @@ where

/// Get the values corresponding to `child_keys` at the given `prefixed_top_key`.
pub(crate) async fn rpc_child_get_storage_paged(
client: &HttpClient,
client: &WsClient,
prefixed_top_key: &StorageKey,
child_keys: Vec<StorageKey>,
at: B::Hash,
Expand Down Expand Up @@ -814,7 +826,7 @@ where
}

pub(crate) async fn rpc_child_get_keys(
client: &HttpClient,
client: &WsClient,
prefixed_top_key: &StorageKey,
child_prefix: StorageKey,
at: B::Hash,
Expand Down Expand Up @@ -1589,12 +1601,12 @@ mod remote_tests {
let at = builder.as_online().at.unwrap();

let prefix = StorageKey(vec![13]);
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None, 0).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
assert_eq!(paged, para);

let prefix = StorageKey(vec![]);
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None, 0).await.unwrap();
let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
assert_eq!(paged, para);
}
Expand Down
Loading