Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Str 867 update l1 reader worker #627

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 0 additions & 46 deletions bin/strata-client/src/l1_reader.rs

This file was deleted.

24 changes: 14 additions & 10 deletions bin/strata-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rpc_client::sync_client;
use strata_bridge_relay::relayer::RelayerHandle;
use strata_btcio::{
broadcaster::{spawn_broadcaster_task, L1BroadcastHandle},
reader::query::bitcoin_data_reader_task,
rpc::{traits::ReaderRpc, BitcoinClient},
writer::start_envelope_task,
};
Expand Down Expand Up @@ -48,7 +49,6 @@ mod args;
mod errors;
mod extractor;
mod helpers;
mod l1_reader;
mod network;
mod rpc_client;
mod rpc_server;
Expand Down Expand Up @@ -340,16 +340,20 @@ fn start_core_tasks(
)?
.into();

let l1db = database.l1_db().clone();
let l1_manager = Arc::new(L1BlockManager::new(pool.clone(), l1db));
// Start the L1 tasks to get that going.
l1_reader::start_reader_tasks(
executor,
sync_manager.get_params(),
config,
bitcoin_client.clone(),
storage.as_ref(),
sync_manager.get_csm_ctl(),
status_channel.clone(),
)?;
executor.spawn_critical_async(
"bitcoin_data_reader_task",
bitcoin_data_reader_task(
bitcoin_client.clone(),
l1_manager.clone(),
Arc::new(config.btcio.reader.clone()),
sync_manager.get_params(),
status_channel.clone(),
sync_manager.get_csm_ctl(),
),
);

