diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 8f53ee4bd230..a0c6f1b0a53c 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -47,6 +47,7 @@ use reth_stages_api::ControlFlow; use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError}; use revm_primitives::ResultAndState; +use root::{StateRootConfig, StateRootTask, StdReceiverStream}; use std::{ cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, VecDeque}, @@ -2191,13 +2192,26 @@ where let exec_time = Instant::now(); - // TODO: create StateRootTask with the receiving end of a channel and - // pass the sending end of the channel to the state hook. - let noop_state_hook = |_result_and_state: &ResultAndState| {}; + let (state_root_tx, state_root_rx) = std::sync::mpsc::channel(); + + let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; + + let input = self + .compute_trie_input(consistent_view.clone(), block.parent_hash) + .map_err(|e| InsertBlockErrorKindTwo::Other(Box::new(e)))?; + let state_root_config = + StateRootConfig { consistent_view: consistent_view.clone(), input: Arc::new(input) }; + let receiver_stream = StdReceiverStream::new(state_root_rx); + let state_root_task = StateRootTask::new(state_root_config, receiver_stream); + let state_root_handle = state_root_task.spawn(); + let state_hook = move |result_and_state: &ResultAndState| { + let _ = state_root_tx.send(result_and_state.state.clone()); + }; + let output = self.metrics.executor.execute_metered( executor, (&block, U256::MAX).into(), - Box::new(noop_state_hook), + Box::new(state_hook), )?; trace!(target: "engine::tree", elapsed=?exec_time.elapsed(), ?block_number, "Executed block"); @@ -2222,8 +2236,6 @@ where let root_time = Instant::now(); let mut state_root_result = None; - // TODO: switch to calculate state root using `StateRootTask`. - // We attempt to compute state root in parallel if we are currently not persisting anything // to database. This is safe, because the database state cannot change until we // finish parallel computation. It is important that nothing is being persisted as @@ -2231,9 +2243,13 @@ where // per thread and it might end up with a different view of the database. let persistence_in_progress = self.persistence_state.in_progress(); if !persistence_in_progress { - state_root_result = match self - .compute_state_root_parallel(block.parent_hash, &hashed_state) - { + let mut input = self + .compute_trie_input(consistent_view.clone(), block.parent_hash) + .map_err(|e| InsertBlockErrorKindTwo::Other(Box::new(e)))?; + // Extend with block we are validating root for. + input.append_ref(&hashed_state); + + state_root_result = match self.compute_state_root_parallel(consistent_view, input) { Ok((state_root, trie_output)) => Some((state_root, trie_output)), Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => { debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back"); @@ -2244,6 +2260,9 @@ where } let (state_root, trie_output) = if let Some(result) = state_root_result { + if let Ok(state_root_task_result) = state_root_handle.wait_for_result() { + debug!(target: "engine::tree", block=?sealed_block.num_hash(), state_root_task_result=?state_root_task_result.0, regular_state_root_result = ?result.0); + } result } else { debug!(target: "engine::tree", block=?sealed_block.num_hash(), persistence_in_progress, "Failed to compute state root in parallel"); @@ -2298,23 +2317,11 @@ where Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid)) } - /// Compute state root for the given hashed post state in parallel. - /// - /// # Returns - /// - /// Returns `Ok(_)` if computed successfully. - /// Returns `Err(_)` if error was encountered during computation. - /// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation - /// should be used instead. - fn compute_state_root_parallel( + fn compute_trie_input( &self, + consistent_view: ConsistentDbView

, parent_hash: B256, - hashed_state: &HashedPostState, - ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { - // TODO: when we switch to calculate state root using `StateRootTask` this - // method can be still useful to calculate the required `TrieInput` to - // create the task. - let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; + ) -> Result { let mut input = TrieInput::default(); if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) { @@ -2334,9 +2341,22 @@ where input.append(revert_state); } - // Extend with block we are validating root for. - input.append_ref(hashed_state); + Ok(input) + } + /// Compute state root for the given hashed post state in parallel. + /// + /// # Returns + /// + /// Returns `Ok(_)` if computed successfully. + /// Returns `Err(_)` if error was encountered during computation. + /// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation + /// should be used instead. + fn compute_state_root_parallel( + &self, + consistent_view: ConsistentDbView

, + input: TrieInput, + ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates() }