diff --git a/crates/derive/src/builder.rs b/crates/derive/src/builder.rs index 07b1f937..943af642 100644 --- a/crates/derive/src/builder.rs +++ b/crates/derive/src/builder.rs @@ -1,29 +1,51 @@ //! Contains a concrete implementation of the [DerivationPipeline]. use crate::{ - stages::{ - AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, - L1Retrieval, L1Traversal, NextAttributes, + stages::NextAttributes, + traits::ResettableStage, + types::{ + BlockInfo, L2AttributesWithParent, L2BlockInfo, StageError, StageResult, SystemConfig, }, - traits::{ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider}, - types::{L2AttributesWithParent, L2BlockInfo, RollupConfig, StageResult}, }; -use alloc::sync::Arc; +use alloc::{boxed::Box, collections::VecDeque}; +use async_trait::async_trait; use core::fmt::Debug; +/// Provides the [BlockInfo] and [SystemConfig] for the stack to reset the stages. +#[async_trait] +pub trait ResetProvider { + /// Returns the current [BlockInfo] for the pipeline to reset. + async fn block_info(&self) -> BlockInfo; + + /// Returns the current [SystemConfig] for the pipeline to reset. + async fn system_config(&self) -> SystemConfig; +} + /// The derivation pipeline is responsible for deriving L2 inputs from L1 data. #[derive(Debug)] -pub struct DerivationPipeline { - /// The attributes queue to retrieve the next attributes. - pub attributes: N, +pub struct DerivationPipeline< + S: NextAttributes + ResettableStage + Debug + Send, + R: ResetProvider + Send, +> { + /// The stack of stages in the pipeline. + /// The stack is reponsible for advancing the L1 traversal stage. + pub stack: S, + /// Reset provider for the pipeline. + pub reset: R, + /// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer. + pub prepared: VecDeque, + /// A flag to tell the pipeline to reset. + pub needs_reset: bool, /// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes. pub cursor: L2BlockInfo, } -impl DerivationPipeline { +impl + DerivationPipeline +{ /// Creates a new instance of the [DerivationPipeline]. - pub fn new(attributes: N, cursor: L2BlockInfo) -> Self { - Self { attributes, cursor } + pub fn new(stack: S, reset: R, cursor: L2BlockInfo) -> Self { + Self { stack, prepared: VecDeque::new(), reset, needs_reset: false, cursor } } /// Set the [L2BlockInfo] cursor to be used when pulling the next attributes. @@ -31,56 +53,51 @@ impl DerivationPipeline { self.cursor = cursor; } - /// Get the next attributes from the pipeline. - pub async fn next(&mut self) -> StageResult { - self.attributes.next_attributes(self.cursor).await + /// Returns the next [L2AttributesWithParent] from the pipeline. + pub fn next_attributes(&mut self) -> Option { + self.prepared.pop_front() } -} -impl DerivationPipeline> -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Creates a new instance of the [DerivationPipeline] from the given attributes. - pub fn new_online_pipeline( - attributes: KonaAttributes, - cursor: L2BlockInfo, - ) -> Self { - Self::new(attributes, cursor) + /// Flags the pipeline to reset on the next [DerivationPipeline::step] call. + pub fn reset(&mut self) { + self.needs_reset = true; } -} -/// [KonaDerivationPipeline] is a concrete [DerivationPipeline] type. -pub type KonaDerivationPipeline = DerivationPipeline>; + /// Attempts to progress the pipeline. + /// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data. + /// Any other error is critical and the derivation pipeline should be reset. + /// An error is expected when the underlying source closes. + /// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the + /// derivation process. + pub async fn step(&mut self) -> StageResult<()> { + tracing::info!("DerivationPipeline::step"); -/// [KonaAttributes] is a concrete [NextAttributes] type. -pub type KonaAttributes = AttributesQueue< - BatchQueue>>>>, F>, - B, ->; + // Reset the pipeline if needed. + if self.needs_reset { + let block_info = self.reset.block_info().await; + let system_config = self.reset.system_config().await; + self.stack.reset(block_info, &system_config).await?; + self.needs_reset = false; + } -/// Creates a new [KonaAttributes] instance. -pub fn new_online_pipeline( - rollup_config: Arc, - chain_provider: P, - dap_source: DAP, - fetcher: F, - builder: B, -) -> KonaAttributes -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone()); - let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); - let frame_queue = FrameQueue::new(l1_retrieval); - let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); - let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); - let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); - AttributesQueue::new(*rollup_config, batch_queue, builder) + // Step over the engine queue. + match self.stack.next_attributes(self.cursor).await { + Ok(a) => { + tracing::info!("attributes queue stage step returned l2 attributes"); + tracing::info!("prepared L2 attributes: {:?}", a); + self.prepared.push_back(a); + return Ok(()); + } + Err(StageError::Eof) => { + tracing::info!("attributes queue stage complete"); + } + // TODO: match on the EngineELSyncing error here and log + Err(err) => { + tracing::error!("attributes queue stage failed: {:?}", err); + return Err(err); + } + } + + Ok(()) + } } diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index 17001815..cb6b6b29 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -29,6 +29,11 @@ pub mod stages; pub mod traits; pub mod types; +#[cfg(feature = "online")] +mod stack; +#[cfg(feature = "online")] +pub use stack::*; + #[cfg(feature = "online")] mod online; #[cfg(feature = "online")] diff --git a/crates/derive/src/stack.rs b/crates/derive/src/stack.rs new file mode 100644 index 00000000..3e15614e --- /dev/null +++ b/crates/derive/src/stack.rs @@ -0,0 +1,128 @@ +//! Contains a stack of Stages for the [crate::DerivationPipeline]. + +use crate::{ + stages::{ + AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, + L1Retrieval, L1Traversal, NextAttributes, + }, + traits::{ + ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider, ResettableStage, + }, + types::{ + BlockInfo, L2AttributesWithParent, L2BlockInfo, RollupConfig, StageError, StageResult, + SystemConfig, + }, +}; +use alloc::{boxed::Box, sync::Arc}; +use async_trait::async_trait; +use core::fmt::Debug; +use spin::Mutex; + +/// The [AttributesQueue] type alias. +pub type AttributesQueueType = AttributesQueue< + BatchQueue>>>>, F>, + B, +>; + +/// An online stack of stages. +#[derive(Debug)] +pub struct OnlineStageStack +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + /// Flag to tell the L1Traversal stage to advance to the next L1 block. + pub advance: Arc>, + /// The [AttributesQueue] stage. + pub attributes: AttributesQueueType, +} + +impl OnlineStageStack +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + /// Creates a new [OnlineStageStack]. + pub fn new( + rollup_config: Arc, + chain_provider: P, + dap_source: DAP, + fetcher: F, + builder: B, + ) -> Self { + let advance = Arc::new(Mutex::new(false)); + let l1_traversal = + L1Traversal::new(chain_provider, Arc::clone(&advance), rollup_config.clone()); + let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); + let frame_queue = FrameQueue::new(l1_retrieval); + let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); + let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); + let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); + let attributes = AttributesQueue::new(*rollup_config, batch_queue, builder); + Self { advance, attributes } + } +} + +#[async_trait] +impl NextAttributes for OnlineStageStack +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + /// Advances the pipeline to the next attributes. + async fn next_attributes( + &mut self, + parent: L2BlockInfo, + ) -> StageResult { + match self.attributes.next_attributes(parent).await { + Ok(a) => { + tracing::info!("attributes queue stage step returned l2 attributes"); + tracing::info!("prepared L2 attributes: {:?}", a); + return Ok(a); + } + Err(StageError::Eof) => { + tracing::info!("attributes queue stage complete"); + let mut advance = self.advance.lock(); + *advance = true; + return Err(StageError::Eof); + } + // TODO: match on the EngineELSyncing error here and log + Err(err) => { + tracing::error!("attributes queue stage failed: {:?}", err); + return Err(err); + } + } + } +} + +#[async_trait] +impl ResettableStage for OnlineStageStack +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + /// Resets all stages in the stack. + async fn reset(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> { + match self.attributes.reset(bi, sc).await { + Ok(()) => { + tracing::info!("Stages reset"); + } + Err(StageError::Eof) => { + tracing::info!("Stages reset with EOF"); + } + Err(err) => { + tracing::error!("Stages reset failed: {:?}", err); + return Err(err); + } + } + Ok(()) + } +} diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index ac83943e..d0b2823f 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -1,7 +1,7 @@ //! Contains the logic for the `AttributesQueue` stage. use crate::{ - traits::{OriginProvider, ResettableStage}, + traits::{OriginProvider, PreviousStage, ResettableStage}, types::{ BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -53,7 +53,7 @@ pub trait NextAttributes { #[derive(Debug)] pub struct AttributesQueue where - P: AttributesProvider + OriginProvider + Debug, + P: AttributesProvider + ResettableStage + PreviousStage + OriginProvider + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. @@ -70,7 +70,7 @@ where impl AttributesQueue where - P: AttributesProvider + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. @@ -147,10 +147,22 @@ where } } +impl PreviousStage for AttributesQueue +where + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + AB: AttributesBuilder + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&P> { + Some(&self.prev) + } +} + #[async_trait] impl NextAttributes for AttributesQueue where - P: AttributesProvider + OriginProvider + Debug + Send, + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send, AB: AttributesBuilder + Debug + Send, { async fn next_attributes( @@ -163,7 +175,7 @@ where impl OriginProvider for AttributesQueue where - P: AttributesProvider + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, AB: AttributesBuilder + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -174,10 +186,15 @@ where #[async_trait] impl ResettableStage for AttributesQueue where - P: AttributesProvider + OriginProvider + Send + Debug, + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, AB: AttributesBuilder + Send + Debug, { - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset( + &mut self, + block_info: BlockInfo, + system_config: &SystemConfig, + ) -> StageResult<()> { + self.prev.reset(block_info, system_config).await?; info!("resetting attributes queue"); self.batch = None; self.is_last_in_span = false; diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 194c90c9..7d66c5ad 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::attributes_queue::AttributesProvider, - traits::{L2ChainProvider, OriginProvider, ResettableStage}, + traits::{L2ChainProvider, OriginProvider, PreviousStage, ResettableStage}, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -43,7 +43,7 @@ pub trait BatchQueueProvider { #[derive(Debug)] pub struct BatchQueue where - P: BatchQueueProvider + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, BF: L2ChainProvider + Debug, { /// The rollup config. @@ -75,7 +75,7 @@ where impl BatchQueue where - P: BatchQueueProvider + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, BF: L2ChainProvider + Debug, { /// Creates a new [BatchQueue] stage. @@ -245,7 +245,7 @@ where #[async_trait] impl AttributesProvider for BatchQueue where - P: BatchQueueProvider + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, BF: L2ChainProvider + Send + Debug, { /// Returns the next valid batch upon the given safe head. @@ -374,7 +374,7 @@ where impl OriginProvider for BatchQueue where - P: BatchQueueProvider + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, BF: L2ChainProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -382,13 +382,26 @@ where } } +impl PreviousStage for BatchQueue +where + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + BF: L2ChainProvider + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + #[async_trait] impl ResettableStage for BatchQueue where - P: BatchQueueProvider + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, BF: L2ChainProvider + Send + Debug, { - async fn reset(&mut self, base: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset(&mut self, base: BlockInfo, system_config: &SystemConfig) -> StageResult<()> { + self.prev.reset(base, system_config).await?; // Copy over the Origin from the next stage. // It is set in the engine queue (two stages away) // such that the L2 Safe Head origin is the progress. @@ -602,6 +615,7 @@ mod tests { let fetcher = MockBlockFetcher { blocks: vec![block_nine, block_seven], payloads: vec![payload, second], + ..Default::default() }; let mut bq = BatchQueue::new(cfg, mock, fetcher); let parent = L2BlockInfo { @@ -616,15 +630,18 @@ mod tests { }; let res = bq.next_batch(parent).await.unwrap_err(); let logs = trace_store.get_by_level(Level::INFO); - assert_eq!(logs.len(), 5); + assert_eq!(logs.len(), 4); let str = alloc::format!("Advancing batch queue origin: {:?}", origin); assert!(logs[0].contains(&str)); - assert!(logs[1].contains("Deriving next batch for epoch: 16988980031808077784")); - assert!(logs[2].contains("Next batch found:")); + assert!(logs[1].contains("need more l1 blocks to check entire origins of span batch")); + assert!(logs[2].contains("Deriving next batch for epoch: 16988980031808077784")); + assert!(logs[3].contains("need more l1 blocks to check entire origins of span batch")); + // assert!(logs[4].contains("Next batch found:")); let warns = trace_store.get_by_level(Level::WARN); assert_eq!(warns.len(), 0); - let str = "Could not get singular batches from span batch: Missing L1 origin"; - assert_eq!(res, StageError::Custom(anyhow::anyhow!(str))); + assert_eq!(res, StageError::NotEnoughData); + // let str = "Could not get singular batches from span batch: Missing L1 origin"; + // assert_eq!(res, StageError::Custom(anyhow::anyhow!(str))); } #[tokio::test] diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 0fc10afe..04378b91 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -3,7 +3,7 @@ use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, stages::ChannelReaderProvider, - traits::{OriginProvider, ResettableStage}, + traits::{OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; @@ -37,7 +37,7 @@ pub trait ChannelBankProvider { #[derive(Debug)] pub struct ChannelBank

