From 3973631c82293752cca59fd269492d6224bf9184 Mon Sep 17 00:00:00 2001 From: zolting <44510235+zolting@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:34:46 -0500 Subject: [PATCH] Add deposits & withdrawals --- blocks/beacon-parquet/src/deposits.rs | 43 +++++++++++++----------- blocks/beacon-parquet/src/sink.rs | 16 ++++----- blocks/beacon-parquet/src/withdrawals.rs | 36 +++++++++++--------- 3 files changed, 51 insertions(+), 44 deletions(-) diff --git a/blocks/beacon-parquet/src/deposits.rs b/blocks/beacon-parquet/src/deposits.rs index dceff71..d0e31ce 100644 --- a/blocks/beacon-parquet/src/deposits.rs +++ b/blocks/beacon-parquet/src/deposits.rs @@ -5,27 +5,30 @@ use common::{ use substreams::pb::substreams::Clock; use substreams_database_change::pb::database::DatabaseChanges; -use crate::{keys::deposit_keys, pb::sf::beacon::r#type::v1::Deposit}; - -pub fn insert_deposits(tables: &mut DatabaseChanges, clock: &Clock, deposits: &Vec) { - for (index, deposit) in deposits.iter().enumerate() { - let proof = hex_array_to_string(&deposit.proof); - let deposit_data = deposit.data.as_ref().unwrap(); - let pubkey = bytes_to_hex(&deposit_data.public_key); - let withdrawal_credentials = bytes_to_hex(&deposit_data.withdrawal_credentials); - let signature = bytes_to_hex(&deposit_data.signature); - let gwei = deposit_data.gwei; - - let keys = deposit_keys(&clock.id, index as u64); +use crate::{ + keys::deposit_keys, + pb::{beacon::rawblocks::Deposit as RawDeposit, sf::beacon::r#type::v1::Deposit}, + structs::BlockTimestamp, + utils::encode_hex_2d_array, +}; - let row = tables - .push_change_composite("deposits", keys, 0, substreams_database_change::pb::database::table_change::Operation::Create) - .change("proof", ("", proof.as_str())) - .change("pubkey", ("", pubkey.as_str())) - .change("withdrawal_credentials", ("", withdrawal_credentials.as_str())) - .change("signature", ("", signature.as_str())) - .change("gwei", ("", gwei.to_string().as_str())); +pub fn collect_deposits(deposits: &Vec, timestamp: &BlockTimestamp) -> Vec { + let mut deposits_vec = Vec::::new(); - insert_timestamp(row, clock, false, true); + for (index, d) in deposits.iter().enumerate() { + deposits_vec.push(RawDeposit { + block_time: Some(timestamp.time), + block_number: timestamp.number, + block_date: timestamp.date.clone(), + block_hash: timestamp.hash.clone(), + index: index as u64, + proof: encode_hex_2d_array(&d.proof), + pubkey: bytes_to_hex(&d.data.as_ref().unwrap().public_key), + withdrawal_credentials: bytes_to_hex(&d.data.as_ref().unwrap().withdrawal_credentials), + signature: bytes_to_hex(&d.data.as_ref().unwrap().signature), + gwei: d.data.as_ref().unwrap().gwei, + }); } + + deposits_vec } diff --git a/blocks/beacon-parquet/src/sink.rs b/blocks/beacon-parquet/src/sink.rs index dc54e19..9b3210c 100644 --- a/blocks/beacon-parquet/src/sink.rs +++ b/blocks/beacon-parquet/src/sink.rs @@ -1,4 +1,4 @@ -use crate::{blobs::collect_blobs, pb::sf::beacon::r#type::v1::block::Body::*, structs::BlockTimestamp}; +use crate::{blobs::collect_blobs, deposits::collect_deposits, pb::sf::beacon::r#type::v1::block::Body::*, structs::BlockTimestamp, withdrawals::collect_withdrawals}; use substreams::{errors::Error, pb::substreams::Clock}; use crate::{ @@ -55,8 +55,8 @@ pub fn output_deneb_body(block: &BeaconBlock, spec: &str, body: &DenebBody, time Events { blocks: collect_blocks(&block, &spec, ×tamp), blobs: collect_blobs(&body.embedded_blobs, ×tamp), - deposits: vec![], - withdrawals: vec![], + deposits: collect_deposits(&body.deposits, ×tamp), + withdrawals: collect_withdrawals(&body.execution_payload.as_ref().unwrap().withdrawals, ×tamp), attestations: vec![], attester_slashings: vec![], bls_to_execution_changes: vec![], @@ -69,8 +69,8 @@ pub fn output_capella_body(block: &BeaconBlock, spec: &str, body: &CapellaBody, Events { blocks: collect_blocks(&block, &spec, ×tamp), blobs: vec![], - deposits: vec![], - withdrawals: vec![], + deposits: collect_deposits(&body.deposits, ×tamp), + withdrawals: collect_withdrawals(&body.execution_payload.as_ref().unwrap().withdrawals, ×tamp), attestations: vec![], attester_slashings: vec![], bls_to_execution_changes: vec![], @@ -83,7 +83,7 @@ pub fn output_bellatrix_body(block: &BeaconBlock, spec: &str, body: &BellatrixBo Events { blocks: collect_blocks(&block, &spec, ×tamp), blobs: vec![], - deposits: vec![], + deposits: collect_deposits(&body.deposits, ×tamp), withdrawals: vec![], attestations: vec![], attester_slashings: vec![], @@ -97,7 +97,7 @@ pub fn output_altair_body(block: &BeaconBlock, spec: &str, body: &AltairBody, ti Events { blocks: collect_blocks(&block, &spec, ×tamp), blobs: vec![], - deposits: vec![], + deposits: collect_deposits(&body.deposits, ×tamp), withdrawals: vec![], attestations: vec![], attester_slashings: vec![], @@ -111,7 +111,7 @@ pub fn output_phase0_body(block: &BeaconBlock, spec: &str, body: &Phase0Body, ti Events { blocks: collect_blocks(&block, &spec, ×tamp), blobs: vec![], - deposits: vec![], + deposits: collect_deposits(&body.deposits, ×tamp), withdrawals: vec![], attestations: vec![], attester_slashings: vec![], diff --git a/blocks/beacon-parquet/src/withdrawals.rs b/blocks/beacon-parquet/src/withdrawals.rs index 7e78858..9e4822a 100644 --- a/blocks/beacon-parquet/src/withdrawals.rs +++ b/blocks/beacon-parquet/src/withdrawals.rs @@ -2,23 +2,27 @@ use common::{blocks::insert_timestamp, utils::bytes_to_hex}; use substreams::pb::substreams::Clock; use substreams_database_change::pb::database::{table_change, DatabaseChanges}; -use crate::{keys::withdrawals_keys, pb::sf::beacon::r#type::v1::Withdrawal}; +use crate::{ + keys::withdrawals_keys, + pb::{beacon::rawblocks::Withdrawal as RawWithdrawal, sf::beacon::r#type::v1::Withdrawal}, + structs::BlockTimestamp, +}; -pub fn insert_withdrawals(tables: &mut DatabaseChanges, clock: &Clock, withdrawals: &Vec) { - for withdrawal in withdrawals { - let index = withdrawal.withdrawal_index; - let validator_index = withdrawal.validator_index; - let address = bytes_to_hex(&withdrawal.address); - let gwei = withdrawal.gwei; +pub fn collect_withdrawals(withdrawals: &Vec, timestamp: &BlockTimestamp) -> Vec { + let mut withdrawals_vec = Vec::::new(); - let keys = withdrawals_keys(&clock.id, index); - - let row = tables - .push_change_composite("withdrawals", keys, 0, table_change::Operation::Create) - .change("validator_index", ("", validator_index.to_string().as_str())) - .change("address", ("", address.as_str())) - .change("gwei", ("", gwei.to_string().as_str())); - - insert_timestamp(row, clock, false, true); + for (index, w) in withdrawals.iter().enumerate() { + withdrawals_vec.push(RawWithdrawal { + block_time: Some(timestamp.time), + block_number: timestamp.number, + block_date: timestamp.date.clone(), + block_hash: timestamp.hash.clone(), + withdrawal_index: index as u64, + validator_index: w.validator_index, + address: bytes_to_hex(&w.address), + gwei: w.gwei, + }); } + + withdrawals_vec }