Skip to content

Commit

Permalink
Add state update data (#391)
Browse files Browse the repository at this point in the history
### Summary

This PR introduces state update data to Starknet.
It also includes small changes to the `HeaderFilter` for all three
networks.
  • Loading branch information
fracek authored Oct 20, 2024
2 parents 48f5356 + c8db6ce commit b1bdd04
Show file tree
Hide file tree
Showing 22 changed files with 786 additions and 83 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beaconchain/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apibara-dna-beaconchain"
version = "2.0.0-beta.2"
version = "2.0.0-beta.3"
edition.workspace = true
authors.workspace = true
repository.workspace = true
Expand Down
19 changes: 15 additions & 4 deletions beaconchain/src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ mod helpers;
mod transaction;
mod validator;

use apibara_dna_common::{data_stream::BlockFilterFactory, query::BlockFilter};
use apibara_dna_common::{
data_stream::BlockFilterFactory,
query::{BlockFilter, HeaderFilter},
};
use apibara_dna_protocol::beaconchain;
use prost::Message;

Expand Down Expand Up @@ -44,9 +47,17 @@ impl BlockFilterExt for beaconchain::Filter {
fn compile_to_block_filter(&self) -> tonic::Result<BlockFilter, tonic::Status> {
let mut block_filter = BlockFilter::default();

if self.header.map(|h| h.always()).unwrap_or(false) {
block_filter.set_always_include_header(true);
let header_filter = match beaconchain::HeaderFilter::try_from(self.header) {
Ok(beaconchain::HeaderFilter::Always) => Some(HeaderFilter::Always),
Ok(beaconchain::HeaderFilter::OnData) => Some(HeaderFilter::OnData),
Ok(beaconchain::HeaderFilter::OnDataOrOnNewBlock) => {
Some(HeaderFilter::OnDataOrOnNewBlock)
}
_ => None,
}
.unwrap_or_default();

block_filter.set_header_filter(header_filter);

for filter in self.transactions.iter() {
let filter = filter.compile_to_filter()?;
Expand All @@ -63,7 +74,7 @@ impl BlockFilterExt for beaconchain::Filter {
block_filter.add_filter(filter);
}

if !block_filter.always_include_header && block_filter.is_empty() {
if !block_filter.always_include_header() && block_filter.is_empty() {
return Err(tonic::Status::invalid_argument("no filters provided"));
}

Expand Down
35 changes: 28 additions & 7 deletions common/src/data_stream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
file_cache::FileCacheError,
fragment::{FragmentId, HEADER_FRAGMENT_ID},
join::ArchivedJoinTo,
query::BlockFilter,
query::{BlockFilter, HeaderFilter},
segment::SegmentGroup,
Cursor,
};
Expand Down Expand Up @@ -332,6 +332,7 @@ impl DataStream {
segments.push((current_segment_cursor, blocks));

// prefetch_tasks.join_all().await;
let finality = DataFinality::Finalized;

for (segment_cursor, segment_data) in segments {
if ct.is_cancelled() || tx.is_closed() {
Expand All @@ -351,12 +352,15 @@ impl DataStream {
let proto_end_cursor: Option<ProtoCursor> = Some(block.end_cursor.clone().into());

let mut blocks = Vec::new();
if self.filter_fragment(&fragment_access, &mut blocks).await? {
if self
.filter_fragment(&fragment_access, &finality, &mut blocks)
.await?
{
let data = Message::Data(Data {
cursor: proto_cursor.clone(),
end_cursor: proto_end_cursor.clone(),
data: blocks,
finality: DataFinality::Finalized as i32,
finality: finality as i32,
});

let Some(Ok(permit)) = ct.run_until_cancelled(tx.reserve()).await else {
Expand Down Expand Up @@ -435,6 +439,8 @@ impl DataStream {

let segment_cursor = Cursor::new_finalized(segment_start);

let finality = DataFinality::Finalized;

for block in blocks {
use apibara_dna_protocol::dna::stream::Cursor as ProtoCursor;

Expand All @@ -447,12 +453,15 @@ impl DataStream {
let proto_end_cursor: Option<ProtoCursor> = Some(block.end_cursor.clone().into());

let mut blocks = Vec::new();
if self.filter_fragment(&fragment_access, &mut blocks).await? {
if self
.filter_fragment(&fragment_access, &finality, &mut blocks)
.await?
{
let data = Message::Data(Data {
cursor: proto_cursor.clone(),
end_cursor: proto_end_cursor.clone(),
data: blocks,
finality: DataFinality::Finalized as i32,
finality: finality as i32,
});

let Some(Ok(permit)) = ct.run_until_cancelled(tx.reserve()).await else {
Expand Down Expand Up @@ -501,7 +510,10 @@ impl DataStream {
let fragment_access = FragmentAccess::new_in_block(self.store.clone(), cursor.clone());

let mut blocks = Vec::new();
if self.filter_fragment(&fragment_access, &mut blocks).await? {
if self
.filter_fragment(&fragment_access, &finality, &mut blocks)
.await?
{
let data = Message::Data(Data {
cursor: proto_cursor.clone(),
end_cursor: proto_end_cursor.clone(),
Expand Down Expand Up @@ -532,6 +544,7 @@ impl DataStream {
async fn filter_fragment(
&mut self,
fragment_access: &FragmentAccess,
finality: &DataFinality,
output: &mut Vec<Bytes>,
) -> Result<bool, DataStreamError> {
let current_span = tracing::Span::current();
Expand Down Expand Up @@ -627,7 +640,15 @@ impl DataStream {
}
}

if block_filter.always_include_header || !fragment_matches.is_empty() {
let should_send_header = match block_filter.header_filter {
HeaderFilter::Always => true,
HeaderFilter::OnData => !fragment_matches.is_empty(),
HeaderFilter::OnDataOrOnNewBlock => {
!fragment_matches.is_empty() || *finality != DataFinality::Finalized
}
};

if should_send_header {
let header = fragment_access
.get_header_fragment()
.await
Expand Down
23 changes: 20 additions & 3 deletions common/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ use crate::{

pub type FilterId = u32;

#[derive(Debug, Clone)]
pub enum HeaderFilter {
Always,
OnData,
OnDataOrOnNewBlock,
}

/// Filter a fragment based on the values from this index.
#[derive(Debug, Clone)]
pub struct Condition {
Expand Down Expand Up @@ -38,13 +45,17 @@ pub struct Filter {
/// A collection of filters.
#[derive(Debug, Clone, Default)]
pub struct BlockFilter {
pub always_include_header: bool,
pub header_filter: HeaderFilter,
filters: BTreeMap<FragmentId, Vec<Filter>>,
}

impl BlockFilter {
pub fn set_always_include_header(&mut self, value: bool) {
self.always_include_header = value;
pub fn always_include_header(&self) -> bool {
matches!(self.header_filter, HeaderFilter::Always)
}

pub fn set_header_filter(&mut self, value: HeaderFilter) {
self.header_filter = value;
}

/// Add a filter to the block filter.
Expand Down Expand Up @@ -110,3 +121,9 @@ impl std::fmt::Display for FilterError {
write!(f, "failed to filter block")
}
}

impl Default for HeaderFilter {
fn default() -> Self {
Self::OnData
}
}
2 changes: 1 addition & 1 deletion evm/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apibara-dna-evm"
version = "2.0.0-beta.2"
version = "2.0.0-beta.3"
edition.workspace = true
authors.workspace = true
repository.workspace = true
Expand Down
17 changes: 13 additions & 4 deletions evm/src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ mod log;
mod transaction;
mod withdrawal;

use apibara_dna_common::{data_stream::BlockFilterFactory, query::BlockFilter};
use apibara_dna_common::{
data_stream::BlockFilterFactory,
query::{BlockFilter, HeaderFilter},
};
use apibara_dna_protocol::evm;
use prost::Message;

Expand Down Expand Up @@ -44,9 +47,15 @@ impl BlockFilterExt for evm::Filter {
fn compile_to_block_filter(&self) -> tonic::Result<BlockFilter, tonic::Status> {
let mut block_filter = BlockFilter::default();

if self.header.map(|h| h.always()).unwrap_or(false) {
block_filter.set_always_include_header(true);
let header_filter = match evm::HeaderFilter::try_from(self.header) {
Ok(evm::HeaderFilter::Always) => Some(HeaderFilter::Always),
Ok(evm::HeaderFilter::OnData) => Some(HeaderFilter::OnData),
Ok(evm::HeaderFilter::OnDataOrOnNewBlock) => Some(HeaderFilter::OnDataOrOnNewBlock),
_ => None,
}
.unwrap_or_default();

block_filter.set_header_filter(header_filter);

for filter in self.withdrawals.iter() {
let filter = filter.compile_to_filter()?;
Expand All @@ -63,7 +72,7 @@ impl BlockFilterExt for evm::Filter {
block_filter.add_filter(filter);
}

if !block_filter.always_include_header && block_filter.is_empty() {
if !block_filter.always_include_header() && block_filter.is_empty() {
return Err(tonic::Status::invalid_argument("no filters provided"));
}

Expand Down
8 changes: 5 additions & 3 deletions protocol/proto/beaconchain/v2/filter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ message Filter {
repeated BlobFilter blobs = 4;
}

message HeaderFilter {
// Always include header data. Defaults to `false`.
optional bool always = 1;
enum HeaderFilter {
HEADER_FILTER_UNSPECIFIED = 0;
HEADER_FILTER_ALWAYS = 1;
HEADER_FILTER_ON_DATA = 2;
HEADER_FILTER_ON_DATA_OR_ON_NEW_BLOCK = 3;
}

message TransactionFilter {
Expand Down
8 changes: 5 additions & 3 deletions protocol/proto/evm/v2/filter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ message Filter {
repeated LogFilter logs = 4;
}

message HeaderFilter {
// Always include header data. Defaults to `false`.
optional bool always = 1;
enum HeaderFilter {
HEADER_FILTER_UNSPECIFIED = 0;
HEADER_FILTER_ALWAYS = 1;
HEADER_FILTER_ON_DATA = 2;
HEADER_FILTER_ON_DATA_OR_ON_NEW_BLOCK = 3;
}

message WithdrawalFilter {
Expand Down
68 changes: 68 additions & 0 deletions protocol/proto/starknet/v2/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ message Block {
repeated Event events = 4;
// List of messages.
repeated MessageToL1 messages = 5;
// List of storage changes by contract.
repeated StorageDiff storage_diffs = 6;
// List of contract/class changes.
repeated ContractChange contract_changes = 7;
// List of nonce updates.
repeated NonceUpdate nonce_updates = 8;
}

// Block header.
Expand Down Expand Up @@ -383,3 +389,65 @@ enum DataAvailabilityMode {
// L2.
DATA_AVAILABILITY_MODE_L2 = 2;
}

// Difference in storage values for a contract.
message StorageDiff {
repeated uint32 filter_ids = 1;
// The contract address.
FieldElement contract_address = 2;
// Entries that changed.
repeated StorageEntry storage_entries = 3;
}

// Storage entry.
message StorageEntry {
// Storage location.
FieldElement key = 1;
// Storage value.
FieldElement value = 2;
}

// A class/contract change.
message ContractChange {
repeated uint32 filter_ids = 1;
oneof change {
DeclaredClass declared_class = 2;
ReplacedClass replaced_class = 3;
DeployedContract deployed_contract = 4;
}
}

// Class declared.
message DeclaredClass {
// Class hash of the newly declared class.
FieldElement class_hash = 1;
// Hash of the cairo assembly resulting from the sierra compilation.
//
// If undefined, it's the result of a deprecated Cairo 0 declaration.
FieldElement compiled_class_hash = 2;
}

// Class replaced.
message ReplacedClass {
// The address of the contract whose class was replaced.
FieldElement contract_address = 1;
// The new class hash.
FieldElement class_hash = 2;
}

// Contract deployed.
message DeployedContract {
// Address of the newly deployed contract.
FieldElement contract_address = 1;
// Class hash of the deployed contract.
FieldElement class_hash = 2;
}

// Nonce update.
message NonceUpdate {
repeated uint32 filter_ids = 1;
// Contract address.
FieldElement contract_address = 2;
// New nonce value.
FieldElement nonce = 3;
}
Loading

0 comments on commit b1bdd04

Please sign in to comment.