Skip to content

Commit

Permalink
feat(derive): Span batch prefix checks (#592)
Browse files Browse the repository at this point in the history
* prefix checks

* fix

* flush prev

* fix test
  • Loading branch information
clabby authored Sep 30, 2024
1 parent 40a47f0 commit 9e9c002
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 36 deletions.
1 change: 1 addition & 0 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub type OracleAttributesQueue<DAP, O> = AttributesQueue<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>>,
>,
OracleL2ChainProvider<O>,
>,
OracleL2ChainProvider<O>,
>,
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Batch {
pub fn timestamp(&self) -> u64 {
match self {
Self::Single(sb) => sb.timestamp,
Self::Span(sb) => sb.timestamp(),
Self::Span(sb) => sb.starting_timestamp(),
}
}

Expand Down
136 changes: 126 additions & 10 deletions crates/derive/src/batch/span_batch/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,26 @@ pub struct SpanBatch {
}

impl SpanBatch {
/// Returns the timestamp for the first batch in the span.
pub fn timestamp(&self) -> u64 {
/// Returns the starting timestamp for the first batch in the span.
///
/// ## Safety
/// Panics if [Self::batches] is empty.
pub fn starting_timestamp(&self) -> u64 {
self.batches[0].timestamp
}

/// Returns the final timestamp for the last batch in the span.
///
/// ## Safety
/// Panics if [Self::batches] is empty.
pub fn final_timestamp(&self) -> u64 {
self.batches[self.batches.len() - 1].timestamp
}

/// Returns the epoch number for the first batch in the span.
///
/// ## Safety
/// Panics if [Self::batches] is empty.
pub fn starting_epoch_num(&self) -> u64 {
self.batches[0].epoch_num
}
Expand Down Expand Up @@ -97,10 +111,10 @@ impl SpanBatch {

// Skip out of order batches.
let next_timestamp = l2_safe_head.block_info.timestamp + cfg.block_time;
if self.timestamp() > next_timestamp {
if self.starting_timestamp() > next_timestamp {
warn!(
"received out-of-order batch for future processing after next batch ({} > {})",
self.timestamp(),
self.starting_timestamp(),
next_timestamp
);
return BatchValidity::Future;
Expand All @@ -116,18 +130,19 @@ impl SpanBatch {
// safe head.
let mut parent_num = l2_safe_head.block_info.number;
let mut parent_block = l2_safe_head;
if self.timestamp() < next_timestamp {
if self.timestamp() > l2_safe_head.block_info.timestamp {
if self.starting_timestamp() < next_timestamp {
if self.starting_timestamp() > l2_safe_head.block_info.timestamp {
// Batch timestamp cannot be between safe head and next timestamp.
warn!("batch has misaligned timestamp, block time is too short");
return BatchValidity::Drop;
}
if (l2_safe_head.block_info.timestamp - self.timestamp()) % cfg.block_time != 0 {
if (l2_safe_head.block_info.timestamp - self.starting_timestamp()) % cfg.block_time != 0
{
warn!("batch has misaligned timestamp, not overlapped exactly");
return BatchValidity::Drop;
}
parent_num = l2_safe_head.block_info.number -
(l2_safe_head.block_info.timestamp - self.timestamp()) / cfg.block_time -
(l2_safe_head.block_info.timestamp - self.starting_timestamp()) / cfg.block_time -
1;
parent_block = match fetcher.l2_block_info_by_number(parent_num).await {
Ok(block) => block,
Expand Down Expand Up @@ -275,7 +290,7 @@ impl SpanBatch {
}

// Check overlapped blocks
if self.timestamp() < next_timestamp {
if self.starting_timestamp() < next_timestamp {
for i in 0..(l2_safe_head.block_info.number - parent_num) {
let safe_block_num = parent_num + i + 1;
let safe_block_payload = match fetcher.block_by_number(safe_block_num).await {
Expand Down Expand Up @@ -326,6 +341,107 @@ impl SpanBatch {
BatchValidity::Accept
}

/// Checks the validity of the batch's prefix. This function is used in the [BatchStream]
/// post-Holocene as a batch is being loaded in.
///
/// [BatchStream]: crate::stages::BatchStream
pub async fn check_batch_prefix<BF: L2ChainProvider>(
&self,
cfg: &RollupConfig,
l1_origins: &[BlockInfo],
l2_safe_head: BlockInfo,
fetcher: &mut BF,
) -> BatchValidity {
if l1_origins.is_empty() {
warn!("missing L1 block input, cannot proceed with batch checking");
return BatchValidity::Undecided;
}
if self.batches.is_empty() {
warn!("empty span batch, cannot proceed with batch checking");
return BatchValidity::Undecided;
}

let next_timestamp = l2_safe_head.timestamp + cfg.block_time;

// Find the parent block of the span batch.
// If the span batch does not overlap the current safe chain, parent block should be the L2
// safe head.
let mut parent_num = l2_safe_head.number;
let mut parent_block = l2_safe_head;
if self.starting_timestamp() < next_timestamp {
if self.starting_timestamp() > l2_safe_head.timestamp {
// Batch timestamp cannot be between safe head and next timestamp.
warn!("batch has misaligned timestamp, block time is too short");
return BatchValidity::Drop;
}
if (l2_safe_head.timestamp - self.starting_timestamp()) % cfg.block_time != 0 {
warn!("batch has misaligned timestamp, not overlapped exactly");
return BatchValidity::Drop;
}
parent_num = l2_safe_head.number -
(l2_safe_head.timestamp - self.starting_timestamp()) / cfg.block_time -
1;
parent_block = match fetcher.l2_block_info_by_number(parent_num).await {
Ok(block) => block.block_info,
Err(e) => {
warn!("failed to fetch L2 block number {parent_num}: {e}");
// Unable to validate the batch for now. Retry later.
return BatchValidity::Undecided;
}
};
}
if !self.check_parent_hash(parent_block.hash) {
warn!(
"parent block mismatch, expected: {parent_num}, received: {}. parent hash: {}, parent hash check: {}",
parent_block.number,
parent_block.hash,
self.parent_check,
);
return BatchValidity::Drop;
}

// Verify the l1 origin hash for each l1 block.
// SAFETY: `Self::batches` is not empty, so the last element is guaranteed to exist.
let end_epoch_num = self.batches.last().unwrap().epoch_num;
let mut origin_checked = false;
// l1Blocks is supplied from batch queue and its length is limited to SequencerWindowSize.
for l1_block in l1_origins {
if l1_block.number == end_epoch_num {
if !self.check_origin_hash(l1_block.hash) {
warn!(
"batch is for different L1 chain, epoch hash does not match, expected: {}",
l1_block.hash
);
return BatchValidity::Drop;
}
origin_checked = true;
break;
}
}
if !origin_checked {
info!("need more l1 blocks to check entire origins of span batch");
return BatchValidity::Undecided;
}

// Drop the batch if it is out of order. Post-Holocene, gaps are disallowed.
if self.starting_timestamp() > next_timestamp {
warn!(
"received out-of-order batch for future processing after next batch ({} > {})",
self.starting_timestamp(),
next_timestamp
);
return BatchValidity::Drop;
}

// Drop the batch if it has no new blocks after the safe head.
if self.final_timestamp() < next_timestamp {
warn!("span batch has no new blocks after safe head");
return BatchValidity::Drop;
}

BatchValidity::Accept
}

/// Converts all [SpanBatchElement]s after the L2 safe head to [SingleBatch]es. The resulting
/// [SingleBatch]es do not contain a parent hash, as it is populated by the Batch Queue
/// stage.
Expand Down Expand Up @@ -437,7 +553,7 @@ mod tests {
let first_element = SpanBatchElement { timestamp, ..Default::default() };
let batch =
SpanBatch { batches: vec![first_element, Default::default()], ..Default::default() };
assert_eq!(batch.timestamp(), timestamp);
assert_eq!(batch.starting_timestamp(), timestamp);
}

#[test]
Expand Down
7 changes: 4 additions & 3 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type L1RetrievalStage<DAP, P> = L1Retrieval<DAP, L1TraversalStage<P>>;
type FrameQueueStage<DAP, P> = FrameQueue<L1RetrievalStage<DAP, P>>;
type ChannelBankStage<DAP, P> = ChannelBank<FrameQueueStage<DAP, P>>;
type ChannelReaderStage<DAP, P> = ChannelReader<ChannelBankStage<DAP, P>>;
type BatchStreamStage<DAP, P> = BatchStream<ChannelReaderStage<DAP, P>>;
type BatchQueueStage<DAP, P, T> = BatchQueue<BatchStreamStage<DAP, P>, T>;
type BatchStreamStage<DAP, P, T> = BatchStream<ChannelReaderStage<DAP, P>, T>;
type BatchQueueStage<DAP, P, T> = BatchQueue<BatchStreamStage<DAP, P, T>, T>;
type AttributesQueueStage<DAP, P, T, B> = AttributesQueue<BatchQueueStage<DAP, P, T>, B>;

/// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern.
Expand Down Expand Up @@ -134,7 +134,8 @@ where
let frame_queue = FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config));
let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config));
let batch_stream = BatchStream::new(channel_reader, rollup_config.clone());
let batch_stream =
BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone());
let batch_queue =
BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone());
let attributes =
Expand Down
Loading

0 comments on commit 9e9c002

Please sign in to comment.