From f75eb5e4f3d8aa1c8b0237e5ced498d35863de73 Mon Sep 17 00:00:00 2001 From: refcell Date: Fri, 19 Apr 2024 13:00:40 -0400 Subject: [PATCH] feat(derive): pipeline builder --- crates/derive/src/builder.rs | 86 +++++++++++++++++++ crates/derive/src/lib.rs | 27 ++---- crates/derive/src/stages/attributes_queue.rs | 22 +++++ crates/derive/src/stages/batch_queue.rs | 72 +++++++++++----- crates/derive/src/stages/channel_bank.rs | 2 - crates/derive/src/stages/channel_reader.rs | 4 +- crates/derive/src/stages/mod.rs | 4 +- .../src/stages/test_utils/batch_queue.rs | 4 +- 8 files changed, 174 insertions(+), 47 deletions(-) create mode 100644 crates/derive/src/builder.rs diff --git a/crates/derive/src/builder.rs b/crates/derive/src/builder.rs new file mode 100644 index 00000000..07b1f937 --- /dev/null +++ b/crates/derive/src/builder.rs @@ -0,0 +1,86 @@ +//! Contains a concrete implementation of the [DerivationPipeline]. + +use crate::{ + stages::{ + AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, + L1Retrieval, L1Traversal, NextAttributes, + }, + traits::{ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider}, + types::{L2AttributesWithParent, L2BlockInfo, RollupConfig, StageResult}, +}; +use alloc::sync::Arc; +use core::fmt::Debug; + +/// The derivation pipeline is responsible for deriving L2 inputs from L1 data. +#[derive(Debug)] +pub struct DerivationPipeline { + /// The attributes queue to retrieve the next attributes. + pub attributes: N, + /// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes. + pub cursor: L2BlockInfo, +} + +impl DerivationPipeline { + /// Creates a new instance of the [DerivationPipeline]. + pub fn new(attributes: N, cursor: L2BlockInfo) -> Self { + Self { attributes, cursor } + } + + /// Set the [L2BlockInfo] cursor to be used when pulling the next attributes. + pub fn set_cursor(&mut self, cursor: L2BlockInfo) { + self.cursor = cursor; + } + + /// Get the next attributes from the pipeline. + pub async fn next(&mut self) -> StageResult { + self.attributes.next_attributes(self.cursor).await + } +} + +impl DerivationPipeline> +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + /// Creates a new instance of the [DerivationPipeline] from the given attributes. + pub fn new_online_pipeline( + attributes: KonaAttributes, + cursor: L2BlockInfo, + ) -> Self { + Self::new(attributes, cursor) + } +} + +/// [KonaDerivationPipeline] is a concrete [DerivationPipeline] type. +pub type KonaDerivationPipeline = DerivationPipeline>; + +/// [KonaAttributes] is a concrete [NextAttributes] type. +pub type KonaAttributes = AttributesQueue< + BatchQueue>>>>, F>, + B, +>; + +/// Creates a new [KonaAttributes] instance. +pub fn new_online_pipeline( + rollup_config: Arc, + chain_provider: P, + dap_source: DAP, + fetcher: F, + builder: B, +) -> KonaAttributes +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone()); + let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); + let frame_queue = FrameQueue::new(l1_retrieval); + let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); + let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); + let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); + AttributesQueue::new(*rollup_config, batch_queue, builder) +} diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index 00c8bc34..17001815 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -6,10 +6,14 @@ extern crate alloc; -use alloc::sync::Arc; -use core::fmt::Debug; -use traits::ChainProvider; -use types::RollupConfig; +/// Prelude exports common types and traits. +pub mod prelude { + pub use super::{builder::DerivationPipeline, params::*}; + // pub use super::traits::prelude::*; + // pub use super::types::prelude::*; + // pub use super::stages::prelude::*; + // pub use super::sources::prelude::*; +} mod params; pub use params::{ @@ -19,6 +23,7 @@ pub use params::{ MAX_SPAN_BATCH_BYTES, SEQUENCER_FEE_VAULT_ADDRESS, }; +pub mod builder; pub mod sources; pub mod stages; pub mod traits; @@ -28,17 +33,3 @@ pub mod types; mod online; #[cfg(feature = "online")] pub use online::prelude::*; - -/// The derivation pipeline is responsible for deriving L2 inputs from L1 data. -#[derive(Debug, Clone, Copy)] -pub struct DerivationPipeline; - -impl DerivationPipeline { - /// Creates a new instance of the [DerivationPipeline]. - pub fn new

