Skip to content

Commit

Permalink
Add deposits & withdrawals
Browse files Browse the repository at this point in the history
  • Loading branch information
zolting committed Nov 14, 2024
1 parent e440533 commit 3973631
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 44 deletions.
43 changes: 23 additions & 20 deletions blocks/beacon-parquet/src/deposits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,30 @@ use common::{
use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::DatabaseChanges;

use crate::{keys::deposit_keys, pb::sf::beacon::r#type::v1::Deposit};

pub fn insert_deposits(tables: &mut DatabaseChanges, clock: &Clock, deposits: &Vec<Deposit>) {
for (index, deposit) in deposits.iter().enumerate() {
let proof = hex_array_to_string(&deposit.proof);
let deposit_data = deposit.data.as_ref().unwrap();
let pubkey = bytes_to_hex(&deposit_data.public_key);
let withdrawal_credentials = bytes_to_hex(&deposit_data.withdrawal_credentials);
let signature = bytes_to_hex(&deposit_data.signature);
let gwei = deposit_data.gwei;

let keys = deposit_keys(&clock.id, index as u64);
use crate::{
keys::deposit_keys,
pb::{beacon::rawblocks::Deposit as RawDeposit, sf::beacon::r#type::v1::Deposit},
structs::BlockTimestamp,
utils::encode_hex_2d_array,
};

let row = tables
.push_change_composite("deposits", keys, 0, substreams_database_change::pb::database::table_change::Operation::Create)
.change("proof", ("", proof.as_str()))
.change("pubkey", ("", pubkey.as_str()))
.change("withdrawal_credentials", ("", withdrawal_credentials.as_str()))
.change("signature", ("", signature.as_str()))
.change("gwei", ("", gwei.to_string().as_str()));
pub fn collect_deposits(deposits: &Vec<Deposit>, timestamp: &BlockTimestamp) -> Vec<RawDeposit> {
let mut deposits_vec = Vec::<RawDeposit>::new();

insert_timestamp(row, clock, false, true);
for (index, d) in deposits.iter().enumerate() {
deposits_vec.push(RawDeposit {
block_time: Some(timestamp.time),
block_number: timestamp.number,
block_date: timestamp.date.clone(),
block_hash: timestamp.hash.clone(),
index: index as u64,
proof: encode_hex_2d_array(&d.proof),
pubkey: bytes_to_hex(&d.data.as_ref().unwrap().public_key),
withdrawal_credentials: bytes_to_hex(&d.data.as_ref().unwrap().withdrawal_credentials),
signature: bytes_to_hex(&d.data.as_ref().unwrap().signature),
gwei: d.data.as_ref().unwrap().gwei,
});
}

deposits_vec
}
16 changes: 8 additions & 8 deletions blocks/beacon-parquet/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{blobs::collect_blobs, pb::sf::beacon::r#type::v1::block::Body::*, structs::BlockTimestamp};
use crate::{blobs::collect_blobs, deposits::collect_deposits, pb::sf::beacon::r#type::v1::block::Body::*, structs::BlockTimestamp, withdrawals::collect_withdrawals};
use substreams::{errors::Error, pb::substreams::Clock};

use crate::{
Expand Down Expand Up @@ -55,8 +55,8 @@ pub fn output_deneb_body(block: &BeaconBlock, spec: &str, body: &DenebBody, time
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: collect_blobs(&body.embedded_blobs, &timestamp),
deposits: vec![],
withdrawals: vec![],
deposits: collect_deposits(&body.deposits, &timestamp),
withdrawals: collect_withdrawals(&body.execution_payload.as_ref().unwrap().withdrawals, &timestamp),
attestations: vec![],
attester_slashings: vec![],
bls_to_execution_changes: vec![],
Expand All @@ -69,8 +69,8 @@ pub fn output_capella_body(block: &BeaconBlock, spec: &str, body: &CapellaBody,
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: vec![],
deposits: vec![],
withdrawals: vec![],
deposits: collect_deposits(&body.deposits, &timestamp),
withdrawals: collect_withdrawals(&body.execution_payload.as_ref().unwrap().withdrawals, &timestamp),
attestations: vec![],
attester_slashings: vec![],
bls_to_execution_changes: vec![],
Expand All @@ -83,7 +83,7 @@ pub fn output_bellatrix_body(block: &BeaconBlock, spec: &str, body: &BellatrixBo
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: vec![],
deposits: vec![],
deposits: collect_deposits(&body.deposits, &timestamp),
withdrawals: vec![],
attestations: vec![],
attester_slashings: vec![],
Expand All @@ -97,7 +97,7 @@ pub fn output_altair_body(block: &BeaconBlock, spec: &str, body: &AltairBody, ti
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: vec![],
deposits: vec![],
deposits: collect_deposits(&body.deposits, &timestamp),
withdrawals: vec![],
attestations: vec![],
attester_slashings: vec![],
Expand All @@ -111,7 +111,7 @@ pub fn output_phase0_body(block: &BeaconBlock, spec: &str, body: &Phase0Body, ti
Events {
blocks: collect_blocks(&block, &spec, &timestamp),
blobs: vec![],
deposits: vec![],
deposits: collect_deposits(&body.deposits, &timestamp),
withdrawals: vec![],
attestations: vec![],
attester_slashings: vec![],
Expand Down
36 changes: 20 additions & 16 deletions blocks/beacon-parquet/src/withdrawals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@ use common::{blocks::insert_timestamp, utils::bytes_to_hex};
use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};

use crate::{keys::withdrawals_keys, pb::sf::beacon::r#type::v1::Withdrawal};
use crate::{
keys::withdrawals_keys,
pb::{beacon::rawblocks::Withdrawal as RawWithdrawal, sf::beacon::r#type::v1::Withdrawal},
structs::BlockTimestamp,
};

pub fn insert_withdrawals(tables: &mut DatabaseChanges, clock: &Clock, withdrawals: &Vec<Withdrawal>) {
for withdrawal in withdrawals {
let index = withdrawal.withdrawal_index;
let validator_index = withdrawal.validator_index;
let address = bytes_to_hex(&withdrawal.address);
let gwei = withdrawal.gwei;
pub fn collect_withdrawals(withdrawals: &Vec<Withdrawal>, timestamp: &BlockTimestamp) -> Vec<RawWithdrawal> {
let mut withdrawals_vec = Vec::<RawWithdrawal>::new();

let keys = withdrawals_keys(&clock.id, index);

let row = tables
.push_change_composite("withdrawals", keys, 0, table_change::Operation::Create)
.change("validator_index", ("", validator_index.to_string().as_str()))
.change("address", ("", address.as_str()))
.change("gwei", ("", gwei.to_string().as_str()));

insert_timestamp(row, clock, false, true);
for (index, w) in withdrawals.iter().enumerate() {
withdrawals_vec.push(RawWithdrawal {
block_time: Some(timestamp.time),
block_number: timestamp.number,
block_date: timestamp.date.clone(),
block_hash: timestamp.hash.clone(),
withdrawal_index: index as u64,
validator_index: w.validator_index,
address: bytes_to_hex(&w.address),
gwei: w.gwei,
});
}

withdrawals_vec
}

0 comments on commit 3973631

Please sign in to comment.