Skip to content

Commit

Permalink
Update DaService interface to support re-orgs (#1118)
Browse files Browse the repository at this point in the history
* [no ci] Updating `DaService` interface for re-orgs

* Fix tests

* [no ci]  WIP

* Mock Da service uses broadcast stream

* [no ci] Keep with streams

* Implement stream mappers everywhere

* Documentation updates

* Update documentation of `get_last_finalized_block_header`

* Minor comment update

* Make it block on get_finalized_height
  • Loading branch information
citizen-stig authored Nov 20, 2023
1 parent d42e289 commit 1ae8d09
Show file tree
Hide file tree
Showing 20 changed files with 801 additions and 142 deletions.
159 changes: 91 additions & 68 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ byteorder = { version = "1.5.0", default-features = false }
bytes = { version = "1.2.1", default-features = false }
digest = { version = "0.10.6", default-features = false, features = ["alloc"] }
futures = "0.3"
pin-project = { version = "1.1.3" }
hex = { version = "0.4.3", default-features = false, features = ["alloc", "serde"] }
once_cell = { version = "1.10.0", default-features = false, features = ["alloc"] }
prometheus = { version = "0.13.3", default-features = false }
Expand Down
21 changes: 17 additions & 4 deletions adapters/avail/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ repository.workspace = true
[dependencies]
borsh = { workspace = true, features = ["bytes"] }
sov-rollup-interface = { path = "../../rollup-interface" }
bytes = { version = "1.2.1", features = ["serde"]}
primitive-types = { version = "0.12.2", features = ["serde"]}
bytes = { version = "1.2.1", features = ["serde"] }
primitive-types = { version = "0.12.2", features = ["serde"] }
sp-core-hashing = "13.0.0"
subxt = { version = "0.29", optional = true }
avail-subxt = { git = "https://github.com/availproject/avail.git", tag = "v1.6.3", features = ["std"], optional = true }
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"], optional = true }

#Convenience
# Convenience
tokio = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
pin-project = { workspace = true, optional = true }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.17", features = ["fmt"] }
async-trait = { workspace = true }
Expand All @@ -35,4 +37,15 @@ sp-core = { version = "21", optional = true }

[features]
default = ["native"]
native = ["dep:tokio", "dep:codec", "dep:reqwest", "dep:avail-subxt", "dep:subxt", "dep:sp-keyring", "dep:sp-core", "sov-rollup-interface/native"]
native = [
"dep:tokio",
"dep:codec",
"dep:reqwest",
"dep:avail-subxt",
"dep:subxt",
"dep:futures",
"dep:pin-project",
"dep:sp-keyring",
"dep:sp-core",
"sov-rollup-interface/native"
]
72 changes: 63 additions & 9 deletions adapters/avail/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use core::time::Duration;
use std::pin::Pin;
use std::task::{Context, Poll};

use anyhow::anyhow;
use async_trait::async_trait;
use avail_subxt::api::runtime_types::sp_core::bounded::bounded_vec::BoundedVec;
use avail_subxt::primitives::AvailExtrinsicParams;
use avail_subxt::{api, AvailConfig};
use pin_project::pin_project;
use reqwest::StatusCode;
use sov_rollup_interface::da::DaSpec;
use sov_rollup_interface::services::da::DaService;
Expand Down Expand Up @@ -153,19 +156,45 @@ async fn wait_for_appdata(
}
}

/// Convincice copy from [`subxt::blocks::BlocksClient`]
type BlockStream<T> = Pin<Box<dyn futures::Stream<Item = Result<T, subxt::Error>> + Send>>;
/// Wrapper around return of [`subxt::blocks::BlocksClient::subscribe_finalized`]
/// that maps `Ok` variant to [`AvailHeader`] and `Error` variant to [`anyhow::Error`]
#[pin_project]
pub struct AvailBlockHeaderStream {
#[pin]
inner: BlockStream<subxt::blocks::Block<AvailConfig, OnlineClient<AvailConfig>>>,
}

impl futures::Stream for AvailBlockHeaderStream {
type Item = anyhow::Result<AvailHeader>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let poll_result = this.inner.poll_next(cx);
Poll::Ready(match poll_result {
Poll::Ready(Some(Ok(block))) => Some(Ok(AvailHeader::from(block))),
Poll::Ready(Some(Err(e))) => Some(Err(e.into())),
Poll::Ready(None) => None,
Poll::Pending => return Poll::Pending,
})
}
}

