From b265e999452996da8909ace200f1e6b46dfad498 Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Thu, 29 Aug 2024 20:34:13 +0200 Subject: [PATCH] feat: sync cursor --- Cargo.lock | 1 + crates/rollup/Cargo.toml | 1 + crates/rollup/src/driver.rs | 154 ++++++++++++++++++------------------ 3 files changed, 80 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 70e4689..0d3ce47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8433,6 +8433,7 @@ dependencies = [ "async-trait", "clap", "eyre", + "hashbrown 0.14.5", "kona-derive", "kona-primitives", "kona-providers", diff --git a/crates/rollup/Cargo.toml b/crates/rollup/Cargo.toml index 7c1ec46..7021c09 100644 --- a/crates/rollup/Cargo.toml +++ b/crates/rollup/Cargo.toml @@ -19,6 +19,7 @@ clap.workspace = true async-trait.workspace = true tokio.workspace = true alloy.workspace = true +hashbrown.workspace = true # Reth Dependencies reth.workspace = true diff --git a/crates/rollup/src/driver.rs b/crates/rollup/src/driver.rs index 29311aa..ada9580 100644 --- a/crates/rollup/src/driver.rs +++ b/crates/rollup/src/driver.rs @@ -1,5 +1,6 @@ //! Rollup Node Driver +use hashbrown::HashMap; use std::{ collections::{BTreeMap, VecDeque}, fmt::Debug, @@ -75,7 +76,7 @@ pub struct Driver { blob_provider: BP, /// The L2 chain provider l2_chain_provider: L2CP, - /// Cursor to keep track of the current L1 and L2 tip blocks + /// Cursor to keep track of the L2 tip cursor: SyncCursor, } @@ -95,7 +96,7 @@ where chain_provider: cp, blob_provider: bp, l2_chain_provider: l2_cp, - cursor: SyncCursor::with_capacity(128), + cursor: SyncCursor::new(), } } } @@ -116,7 +117,7 @@ impl Driver) -> Result<()> { + let (l2_tip, l1_origin) = self.cursor.tip(); + let _ = pipeline.step(l2_tip).await; + self.cursor.advance(l1_origin, l2_tip); + + unimplemented!() + } + async fn handle_notification( &mut self, notif: ExExNotification, pipeline: &mut RollupPipeline, ) -> Result<()> { if let Some(reverted_chain) = notif.reverted_chain() { + // The reverted chain contains the list of blocks that were invalidated by the + // reorg. we need to reset the cursor to the last canonical block, which corresponds + // to the block before the reorg happened. let last_canonical_block = reverted_chain.first().number - 1; - let first_reorged_block = info_from_header(&reverted_chain.first().block); - let l2_safe_tip = self.cursor.reset(last_canonical_block); - if let Err(e) = pipeline.reset(l2_safe_tip, first_reorged_block).await { + // Find the last known L2 block that is still valid after the reorg, + // and reset the cursor and pipeline to it. + let (l2_safe_tip, l2_safe_tip_l1_origin) = self.cursor.reset(last_canonical_block); + + warn!("Reverting derivation pipeline to L2 block: {}", l2_safe_tip.number); + if let Err(e) = pipeline.reset(l2_safe_tip, l2_safe_tip_l1_origin).await { bail!("Failed to reset pipeline: {:?}", e); } } @@ -205,13 +221,16 @@ where pub async fn start(mut self) -> Result<()> { // Step 1: Wait for the L2 origin block to be available self.wait_for_l2_genesis_l1_block().await?; - info!("Chain synced to rollup genesis"); + info!("L1 chain synced to the rollup genesis block"); // Step 2: Initialize the rollup pipeline let mut pipeline = self.init_pipeline(); // Step 3: Start processing events loop { + // TODO: handle pipeline step (stubbed) + let _ = self.step(&mut pipeline).await; + if let Some(notification) = self.ctx.recv_notification().await { self.handle_notification(notification, &mut pipeline).await?; } @@ -219,111 +238,94 @@ where } } -#[allow(unused)] -#[derive(Debug, Default)] +/// A cursor that keeps track of the L2 tip block for a given L1 origin block. +/// +/// The cursor is used to advance the pipeline to the next L2 block, and to reset +/// the pipeline to a previous L2 block when a reorg happens. +#[derive(Debug)] pub struct SyncCursor { - /// The current L1 origin tip block number - l1_tip: u64, - /// The current L2 tip block number - l2_tip: u64, /// The block cache capacity before evicting old entries /// (to avoid unbounded memory growth) capacity: usize, - /// The L1 origin block info for which we have an L2 block in the cache. + /// The L1 origin block numbers for which we have an L2 block in the cache. /// Used to keep track of the order of insertion and evict the oldest entry. - l1_origin_block_info: VecDeque, - /// Map of the L1 origin block number to its corresponding L2 tip block info + l1_origin_key_order: VecDeque, + /// The L1 origin block info for which we have an L2 block in the cache. + l1_origin_block_info: HashMap, + /// Map of the L1 origin block number to its corresponding tip L2 block l1_origin_to_l2_blocks: BTreeMap, } #[allow(unused)] impl SyncCursor { + /// Create a new cursor with the default cache capacity. + pub fn new() -> Self { + // NOTE: this value must be greater than the `CHANNEL_TIMEOUT` to allow + // for derivation to proceed through a deep reorg. This value is set + // to 300 blocks before the Granite hardfork and 50 blocks after it. + // Ref: + Self::with_capacity(350) + } + /// Create a new cursor with the given cache capacity. - pub fn with_capacity(capacity: usize) -> Self { + fn with_capacity(capacity: usize) -> Self { Self { capacity, - l1_origin_block_info: VecDeque::with_capacity(capacity), + l1_origin_key_order: VecDeque::with_capacity(capacity), + l1_origin_block_info: HashMap::with_capacity(capacity), l1_origin_to_l2_blocks: BTreeMap::new(), - ..Default::default() } } - pub fn l1_tip(&self) -> u64 { - self.l1_tip - } - - pub fn l2_tip(&self) -> u64 { - self.l2_tip + /// Get the current L2 tip and the corresponding L1 origin block info. + pub fn tip(&self) -> (L2BlockInfo, BlockInfo) { + if let Some((origin_number, l2_tip)) = self.l1_origin_to_l2_blocks.last_key_value() { + let origin_block = self.l1_origin_block_info[origin_number]; + (*l2_tip, origin_block) + } else { + unreachable!("cursor must be initialized with one block before advancing") + } } /// Advance the cursor to the provided L2 block, given the corresponding L1 origin block. - /// This is a no-op if the L2 origin block does not match the provided L1 block. - pub fn advance_tip(&mut self, l1_block: BlockInfo, l2_block: L2BlockInfo) { - if l2_block.l1_origin.hash != l1_block.hash { - warn!( - "Unexpected L1 origin block: {} (expected {})", - l1_block.hash, l2_block.l1_origin.hash - ); - return; + /// + /// If the cache is full, the oldest entry is evicted. + pub fn advance(&mut self, l1_origin_block: BlockInfo, l2_tip_block: L2BlockInfo) { + if self.l1_origin_to_l2_blocks.len() >= self.capacity { + let key = self.l1_origin_key_order.pop_front().unwrap(); + self.l1_origin_to_l2_blocks.remove(&key); } - self.l1_tip = l1_block.number; - self.l2_tip = l2_block.block_info.number; - self.insert(l1_block, l2_block); + self.l1_origin_key_order.push_back(l1_origin_block.number); + self.l1_origin_block_info.insert(l1_origin_block.number, l1_origin_block); + self.l1_origin_to_l2_blocks.insert(l1_origin_block.number, l2_tip_block); } - /// When the L1 undergoes a reorg, we need to reset the cursor - /// to the last canonical known L1 block for which we have a - /// corresponding entry in the cache. + /// When the L1 undergoes a reorg, we need to reset the cursor to the last canonical + /// known L1 block for which we have a corresponding entry in the cache. /// - /// Returns the L2 block info for the new safe tip. - pub fn reset(&mut self, last_canonical_l1_block_number: u64) -> BlockInfo { + /// Returns the (L2 block info, L1 origin block info) tuple for the new cursor state. + pub fn reset(&mut self, last_canonical_l1_block_number: u64) -> (BlockInfo, BlockInfo) { match self.l1_origin_to_l2_blocks.get(&last_canonical_l1_block_number) { Some(l2_safe_tip) => { - self.l1_tip = last_canonical_l1_block_number; - self.l2_tip = l2_safe_tip.block_info.number; - - l2_safe_tip.block_info + // The last canonical L1 block is in the cache, we can use it to reset the cursor. + // INVARIANT: the L1 origin info must be present in the cache + // since we always insert them in tandem. + (l2_safe_tip.block_info, self.l1_origin_block_info[&last_canonical_l1_block_number]) } None => { - // If the last canonical L1 block is not in the cache, - // we reset the cursor to the last known L1 block for which - // we have a corresponding L2 block. + // If the last canonical L1 block is not in the cache, we reset the cursor + // to the last known L1 block for which we have a corresponding L2 block. let (last_l1_known_tip, l2_known_tip) = self .l1_origin_to_l2_blocks .range(..=last_canonical_l1_block_number) .next_back() .expect("walked back to genesis without finding anchor origin block"); - self.l1_tip = *last_l1_known_tip; - self.l2_tip = l2_known_tip.block_info.number; - - l2_known_tip.block_info + // INVARIANT: the L1 origin info must be present in the cache + // since we always insert them in tandem. + (l2_known_tip.block_info, self.l1_origin_block_info[last_l1_known_tip]) } } } - - /// Insert a new L1 origin block info and its corresponding - /// tip L2 block info into the cache. - /// - /// If the cache is full, the oldest entry is evicted. - fn insert(&mut self, l1_block: BlockInfo, l2_block: L2BlockInfo) { - if self.l1_origin_to_l2_blocks.len() >= self.capacity { - let key = self.l1_origin_block_info.pop_front().unwrap(); - self.l1_origin_to_l2_blocks.remove(&key.number); - } - - self.l1_origin_block_info.push_back(l1_block); - self.l1_origin_to_l2_blocks.insert(l1_block.number, l2_block); - } -} - -/// Helper to extract block info from a sealed block -fn info_from_header(block: &reth::primitives::SealedBlock) -> BlockInfo { - BlockInfo { - hash: block.hash(), - number: block.number, - timestamp: block.timestamp, - parent_hash: block.parent_hash, - } }