(_rollup_config: Arc, _chain_provider: P) -> Self - where - P: ChainProvider + Clone + Debug + Send, - { - unimplemented!("TODO: High-level pipeline composition helper.") - } -} diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index fe549fae..29eb2d1e 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -30,6 +30,14 @@ pub trait AttributesProvider { fn is_last_in_span(&self) -> bool; } +/// [NextAttributes] is a trait abstraction that generalizes the [AttributesQueue] stage. +#[async_trait] +pub trait NextAttributes { + /// Returns the next [L2AttributesWithParent] from the current batch. + async fn next_attributes(&mut self, parent: L2BlockInfo) + -> StageResult; +} + /// [AttributesQueue] accepts batches from the [BatchQueue] stage /// and transforms them into [L2PayloadAttributes]. The outputted payload /// attributes cannot be buffered because each batch->attributes transformation @@ -139,6 +147,20 @@ where } } +#[async_trait] +impl NextAttributes for AttributesQueue +where + P: AttributesProvider + OriginProvider + Debug + Send, + AB: AttributesBuilder + Debug + Send, +{ + async fn next_attributes( + &mut self, + parent: L2BlockInfo, + ) -> StageResult { + self.next_attributes(parent).await + } +} + impl OriginProvider for AttributesQueue where P: AttributesProvider + OriginProvider + Debug, diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 9a66fb20..649ed5f5 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -290,7 +290,7 @@ where // We always update the origin of this stage if it's not the same so // after the update code runs, this is consistent. let origin_behind = - self.origin.map_or(true, |origin| origin.number < parent.l1_origin.number); + self.prev.origin().map_or(true, |origin| origin.number < parent.l1_origin.number); // Advance the origin if needed. // The entire pipeline has the same origin. @@ -334,6 +334,7 @@ where } // Attempt to derive more batches. + assert!(self.l1_blocks.is_empty()); let batch = match self.derive_next_batch(out_of_data, parent).await { Ok(b) => b, Err(e) => match e { @@ -408,12 +409,17 @@ where mod tests { use super::*; use crate::{ - stages::{channel_reader::BatchReader, test_utils::MockBatchQueueProvider}, + stages::{ + channel_reader::BatchReader, + test_utils::{CollectingLayer, MockBatchQueueProvider, TraceStorage}, + }, traits::test_utils::MockBlockFetcher, - types::BatchType, + types::{BatchType, BlockID}, }; use alloc::vec; use miniz_oxide::deflate::compress_to_vec_zlib; + use tracing::Level; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; fn new_batch_reader() -> BatchReader { let raw_data = include_bytes!("../../testdata/raw_batch.hex"); @@ -448,24 +454,48 @@ mod tests { assert!(bq.is_last_in_span()); } - // TODO(refcell): The batch reader here loops forever. - // Maybe the cursor isn't being used? - // UPDATE: the batch data is not valid - // #[tokio::test] - // async fn test_next_batch_succeeds() { - // let mut reader = new_batch_reader(); - // let mut batch_vec: Vec> = vec![]; - // while let Some(batch) = reader.next_batch() { - // batch_vec.push(Ok(batch)); - // } - // let mock = MockBatchQueueProvider::new(batch_vec); - // let telemetry = TestTelemetry::new(); - // let fetcher = MockBlockFetcher::default(); - // let mut bq = BatchQueue::new(RollupConfig::default(), mock, telemetry, fetcher); - // let res = bq.next_batch(L2BlockInfo::default()).await.unwrap(); - // assert_eq!(res, SingleBatch::default()); - // assert!(bq.is_last_in_span()); - // } + #[tokio::test] + async fn test_next_batch_origin_behind() { + let mut reader = new_batch_reader(); + let cfg = Arc::new(RollupConfig::default()); + let mut batch_vec: Vec> = vec![]; + while let Some(batch) = reader.next_batch(cfg.as_ref()) { + batch_vec.push(Ok(batch)); + } + let mut mock = MockBatchQueueProvider::new(batch_vec); + mock.origin = Some(BlockInfo::default()); + let fetcher = MockBlockFetcher::default(); + let mut bq = BatchQueue::new(cfg, mock, fetcher); + let parent = L2BlockInfo { + l1_origin: BlockID { number: 10, ..Default::default() }, + ..Default::default() + }; + let res = bq.next_batch(parent).await.unwrap_err(); + assert_eq!(res, StageError::NotEnoughData); + } + + // TODO: fix + #[tokio::test] + async fn test_next_batch_succeeds() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + + let mut reader = new_batch_reader(); + let cfg = Arc::new(RollupConfig::default()); + let mut batch_vec: Vec> = vec![]; + while let Some(batch) = reader.next_batch(cfg.as_ref()) { + batch_vec.push(Ok(batch)); + } + let mut mock = MockBatchQueueProvider::new(batch_vec); + mock.origin = Some(BlockInfo::default()); + let fetcher = MockBlockFetcher::default(); + let mut bq = BatchQueue::new(cfg, mock, fetcher); + let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err(); + let logs = trace_store.get_by_level(Level::WARN); + let str = "Deriving next batch for epoch: 1"; + assert_eq!(logs[0], str); + } #[tokio::test] async fn test_batch_queue_empty_bytes() { diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 03027515..0fc10afe 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -117,8 +117,6 @@ where let first = self.channel_queue[0]; let channel = self.channels.get(&first).ok_or(StageError::ChannelNotFound)?; let origin = self.origin().ok_or(StageError::MissingOrigin)?; - - // Remove all timed out channels from the front of the `channel_queue`. if channel.open_block_number() + self.cfg.channel_timeout < origin.number { warn!("Channel {:?} timed out", first); self.channels.remove(&first); diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index e6cf7761..2beae7ef 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -122,11 +122,11 @@ impl BatchReader { } // Decompress and RLP decode the batch data, before finally decoding the batch itself. - let mut decompressed_reader = self.decompressed.as_slice(); + let mut decompressed_reader = self.decompressed.as_slice()[self.cursor..].as_ref(); let batch = Batch::decode(&mut decompressed_reader, cfg).ok()?; // Advance the cursor on the reader. - self.cursor += self.decompressed.len() - decompressed_reader.len(); + self.cursor = self.decompressed.len() - decompressed_reader.len(); Some(batch) } diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 9517d343..5b637cec 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -33,8 +33,8 @@ pub use batch_queue::{BatchQueue, BatchQueueProvider}; mod attributes_queue; pub use attributes_queue::{ - AttributesBuilder, AttributesProvider, AttributesQueue, StatefulAttributesBuilder, - SystemConfigL2Fetcher, + AttributesBuilder, AttributesProvider, AttributesQueue, NextAttributes, + StatefulAttributesBuilder, SystemConfigL2Fetcher, }; #[cfg(test)] diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index 0165b452..809863fd 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -12,9 +12,9 @@ use async_trait::async_trait; #[derive(Debug, Default)] pub struct MockBatchQueueProvider { /// The origin of the L1 block. - origin: Option, + pub origin: Option, /// A list of batches to return. - batches: Vec>, + pub batches: Vec>, } impl MockBatchQueueProvider {