diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index a6203ea2a73e..0210142be713 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -73,7 +73,7 @@ impl> Command { { // building network downloaders using the fetch client let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) - .build(client.clone(), Arc::clone(&consensus)) + .build(client.clone(), consensus.clone().as_header_validator()) .into_task_with(task_executor); let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies) diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 64daba2b453d..0ebef1efe6e6 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -370,7 +370,7 @@ where .with_tip_sender(tip_tx), TestPipelineConfig::Real => { let header_downloader = ReverseHeadersDownloaderBuilder::default() - .build(client.clone(), consensus.clone()) + .build(client.clone(), consensus.clone().as_header_validator()) .into_task(); let body_downloader = BodiesDownloaderBuilder::default() diff --git a/crates/consensus/consensus/src/lib.rs b/crates/consensus/consensus/src/lib.rs index e059305911f6..da90439af7ff 100644 --- a/crates/consensus/consensus/src/lib.rs +++ b/crates/consensus/consensus/src/lib.rs @@ -11,7 +11,7 @@ extern crate alloc; -use alloc::{fmt::Debug, vec::Vec}; +use alloc::{fmt::Debug, sync::Arc, vec::Vec}; use alloy_consensus::Header; use alloy_eips::eip7685::Requests; use alloy_primitives::{BlockHash, BlockNumber, Bloom, B256, U256}; @@ -46,7 +46,9 @@ impl<'a> PostExecutionInput<'a> { /// Consensus is a protocol that chooses canonical chain. #[auto_impl::auto_impl(&, Arc)] -pub trait Consensus: HeaderValidator + Debug + Send + Sync { +pub trait Consensus: + AsHeaderValidator + HeaderValidator + Debug + Send + Sync +{ /// Ensures that body field values match the header. fn validate_body_against_header( &self, @@ -143,6 +145,23 @@ pub trait HeaderValidator: Debug + Send + Sync { ) -> Result<(), ConsensusError>; } +/// Helper trait to cast `Arc` to `Arc` +pub trait AsHeaderValidator: HeaderValidator { + /// Converts the [`Arc`] of self to [`Arc`] of [`HeaderValidator`] + fn as_header_validator<'a>(self: Arc) -> Arc + 'a> + where + Self: 'a; +} + +impl, H> AsHeaderValidator for T { + fn as_header_validator<'a>(self: Arc) -> Arc + 'a> + where + Self: 'a, + { + self + } +} + /// Consensus Errors #[derive(Debug, PartialEq, Eq, Clone, derive_more::Display, derive_more::Error)] pub enum ConsensusError { diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index bebc51ad7725..82f45dd23bfe 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -20,6 +20,7 @@ use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use std::{ cmp::Ordering, collections::BinaryHeap, + fmt::Debug, mem, ops::RangeInclusive, pin::Pin, @@ -298,7 +299,7 @@ where impl BodyDownloader for BodiesDownloader where - B: BodiesClient + 'static, + B: BodiesClient + 'static, Provider: HeaderProvider + Unpin + 'static, { type Body = B::Body; diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index de1638f3e665..a2b63c8ed186 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -8,6 +8,7 @@ use reth_network_p2p::{ }; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use std::{ + fmt::Debug, future::Future, ops::RangeInclusive, pin::Pin, @@ -47,10 +48,10 @@ impl TaskDownloader { /// use reth_network_p2p::bodies::client::BodiesClient; /// use reth_primitives_traits::InMemorySize; /// use reth_storage_api::HeaderProvider; - /// use std::sync::Arc; + /// use std::{fmt::Debug, sync::Arc}; /// /// fn t< - /// B: BodiesClient + 'static, + /// B: BodiesClient + 'static, /// Provider: HeaderProvider + Unpin + 'static, /// >( /// client: Arc, @@ -90,7 +91,7 @@ impl TaskDownloader { } } -impl BodyDownloader for TaskDownloader { +impl BodyDownloader for TaskDownloader { type Body = B; fn set_download_range(&mut self, range: RangeInclusive) -> DownloadResult<()> { diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index 486d4a05127a..ff352bc23049 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -1,8 +1,8 @@ use std::{collections::HashMap, io, path::Path}; -use alloy_consensus::Header; +use alloy_consensus::BlockHeader; use alloy_eips::BlockHashOrNumber; -use alloy_primitives::{BlockHash, BlockNumber, B256}; +use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256}; use futures::Future; use itertools::Either; use reth_network_p2p::{ @@ -13,7 +13,8 @@ use reth_network_p2p::{ priority::Priority, }; use reth_network_peers::PeerId; -use reth_primitives::{BlockBody, SealedHeader}; +use reth_primitives::SealedHeader; +use reth_primitives_traits::{Block, BlockBody, FullBlock}; use thiserror::Error; use tokio::{fs::File, io::AsyncReadExt}; use tokio_stream::StreamExt; @@ -40,15 +41,15 @@ pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000; /// /// This reads the entire file into memory, so it is not suitable for large files. #[derive(Debug)] -pub struct FileClient { +pub struct FileClient { /// The buffered headers retrieved when fetching new bodies. - headers: HashMap, + headers: HashMap, /// A mapping between block hash and number. hash_to_number: HashMap, /// The buffered bodies retrieved when fetching new headers. - bodies: HashMap, + bodies: HashMap, } /// An error that can occur when constructing and using a [`FileClient`]. @@ -73,7 +74,7 @@ impl From<&'static str> for FileClientError { } } -impl FileClient { +impl FileClient { /// Create a new file client from a file path. pub async fn new>(path: P) -> Result { let file = File::open(path).await?; @@ -114,7 +115,7 @@ impl FileClient { /// Clones and returns the highest header of this client has or `None` if empty. Seals header /// before returning. - pub fn tip_header(&self) -> Option { + pub fn tip_header(&self) -> Option> { self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal(h.clone())) } @@ -137,13 +138,13 @@ impl FileClient { } /// Use the provided bodies as the file client's block body buffer. - pub fn with_bodies(mut self, bodies: HashMap) -> Self { + pub fn with_bodies(mut self, bodies: HashMap) -> Self { self.bodies = bodies; self } /// Use the provided headers as the file client's block body buffer. - pub fn with_headers(mut self, headers: HashMap) -> Self { + pub fn with_headers(mut self, headers: HashMap) -> Self { self.headers = headers; for (number, header) in &self.headers { self.hash_to_number.insert(header.hash_slow(), *number); @@ -162,14 +163,14 @@ impl FileClient { } /// Returns an iterator over headers in the client. - pub fn headers_iter(&self) -> impl Iterator { + pub fn headers_iter(&self) -> impl Iterator { self.headers.values() } /// Returns a mutable iterator over bodies in the client. /// /// Panics, if file client headers and bodies are not mapping 1-1. - pub fn bodies_iter_mut(&mut self) -> impl Iterator { + pub fn bodies_iter_mut(&mut self) -> impl Iterator { let bodies = &mut self.bodies; let numbers = &self.hash_to_number; bodies.iter_mut().map(|(hash, body)| (numbers[hash], body)) @@ -177,27 +178,28 @@ impl FileClient { /// Returns the current number of transactions in the client. pub fn total_transactions(&self) -> usize { - self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions.len()) + self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len()) } } -impl FromReader for FileClient { +impl FromReader for FileClient { type Error = FileClientError; /// Initialize the [`FileClient`] from bytes that have been read from file. - fn from_reader( - reader: B, + fn from_reader( + reader: R, num_bytes: u64, ) -> impl Future, Self::Error>> where - B: AsyncReadExt + Unpin, + R: AsyncReadExt + Unpin, { let mut headers = HashMap::default(); let mut hash_to_number = HashMap::default(); let mut bodies = HashMap::default(); // use with_capacity to make sure the internal buffer contains the entire chunk - let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize); + let mut stream = + FramedRead::with_capacity(reader, BlockFileCodec::::default(), num_bytes as usize); trace!(target: "downloaders::file", target_num_bytes=num_bytes, @@ -225,13 +227,13 @@ impl FromReader for FileClient { } Err(err) => return Err(err), }; - let block_number = block.header.number; - let block_hash = block.header.hash_slow(); + let block_number = block.header().number(); + let block_hash = block.header().hash_slow(); // add to the internal maps - headers.insert(block.header.number, block.header.clone()); - hash_to_number.insert(block_hash, block.header.number); - bodies.insert(block_hash, block.into()); + headers.insert(block.header().number(), block.header().clone()); + hash_to_number.insert(block_hash, block.header().number()); + bodies.insert(block_hash, block.body().clone()); if log_interval == 0 { trace!(target: "downloaders::file", @@ -260,9 +262,9 @@ impl FromReader for FileClient { } } -impl HeadersClient for FileClient { - type Header = Header; - type Output = HeadersFut; +impl HeadersClient for FileClient { + type Header = B::Header; + type Output = HeadersFut; fn get_headers_with_priority( &self, @@ -311,9 +313,9 @@ impl HeadersClient for FileClient { } } -impl BodiesClient for FileClient { - type Body = BlockBody; - type Output = BodiesFut; +impl BodiesClient for FileClient { + type Body = B::Body; + type Output = BodiesFut; fn get_block_bodies_with_priority( &self, @@ -336,7 +338,7 @@ impl BodiesClient for FileClient { } } -impl DownloadClient for FileClient { +impl DownloadClient for FileClient { fn report_bad_message(&self, _peer_id: PeerId) { warn!("Reported a bad message on a file client, the file may be corrupted or invalid"); // noop @@ -542,7 +544,7 @@ mod tests { // create an empty file let file = tempfile::tempfile().unwrap(); - let client = + let client: Arc = Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone())); let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), @@ -567,14 +569,14 @@ mod tests { let p0 = child_header(&p1); let file = tempfile::tempfile().unwrap(); - let client = Arc::new(FileClient::from_file(file.into()).await.unwrap().with_headers( - HashMap::from([ + let client: Arc = Arc::new( + FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([ (0u64, p0.clone().unseal()), (1, p1.clone().unseal()), (2, p2.clone().unseal()), (3, p3.clone().unseal()), - ]), - )); + ])), + ); let mut downloader = ReverseHeadersDownloaderBuilder::default() .stream_batch_size(3) @@ -596,7 +598,7 @@ mod tests { // Generate some random blocks let (file, headers, _) = generate_bodies_file(0..=19).await; // now try to read them back - let client = Arc::new(FileClient::from_file(file).await.unwrap()); + let client: Arc = Arc::new(FileClient::from_file(file).await.unwrap()); // construct headers downloader and use first header let mut header_downloader = ReverseHeadersDownloaderBuilder::default() @@ -621,7 +623,7 @@ mod tests { let (file, headers, mut bodies) = generate_bodies_file(0..=19).await; // now try to read them back - let client = Arc::new(FileClient::from_file(file).await.unwrap()); + let client: Arc = Arc::new(FileClient::from_file(file).await.unwrap()); // insert headers in db for the bodies downloader insert_headers(factory.db_ref().db(), &headers); diff --git a/crates/net/downloaders/src/file_codec.rs b/crates/net/downloaders/src/file_codec.rs index 3e754f9cf49b..57a15b6c888c 100644 --- a/crates/net/downloaders/src/file_codec.rs +++ b/crates/net/downloaders/src/file_codec.rs @@ -3,7 +3,6 @@ use crate::file_client::FileClientError; use alloy_primitives::bytes::{Buf, BytesMut}; use alloy_rlp::{Decodable, Encodable}; -use reth_primitives::Block; use tokio_util::codec::{Decoder, Encoder}; /// Codec for reading raw block bodies from a file. @@ -19,10 +18,16 @@ use tokio_util::codec::{Decoder, Encoder}; /// /// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set /// the capacity of the framed reader to the size of the file. -pub(crate) struct BlockFileCodec; +pub(crate) struct BlockFileCodec(std::marker::PhantomData); -impl Decoder for BlockFileCodec { - type Item = Block; +impl Default for BlockFileCodec { + fn default() -> Self { + Self(std::marker::PhantomData) + } +} + +impl Decoder for BlockFileCodec { + type Item = B; type Error = FileClientError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { @@ -31,18 +36,17 @@ impl Decoder for BlockFileCodec { } let buf_slice = &mut src.as_ref(); - let body = - Block::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?; + let body = B::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?; src.advance(src.len() - buf_slice.len()); Ok(Some(body)) } } -impl Encoder for BlockFileCodec { +impl Encoder for BlockFileCodec { type Error = FileClientError; - fn encode(&mut self, item: Block, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: B, dst: &mut BytesMut) -> Result<(), Self::Error> { item.encode(dst); Ok(()) } diff --git a/crates/net/downloaders/src/headers/reverse_headers.rs b/crates/net/downloaders/src/headers/reverse_headers.rs index 2d79e0a7af6d..63a20ff27f5b 100644 --- a/crates/net/downloaders/src/headers/reverse_headers.rs +++ b/crates/net/downloaders/src/headers/reverse_headers.rs @@ -9,7 +9,7 @@ use futures::{stream::Stream, FutureExt}; use futures_util::{stream::FuturesUnordered, StreamExt}; use rayon::prelude::*; use reth_config::config::HeadersConfig; -use reth_consensus::{Consensus, HeaderValidator}; +use reth_consensus::HeaderValidator; use reth_network_p2p::{ error::{DownloadError, DownloadResult, PeerRequestResult}, headers::{ @@ -68,7 +68,7 @@ impl From for ReverseHeadersDownloaderError { #[derive(Debug)] pub struct ReverseHeadersDownloader { /// Consensus client used to validate headers - consensus: Arc>, + consensus: Arc>, /// Client used to download headers. client: Arc, /// The local head of the chain. @@ -1165,7 +1165,7 @@ impl ReverseHeadersDownloaderBuilder { pub fn build( self, client: H, - consensus: Arc>, + consensus: Arc>, ) -> ReverseHeadersDownloader where H: HeadersClient + 'static, diff --git a/crates/net/downloaders/src/headers/task.rs b/crates/net/downloaders/src/headers/task.rs index 81c4cd80da3f..3dbfd5e3615e 100644 --- a/crates/net/downloaders/src/headers/task.rs +++ b/crates/net/downloaders/src/headers/task.rs @@ -8,6 +8,7 @@ use reth_network_p2p::headers::{ use reth_primitives::SealedHeader; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use std::{ + fmt::Debug, future::Future, pin::Pin, task::{ready, Context, Poll}, @@ -44,10 +45,10 @@ impl TaskDownloader { /// # use std::sync::Arc; /// # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloader; /// # use reth_downloaders::headers::task::TaskDownloader; - /// # use reth_consensus::Consensus; + /// # use reth_consensus::HeaderValidator; /// # use reth_network_p2p::headers::client::HeadersClient; /// # use reth_primitives_traits::BlockHeader; - /// # fn t + 'static>(consensus:Arc>, client: Arc) { + /// # fn t + 'static>(consensus:Arc>, client: Arc) { /// let downloader = ReverseHeadersDownloader::::builder().build( /// client, /// consensus @@ -82,7 +83,7 @@ impl TaskDownloader { } } -impl HeaderDownloader for TaskDownloader { +impl HeaderDownloader for TaskDownloader { type Header = H; fn update_sync_gap(&mut self, head: SealedHeader, target: SyncTarget) { diff --git a/crates/net/downloaders/src/test_utils/mod.rs b/crates/net/downloaders/src/test_utils/mod.rs index 7755c5e6017c..635383ce3f34 100644 --- a/crates/net/downloaders/src/test_utils/mod.rs +++ b/crates/net/downloaders/src/test_utils/mod.rs @@ -43,7 +43,7 @@ pub(crate) async fn generate_bodies_file( let raw_block_bodies = create_raw_bodies(headers.iter().cloned(), &mut bodies.clone()); let file: File = tempfile::tempfile().unwrap().into(); - let mut writer = FramedWrite::new(file, BlockFileCodec); + let mut writer = FramedWrite::new(file, BlockFileCodec::default()); // rlp encode one after the other for block in raw_block_bodies { diff --git a/crates/net/p2p/src/bodies/downloader.rs b/crates/net/p2p/src/bodies/downloader.rs index f335b21438b7..7008c08e522e 100644 --- a/crates/net/p2p/src/bodies/downloader.rs +++ b/crates/net/p2p/src/bodies/downloader.rs @@ -2,7 +2,7 @@ use super::response::BlockResponse; use crate::error::DownloadResult; use alloy_primitives::BlockNumber; use futures::Stream; -use std::ops::RangeInclusive; +use std::{fmt::Debug, ops::RangeInclusive}; /// Body downloader return type. pub type BodyDownloaderResult = DownloadResult>>; @@ -16,7 +16,7 @@ pub trait BodyDownloader: Send + Sync + Stream> + Unpin { /// The type of the body that is being downloaded. - type Body: Send + Sync + Unpin + 'static; + type Body: Debug + Send + Sync + Unpin + 'static; /// Method for setting the download range. fn set_download_range(&mut self, range: RangeInclusive) -> DownloadResult<()>; diff --git a/crates/net/p2p/src/headers/client.rs b/crates/net/p2p/src/headers/client.rs index 3e8f9296e076..4be6208c4a2c 100644 --- a/crates/net/p2p/src/headers/client.rs +++ b/crates/net/p2p/src/headers/client.rs @@ -50,7 +50,8 @@ impl HeadersRequest { } /// The headers future type -pub type HeadersFut = Pin>> + Send + Sync>>; +pub type HeadersFut = + Pin>> + Send + Sync>>; /// The block headers downloader client #[auto_impl::auto_impl(&, Arc, Box)] diff --git a/crates/net/p2p/src/headers/downloader.rs b/crates/net/p2p/src/headers/downloader.rs index 03ab467bafb3..eca03bdb4e79 100644 --- a/crates/net/p2p/src/headers/downloader.rs +++ b/crates/net/p2p/src/headers/downloader.rs @@ -7,6 +7,8 @@ use futures::Stream; use reth_consensus::HeaderValidator; use reth_primitives::SealedHeader; use reth_primitives_traits::BlockWithParent; +use std::fmt::Debug; + /// A downloader capable of fetching and yielding block headers. /// /// A downloader represents a distinct strategy for submitting requests to download block headers, @@ -21,7 +23,7 @@ pub trait HeaderDownloader: + Unpin { /// The header type being downloaded. - type Header: Send + Sync + Unpin + 'static; + type Header: Debug + Send + Sync + Unpin + 'static; /// Updates the gap to sync which ranges from local head to the sync target /// diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 337e37eeedd4..400e3d844565 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -27,7 +27,7 @@ use tokio::sync::watch; pub fn build_networked_pipeline( config: &StageConfig, client: Client, - consensus: Arc, + consensus: Arc>, provider_factory: ProviderFactory, task_executor: &TaskExecutor, metrics_tx: reth_stages::MetricEventsSender, @@ -46,7 +46,7 @@ where { // building network downloaders using the fetch client let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers) - .build(client.clone(), Arc::clone(&consensus)) + .build(client.clone(), consensus.clone().as_header_validator()) .into_task_with(task_executor); let body_downloader = BodiesDownloaderBuilder::new(config.bodies) diff --git a/crates/primitives-traits/src/block/body.rs b/crates/primitives-traits/src/block/body.rs index 66c9c2d2e3a2..11c4dd785dd8 100644 --- a/crates/primitives-traits/src/block/body.rs +++ b/crates/primitives-traits/src/block/body.rs @@ -3,14 +3,13 @@ use alloc::fmt; use alloy_consensus::Transaction; -use reth_codecs::Compact; use crate::{FullSignedTx, InMemorySize, MaybeSerde}; /// Helper trait that unifies all behaviour required by transaction to support full node operations. -pub trait FullBlockBody: BlockBody + Compact {} +pub trait FullBlockBody: BlockBody {} -impl FullBlockBody for T where T: BlockBody + Compact {} +impl FullBlockBody for T where T: BlockBody {} /// Abstraction for block's body. #[auto_impl::auto_impl(&, Arc)] diff --git a/crates/primitives-traits/src/block/mod.rs b/crates/primitives-traits/src/block/mod.rs index 67658c39e07d..01ed75bd9673 100644 --- a/crates/primitives-traits/src/block/mod.rs +++ b/crates/primitives-traits/src/block/mod.rs @@ -5,14 +5,20 @@ pub mod header; use alloc::fmt; -use reth_codecs::Compact; +use alloy_rlp::{Decodable, Encodable}; -use crate::{BlockHeader, FullBlockHeader, InMemorySize, MaybeSerde}; +use crate::{BlockHeader, FullBlockBody, FullBlockHeader, InMemorySize, MaybeSerde}; /// Helper trait that unifies all behaviour required by block to support full node operations. -pub trait FullBlock: Block {} +pub trait FullBlock: + Block + Encodable + Decodable +{ +} -impl FullBlock for T where T: Block {} +impl FullBlock for T where + T: Block + Encodable + Decodable +{ +} /// Abstraction of block data type. // todo: make sealable super-trait, depends on diff --git a/crates/stages/stages/src/lib.rs b/crates/stages/stages/src/lib.rs index 38a0f209dbdd..ce6a96cf3496 100644 --- a/crates/stages/stages/src/lib.rs +++ b/crates/stages/stages/src/lib.rs @@ -37,7 +37,7 @@ //! # let consensus: Arc = Arc::new(TestConsensus::default()); //! # let headers_downloader = ReverseHeadersDownloaderBuilder::default().build( //! # Arc::new(TestHeadersClient::default()), -//! # consensus.clone() +//! # consensus.clone().as_header_validator() //! # ); //! # let provider_factory = create_test_provider_factory(); //! # let bodies_downloader = BodiesDownloaderBuilder::default().build( diff --git a/crates/stages/stages/src/sets.rs b/crates/stages/stages/src/sets.rs index a25fcd4e1e57..d04a96470a03 100644 --- a/crates/stages/stages/src/sets.rs +++ b/crates/stages/stages/src/sets.rs @@ -76,7 +76,11 @@ use tokio::sync::watch; /// - [`PruneStage`] (execute) /// - [`FinishStage`] #[derive(Debug)] -pub struct DefaultStages { +pub struct DefaultStages +where + H: HeaderDownloader, + B: BodyDownloader, +{ /// Configuration for the online stages online: OnlineStages, /// Executor factory needs for execution stage @@ -87,13 +91,17 @@ pub struct DefaultStages { prune_modes: PruneModes, } -impl DefaultStages { +impl DefaultStages +where + H: HeaderDownloader, + B: BodyDownloader, +{ /// Create a new set of default stages with default values. #[allow(clippy::too_many_arguments)] pub fn new( provider: Provider, tip: watch::Receiver, - consensus: Arc, + consensus: Arc>, header_downloader: H, body_downloader: B, executor_factory: E, @@ -122,6 +130,8 @@ impl DefaultStages { impl DefaultStages where E: BlockExecutorProvider, + H: HeaderDownloader, + B: BodyDownloader, { /// Appends the default offline stages and default finish stage to the given builder. pub fn add_offline_stages( @@ -164,13 +174,17 @@ where /// These stages *can* be run without network access if the specified downloaders are /// themselves offline. #[derive(Debug)] -pub struct OnlineStages { +pub struct OnlineStages +where + H: HeaderDownloader, + B: BodyDownloader, +{ /// Sync gap provider for the headers stage. provider: Provider, /// The tip for the headers stage. tip: watch::Receiver, /// The consensus engine used to validate incoming data. - consensus: Arc, + consensus: Arc>, /// The block header downloader header_downloader: H, /// The block body downloader @@ -179,12 +193,16 @@ pub struct OnlineStages { stages_config: StageConfig, } -impl OnlineStages { +impl OnlineStages +where + H: HeaderDownloader, + B: BodyDownloader, +{ /// Create a new set of online stages with default values. pub fn new( provider: Provider, tip: watch::Receiver, - consensus: Arc, + consensus: Arc>, header_downloader: H, body_downloader: B, stages_config: StageConfig, @@ -196,7 +214,7 @@ impl OnlineStages { impl OnlineStages where P: HeaderSyncGapProvider + 'static, - H: HeaderDownloader + 'static, + H: HeaderDownloader
+ 'static, B: BodyDownloader + 'static, { /// Create a new builder using the given headers stage. @@ -229,7 +247,7 @@ where provider, header_downloader, tip, - consensus.clone(), + consensus.clone().as_header_validator(), stages_config.etl, )) .add_stage(bodies) @@ -239,7 +257,7 @@ where impl StageSet for OnlineStages where P: HeaderSyncGapProvider + 'static, - H: HeaderDownloader + 'static, + H: HeaderDownloader
+ 'static, B: BodyDownloader + 'static, HeaderStage: Stage, BodyStage: Stage, @@ -250,7 +268,7 @@ where self.provider, self.header_downloader, self.tip, - self.consensus.clone(), + self.consensus.clone().as_header_validator(), self.stages_config.etl.clone(), )) .add_stage(BodyStage::new(self.body_downloader)) diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index 1ec55f7fd801..100fe4e979a7 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -1,7 +1,7 @@ use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256}; use futures_util::StreamExt; use reth_config::config::EtlConfig; -use reth_consensus::Consensus; +use reth_consensus::HeaderValidator; use reth_db::{tables, transaction::DbTx, RawKey, RawTable, RawValue}; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, @@ -48,7 +48,7 @@ pub struct HeaderStage { /// The tip for the stage. tip: watch::Receiver, /// Consensus client implementation - consensus: Arc, + consensus: Arc>, /// Current sync gap. sync_gap: Option, /// ETL collector with `HeaderHash` -> `BlockNumber` @@ -63,14 +63,14 @@ pub struct HeaderStage { impl HeaderStage where - Downloader: HeaderDownloader, + Downloader: HeaderDownloader
, { /// Create a new header stage pub fn new( database: Provider, downloader: Downloader, tip: watch::Receiver, - consensus: Arc, + consensus: Arc>, etl_config: EtlConfig, ) -> Self { Self {