Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(derive): Pipeline Builder and Fixes #124

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions crates/derive/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! Contains a concrete implementation of the [DerivationPipeline].

use crate::{
stages::{
AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal, NextAttributes,
},
traits::{ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider},
types::{L2AttributesWithParent, L2BlockInfo, RollupConfig, StageResult},
};
use alloc::sync::Arc;
use core::fmt::Debug;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug)]
pub struct DerivationPipeline<N: NextAttributes + Debug> {
/// The attributes queue to retrieve the next attributes.
pub attributes: N,
/// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes.
pub cursor: L2BlockInfo,
}

impl<N: NextAttributes + Debug + Send> DerivationPipeline<N> {
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: N, cursor: L2BlockInfo) -> Self {
Self { attributes, cursor }
}

/// Set the [L2BlockInfo] cursor to be used when pulling the next attributes.
pub fn set_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
}

/// Get the next attributes from the pipeline.
pub async fn next(&mut self) -> StageResult<L2AttributesWithParent> {
self.attributes.next_attributes(self.cursor).await
}
}

impl<P, DAP, F, B> DerivationPipeline<KonaAttributes<P, DAP, F, B>>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
/// Creates a new instance of the [DerivationPipeline] from the given attributes.
pub fn new_online_pipeline(
attributes: KonaAttributes<P, DAP, F, B>,
cursor: L2BlockInfo,
) -> Self {
Self::new(attributes, cursor)
}
}

/// [KonaDerivationPipeline] is a concrete [DerivationPipeline] type.
pub type KonaDerivationPipeline<P, DAP, F, B> = DerivationPipeline<KonaAttributes<P, DAP, F, B>>;

/// [KonaAttributes] is a concrete [NextAttributes] type.
pub type KonaAttributes<P, DAP, F, B> = AttributesQueue<
BatchQueue<ChannelReader<ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<P>>>>>, F>,
B,
>;

/// Creates a new [KonaAttributes] instance.
pub fn new_online_pipeline<P, DAP, F, B>(
rollup_config: Arc<RollupConfig>,
chain_provider: P,
dap_source: DAP,
fetcher: F,
builder: B,
) -> KonaAttributes<P, DAP, F, B>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone());
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone());
let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher);
AttributesQueue::new(*rollup_config, batch_queue, builder)
}
27 changes: 9 additions & 18 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@

extern crate alloc;

use alloc::sync::Arc;
use core::fmt::Debug;
use traits::ChainProvider;
use types::RollupConfig;
/// Prelude exports common types and traits.
pub mod prelude {
pub use super::{builder::DerivationPipeline, params::*};
// pub use super::traits::prelude::*;
// pub use super::types::prelude::*;
// pub use super::stages::prelude::*;
// pub use super::sources::prelude::*;
}

mod params;
pub use params::{
Expand All @@ -19,6 +23,7 @@ pub use params::{
MAX_SPAN_BATCH_BYTES, SEQUENCER_FEE_VAULT_ADDRESS,
};

pub mod builder;
pub mod sources;
pub mod stages;
pub mod traits;
Expand All @@ -28,17 +33,3 @@ pub mod types;
mod online;
#[cfg(feature = "online")]
pub use online::prelude::*;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug, Clone, Copy)]
pub struct DerivationPipeline;

impl DerivationPipeline {
/// Creates a new instance of the [DerivationPipeline].
pub fn new<P>(_rollup_config: Arc<RollupConfig>, _chain_provider: P) -> Self
where
P: ChainProvider + Clone + Debug + Send,
{
unimplemented!("TODO: High-level pipeline composition helper.")
}
}
22 changes: 22 additions & 0 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ pub trait AttributesProvider {
fn is_last_in_span(&self) -> bool;
}

/// [NextAttributes] is a trait abstraction that generalizes the [AttributesQueue] stage.
#[async_trait]
pub trait NextAttributes {
/// Returns the next [L2AttributesWithParent] from the current batch.
async fn next_attributes(&mut self, parent: L2BlockInfo)
-> StageResult<L2AttributesWithParent>;
}

/// [AttributesQueue] accepts batches from the [BatchQueue] stage
/// and transforms them into [L2PayloadAttributes]. The outputted payload
/// attributes cannot be buffered because each batch->attributes transformation
Expand Down Expand Up @@ -139,6 +147,20 @@ where
}
}

#[async_trait]
impl<P, AB> NextAttributes for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn next_attributes(
&mut self,
parent: L2BlockInfo,
) -> StageResult<L2AttributesWithParent> {
self.next_attributes(parent).await
}
}

impl<P, AB> OriginProvider for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
Expand Down
72 changes: 51 additions & 21 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ where
// We always update the origin of this stage if it's not the same so
// after the update code runs, this is consistent.
let origin_behind =
self.origin.map_or(true, |origin| origin.number < parent.l1_origin.number);
self.prev.origin().map_or(true, |origin| origin.number < parent.l1_origin.number);

// Advance the origin if needed.
// The entire pipeline has the same origin.
Expand Down Expand Up @@ -334,6 +334,7 @@ where
}

