Skip to content

Commit

Permalink
fix(derive): ugly refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Apr 22, 2024
1 parent b6346b2 commit ae4166a
Show file tree
Hide file tree
Showing 19 changed files with 508 additions and 135 deletions.
133 changes: 75 additions & 58 deletions crates/derive/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,103 @@
//! Contains a concrete implementation of the [DerivationPipeline].

use crate::{
stages::{
AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal, NextAttributes,
stages::NextAttributes,
traits::ResettableStage,
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, StageError, StageResult, SystemConfig,
},
traits::{ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider},
types::{L2AttributesWithParent, L2BlockInfo, RollupConfig, StageResult},
};
use alloc::sync::Arc;
use alloc::{boxed::Box, collections::VecDeque};
use async_trait::async_trait;
use core::fmt::Debug;

/// Provides the [BlockInfo] and [SystemConfig] for the stack to reset the stages.
#[async_trait]
pub trait ResetProvider {
/// Returns the current [BlockInfo] for the pipeline to reset.
async fn block_info(&self) -> BlockInfo;

/// Returns the current [SystemConfig] for the pipeline to reset.
async fn system_config(&self) -> SystemConfig;
}

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug)]
pub struct DerivationPipeline<N: NextAttributes + Debug> {
/// The attributes queue to retrieve the next attributes.
pub attributes: N,
pub struct DerivationPipeline<
S: NextAttributes + ResettableStage + Debug + Send,
R: ResetProvider + Send,
> {
/// The stack of stages in the pipeline.
/// The stack is reponsible for advancing the L1 traversal stage.
pub stack: S,
/// Reset provider for the pipeline.
pub reset: R,
/// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer.
pub prepared: VecDeque<L2AttributesWithParent>,
/// A flag to tell the pipeline to reset.
pub needs_reset: bool,
/// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes.
pub cursor: L2BlockInfo,
}

impl<N: NextAttributes + Debug + Send> DerivationPipeline<N> {
impl<S: NextAttributes + ResettableStage + Debug + Send, R: ResetProvider + Send>
DerivationPipeline<S, R>
{
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: N, cursor: L2BlockInfo) -> Self {
Self { attributes, cursor }
pub fn new(stack: S, reset: R, cursor: L2BlockInfo) -> Self {
Self { stack, prepared: VecDeque::new(), reset, needs_reset: false, cursor }
}

/// Set the [L2BlockInfo] cursor to be used when pulling the next attributes.
pub fn set_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
}

/// Get the next attributes from the pipeline.
pub async fn next(&mut self) -> StageResult<L2AttributesWithParent> {
self.attributes.next_attributes(self.cursor).await
/// Returns the next [L2AttributesWithParent] from the pipeline.
pub fn next_attributes(&mut self) -> Option<L2AttributesWithParent> {
self.prepared.pop_front()
}
}

impl<P, DAP, F, B> DerivationPipeline<KonaAttributes<P, DAP, F, B>>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
/// Creates a new instance of the [DerivationPipeline] from the given attributes.
pub fn new_online_pipeline(
attributes: KonaAttributes<P, DAP, F, B>,
cursor: L2BlockInfo,
) -> Self {
Self::new(attributes, cursor)
/// Flags the pipeline to reset on the next [DerivationPipeline::step] call.
pub fn reset(&mut self) {
self.needs_reset = true;
}
}

/// [KonaDerivationPipeline] is a concrete [DerivationPipeline] type.
pub type KonaDerivationPipeline<P, DAP, F, B> = DerivationPipeline<KonaAttributes<P, DAP, F, B>>;
/// Attempts to progress the pipeline.
/// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data.
/// Any other error is critical and the derivation pipeline should be reset.
/// An error is expected when the underlying source closes.
/// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the
/// derivation process.
pub async fn step(&mut self) -> StageResult<()> {
tracing::info!("DerivationPipeline::step");

/// [KonaAttributes] is a concrete [NextAttributes] type.
pub type KonaAttributes<P, DAP, F, B> = AttributesQueue<
BatchQueue<ChannelReader<ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<P>>>>>, F>,
B,
>;
// Reset the pipeline if needed.
if self.needs_reset {
let block_info = self.reset.block_info().await;
let system_config = self.reset.system_config().await;
self.stack.reset(block_info, &system_config).await?;
self.needs_reset = false;
}

/// Creates a new [KonaAttributes] instance.
pub fn new_online_pipeline<P, DAP, F, B>(
rollup_config: Arc<RollupConfig>,
chain_provider: P,
dap_source: DAP,
fetcher: F,
builder: B,
) -> KonaAttributes<P, DAP, F, B>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone());
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone());
let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher);
AttributesQueue::new(*rollup_config, batch_queue, builder)
// Step over the engine queue.
match self.stack.next_attributes(self.cursor).await {
Ok(a) => {
tracing::info!("attributes queue stage step returned l2 attributes");
tracing::info!("prepared L2 attributes: {:?}", a);
self.prepared.push_back(a);
return Ok(());
}
Err(StageError::Eof) => {
tracing::info!("attributes queue stage complete");
}
// TODO: match on the EngineELSyncing error here and log
Err(err) => {
tracing::error!("attributes queue stage failed: {:?}", err);
return Err(err);
}
}