where - P: ChannelBankProvider + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// The rollup configuration. cfg: Arc, @@ -51,7 +51,7 @@ where impl

ChannelBank

where - P: ChannelBankProvider + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// Create a new [ChannelBank] stage. pub fn new(cfg: Arc, prev: P) -> Self { @@ -166,7 +166,7 @@ where #[async_trait] impl

ChannelReaderProvider for ChannelBank

where - P: ChannelBankProvider + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { async fn next_data(&mut self) -> StageResult> { match self.read() { @@ -188,19 +188,35 @@ where impl

OriginProvider for ChannelBank

where - P: ChannelBankProvider + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +impl

PreviousStage for ChannelBank

+where + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + #[async_trait] impl

ResettableStage for ChannelBank

where - P: ChannelBankProvider + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset( + &mut self, + block_info: BlockInfo, + system_config: &SystemConfig, + ) -> StageResult<()> { + self.prev.reset(block_info, system_config).await?; self.channels.clear(); self.channel_queue = VecDeque::with_capacity(10); Err(StageError::Eof) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 2beae7ef..6efcae9e 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -2,8 +2,8 @@ use crate::{ stages::BatchQueueProvider, - traits::OriginProvider, - types::{Batch, BlockInfo, RollupConfig, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{Batch, BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; @@ -27,7 +27,7 @@ pub trait ChannelReaderProvider { #[derive(Debug)] pub struct ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// The previous stage of the derivation pipeline. prev: P, @@ -39,7 +39,7 @@ where impl

ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// Create a new [ChannelReader] stage. pub fn new(prev: P, cfg: Arc) -> Self { @@ -65,7 +65,7 @@ where #[async_trait] impl

BatchQueueProvider for ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Send + Debug, + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { @@ -91,13 +91,36 @@ where impl

OriginProvider for ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +#[async_trait] +impl

ResettableStage for ChannelReader

+where + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send, +{ + async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { + self.prev.reset(base, cfg).await?; + self.next_channel(); + Ok(()) + } +} + +impl

PreviousStage for ChannelReader

+where + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + /// Batch Reader provides a function that iteratively consumes batches from the reader. /// The L1Inclusion block is also provided at creation time. /// Warning: the batch reader can read every batch-type. diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index be7ac6bc..4d1f42ea 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::ChannelBankProvider, - traits::{OriginProvider, ResettableStage}, + traits::{OriginProvider, PreviousStage, ResettableStage}, types::{into_frames, BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque}; @@ -31,7 +31,7 @@ pub trait FrameQueueProvider { #[derive(Debug)] pub struct FrameQueue

where - P: FrameQueueProvider + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// The previous stage in the pipeline. pub prev: P, @@ -41,7 +41,7 @@ where impl

FrameQueue

where - P: FrameQueueProvider + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. /// @@ -54,7 +54,7 @@ where #[async_trait] impl

ChannelBankProvider for FrameQueue

where - P: FrameQueueProvider + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { async fn next_frame(&mut self) -> StageResult { if self.queue.is_empty() { @@ -87,19 +87,35 @@ where impl

OriginProvider for FrameQueue

where - P: FrameQueueProvider + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +impl

PreviousStage for FrameQueue

+where + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + #[async_trait] impl

ResettableStage for FrameQueue

where - P: FrameQueueProvider + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset( + &mut self, + block_info: BlockInfo, + system_config: &SystemConfig, + ) -> StageResult<()> { + self.prev.reset(block_info, system_config).await?; self.queue = VecDeque::default(); Err(StageError::Eof) } diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 0986a94a..74ab4684 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -2,7 +2,9 @@ use crate::{ stages::FrameQueueProvider, - traits::{AsyncIterator, DataAvailabilityProvider, OriginProvider, ResettableStage}, + traits::{ + AsyncIterator, DataAvailabilityProvider, OriginProvider, PreviousStage, ResettableStage, + }, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::boxed::Box; @@ -12,6 +14,7 @@ use async_trait::async_trait; /// Provides L1 blocks for the [L1Retrieval] stage. /// This is the previous stage in the pipeline. +#[async_trait] pub trait L1RetrievalProvider { /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. /// This function can only be called once while the stage is in progress, and will return @@ -19,7 +22,7 @@ pub trait L1RetrievalProvider { /// complete and the [BlockInfo] has been consumed, an [StageError::Eof] error is returned. /// /// [L1Traversal]: crate::stages::L1Traversal - fn next_l1_block(&mut self) -> StageResult>; + async fn next_l1_block(&mut self) -> StageResult>; /// Returns the batcher [Address] from the [crate::types::SystemConfig]. fn batcher_addr(&self) -> Address; @@ -36,7 +39,7 @@ pub trait L1RetrievalProvider { pub struct L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginProvider, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, { /// The previous stage in the pipeline. pub prev: P, @@ -49,7 +52,7 @@ where impl L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginProvider, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, { /// Creates a new [L1Retrieval] stage with the previous [L1Traversal] stage and given /// [DataAvailabilityProvider]. @@ -64,7 +67,7 @@ where impl FrameQueueProvider for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider + Send, { type Item = DAP::Item; @@ -72,7 +75,8 @@ where if self.data.is_none() { let next = self .prev - .next_l1_block()? // SAFETY: This question mark bubbles up the Eof error. + .next_l1_block() + .await? // SAFETY: This question mark bubbles up the Eof error. .ok_or_else(|| anyhow!("No block to retrieve data from"))?; self.data = Some(self.provider.open_data(&next, self.prev.batcher_addr()).await?); } @@ -92,20 +96,33 @@ where impl OriginProvider for L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginProvider, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +impl PreviousStage for L1Retrieval +where + DAP: DataAvailabilityProvider, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + #[async_trait] impl ResettableStage for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider + Send, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { + self.prev.reset(base, cfg).await?; self.data = Some(self.provider.open_data(&base, cfg.batcher_addr).await?); Ok(()) } diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 6403291f..80de5606 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -2,14 +2,22 @@ use crate::{ stages::L1RetrievalProvider, - traits::{ChainProvider, OriginProvider, ResettableStage}, + traits::{ChainProvider, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::Address; use async_trait::async_trait; +use spin::Mutex; use tracing::warn; +/// Defines a trait for advancing the L1 block in the [L1Traversal] stage. +#[async_trait] +pub trait L1BlockAdvance { + /// Advances the internal state of the [L1Traversal] stage to the next L1 block. + async fn advance_l1_block(&mut self) -> StageResult<()>; +} + /// The [L1Traversal] stage of the derivation pipeline. /// /// This stage sits at the bottom of the pipeline, holding a handle to the data source @@ -25,18 +33,31 @@ pub struct L1Traversal { data_source: Provider, /// Signals whether or not the traversal stage is complete. done: bool, + /// Marks if the L1Traversal stage should attempt to advance to the next block. + pub advance: Arc>, /// The system config. pub system_config: SystemConfig, /// A reference to the rollup config. pub rollup_config: Arc, } -impl L1RetrievalProvider for L1Traversal { +#[async_trait] +impl L1RetrievalProvider for L1Traversal { fn batcher_addr(&self) -> Address { self.system_config.batcher_addr } - fn next_l1_block(&mut self) -> StageResult> { + async fn next_l1_block(&mut self) -> StageResult> { + // let advance = match { + // Ok(advance) => advance, + // Err(_) => return Err(StageError::Custom(anyhow::anyhow!("Failed to lock advance + // mutex"))), }; + let mut advance = self.advance.lock(); + if *advance { + *advance = false; + drop(advance); + self.advance_l1_block().await?; + } if !self.done { self.done = true; Ok(self.block) @@ -48,11 +69,12 @@ impl L1RetrievalProvider for L1Traversal { impl L1Traversal { /// Creates a new [L1Traversal] instance. - pub fn new(data_source: F, cfg: Arc) -> Self { + pub fn new(data_source: F, advance: Arc>, cfg: Arc) -> Self { Self { block: Some(BlockInfo::default()), data_source, done: false, + advance, system_config: SystemConfig::default(), rollup_config: cfg, } @@ -62,11 +84,14 @@ impl L1Traversal { pub fn data_source(&self) -> &F { &self.data_source } +} +#[async_trait] +impl L1BlockAdvance for L1Traversal { /// Advances the internal state of the [L1Traversal] stage to the next L1 block. /// This function fetches the next L1 [BlockInfo] from the data source and updates the /// [SystemConfig] with the receipts from the block. - pub async fn advance_l1_block(&mut self) -> StageResult<()> { + async fn advance_l1_block(&mut self) -> StageResult<()> { // Pull the next block or return EOF. // StageError::EOF has special handling further up the pipeline. let block = match self.block { @@ -112,6 +137,14 @@ impl OriginProvider for L1Traversal { } } +impl PreviousStage for L1Traversal { + type Previous = L1Traversal; + + fn previous(&self) -> Option<&Self::Previous> { + None + } +} + #[async_trait] impl ResettableStage for L1Traversal { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { @@ -179,7 +212,7 @@ pub(crate) mod tests { let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default(); provider.insert_receipts(hash, vec![receipt.clone()]); } - L1Traversal::new(provider, Arc::new(rollup_config)) + L1Traversal::new(provider, Arc::new(Mutex::new(false)), Arc::new(rollup_config)) } pub(crate) fn new_populated_test_traversal() -> L1Traversal { @@ -193,8 +226,8 @@ pub(crate) mod tests { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); assert!(traversal.advance_l1_block().await.is_ok()); } @@ -202,8 +235,8 @@ pub(crate) mod tests { async fn test_l1_traversal_missing_receipts() { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let mut traversal = new_test_traversal(blocks, vec![]); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::ReceiptFetch(_)); } @@ -222,8 +255,8 @@ pub(crate) mod tests { #[tokio::test] async fn test_l1_traversal_missing_blocks() { let mut traversal = new_test_traversal(vec![], vec![]); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::BlockInfoFetch(_)); } @@ -248,8 +281,8 @@ pub(crate) mod tests { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); assert!(traversal.advance_l1_block().await.is_ok()); let expected = address!("000000000000000000000000000000000000bEEF"); assert_eq!(traversal.system_config.batcher_addr, expected); diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 2034cf5d..355793fa 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -14,7 +14,7 @@ //! 8. (Omitted) Engine Queue mod l1_traversal; -pub use l1_traversal::L1Traversal; +pub use l1_traversal::{L1BlockAdvance, L1Traversal}; mod l1_retrieval; pub use l1_retrieval::{L1Retrieval, L1RetrievalProvider}; diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs index a630b912..591caad2 100644 --- a/crates/derive/src/stages/test_utils/attributes_queue.rs +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -2,10 +2,10 @@ use crate::{ stages::attributes_queue::{AttributesBuilder, AttributesProvider}, - traits::OriginProvider, + traits::{OriginProvider, PreviousStage, ResettableStage}, types::{ BlockID, BlockInfo, BuilderError, L2BlockInfo, L2PayloadAttributes, SingleBatch, - StageError, StageResult, + StageError, StageResult, SystemConfig, }, }; use alloc::{boxed::Box, vec::Vec}; @@ -49,6 +49,21 @@ impl OriginProvider for MockAttributesProvider { } } +#[async_trait] +impl ResettableStage for MockAttributesProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockAttributesProvider { + type Previous = MockAttributesProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} + #[async_trait] impl AttributesProvider for MockAttributesProvider { async fn next_batch(&mut self, _parent: L2BlockInfo) -> StageResult { diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index 809863fd..abae762f 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -2,8 +2,8 @@ use crate::{ stages::batch_queue::BatchQueueProvider, - traits::OriginProvider, - types::{Batch, BlockInfo, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{Batch, BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; @@ -36,3 +36,18 @@ impl BatchQueueProvider for MockBatchQueueProvider { self.batches.pop().ok_or(StageError::Eof)? } } + +#[async_trait] +impl ResettableStage for MockBatchQueueProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockBatchQueueProvider { + type Previous = MockBatchQueueProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} diff --git a/crates/derive/src/stages/test_utils/channel_bank.rs b/crates/derive/src/stages/test_utils/channel_bank.rs index 62db2933..9e540038 100644 --- a/crates/derive/src/stages/test_utils/channel_bank.rs +++ b/crates/derive/src/stages/test_utils/channel_bank.rs @@ -2,14 +2,14 @@ use crate::{ stages::ChannelBankProvider, - traits::OriginProvider, - types::{BlockInfo, Frame, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; /// A mock [ChannelBankProvider] for testing the [ChannelBank] stage. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockChannelBankProvider { /// The data to return. pub data: Vec>, @@ -36,3 +36,18 @@ impl ChannelBankProvider for MockChannelBankProvider { self.data.pop().unwrap_or(Err(StageError::Eof)) } } + +#[async_trait] +impl ResettableStage for MockChannelBankProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockChannelBankProvider { + type Previous = MockChannelBankProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs index 23cea641..d1b531f3 100644 --- a/crates/derive/src/stages/test_utils/channel_reader.rs +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -2,15 +2,15 @@ use crate::{ stages::ChannelReaderProvider, - traits::OriginProvider, - types::{BlockInfo, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; use async_trait::async_trait; /// A mock [ChannelReaderProvider] for testing the [ChannelReader] stage. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockChannelReaderProvider { /// The data to return. pub data: Vec>>, @@ -37,3 +37,18 @@ impl ChannelReaderProvider for MockChannelReaderProvider { self.data.pop().unwrap_or(Err(StageError::Eof)) } } + +#[async_trait] +impl ResettableStage for MockChannelReaderProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockChannelReaderProvider { + type Previous = MockChannelReaderProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} diff --git a/crates/derive/src/stages/test_utils/frame_queue.rs b/crates/derive/src/stages/test_utils/frame_queue.rs index a7cd4f8e..491bf2a2 100644 --- a/crates/derive/src/stages/test_utils/frame_queue.rs +++ b/crates/derive/src/stages/test_utils/frame_queue.rs @@ -2,15 +2,15 @@ use crate::{ stages::FrameQueueProvider, - traits::OriginProvider, - types::{BlockInfo, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; use async_trait::async_trait; /// A mock [FrameQueueProvider] for testing the [FrameQueue] stage. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockFrameQueueProvider { /// The data to return. pub data: Vec>, @@ -37,3 +37,18 @@ impl FrameQueueProvider for MockFrameQueueProvider { self.data.pop().unwrap_or(Err(StageError::Eof)) } } + +#[async_trait] +impl ResettableStage for MockFrameQueueProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockFrameQueueProvider { + type Previous = MockFrameQueueProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index ccd92b10..8ee12b53 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -5,7 +5,7 @@ mod data_sources; pub use data_sources::*; mod stages; -pub use stages::{OriginProvider, ResettableStage}; +pub use stages::{OriginProvider, PreviousStage, ResettableStage}; mod ecrecover; pub use ecrecover::SignedRecoverable; diff --git a/crates/derive/src/traits/stages.rs b/crates/derive/src/traits/stages.rs index 7c436265..18ecc6cc 100644 --- a/crates/derive/src/traits/stages.rs +++ b/crates/derive/src/traits/stages.rs @@ -16,3 +16,12 @@ pub trait OriginProvider { /// Returns the optional L1 [BlockInfo] origin. fn origin(&self) -> Option<&BlockInfo>; } + +/// Provides a method for accessing a previous stage. +pub trait PreviousStage { + /// The previous stage. + type Previous: ResettableStage + PreviousStage; + + /// Returns the previous stage. + fn previous(&self) -> Option<&Self::Previous>; +} diff --git a/crates/derive/src/types/payload.rs b/crates/derive/src/types/payload.rs index ae1e10e8..d6d89aa9 100644 --- a/crates/derive/src/types/payload.rs +++ b/crates/derive/src/types/payload.rs @@ -3,7 +3,7 @@ use alloc::vec::Vec; use alloy_primitives::{Address, Bloom, Bytes, B256, U256}; use anyhow::Result; -use op_alloy_consensus::TxDeposit; +use op_alloy_consensus::{OpTxEnvelope, TxDeposit}; /// Fixed and variable memory costs for a payload. /// ~1000 bytes per payload, with some margin for overhead like map data.