Skip to content

Commit

Permalink
Add blob
Browse files Browse the repository at this point in the history
  • Loading branch information
zolting committed Nov 14, 2024
1 parent 7c1a395 commit e440533
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 58 deletions.
43 changes: 22 additions & 21 deletions blocks/beacon-parquet/src/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
use common::{blocks::insert_timestamp, utils::hex_array_to_string};
use substreams::{pb::substreams::Clock, Hex};
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use common::utils::bytes_to_hex;

use crate::{keys::blob_keys, pb::sf::beacon::r#type::v1::Blob};
use crate::{
pb::{beacon::rawblocks::Blob as RawBlob, sf::beacon::r#type::v1::Blob},
structs::BlockTimestamp,
utils::encode_hex_2d_array,
};

pub fn insert_blobs(tables: &mut DatabaseChanges, clock: &Clock, blobs: &Vec<Blob>) {
for b in blobs {
let index = b.index;
let blob = Hex::encode(&b.blob);
let kzg_commitment = Hex::encode(&b.kzg_commitment);
let kzg_proof = Hex::encode(&b.kzg_proof);
let kzg_commitment_inclusion_proof = hex_array_to_string(&b.kzg_commitment_inclusion_proof);

let keys = blob_keys(&clock.id, index);

let row = tables
.push_change_composite("blobs", keys, 0, table_change::Operation::Create)
.change("blob", ("", blob.as_str()))
.change("kzg_commitment", ("", kzg_commitment.as_str()))
.change("kzg_proof", ("", kzg_proof.as_str()))
.change("kzg_commitment_inclusion_proof", ("", kzg_commitment_inclusion_proof.as_str()));
pub fn collect_blobs(blobs: &Vec<Blob>, timestamp: &BlockTimestamp) -> Vec<RawBlob> {
let mut blobs_vec = Vec::<RawBlob>::new();

insert_timestamp(row, clock, false, true);
for b in blobs {
blobs_vec.push(RawBlob {
block_time: Some(timestamp.time),
block_number: timestamp.number,
block_date: timestamp.date.clone(),
block_hash: timestamp.hash.clone(),
index: b.index,
blob: bytes_to_hex(&b.blob),
kzg_commitment: bytes_to_hex(&b.kzg_commitment),
kzg_proof: bytes_to_hex(&b.kzg_proof),
kzg_commitment_inclusion_proof: encode_hex_2d_array(&b.kzg_commitment_inclusion_proof),
});
}

blobs_vec
}
110 changes: 74 additions & 36 deletions blocks/beacon-parquet/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,30 @@
use crate::{blobs::collect_blobs, pb::sf::beacon::r#type::v1::block::Body::*, structs::BlockTimestamp};
use substreams::{errors::Error, pb::substreams::Clock};

use crate::{
attestations::insert_attestations,
attester_slashings::insert_attester_slashings,
blocks::collect_blocks,
bls_to_execution_changes::insert_bls_to_execution_changes,
deposits::insert_deposits,
pb::{
beacon::rawblocks::Events,
sf::beacon::r#type::v1::{AltairBody, BellatrixBody, Block as BeaconBlock, CapellaBody, DenebBody, Phase0Body},
},
proposer_slashings::insert_proposer_slashings,
utils::build_timestamp,
voluntary_exits::insert_voluntary_exits,
withdrawals::insert_withdrawals,
};

use crate::blobs::insert_blobs;

#[substreams::handlers::map]
pub fn ch_out(clock: Clock, block: BeaconBlock) -> Result<Events, Error> {
let spec = spec_to_string(block.spec);

// insert_blocks(&mut tables, &block, &spec, &clock);

let body = block.body.as_ref().unwrap();
let timestamp = build_timestamp(&clock);

Ok(Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: vec![],
deposits: vec![],
withdrawals: vec![],
attestations: vec![],
attester_slashings: vec![],
bls_to_execution_changes: vec![],
proposer_slashings: vec![],
voluntary_exits: vec![],
})

// match (spec.as_str(), body) {
// ("Deneb", Deneb(body)) => insert_deneb_body(&mut tables, &clock, &body),
// ("Capella", Capella(body)) => insert_capella_body(&mut tables, &clock, &body),
// ("Bellatrix", Bellatrix(body)) => insert_bellatrix_body(&mut tables, &clock, &body),
// ("Altair", Altair(body)) => insert_altair_body(&mut tables, &clock, &body),
// ("Phase0", Phase0(body)) => insert_phase0_body(&mut tables, &clock, &body),
// _ => {}
// }
match (spec.as_str(), body) {
("Deneb", Deneb(body)) => Ok(output_deneb_body(&block, &spec, body, &timestamp)),
("Capella", Capella(body)) => Ok(output_capella_body(&block, &spec, body, &timestamp)),
("Bellatrix", Bellatrix(body)) => Ok(output_bellatrix_body(&block, &spec, body, &timestamp)),
("Altair", Altair(body)) => Ok(output_altair_body(&block, &spec, body, &timestamp)),
("Phase0", Phase0(body)) => Ok(output_phase0_body(&block, &spec, body, &timestamp)),
_ => Ok(Events::default()),
}
}

fn spec_to_string(spec: i32) -> String {
Expand All @@ -73,15 +51,75 @@ fn spec_to_string(spec: i32) -> String {
// insert_voluntary_exits(tables, clock, &body.voluntary_exits);
// }

pub fn collect_deneb_body(clock: &Clock, body: &DenebBody) {}
pub fn output_deneb_body(block: &BeaconBlock, spec: &str, body: &DenebBody, timestamp: &BlockTimestamp) -> Events {
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: collect_blobs(&body.embedded_blobs, &timestamp),
deposits: vec![],
withdrawals: vec![],
attestations: vec![],
attester_slashings: vec![],
bls_to_execution_changes: vec![],
proposer_slashings: vec![],
voluntary_exits: vec![],
}
}

pub fn collect_capella_body(clock: &Clock, body: &CapellaBody) {}
pub fn output_capella_body(block: &BeaconBlock, spec: &str, body: &CapellaBody, timestamp: &BlockTimestamp) -> Events {
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: vec![],
deposits: vec![],
withdrawals: vec![],
attestations: vec![],
attester_slashings: vec![],
bls_to_execution_changes: vec![],
proposer_slashings: vec![],
voluntary_exits: vec![],
}
}

pub fn collect_bellatrix_body(clock: &Clock, body: &BellatrixBody) {}
pub fn output_bellatrix_body(block: &BeaconBlock, spec: &str, body: &BellatrixBody, timestamp: &BlockTimestamp) -> Events {
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: vec![],
deposits: vec![],
withdrawals: vec![],
attestations: vec![],
attester_slashings: vec![],
bls_to_execution_changes: vec![],
proposer_slashings: vec![],
voluntary_exits: vec![],
}
}

pub fn collect_altair_body(clock: &Clock, body: &AltairBody) {}
pub fn output_altair_body(block: &BeaconBlock, spec: &str, body: &AltairBody, timestamp: &BlockTimestamp) -> Events {
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: vec![],
deposits: vec![],
withdrawals: vec![],
attestations: vec![],
attester_slashings: vec![],
bls_to_execution_changes: vec![],
proposer_slashings: vec![],
voluntary_exits: vec![],
}
}

pub fn collect_phase0_body(clock: &Clock, body: &Phase0Body) {}
pub fn output_phase0_body(block: &BeaconBlock, spec: &str, body: &Phase0Body, timestamp: &BlockTimestamp) -> Events {
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: vec![],
deposits: vec![],
withdrawals: vec![],
attestations: vec![],
attester_slashings: vec![],
bls_to_execution_changes: vec![],
proposer_slashings: vec![],
voluntary_exits: vec![],
}
}

// fn insert_capella_body(tables: &mut DatabaseChanges, clock: &Clock, body: &CapellaBody) {
// insert_deposits(tables, clock, &body.deposits);
Expand Down
6 changes: 5 additions & 1 deletion blocks/beacon-parquet/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use common::utils::block_time_to_date;
use common::utils::{block_time_to_date, bytes_to_hex};
use substreams::pb::substreams::Clock;

use crate::structs::BlockTimestamp;
Expand All @@ -14,3 +14,7 @@ pub fn build_timestamp(clock: &Clock) -> BlockTimestamp {
number: clock.number,
}
}

pub fn encode_hex_2d_array(hex_array: &Vec<Vec<u8>>) -> Vec<String> {
hex_array.iter().map(|bytes| bytes_to_hex(bytes)).collect()
}

0 comments on commit e440533

Please sign in to comment.