// Attempt to derive more batches.
assert!(self.l1_blocks.is_empty());
let batch = match self.derive_next_batch(out_of_data, parent).await {
Ok(b) => b,
Err(e) => match e {
Expand Down Expand Up @@ -408,12 +409,17 @@ where
mod tests {
use super::*;
use crate::{
stages::{channel_reader::BatchReader, test_utils::MockBatchQueueProvider},
stages::{
channel_reader::BatchReader,
test_utils::{CollectingLayer, MockBatchQueueProvider, TraceStorage},
},
traits::test_utils::MockBlockFetcher,
types::BatchType,
types::{BatchType, BlockID},
};
use alloc::vec;
use miniz_oxide::deflate::compress_to_vec_zlib;
use tracing::Level;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

fn new_batch_reader() -> BatchReader {
let raw_data = include_bytes!("../../testdata/raw_batch.hex");
Expand Down Expand Up @@ -448,24 +454,48 @@ mod tests {
assert!(bq.is_last_in_span());
}

// TODO(refcell): The batch reader here loops forever.
// Maybe the cursor isn't being used?
// UPDATE: the batch data is not valid
// #[tokio::test]
// async fn test_next_batch_succeeds() {
// let mut reader = new_batch_reader();
// let mut batch_vec: Vec<StageResult<Batch>> = vec![];
// while let Some(batch) = reader.next_batch() {
// batch_vec.push(Ok(batch));
// }
// let mock = MockBatchQueueProvider::new(batch_vec);
// let telemetry = TestTelemetry::new();
// let fetcher = MockBlockFetcher::default();
// let mut bq = BatchQueue::new(RollupConfig::default(), mock, telemetry, fetcher);
// let res = bq.next_batch(L2BlockInfo::default()).await.unwrap();
// assert_eq!(res, SingleBatch::default());
// assert!(bq.is_last_in_span());
// }
#[tokio::test]
async fn test_next_batch_origin_behind() {
let mut reader = new_batch_reader();
let cfg = Arc::new(RollupConfig::default());
let mut batch_vec: Vec<StageResult<Batch>> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
batch_vec.push(Ok(batch));
}
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = MockBlockFetcher::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
let parent = L2BlockInfo {
l1_origin: BlockID { number: 10, ..Default::default() },
..Default::default()
};
let res = bq.next_batch(parent).await.unwrap_err();
assert_eq!(res, StageError::NotEnoughData);
}

// TODO: fix
#[tokio::test]
async fn test_next_batch_succeeds() {
let trace_store: TraceStorage = Default::default();
let layer = CollectingLayer::new(trace_store.clone());
tracing_subscriber::Registry::default().with(layer).init();

let mut reader = new_batch_reader();
let cfg = Arc::new(RollupConfig::default());
let mut batch_vec: Vec<StageResult<Batch>> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
batch_vec.push(Ok(batch));
}
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = MockBlockFetcher::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err();
let logs = trace_store.get_by_level(Level::WARN);
let str = "Deriving next batch for epoch: 1";
assert_eq!(logs[0], str);
}

#[tokio::test]
async fn test_batch_queue_empty_bytes() {
Expand Down
2 changes: 0 additions & 2 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ where
let first = self.channel_queue[0];
let channel = self.channels.get(&first).ok_or(StageError::ChannelNotFound)?;
let origin = self.origin().ok_or(StageError::MissingOrigin)?;

// Remove all timed out channels from the front of the `channel_queue`.
if channel.open_block_number() + self.cfg.channel_timeout < origin.number {
warn!("Channel {:?} timed out", first);
self.channels.remove(&first);
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ impl BatchReader {
}

// Decompress and RLP decode the batch data, before finally decoding the batch itself.
let mut decompressed_reader = self.decompressed.as_slice();
let mut decompressed_reader = self.decompressed.as_slice()[self.cursor..].as_ref();
let batch = Batch::decode(&mut decompressed_reader, cfg).ok()?;

// Advance the cursor on the reader.
self.cursor += self.decompressed.len() - decompressed_reader.len();
self.cursor = self.decompressed.len() - decompressed_reader.len();

Some(batch)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub use batch_queue::{BatchQueue, BatchQueueProvider};

mod attributes_queue;
pub use attributes_queue::{
AttributesBuilder, AttributesProvider, AttributesQueue, StatefulAttributesBuilder,
SystemConfigL2Fetcher,
AttributesBuilder, AttributesProvider, AttributesQueue, NextAttributes,
StatefulAttributesBuilder, SystemConfigL2Fetcher,
};

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/stages/test_utils/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use async_trait::async_trait;
#[derive(Debug, Default)]
pub struct MockBatchQueueProvider {
/// The origin of the L1 block.
origin: Option<BlockInfo>,
pub origin: Option<BlockInfo>,
/// A list of batches to return.
batches: Vec<StageResult<Batch>>,
pub batches: Vec<StageResult<Batch>>,
}

impl MockBatchQueueProvider {
Expand Down
Loading