#[async_trait]
impl DaService for DaProvider {
type Spec = DaLayerSpec;

type FilteredBlock = AvailBlock;

type Verifier = Verifier;

type FilteredBlock = AvailBlock;
type HeaderStream = AvailBlockHeaderStream;

type Error = anyhow::Error;

// Make an RPC call to the node to get the finalized block at the given height, if one exists.
// Make an RPC call to the node to get the block at the given height, if one exists.
// If no such block exists, block until one does.
async fn get_finalized_at(&self, height: u64) -> Result<Self::FilteredBlock, Self::Error> {
async fn get_block_at(&self, height: u64) -> Result<Self::FilteredBlock, Self::Error> {
let node_client = self.node_client.clone();
let confidence_url = self.confidence_url(height);
let appdata_url = self.appdata_url(height);
Expand Down Expand Up @@ -204,10 +233,35 @@ impl DaService for DaProvider {
})
}

// Make an RPC call to the node to get the block at the given height
// If no such block exists, block until one does.
async fn get_block_at(&self, height: u64) -> Result<Self::FilteredBlock, Self::Error> {
self.get_finalized_at(height).await
async fn get_last_finalized_block_header(
&self,
) -> Result<<Self::Spec as DaSpec>::BlockHeader, Self::Error> {
let node_client = self.node_client.clone();
let finalized_header_hash = node_client.rpc().finalized_head().await?;

let header = node_client
.rpc()
.header(Some(finalized_header_hash))
.await?
.ok_or(anyhow::anyhow!("No finalized head found"))?;
let header = AvailHeader::new(header, finalized_header_hash);
Ok(header)
}

async fn subscribe_finalized_header(&self) -> Result<Self::HeaderStream, Self::Error> {
let block_stream = self.node_client.blocks().subscribe_finalized().await?;
Ok(AvailBlockHeaderStream {
inner: block_stream,
})
}

async fn get_head_block_header(
&self,
) -> Result<<Self::Spec as DaSpec>::BlockHeader, Self::Error> {
let node_client = self.node_client.clone();
let latest_block = node_client.blocks().at_latest().await?;

Ok(latest_block.into())
}

// Extract the blob transactions relevant to a particular rollup from a block.
Expand All @@ -220,7 +274,7 @@ impl DaService for DaProvider {
block.transactions.clone()
}

// Extract the inclusion and completenss proof for filtered block provided.
// Extract the inclusion and completeness proof for filtered block provided.
// The output of this method will be passed to the verifier.
// NOTE: The light client here has already completed DA sampling and verification of inclusion and soundness.
async fn get_extraction_proof(
Expand Down
19 changes: 19 additions & 0 deletions adapters/avail/src/spec/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,22 @@ impl BlockHeaderTrait for AvailHeader {
)
}
}

#[cfg(feature = "native")]
impl
From<
subxt::blocks::Block<
avail_subxt::AvailConfig,
subxt::OnlineClient<avail_subxt::AvailConfig>,
>,
> for AvailHeader
{
fn from(
block: subxt::blocks::Block<
avail_subxt::AvailConfig,
subxt::OnlineClient<avail_subxt::AvailConfig>,
>,
) -> Self {
AvailHeader::new(block.header().clone(), block.hash())
}
}
8 changes: 4 additions & 4 deletions adapters/avail/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ pub mod transaction;
pub struct DaLayerSpec;

impl DaSpec for DaLayerSpec {
type ValidityCondition = ChainValidityCondition;

type SlotHash = hash::AvailHash;

type ChainParams = ();

type BlockHeader = header::AvailHeader;

type BlobTransaction = transaction::AvailBlobTransaction;

type Address = address::AvailAddress;

type ValidityCondition = ChainValidityCondition;

type InclusionMultiProof = ();

type CompletenessProof = ();

type ChainParams = ();
}
24 changes: 14 additions & 10 deletions adapters/celestia/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ celestia-types = { git = "https://github.com/eigerco/celestia-node-rs.git", rev
tendermint = { git = "https://github.com/eigerco/celestia-tendermint-rs.git", rev = "ef58b85", default-features = false }
tendermint-proto = { git = "https://github.com/eigerco/celestia-tendermint-rs.git", rev = "ef58b85" }
nmt-rs = { git = "https://github.com/Sovereign-Labs/nmt-rs.git", rev = "d821332", features = [
"serde",
"borsh",
"serde",
"borsh",
] }

# Convenience
Expand All @@ -34,6 +34,8 @@ jsonrpsee = { workspace = true, features = ["http-client"], optional = true }
serde = { workspace = true }
serde_json = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
pin-project = { workspace = true, optional = true }
thiserror = { workspace = true }
tracing = { workspace = true }
sov-zk-cycle-macros = { path = "../../utils/zk-cycle-macros", version = "0.3", optional = true }
Expand All @@ -52,16 +54,18 @@ wiremock = "0.5"
[features]
default = []
native = [
"dep:tokio",
"dep:jsonrpsee",
"dep:serde_json",
"dep:celestia-rpc",
"tendermint/default",
"sov-rollup-interface/native",
"dep:tokio",
"dep:futures",
"dep:pin-project",
"dep:jsonrpsee",
"dep:serde_json",
"dep:celestia-rpc",
"tendermint/default",
"sov-rollup-interface/native",
]
risc0 = [
"dep:risc0-zkvm",
"dep:risc0-zkvm-platform"
"dep:risc0-zkvm",
"dep:risc0-zkvm-platform"
]
bench = [
"sov-zk-cycle-macros/bench",
Expand Down
8 changes: 7 additions & 1 deletion adapters/celestia/src/celestia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Context;
use borsh::{BorshDeserialize, BorshSerialize};
use celestia_proto::celestia::blob::v1::MsgPayForBlobs;
use celestia_proto::cosmos::tx::v1beta1::Tx;
use celestia_types::DataAvailabilityHeader;
use celestia_types::{DataAvailabilityHeader, ExtendedHeader};
use prost::bytes::Buf;
use prost::Message;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -187,6 +187,12 @@ impl CelestiaHeader {
}
}

impl From<celestia_types::ExtendedHeader> for CelestiaHeader {
fn from(extended_header: ExtendedHeader) -> Self {
CelestiaHeader::new(extended_header.dah, extended_header.header.into())
}
}

#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] // TODO: , BorshDeserialize, BorshSerialize)]
pub struct BlobWithSender {
pub blob: CountedBufReader<BlobIterator>,
Expand Down
64 changes: 60 additions & 4 deletions adapters/celestia/src/da_service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::pin::Pin;
use std::task::{Context, Poll};

use async_trait::async_trait;
use celestia_rpc::prelude::*;
use celestia_types::blob::{Blob as JsonBlob, Commitment, SubmitOptions};
use celestia_types::consts::appconsts::{
CONTINUATION_SPARSE_SHARE_CONTENT_SIZE, FIRST_SPARSE_SHARE_CONTENT_SIZE, SHARE_SIZE,
};
use celestia_types::nmt::Namespace;
use celestia_types::ExtendedHeader;
use jsonrpsee::core::client::Subscription;
use jsonrpsee::http_client::{HeaderMap, HttpClient};
use pin_project::pin_project;
use sov_rollup_interface::da::CountedBufReader;
use sov_rollup_interface::services::da::DaService;
use tracing::{debug, info, instrument, trace};
Expand All @@ -15,7 +21,7 @@ use crate::types::FilteredCelestiaBlock;
use crate::utils::BoxError;
use crate::verifier::proofs::{CompletenessProof, CorrectnessProof};
use crate::verifier::{CelestiaSpec, CelestiaVerifier, RollupParams, PFB_NAMESPACE};
use crate::BlobWithSender;
use crate::{BlobWithSender, CelestiaHeader};

// Approximate value, just to make it work.
const GAS_PER_BYTE: usize = 20;
Expand Down Expand Up @@ -89,18 +95,51 @@ impl CelestiaService {
}
}

/// A Wrapper around [`Subscription`] that converts [`ExtendedHeader`] to [`CelestiaHeader`]
/// and converts [`Error`] to [`BoxError`]
#[pin_project]
pub struct CelestiaBlockHeaderSubscription {
#[pin]
inner: Subscription<ExtendedHeader>,
}

impl CelestiaBlockHeaderSubscription {
/// Create a new [`CelestiaBlockHeaderSubscription`] from [`Subscription`]
pub fn new(inner: Subscription<ExtendedHeader>) -> Self {
Self { inner }
}
}

impl futures::Stream for CelestiaBlockHeaderSubscription {
type Item = Result<CelestiaHeader, BoxError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let poll_result = this.inner.poll_next(cx);
Poll::Ready(match poll_result {
Poll::Ready(Some(Ok(extended_header))) => {
Some(Ok(CelestiaHeader::from(extended_header)))
}
Poll::Ready(Some(Err(e))) => Some(Err(e.into())),
Poll::Ready(None) => None,
Poll::Pending => return Poll::Pending,
})
}
}

#[async_trait]
impl DaService for CelestiaService {
type Spec = CelestiaSpec;

type Verifier = CelestiaVerifier;

type FilteredBlock = FilteredCelestiaBlock;
type HeaderStream = CelestiaBlockHeaderSubscription;

type Error = BoxError;

#[instrument(skip(self), err)]
async fn get_finalized_at(&self, height: u64) -> Result<Self::FilteredBlock, Self::Error> {
async fn get_block_at(&self, height: u64) -> Result<Self::FilteredBlock, Self::Error> {
let client = self.client.clone();
let rollup_namespace = self.rollup_namespace;

Expand All @@ -127,8 +166,25 @@ impl DaService for CelestiaService {
)
}

async fn get_block_at(&self, height: u64) -> Result<Self::FilteredBlock, Self::Error> {
self.get_finalized_at(height).await
async fn get_last_finalized_block_header(
&self,
) -> Result<<Self::Spec as sov_rollup_interface::da::DaSpec>::BlockHeader, Self::Error> {
// Tendermint has instant finality, so head block is the one that finalized
// and network is always guaranteed to be secure,
// it can work even if the node is still catching up
self.get_head_block_header().await
}

async fn subscribe_finalized_header(&self) -> Result<Self::HeaderStream, Self::Error> {
let subscription = self.client.header_subscribe().await?;
Ok(CelestiaBlockHeaderSubscription::new(subscription))
}

async fn get_head_block_header(
&self,
) -> Result<<Self::Spec as sov_rollup_interface::da::DaSpec>::BlockHeader, Self::Error> {
let header = self.client.header_network_head().await?;
Ok(CelestiaHeader::from(header))
}

fn extract_relevant_blobs(
Expand Down
Loading

0 comments on commit 1ae8d09

Please sign in to comment.