Skip to content

Commit

Permalink
feat: sync cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
merklefruit committed Aug 29, 2024
1 parent d512e1a commit b265e99
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 76 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/rollup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ clap.workspace = true
async-trait.workspace = true
tokio.workspace = true
alloy.workspace = true
hashbrown.workspace = true

# Reth Dependencies
reth.workspace = true
Expand Down
154 changes: 78 additions & 76 deletions crates/rollup/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Rollup Node Driver
use hashbrown::HashMap;
use std::{
collections::{BTreeMap, VecDeque},
fmt::Debug,
Expand Down Expand Up @@ -75,7 +76,7 @@ pub struct Driver<DC, CP, BP, L2CP> {
blob_provider: BP,
/// The L2 chain provider
l2_chain_provider: L2CP,
/// Cursor to keep track of the current L1 and L2 tip blocks
/// Cursor to keep track of the L2 tip
cursor: SyncCursor,
}

Expand All @@ -95,7 +96,7 @@ where
chain_provider: cp,
blob_provider: bp,
l2_chain_provider: l2_cp,
cursor: SyncCursor::with_capacity(128),
cursor: SyncCursor::new(),
}
}
}
Expand All @@ -116,7 +117,7 @@ impl Driver<StandaloneContext, AlloyChainProvider, DurableBlobProvider, AlloyL2C
chain_provider: cp,
blob_provider: bp,
l2_chain_provider: l2_cp,
cursor: SyncCursor::with_capacity(128),
cursor: SyncCursor::new(),
}
}
}
Expand Down Expand Up @@ -167,17 +168,32 @@ where
)
}

/// Advance the pipeline to the next L2 block.
async fn step(&mut self, pipeline: &mut RollupPipeline<CP, BP, L2CP>) -> Result<()> {
let (l2_tip, l1_origin) = self.cursor.tip();
let _ = pipeline.step(l2_tip).await;
self.cursor.advance(l1_origin, l2_tip);

unimplemented!()
}