// Start relayer task.
let relayer_handle = strata_bridge_relay::relayer::start_bridge_relayer_task(
Expand Down
3 changes: 2 additions & 1 deletion bin/strata-client/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use strata_btcio::{broadcaster::L1BroadcastHandle, writer::EnvelopeHandle};
#[cfg(feature = "debug-utils")]
use strata_common::bail_manager::BAIL_SENDER;
use strata_consensus_logic::{
csm::state_tracker::reconstruct_state, l1_handler::verify_proof, sync_manager::SyncManager,
checkpoint::CheckpointHandle, csm::state_tracker::reconstruct_state, sync_manager::SyncManager,
util::verify_proof,
};
use strata_db::{
traits::*,
Expand Down
1 change: 1 addition & 0 deletions crates/btcio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ hex.workspace = true
musig2 = { workspace = true, features = ["serde"] }
rand.workspace = true
reqwest.workspace = true
secp256k1 = { workspace = true, features = ["rand-std"] }
serde.workspace = true
serde_json.workspace = true
sha2.workspace = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,79 +1,42 @@
// TODO move this to btcio crate, maybe consolidating into a single task with
// the query task

use std::sync::Arc;

use bitcoin::{consensus::serialize, hashes::Hash, Block};
use secp256k1::XOnlyPublicKey;
use strata_l1tx::messages::{BlockData, L1Event};
use strata_primitives::{
block_credential::CredRule,
buf::Buf32,
l1::{L1BlockManifest, L1BlockRecord},
params::{Params, RollupParams},
proof::RollupVerifyingKey,
};
use strata_state::{
batch::{
BatchCheckpoint, BatchCheckpointWithCommitment, CheckpointProofOutput, CommitmentInfo,
},
batch::{BatchCheckpoint, BatchCheckpointWithCommitment, CommitmentInfo},
l1::{generate_l1_tx, L1Tx},
sync_event::SyncEvent,
sync_event::{EventSubmitter, SyncEvent},
tx::ProtocolOperation,
};
use strata_storage::L1BlockManager;
use tokio::sync::mpsc;
use tracing::*;
use zkaleido::{ProofReceipt, ZkVmError, ZkVmResult};
use zkaleido_risc0_adapter;
use zkaleido_sp1_adapter;

use crate::csm::ctl::CsmController;

/// Consumes L1 events and reflects them in the database.
pub fn bitcoin_data_handler_task(
l1man: Arc<L1BlockManager>,
csm_ctl: Arc<CsmController>,
mut event_rx: mpsc::Receiver<L1Event>,
params: Arc<Params>,
) -> anyhow::Result<()> {
// Parse the sequencer pubkey once here as this involves and FFI call that we don't want to be
// calling per event although it can be generated from the params passed to the relevant event
// handler.
let seq_pubkey = match params.rollup.cred_rule {
CredRule::Unchecked => None,
CredRule::SchnorrKey(buf32) => Some(
XOnlyPublicKey::try_from(buf32)
.expect("the sequencer pubkey must be valid in the params"),
),
};

while let Some(event) = event_rx.blocking_recv() {
if let Err(e) = handle_bitcoin_event(event, &l1man, csm_ctl.as_ref(), &params, seq_pubkey) {
error!(err = %e, "failed to handle L1 event");
}
}

info!("L1 event stream closed, store task exiting...");
Ok(())
}
use super::query::ReaderContext;
use crate::rpc::traits::ReaderRpc;

fn handle_bitcoin_event(
pub(crate) async fn handle_bitcoin_event<R: ReaderRpc, E: EventSubmitter>(
event: L1Event,
l1man: &L1BlockManager,
csm_ctl: &CsmController,
params: &Arc<Params>,
seq_pubkey: Option<XOnlyPublicKey>,
ctx: &ReaderContext<R>,
event_submitter: &E,
) -> anyhow::Result<()> {
let ReaderContext {
seq_pubkey,
params,
l1_manager,
..
} = ctx;
match event {
L1Event::RevertTo(revert_blk_num) => {
// L1 reorgs will be handled in L2 STF, we just have to reflect
// what the client is telling us in the database.
l1man.revert_to_height(revert_blk_num)?;
l1_manager.revert_to_height_async(revert_blk_num).await?;
debug!(%revert_blk_num, "wrote revert");

// Write to sync event db.
let ev = SyncEvent::L1Revert(revert_blk_num);
csm_ctl.submit_event(ev)?;
event_submitter.submit_event_async(ev).await?;

Ok(())
}
Expand All @@ -93,21 +56,23 @@ fn handle_bitcoin_event(
let manifest = generate_block_manifest(blockdata.block(), epoch);
let l1txs: Vec<_> = generate_l1txs(&blockdata);
let num_txs = l1txs.len();
l1man.put_block_data(blockdata.block_num(), manifest, l1txs.clone())?;
l1_manager
.put_block_data_async(blockdata.block_num(), manifest, l1txs.clone())
.await?;
info!(%height, %l1blkid, txs = %num_txs, "wrote L1 block manifest");

// Write to sync event db if it's something we care about.
let blkid: Buf32 = blockdata.block().block_hash().into();
let ev = SyncEvent::L1Block(blockdata.block_num(), blkid.into());
csm_ctl.submit_event(ev)?;
event_submitter.submit_event_async(ev).await?;

// Check for da batch and send event accordingly
debug!(?height, "Checking for da batch");
let checkpoints = check_for_da_batch(&blockdata, seq_pubkey);
let checkpoints = check_for_da_batch(&blockdata, *seq_pubkey);
debug!(?checkpoints, "Received checkpoints");
if !checkpoints.is_empty() {
let ev = SyncEvent::L1DABatch(height, checkpoints);
csm_ctl.submit_event(ev)?;
event_submitter.submit_event_async(ev).await?;
}

// TODO: Check for deposits and forced inclusions and emit appropriate events
Expand All @@ -117,7 +82,7 @@ fn handle_bitcoin_event(

L1Event::GenesisVerificationState(height, header_verification_state) => {
let ev = SyncEvent::L1BlockGenesis(height, header_verification_state);
csm_ctl.submit_event(ev)?;
event_submitter.submit_event_async(ev).await?;
Ok(())
}
}
Expand All @@ -130,15 +95,14 @@ fn check_for_da_batch(
) -> Vec<BatchCheckpointWithCommitment> {
let protocol_ops_txs = blockdata.protocol_ops_txs();

let signed_checkpts = protocol_ops_txs
.iter()
.filter_map(|ops_txs| match ops_txs.proto_op() {
strata_state::tx::ProtocolOperation::Checkpoint(envelope) => Some((
envelope,
&blockdata.block().txdata[ops_txs.index() as usize],
)),
let signed_checkpts = protocol_ops_txs.iter().flat_map(|txref| {
txref.proto_ops().iter().filter_map(|op| match op {
ProtocolOperation::Checkpoint(envelope) => {
Some((envelope, &blockdata.block().txdata[txref.index() as usize]))
}
_ => None,
});
})
});

let sig_verified_checkpoints = signed_checkpts.filter_map(|(signed_checkpoint, tx)| {
if let Some(seq_pubkey) = seq_pubkey {
Expand Down Expand Up @@ -173,56 +137,6 @@ fn check_for_da_batch(
sig_verified_checkpoints.collect()
}

/// Verify that the provided checkpoint proof is valid for the verifier key.
///
/// # Caution
///
/// If the checkpoint proof is empty, this function returns an `Ok(())`.
pub fn verify_proof(
checkpoint: &BatchCheckpoint,
proof_receipt: &ProofReceipt,
rollup_params: &RollupParams,
) -> ZkVmResult<()> {
let rollup_vk = rollup_params.rollup_vk;
let checkpoint_idx = checkpoint.batch_info().idx();
info!(%checkpoint_idx, "verifying proof");

// FIXME: we are accepting empty proofs for now (devnet) to reduce dependency on the prover
// infra.
if rollup_params.proof_publish_mode.allow_empty()
&& proof_receipt.proof().is_empty()
&& proof_receipt.public_values().is_empty()
{
warn!(%checkpoint_idx, "verifying empty proof as correct");
return Ok(());
}

let expected_public_output = checkpoint.proof_output();
let actual_public_output: CheckpointProofOutput =
borsh::from_slice(proof_receipt.public_values().as_bytes())
.map_err(|e| ZkVmError::OutputExtractionError { source: e.into() })?;
if expected_public_output != actual_public_output {
dbg!(actual_public_output, expected_public_output);
return Err(ZkVmError::ProofVerificationError(
"Public output mismatch during proof verification".to_string(),
));
}

// NOTE/TODO: this should also verify that this checkpoint is based on top of some previous
// checkpoint
match rollup_vk {
RollupVerifyingKey::Risc0VerifyingKey(vk) => {
zkaleido_risc0_adapter::verify_groth16(proof_receipt, vk.as_ref())
}
RollupVerifyingKey::SP1VerifyingKey(vk) => {
zkaleido_sp1_adapter::verify_groth16(proof_receipt, vk.as_ref())
}
// In Native Execution mode, we do not actually generate the proof to verify. Checking
// public parameters is sufficient.
RollupVerifyingKey::NativeVerifyingKey(_) => Ok(()),
}
}

/// Given a block, generates a manifest of the parts we care about that we can
/// store in the database.
fn generate_block_manifest(block: &Block, epoch: u64) -> L1BlockManifest {
Expand All @@ -239,13 +153,13 @@ fn generate_block_manifest(block: &Block, epoch: u64) -> L1BlockManifest {

fn generate_l1txs(blockdata: &BlockData) -> Vec<L1Tx> {
blockdata
.protocol_ops_txs()
.protocol_txs()
.iter()
.map(|ops_txs| {
generate_l1_tx(
blockdata.block(),
ops_txs.index(),
ops_txs.proto_op().clone(),
ops_txs.proto_ops().to_vec(),
)
})
.collect()
Expand Down
1 change: 1 addition & 0 deletions crates/btcio/src/reader/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
mod handler;
pub mod query;
mod state;
Loading