Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate HeaderValidator + make FileClient generic over block #12681

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
{
// building network downloaders using the fetch client
let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
.build(client.clone(), Arc::clone(&consensus))
.build(client.clone(), consensus.clone().as_header_validator())
.into_task_with(task_executor);

let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
Expand Down
2 changes: 1 addition & 1 deletion crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ where
.with_tip_sender(tip_tx),
TestPipelineConfig::Real => {
let header_downloader = ReverseHeadersDownloaderBuilder::default()
.build(client.clone(), consensus.clone())
.build(client.clone(), consensus.clone().as_header_validator())
.into_task();

let body_downloader = BodiesDownloaderBuilder::default()
Expand Down
23 changes: 21 additions & 2 deletions crates/consensus/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

extern crate alloc;

use alloc::{fmt::Debug, vec::Vec};
use alloc::{fmt::Debug, sync::Arc, vec::Vec};
use alloy_consensus::Header;
use alloy_eips::eip7685::Requests;
use alloy_primitives::{BlockHash, BlockNumber, Bloom, B256, U256};
Expand Down Expand Up @@ -46,7 +46,9 @@ impl<'a> PostExecutionInput<'a> {

/// Consensus is a protocol that chooses canonical chain.
#[auto_impl::auto_impl(&, Arc)]
pub trait Consensus<H = Header, B = BlockBody>: HeaderValidator<H> + Debug + Send + Sync {
pub trait Consensus<H = Header, B = BlockBody>:
AsHeaderValidator<H> + HeaderValidator<H> + Debug + Send + Sync
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, isn't this AsHeaderValidator<H> bound redundant if AsHeaderValidator is impl for all HeaderValidator

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't compile without it, unsure why exactly, probably has something to do with the fact that we are invoking this on Arc impl

error[E0599]: no method named `as_header_validator` found for struct `Arc<dyn reth_consensus::Consensus>` in the current scope
   --> crates/stages/stages/src/sets.rs:250:35
    |
250 |                 consensus.clone().as_header_validator(),
    |                                   ^^^^^^^^^^^^^^^^^^^ method not found in `Arc<dyn Consensus>`
    |
   ::: /Users/klkvr/github/reth/crates/consensus/consensus/src/lib.rs:150:8
    |
150 |     fn as_header_validator<'a>(self: Arc<Self>) -> Arc<dyn HeaderValidator<H> + 'a>
    |        -------------------
    |        |
    |        the method is available for `Arc<Arc<dyn reth_consensus::Consensus>>` here
    |        the method is available for `Arc<&Arc<dyn reth_consensus::Consensus>>` here
    |
help: consider wrapping the receiver expression with the appropriate type
    |
250 |                 Arc::new(consensus.clone()).as_header_validator(),
    |                 +++++++++                 +
help: consider wrapping the receiver expression with the appropriate type
    |
250 |                 Arc::new(&consensus.clone()).as_header_validator(),

{
/// Ensures that body field values match the header.
fn validate_body_against_header(
&self,
Expand Down Expand Up @@ -143,6 +145,23 @@ pub trait HeaderValidator<H = Header>: Debug + Send + Sync {
) -> Result<(), ConsensusError>;
}

/// Helper trait to cast `Arc<dyn Consensus>` to `Arc<dyn HeaderValidator>`
pub trait AsHeaderValidator<H>: HeaderValidator<H> {
/// Converts the [`Arc`] of self to [`Arc`] of [`HeaderValidator`]
fn as_header_validator<'a>(self: Arc<Self>) -> Arc<dyn HeaderValidator<H> + 'a>
where
Self: 'a;
}

impl<T: HeaderValidator<H>, H> AsHeaderValidator<H> for T {
fn as_header_validator<'a>(self: Arc<Self>) -> Arc<dyn HeaderValidator<H> + 'a>
where
Self: 'a,
{
self
}
}

/// Consensus Errors
#[derive(Debug, PartialEq, Eq, Clone, derive_more::Display, derive_more::Error)]
pub enum ConsensusError {
Expand Down
3 changes: 2 additions & 1 deletion crates/net/downloaders/src/bodies/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
cmp::Ordering,
collections::BinaryHeap,
fmt::Debug,
mem,
ops::RangeInclusive,
pin::Pin,
Expand Down Expand Up @@ -298,7 +299,7 @@ where

impl<B, Provider> BodyDownloader for BodiesDownloader<B, Provider>
where
B: BodiesClient<Body: InMemorySize> + 'static,
B: BodiesClient<Body: Debug + InMemorySize> + 'static,
Provider: HeaderProvider + Unpin + 'static,
{
type Body = B::Body;
Expand Down
7 changes: 4 additions & 3 deletions crates/net/downloaders/src/bodies/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use reth_network_p2p::{
};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use std::{
fmt::Debug,
future::Future,
ops::RangeInclusive,
pin::Pin,
Expand Down Expand Up @@ -47,10 +48,10 @@ impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
/// use reth_network_p2p::bodies::client::BodiesClient;
/// use reth_primitives_traits::InMemorySize;
/// use reth_storage_api::HeaderProvider;
/// use std::sync::Arc;
/// use std::{fmt::Debug, sync::Arc};
///
/// fn t<
/// B: BodiesClient<Body: InMemorySize> + 'static,
/// B: BodiesClient<Body: Debug + InMemorySize> + 'static,
/// Provider: HeaderProvider + Unpin + 'static,
/// >(
/// client: Arc<B>,
Expand Down Expand Up @@ -90,7 +91,7 @@ impl<B: Send + Sync + Unpin + 'static> TaskDownloader<B> {
}
}

impl<B: Send + Sync + Unpin + 'static> BodyDownloader for TaskDownloader<B> {
impl<B: Debug + Send + Sync + Unpin + 'static> BodyDownloader for TaskDownloader<B> {
type Body = B;

fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
Expand Down
76 changes: 39 additions & 37 deletions crates/net/downloaders/src/file_client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{collections::HashMap, io, path::Path};

use alloy_consensus::Header;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockHash, BlockNumber, B256};
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
use futures::Future;
use itertools::Either;
use reth_network_p2p::{
Expand All @@ -13,7 +13,8 @@ use reth_network_p2p::{
priority::Priority,
};
use reth_network_peers::PeerId;
use reth_primitives::{BlockBody, SealedHeader};
use reth_primitives::SealedHeader;
use reth_primitives_traits::{Block, BlockBody, FullBlock};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
Expand All @@ -40,15 +41,15 @@ pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
///
/// This reads the entire file into memory, so it is not suitable for large files.
#[derive(Debug)]
pub struct FileClient {
pub struct FileClient<B: Block = reth_primitives::Block> {
/// The buffered headers retrieved when fetching new bodies.
headers: HashMap<BlockNumber, Header>,
headers: HashMap<BlockNumber, B::Header>,

/// A mapping between block hash and number.
hash_to_number: HashMap<BlockHash, BlockNumber>,

/// The buffered bodies retrieved when fetching new headers.
bodies: HashMap<BlockHash, BlockBody>,
bodies: HashMap<BlockHash, B::Body>,
}

/// An error that can occur when constructing and using a [`FileClient`].
Expand All @@ -73,7 +74,7 @@ impl From<&'static str> for FileClientError {
}
}

impl FileClient {
impl<B: FullBlock> FileClient<B> {
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
Expand Down Expand Up @@ -114,7 +115,7 @@ impl FileClient {

/// Clones and returns the highest header of this client has or `None` if empty. Seals header
/// before returning.
pub fn tip_header(&self) -> Option<SealedHeader> {
pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal(h.clone()))
}

Expand All @@ -137,13 +138,13 @@ impl FileClient {
}

/// Use the provided bodies as the file client's block body buffer.
pub fn with_bodies(mut self, bodies: HashMap<BlockHash, BlockBody>) -> Self {
pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
self.bodies = bodies;
self
}

/// Use the provided headers as the file client's block body buffer.
pub fn with_headers(mut self, headers: HashMap<BlockNumber, Header>) -> Self {
pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
self.headers = headers;
for (number, header) in &self.headers {
self.hash_to_number.insert(header.hash_slow(), *number);
Expand All @@ -162,42 +163,43 @@ impl FileClient {
}

/// Returns an iterator over headers in the client.
pub fn headers_iter(&self) -> impl Iterator<Item = &Header> {
pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
self.headers.values()
}

/// Returns a mutable iterator over bodies in the client.
///
/// Panics, if file client headers and bodies are not mapping 1-1.
pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut BlockBody)> {
pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
let bodies = &mut self.bodies;
let numbers = &self.hash_to_number;
bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
}

/// Returns the current number of transactions in the client.
pub fn total_transactions(&self) -> usize {
self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions.len())
self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
}
}

impl FromReader for FileClient {
impl<B: FullBlock> FromReader for FileClient<B> {
type Error = FileClientError;

/// Initialize the [`FileClient`] from bytes that have been read from file.
fn from_reader<B>(
reader: B,
fn from_reader<R>(
reader: R,
num_bytes: u64,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
B: AsyncReadExt + Unpin,
R: AsyncReadExt + Unpin,
{
let mut headers = HashMap::default();
let mut hash_to_number = HashMap::default();
let mut bodies = HashMap::default();

// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, BlockFileCodec, num_bytes as usize);
let mut stream =
FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);

trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
Expand Down Expand Up @@ -225,13 +227,13 @@ impl FromReader for FileClient {
}
Err(err) => return Err(err),
};
let block_number = block.header.number;
let block_hash = block.header.hash_slow();
let block_number = block.header().number();
let block_hash = block.header().hash_slow();

// add to the internal maps
headers.insert(block.header.number, block.header.clone());
hash_to_number.insert(block_hash, block.header.number);
bodies.insert(block_hash, block.into());
headers.insert(block.header().number(), block.header().clone());
hash_to_number.insert(block_hash, block.header().number());
bodies.insert(block_hash, block.body().clone());

if log_interval == 0 {
trace!(target: "downloaders::file",
Expand Down Expand Up @@ -260,9 +262,9 @@ impl FromReader for FileClient {
}
}

impl HeadersClient for FileClient {
type Header = Header;
type Output = HeadersFut;
impl<B: FullBlock> HeadersClient for FileClient<B> {
type Header = B::Header;
type Output = HeadersFut<B::Header>;

fn get_headers_with_priority(
&self,
Expand Down Expand Up @@ -311,9 +313,9 @@ impl HeadersClient for FileClient {
}
}

impl BodiesClient for FileClient {
type Body = BlockBody;
type Output = BodiesFut;
impl<B: FullBlock> BodiesClient for FileClient<B> {
type Body = B::Body;
type Output = BodiesFut<B::Body>;

fn get_block_bodies_with_priority(
&self,
Expand All @@ -336,7 +338,7 @@ impl BodiesClient for FileClient {
}
}

impl DownloadClient for FileClient {
impl<B: FullBlock> DownloadClient for FileClient<B> {
fn report_bad_message(&self, _peer_id: PeerId) {
warn!("Reported a bad message on a file client, the file may be corrupted or invalid");
// noop
Expand Down Expand Up @@ -542,7 +544,7 @@ mod tests {
// create an empty file
let file = tempfile::tempfile().unwrap();

let client =
let client: Arc<FileClient> =
Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Expand All @@ -567,14 +569,14 @@ mod tests {
let p0 = child_header(&p1);

let file = tempfile::tempfile().unwrap();
let client = Arc::new(FileClient::from_file(file.into()).await.unwrap().with_headers(
HashMap::from([
let client: Arc<FileClient> = Arc::new(
FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
(0u64, p0.clone().unseal()),
(1, p1.clone().unseal()),
(2, p2.clone().unseal()),
(3, p3.clone().unseal()),
]),
));
])),
);

let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(3)
Expand All @@ -596,7 +598,7 @@ mod tests {
// Generate some random blocks
let (file, headers, _) = generate_bodies_file(0..=19).await;
// now try to read them back
let client = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());

// construct headers downloader and use first header
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
Expand All @@ -621,7 +623,7 @@ mod tests {
let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;

// now try to read them back
let client = Arc::new(FileClient::from_file(file).await.unwrap());
let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());

// insert headers in db for the bodies downloader
insert_headers(factory.db_ref().db(), &headers);
Expand Down
20 changes: 12 additions & 8 deletions crates/net/downloaders/src/file_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::file_client::FileClientError;
use alloy_primitives::bytes::{Buf, BytesMut};
use alloy_rlp::{Decodable, Encodable};
use reth_primitives::Block;
use tokio_util::codec::{Decoder, Encoder};

/// Codec for reading raw block bodies from a file.
Expand All @@ -19,10 +18,16 @@ use tokio_util::codec::{Decoder, Encoder};
///
/// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set
/// the capacity of the framed reader to the size of the file.
pub(crate) struct BlockFileCodec;
pub(crate) struct BlockFileCodec<B>(std::marker::PhantomData<B>);

impl Decoder for BlockFileCodec {
type Item = Block;
impl<B> Default for BlockFileCodec<B> {
fn default() -> Self {
Self(std::marker::PhantomData)
}
}

impl<B: Decodable> Decoder for BlockFileCodec<B> {
type Item = B;
type Error = FileClientError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand All @@ -31,18 +36,17 @@ impl Decoder for BlockFileCodec {
}

let buf_slice = &mut src.as_ref();
let body =
Block::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?;
let body = B::decode(buf_slice).map_err(|err| FileClientError::Rlp(err, src.to_vec()))?;
src.advance(src.len() - buf_slice.len());

Ok(Some(body))
}
}

impl Encoder<Block> for BlockFileCodec {
impl<B: Encodable> Encoder<B> for BlockFileCodec<B> {
type Error = FileClientError;

fn encode(&mut self, item: Block, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: B, dst: &mut BytesMut) -> Result<(), Self::Error> {
item.encode(dst);
Ok(())
}
Expand Down
Loading
Loading