async fn handle_notification(
&mut self,
notif: ExExNotification,
pipeline: &mut RollupPipeline<CP, BP, L2CP>,
) -> Result<()> {
if let Some(reverted_chain) = notif.reverted_chain() {
// The reverted chain contains the list of blocks that were invalidated by the
// reorg. we need to reset the cursor to the last canonical block, which corresponds
// to the block before the reorg happened.
let last_canonical_block = reverted_chain.first().number - 1;
let first_reorged_block = info_from_header(&reverted_chain.first().block);

let l2_safe_tip = self.cursor.reset(last_canonical_block);
if let Err(e) = pipeline.reset(l2_safe_tip, first_reorged_block).await {
// Find the last known L2 block that is still valid after the reorg,
// and reset the cursor and pipeline to it.
let (l2_safe_tip, l2_safe_tip_l1_origin) = self.cursor.reset(last_canonical_block);

warn!("Reverting derivation pipeline to L2 block: {}", l2_safe_tip.number);
if let Err(e) = pipeline.reset(l2_safe_tip, l2_safe_tip_l1_origin).await {
bail!("Failed to reset pipeline: {:?}", e);
}
}
Expand Down Expand Up @@ -205,125 +221,111 @@ where
pub async fn start(mut self) -> Result<()> {
// Step 1: Wait for the L2 origin block to be available
self.wait_for_l2_genesis_l1_block().await?;
info!("Chain synced to rollup genesis");
info!("L1 chain synced to the rollup genesis block");

// Step 2: Initialize the rollup pipeline
let mut pipeline = self.init_pipeline();

// Step 3: Start processing events
loop {
// TODO: handle pipeline step (stubbed)
let _ = self.step(&mut pipeline).await;

if let Some(notification) = self.ctx.recv_notification().await {
self.handle_notification(notification, &mut pipeline).await?;
}
}
}
}

#[allow(unused)]
#[derive(Debug, Default)]
/// A cursor that keeps track of the L2 tip block for a given L1 origin block.
///
/// The cursor is used to advance the pipeline to the next L2 block, and to reset
/// the pipeline to a previous L2 block when a reorg happens.
#[derive(Debug)]
pub struct SyncCursor {
/// The current L1 origin tip block number
l1_tip: u64,
/// The current L2 tip block number
l2_tip: u64,
/// The block cache capacity before evicting old entries
/// (to avoid unbounded memory growth)
capacity: usize,
/// The L1 origin block info for which we have an L2 block in the cache.
/// The L1 origin block numbers for which we have an L2 block in the cache.
/// Used to keep track of the order of insertion and evict the oldest entry.
l1_origin_block_info: VecDeque<BlockInfo>,
/// Map of the L1 origin block number to its corresponding L2 tip block info
l1_origin_key_order: VecDeque<u64>,
/// The L1 origin block info for which we have an L2 block in the cache.
l1_origin_block_info: HashMap<u64, BlockInfo>,
/// Map of the L1 origin block number to its corresponding tip L2 block
l1_origin_to_l2_blocks: BTreeMap<u64, L2BlockInfo>,
}

#[allow(unused)]
impl SyncCursor {
/// Create a new cursor with the default cache capacity.
pub fn new() -> Self {
// NOTE: this value must be greater than the `CHANNEL_TIMEOUT` to allow
// for derivation to proceed through a deep reorg. This value is set
// to 300 blocks before the Granite hardfork and 50 blocks after it.
// Ref: <https://specs.optimism.io/protocol/derivation.html#timeouts>
Self::with_capacity(350)
}

/// Create a new cursor with the given cache capacity.
pub fn with_capacity(capacity: usize) -> Self {
fn with_capacity(capacity: usize) -> Self {
Self {
capacity,
l1_origin_block_info: VecDeque::with_capacity(capacity),
l1_origin_key_order: VecDeque::with_capacity(capacity),
l1_origin_block_info: HashMap::with_capacity(capacity),
l1_origin_to_l2_blocks: BTreeMap::new(),
..Default::default()
}
}

pub fn l1_tip(&self) -> u64 {
self.l1_tip
}

pub fn l2_tip(&self) -> u64 {
self.l2_tip
/// Get the current L2 tip and the corresponding L1 origin block info.
pub fn tip(&self) -> (L2BlockInfo, BlockInfo) {
if let Some((origin_number, l2_tip)) = self.l1_origin_to_l2_blocks.last_key_value() {
let origin_block = self.l1_origin_block_info[origin_number];
(*l2_tip, origin_block)
} else {
unreachable!("cursor must be initialized with one block before advancing")
}
}

/// Advance the cursor to the provided L2 block, given the corresponding L1 origin block.
/// This is a no-op if the L2 origin block does not match the provided L1 block.
pub fn advance_tip(&mut self, l1_block: BlockInfo, l2_block: L2BlockInfo) {
if l2_block.l1_origin.hash != l1_block.hash {
warn!(
"Unexpected L1 origin block: {} (expected {})",
l1_block.hash, l2_block.l1_origin.hash
);
return;
///
/// If the cache is full, the oldest entry is evicted.
pub fn advance(&mut self, l1_origin_block: BlockInfo, l2_tip_block: L2BlockInfo) {
if self.l1_origin_to_l2_blocks.len() >= self.capacity {
let key = self.l1_origin_key_order.pop_front().unwrap();
self.l1_origin_to_l2_blocks.remove(&key);
}

self.l1_tip = l1_block.number;
self.l2_tip = l2_block.block_info.number;
self.insert(l1_block, l2_block);
self.l1_origin_key_order.push_back(l1_origin_block.number);
self.l1_origin_block_info.insert(l1_origin_block.number, l1_origin_block);
self.l1_origin_to_l2_blocks.insert(l1_origin_block.number, l2_tip_block);
}

/// When the L1 undergoes a reorg, we need to reset the cursor
/// to the last canonical known L1 block for which we have a
/// corresponding entry in the cache.
/// When the L1 undergoes a reorg, we need to reset the cursor to the last canonical
/// known L1 block for which we have a corresponding entry in the cache.
///
/// Returns the L2 block info for the new safe tip.
pub fn reset(&mut self, last_canonical_l1_block_number: u64) -> BlockInfo {
/// Returns the (L2 block info, L1 origin block info) tuple for the new cursor state.
pub fn reset(&mut self, last_canonical_l1_block_number: u64) -> (BlockInfo, BlockInfo) {
match self.l1_origin_to_l2_blocks.get(&last_canonical_l1_block_number) {
Some(l2_safe_tip) => {
self.l1_tip = last_canonical_l1_block_number;
self.l2_tip = l2_safe_tip.block_info.number;

l2_safe_tip.block_info
// The last canonical L1 block is in the cache, we can use it to reset the cursor.
// INVARIANT: the L1 origin info must be present in the cache
// since we always insert them in tandem.
(l2_safe_tip.block_info, self.l1_origin_block_info[&last_canonical_l1_block_number])
}
None => {
// If the last canonical L1 block is not in the cache,
// we reset the cursor to the last known L1 block for which
// we have a corresponding L2 block.
// If the last canonical L1 block is not in the cache, we reset the cursor
// to the last known L1 block for which we have a corresponding L2 block.
let (last_l1_known_tip, l2_known_tip) = self
.l1_origin_to_l2_blocks
.range(..=last_canonical_l1_block_number)
.next_back()
.expect("walked back to genesis without finding anchor origin block");

self.l1_tip = *last_l1_known_tip;
self.l2_tip = l2_known_tip.block_info.number;

l2_known_tip.block_info
// INVARIANT: the L1 origin info must be present in the cache
// since we always insert them in tandem.
(l2_known_tip.block_info, self.l1_origin_block_info[last_l1_known_tip])
}
}
}

/// Insert a new L1 origin block info and its corresponding
/// tip L2 block info into the cache.
///
/// If the cache is full, the oldest entry is evicted.
fn insert(&mut self, l1_block: BlockInfo, l2_block: L2BlockInfo) {
if self.l1_origin_to_l2_blocks.len() >= self.capacity {
let key = self.l1_origin_block_info.pop_front().unwrap();
self.l1_origin_to_l2_blocks.remove(&key.number);
}

self.l1_origin_block_info.push_back(l1_block);
self.l1_origin_to_l2_blocks.insert(l1_block.number, l2_block);
}
}

/// Helper to extract block info from a sealed block
fn info_from_header(block: &reth::primitives::SealedBlock) -> BlockInfo {
BlockInfo {
hash: block.hash(),
number: block.number,
timestamp: block.timestamp,
parent_hash: block.parent_hash,
}
}

0 comments on commit b265e99

Please sign in to comment.