Skip to content

Commit c4b147c

Browse files
Rjectedrkrasiukfgimenez
authored
chore: move state root task result handling to fn (paradigmxyz#13892)
Co-authored-by: Roman Krasiuk <[email protected]> Co-authored-by: Federico Gimenez <[email protected]>
1 parent 50dae68 commit c4b147c

File tree

1 file changed

+76
-52
lines changed
  • crates/engine/tree/src/tree

1 file changed

+76
-52
lines changed

crates/engine/tree/src/tree/mod.rs

+76-52
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ use reth_trie::{
5050
hashed_cursor::HashedPostStateCursorFactory,
5151
prefix_set::TriePrefixSetsMut,
5252
proof::ProofBlindedProviderFactory,
53-
trie_cursor::InMemoryTrieCursorFactory,
53+
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
5454
updates::{TrieUpdates, TrieUpdatesSorted},
5555
HashedPostState, HashedPostStateSorted, TrieInput,
5656
};
5757
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
5858
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
5959
use revm_primitives::EvmState;
60-
use root::{StateRootComputeOutcome, StateRootConfig, StateRootTask};
60+
use root::{StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootTask};
6161
use std::{
6262
cmp::Ordering,
6363
collections::{btree_map, hash_map, BTreeMap, VecDeque},
@@ -67,7 +67,7 @@ use std::{
6767
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
6868
Arc,
6969
},
70-
time::Instant,
70+
time::{Duration, Instant},
7171
};
7272
use tokio::sync::{
7373
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
@@ -2364,55 +2364,20 @@ where
23642364
// different view of the database.
23652365
let (state_root, trie_updates, root_elapsed) = if persistence_not_in_progress {
23662366
if self.config.use_state_root_task() {
2367-
match state_root_handle
2368-
.expect("state root handle must exist if use_state_root_task is true")
2369-
.wait_for_result()
2370-
{
2371-
Ok(StateRootComputeOutcome {
2372-
state_root: (task_state_root, task_trie_updates),
2373-
time_from_last_update,
2374-
..
2375-
}) => {
2376-
info!(
2377-
target: "engine::tree",
2378-
block = ?sealed_block.num_hash(),
2379-
?task_state_root,
2380-
task_elapsed = ?time_from_last_update,
2381-
"Task state root finished"
2382-
);
2383-
2384-
if task_state_root != block.header().state_root() ||
2385-
self.config.always_compare_trie_updates()
2386-
{
2387-
if task_state_root != block.header().state_root() {
2388-
debug!(target: "engine::tree", "Task state root does not match block state root");
2389-
}
2390-
2391-
let (regular_root, regular_updates) =
2392-
state_provider.state_root_with_updates(hashed_state.clone())?;
2393-
2394-
if regular_root == block.header().state_root() {
2395-
compare_trie_updates(
2396-
in_memory_trie_cursor.expect("in memory trie cursor must exist if use_state_root_task is true"),
2397-
task_trie_updates.clone(),
2398-
regular_updates,
2399-
)
2400-
.map_err(ProviderError::from)?;
2401-
} else {
2402-
debug!(target: "engine::tree", "Regular state root does not match block state root");
2403-
}
2404-
}
2405-
2406-
(task_state_root, task_trie_updates, time_from_last_update)
2407-
}
2408-
Err(error) => {
2409-
info!(target: "engine::tree", ?error, "Failed to wait for state root task result");
2410-
// Fall back to sequential calculation
2411-
let (root, updates) =
2412-
state_provider.state_root_with_updates(hashed_state.clone())?;
2413-
(root, updates, root_time.elapsed())
2414-
}
2415-
}
2367+
let state_root_handle = state_root_handle
2368+
.expect("state root handle must exist if use_state_root_task is true");
2369+
let in_memory_trie_cursor = in_memory_trie_cursor
2370+
.expect("in memory trie cursor must exist if use_state_root_task is true");
2371+
2372+
// Handle state root result from task using handle
2373+
self.handle_state_root_result(
2374+
state_root_handle,
2375+
sealed_block.as_ref(),
2376+
&hashed_state,
2377+
&state_provider,
2378+
in_memory_trie_cursor,
2379+
root_time,
2380+
)?
24162381
} else {
24172382
match self
24182383
.compute_state_root_parallel(block.header().parent_hash(), &hashed_state)
@@ -2589,6 +2554,65 @@ where
25892554
))
25902555
}
25912556

2557+
/// Waits for the result on the input [`StateRootHandle`], and handles it, falling back to
2558+
/// the hash builder-based state root calculation if it fails.
2559+
fn handle_state_root_result(
2560+
&self,
2561+
state_root_handle: StateRootHandle,
2562+
sealed_block: &SealedBlock<N::Block>,
2563+
hashed_state: &HashedPostState,
2564+
state_provider: impl StateRootProvider,
2565+
in_memory_trie_cursor: impl TrieCursorFactory,
2566+
root_time: Instant,
2567+
) -> Result<(B256, TrieUpdates, Duration), InsertBlockErrorKind> {
2568+
match state_root_handle.wait_for_result() {
2569+
Ok(StateRootComputeOutcome {
2570+
state_root: (task_state_root, task_trie_updates),
2571+
time_from_last_update,
2572+
..
2573+
}) => {
2574+
info!(
2575+
target: "engine::tree",
2576+
block = ?sealed_block.num_hash(),
2577+
?task_state_root,
2578+
task_elapsed = ?time_from_last_update,
2579+
"Task state root finished"
2580+
);
2581+
2582+
if task_state_root != sealed_block.header().state_root() ||
2583+
self.config.always_compare_trie_updates()
2584+
{
2585+
if task_state_root != sealed_block.header().state_root() {
2586+
debug!(target: "engine::tree", "Task state root does not match block state root");
2587+
}
2588+
2589+
let (regular_root, regular_updates) =
2590+
state_provider.state_root_with_updates(hashed_state.clone())?;
2591+
2592+
if regular_root == sealed_block.header().state_root() {
2593+
compare_trie_updates(
2594+
in_memory_trie_cursor,
2595+
task_trie_updates.clone(),
2596+
regular_updates,
2597+
)
2598+
.map_err(ProviderError::from)?;
2599+
} else {
2600+
debug!(target: "engine::tree", "Regular state root does not match block state root");
2601+
}
2602+
}
2603+
2604+
Ok((task_state_root, task_trie_updates, time_from_last_update))
2605+
}
2606+
Err(error) => {
2607+
info!(target: "engine::tree", ?error, "Failed to wait for state root task result");
2608+
// Fall back to sequential calculation
2609+
let (root, updates) =
2610+
state_provider.state_root_with_updates(hashed_state.clone())?;
2611+
Ok((root, updates, root_time.elapsed()))
2612+
}
2613+
}
2614+
}
2615+
25922616
/// Attempts to find the header for the given block hash if it is canonical.
25932617
pub fn find_canonical_header(
25942618
&self,

0 commit comments

Comments
 (0)