Ok(())
}
}
5 changes: 5 additions & 0 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ pub mod stages;
pub mod traits;
pub mod types;

#[cfg(feature = "online")]
mod stack;
#[cfg(feature = "online")]
pub use stack::*;

#[cfg(feature = "online")]
mod online;
#[cfg(feature = "online")]
Expand Down
128 changes: 128 additions & 0 deletions crates/derive/src/stack.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//! Contains a stack of Stages for the [crate::DerivationPipeline].

use crate::{
stages::{
AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal, NextAttributes,
},
traits::{
ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider, ResettableStage,
},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, RollupConfig, StageError, StageResult,
SystemConfig,
},
};
use alloc::{boxed::Box, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
use spin::Mutex;

/// The [AttributesQueue] type alias.
pub type AttributesQueueType<P, DAP, F, B> = AttributesQueue<
BatchQueue<ChannelReader<ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<P>>>>>, F>,
B,
>;

/// An online stack of stages.
#[derive(Debug)]
pub struct OnlineStageStack<P, DAP, F, B>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
/// Flag to tell the L1Traversal stage to advance to the next L1 block.
pub advance: Arc<Mutex<bool>>,
/// The [AttributesQueue] stage.
pub attributes: AttributesQueueType<P, DAP, F, B>,
}

impl<P, DAP, F, B> OnlineStageStack<P, DAP, F, B>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
/// Creates a new [OnlineStageStack].
pub fn new(
rollup_config: Arc<RollupConfig>,
chain_provider: P,
dap_source: DAP,
fetcher: F,
builder: B,
) -> Self {
let advance = Arc::new(Mutex::new(false));
let l1_traversal =
L1Traversal::new(chain_provider, Arc::clone(&advance), rollup_config.clone());
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone());
let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher);
let attributes = AttributesQueue::new(*rollup_config, batch_queue, builder);
Self { advance, attributes }
}
}

#[async_trait]
impl<P, DAP, F, B> NextAttributes for OnlineStageStack<P, DAP, F, B>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
/// Advances the pipeline to the next attributes.
async fn next_attributes(
&mut self,
parent: L2BlockInfo,
) -> StageResult<L2AttributesWithParent> {
match self.attributes.next_attributes(parent).await {
Ok(a) => {
tracing::info!("attributes queue stage step returned l2 attributes");
tracing::info!("prepared L2 attributes: {:?}", a);
return Ok(a);
}
Err(StageError::Eof) => {
tracing::info!("attributes queue stage complete");
let mut advance = self.advance.lock();
*advance = true;
return Err(StageError::Eof);
}
// TODO: match on the EngineELSyncing error here and log
Err(err) => {
tracing::error!("attributes queue stage failed: {:?}", err);
return Err(err);
}
}
}
}

#[async_trait]
impl<P, DAP, F, B> ResettableStage for OnlineStageStack<P, DAP, F, B>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
/// Resets all stages in the stack.
async fn reset(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> {
match self.attributes.reset(bi, sc).await {
Ok(()) => {
tracing::info!("Stages reset");
}
Err(StageError::Eof) => {
tracing::info!("Stages reset with EOF");
}
Err(err) => {
tracing::error!("Stages reset failed: {:?}", err);
return Err(err);
}
}
Ok(())
}
}
31 changes: 24 additions & 7 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains the logic for the `AttributesQueue` stage.

use crate::{
traits::{OriginProvider, ResettableStage},
traits::{OriginProvider, PreviousStage, ResettableStage},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
Expand Down Expand Up @@ -53,7 +53,7 @@ pub trait NextAttributes {
#[derive(Debug)]
pub struct AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
P: AttributesProvider + ResettableStage + PreviousStage + OriginProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// The rollup config.
Expand All @@ -70,7 +70,7 @@ where

impl<P, AB> AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug,
AB: AttributesBuilder + Debug,
{
/// Create a new [AttributesQueue] stage.
Expand Down Expand Up @@ -147,10 +147,22 @@ where
}
}

impl<P, AB> PreviousStage for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug,
AB: AttributesBuilder + Debug,
{
type Previous = P;

fn previous(&self) -> Option<&P> {
Some(&self.prev)
}
}

#[async_trait]
impl<P, AB> NextAttributes for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug + Send,
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn next_attributes(
Expand All @@ -163,7 +175,7 @@ where

impl<P, AB> OriginProvider for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug,
AB: AttributesBuilder + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
Expand All @@ -174,10 +186,15 @@ where
#[async_trait]
impl<P, AB> ResettableStage for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Send + Debug,
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
async fn reset(
&mut self,
block_info: BlockInfo,
system_config: &SystemConfig,
) -> StageResult<()> {
self.prev.reset(block_info, system_config).await?;
info!("resetting attributes queue");
self.batch = None;
self.is_last_in_span = false;
Expand Down
Loading

0 comments on commit ae4166a

Please sign in to comment.