Skip to content

Commit

Permalink
Merge pull request #28 from pinax-network/feature/evm-parquet
Browse files Browse the repository at this point in the history
EVM Parquet
  • Loading branch information
DenisCarriere authored Nov 19, 2024
2 parents 87c89fb + 67c8ff3 commit 3b3d16c
Show file tree
Hide file tree
Showing 29 changed files with 1,520 additions and 651 deletions.
2 changes: 2 additions & 0 deletions blocks/evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ common = { path = "../../common" }
substreams-ethereum = { workspace = true }
substreams-database-change = { workspace = true }
substreams = { workspace = true }
prost-types = { workspace = true }
prost = { workspace = true }
40 changes: 7 additions & 33 deletions blocks/evm/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,39 +29,9 @@ run:
gui:
substreams gui -e eth.substreams.pinax.network:443 ch_out -s 20500000 -t 20501001 --production-mode

.PHONY: sql-setup
sql-setup:
# EVM blocks
substreams-sink-sql setup clickhouse://default:default@localhost:9000/eth substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/ethtest substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/eth substreams.yaml --cursors-table cursors1
substreams-sink-sql setup clickhouse://default:default@localhost:9000/eth substreams.yaml --cursors-table cursors2
substreams-sink-sql setup clickhouse://default:default@localhost:9000/eth substreams.yaml --cursors-table cursors3
substreams-sink-sql setup clickhouse://default:default@localhost:9000/base substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/bsc substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/polygon substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/arbone substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/mode substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/zora substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/xai substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/linea substreams.yaml

# testnets
substreams-sink-sql setup clickhouse://default:default@localhost:9000/arbsepolia substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/sepolia substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/chapel substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/holesky substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/amoy substreams.yaml

# not detailed EVM blocks
substreams-sink-sql setup clickhouse://default:default@localhost:9000/avalanche substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/optimism substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/blast substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/boba substreams.yaml
substreams-sink-sql setup clickhouse://default:default@localhost:9000/fantom substreams.yaml

# not compatible with EVM blocks
substreams-sink-sql setup clickhouse://default:default@localhost:9000/injective substreams.yaml
.PHONY: protogen
protogen:
substreams protogen --exclude-paths sf/substreams,google

# EVM blocks
.PHONY: sql-run-eth
Expand Down Expand Up @@ -164,6 +134,10 @@ sql-run-injective:
sql-run-fantom:
substreams-sink-sql run clickhouse://default:default@localhost:9000/fantom substreams.yaml -e fantom.substreams.pinax.network:443 87606376:87714334 --final-blocks-only --undo-buffer-size 1 --on-module-hash-mistmatch=warn --batch-block-flush-interval 100 --development-mode

.PHONY: parquet
parquet:
substreams-sink-files run eth.substreams.pinax.network:443 substreams.yaml map_events './out' 20444295:20444795 --encoder parquet --file-block-count 100 --development-mode

.PHONY: deploy
deploy:
graph build
Expand Down
31 changes: 18 additions & 13 deletions blocks/evm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@
- [x] **Account Creation**
- [x] **Gas Changes**
- [x] **Nonce Changes**
- [x] **Creation Traces**

## Graph

```mermaid
graph TD;
raw[sf.ethereum.type.v2.Block];
raw --> base(BASE)
raw --> extended(EXTENDED);
base --> blocks;
base --> logs;
base --> transactions;
extended --> traces;
extended --> balance_changes;
extended --> storage_changes;
extended --> code_changes;
extended --> account_creations;
extended --> gas_changes;
extended --> nonce_changes;
map_events[map: map_events];
sf.substreams.v1.Clock[source: sf.substreams.v1.Clock] --> map_events;
sf.ethereum.type.v2.Block[source: sf.ethereum.type.v2.Block] --> map_events;
```

## Modules

```bash
Name: map_events
Initial block: 0
Kind: map
Input: source: sf.substreams.v1.Clock
Input: source: sf.ethereum.type.v2.Block
Output Type: proto:evm.Events
Hash: ec09630461ab227a2e7448038cebaa91b49400bf
```
Binary file modified blocks/evm/logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
53 changes: 39 additions & 14 deletions blocks/evm/src/account_creations.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,47 @@
use common::blocks::insert_timestamp;
use common::structs::BlockTimestamp;
use common::utils::bytes_to_hex;
use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use substreams_ethereum::pb::eth::v2::AccountCreation;

use crate::keys::block_ordinal_keys;
use substreams_ethereum::pb::eth::v2::{AccountCreation, Block, TransactionTrace};

use crate::pb::evm::AccountCreation as AccountCreationEvent;

// https://github.com/streamingfast/firehose-ethereum/blob/1bcb32a8eb3e43347972b6b5c9b1fcc4a08c751e/proto/sf/ethereum/type/v2/type.proto#L736
// DetailLevel: EXTENDED
pub fn insert_account_creation(tables: &mut DatabaseChanges, clock: &Clock, account_creation: &AccountCreation) {
let account = bytes_to_hex(&account_creation.account);
let ordinal = account_creation.ordinal;
pub fn collect_account_creations(block: &Block, timestamp: &BlockTimestamp) -> Vec<AccountCreationEvent> {
let mut account_creations: Vec<AccountCreationEvent> = vec![];

// Collect account creations from system calls
for call in &block.system_calls {
for account_creation in &call.account_creations {
account_creations.push(parse_account_creation(account_creation, &TransactionTrace::default(), timestamp));
}
}

// Collect account creations from transaction traces
for transaction in &block.transaction_traces {
for call in &transaction.calls {
for account_creation in &call.account_creations {
account_creations.push(parse_account_creation(account_creation, transaction, timestamp));
}
}
}

account_creations
}

pub fn parse_account_creation(account_creation: &AccountCreation, transaction: &TransactionTrace, timestamp: &BlockTimestamp) -> AccountCreationEvent {
AccountCreationEvent {
// block
block_time: Some(timestamp.time),
block_number: timestamp.number,
block_hash: timestamp.hash.clone(),
block_date: timestamp.date.clone(),

let keys = block_ordinal_keys(&clock, &ordinal);
let row = tables
.push_change_composite("account_creations", keys, 0, table_change::Operation::Create)
.change("account", ("", account.as_str()))
.change("ordinal", ("", ordinal.to_string().as_str()));
// transaction
tx_hash: Some(bytes_to_hex(&transaction.hash)),

insert_timestamp(row, clock, false, true);
// account creation
account: bytes_to_hex(&account_creation.account),
ordinal: account_creation.ordinal,
}
}
72 changes: 44 additions & 28 deletions blocks/evm/src/balance_changes.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use common::blocks::insert_timestamp;
use common::utils::{bytes_to_hex, optional_bigint_to_decimal};
use common::structs::BlockTimestamp;
use common::utils::optional_bigint_to_string;
use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::{table_change, DatabaseChanges, TableChange};
use substreams_ethereum::pb::eth::v2::BalanceChange;
use common::utils::{bytes_to_hex, optional_bigint_to_decimal};
use substreams_ethereum::pb::eth::v2::{BalanceChange, Block, TransactionTrace};

use crate::keys::block_ordinal_keys;
use crate::pb::evm::BalanceChange as BalanceChangeEvent;

pub fn balance_change_reason_to_string(reason: i32) -> String {
match reason {
Expand Down Expand Up @@ -37,29 +35,47 @@ pub fn balance_change_reason_to_string(reason: i32) -> String {

// https://github.com/streamingfast/firehose-ethereum/blob/1bcb32a8eb3e43347972b6b5c9b1fcc4a08c751e/proto/sf/ethereum/type/v2/type.proto#L658
// DetailLevel: EXTENDED
pub fn insert_balance_change_row(row: &mut TableChange, balance_change: &BalanceChange) {
let address = bytes_to_hex(&balance_change.address);
let new_balance = optional_bigint_to_string(&balance_change.new_value.clone(), "0");
let old_balance = optional_bigint_to_string(&balance_change.old_value.clone(), "0");
let amount = optional_bigint_to_decimal(balance_change.new_value.clone()) - optional_bigint_to_decimal(balance_change.old_value.clone());
let ordinal = balance_change.ordinal;
let reason_code = balance_change.reason;
let reason = balance_change_reason_to_string(reason_code);
pub fn collect_balance_changes(block: &Block, timestamp: &BlockTimestamp) -> Vec<BalanceChangeEvent> {
let mut balance_changes: Vec<BalanceChangeEvent> = vec![];

row.change("address", ("", address.as_str()))
.change("new_balance", ("", new_balance.as_str()))
.change("old_balance", ("", old_balance.as_str()))
.change("amount", ("", amount.to_string().as_str()))
.change("ordinal", ("", ordinal.to_string().as_str()))
.change("reason", ("", reason.as_str()))
.change("reason_code", ("", reason_code.to_string().as_str()));
// Collect balance changes from system calls
for call in &block.system_calls {
for balance_change in &call.balance_changes {
balance_changes.push(parse_balance_change(balance_change, &TransactionTrace::default(), timestamp));
}
}

// Collect balance changes from transaction traces
for transaction in &block.transaction_traces {
for call in &transaction.calls {
for balance_change in &call.balance_changes {
balance_changes.push(parse_balance_change(balance_change, transaction, timestamp));
}
}
}

balance_changes
}

pub fn insert_balance_change(tables: &mut DatabaseChanges, clock: &Clock, balance_change: &BalanceChange) {
let ordinal = balance_change.ordinal;
let keys = block_ordinal_keys(&clock, &ordinal);
let row = tables.push_change_composite("balance_changes", keys, 0, table_change::Operation::Create);
pub fn parse_balance_change(balance_change: &BalanceChange, transaction: &TransactionTrace, timestamp: &BlockTimestamp) -> BalanceChangeEvent {
let amount = optional_bigint_to_decimal(balance_change.new_value.clone()) - optional_bigint_to_decimal(balance_change.old_value.clone());
BalanceChangeEvent {
// block
block_time: Some(timestamp.time),
block_number: timestamp.number,
block_hash: timestamp.hash.clone(),
block_date: timestamp.date.clone(),

// transaction
tx_hash: Some(bytes_to_hex(&transaction.hash)),

insert_balance_change_row(row, balance_change);
insert_timestamp(row, clock, false, true);
}
// balance changes
address: bytes_to_hex(&balance_change.address),
new_balance: optional_bigint_to_string(&balance_change.new_value, "0"),
old_balance: optional_bigint_to_string(&balance_change.old_value, "0"),
amount: amount.to_string(),
ordinal: balance_change.ordinal,
reason: balance_change_reason_to_string(balance_change.reason),
reason_code: balance_change.reason as u32,
}
}
134 changes: 47 additions & 87 deletions blocks/evm/src/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use common::blocks::insert_timestamp;
use common::utils::{bytes_to_hex, optional_u64_to_string};
use common::{keys::blocks_keys, utils::optional_bigint_to_string};
use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use common::structs::BlockTimestamp;
use common::utils::optional_bigint_to_string;
use common::utils::{bytes_to_hex, optional_bigint_to_u64, optional_u64_to_string};

use substreams_ethereum::pb::eth::v2::Block;

use crate::balance_changes::insert_balance_change;
use crate::code_changes::insert_code_change;
use crate::size::insert_size;
use crate::traces::insert_system_trace;
use crate::transactions::insert_transaction;
use crate::pb::evm::Block as BlockHeader;

pub fn block_detail_to_string(detail_level: i32) -> String {
match detail_level {
Expand All @@ -22,86 +17,51 @@ pub fn block_detail_to_string(detail_level: i32) -> String {

// https://github.com/streamingfast/firehose-ethereum/blob/develop/proto/sf/ethereum/type/v2/type.proto
// DetailLevel: BASE
pub fn insert_blocks(tables: &mut DatabaseChanges, clock: &Clock, block: &Block) {
let header: substreams_ethereum::pb::eth::v2::BlockHeader = block.header.clone().unwrap_or_default();
let parent_hash = bytes_to_hex(&header.parent_hash);
let nonce = header.nonce;
let ommers_hash = bytes_to_hex(&header.uncle_hash);
let logs_bloom = bytes_to_hex(&header.logs_bloom);
let transactions_root = bytes_to_hex(&header.transactions_root);
let state_root = bytes_to_hex(&header.state_root);
let receipts_root = bytes_to_hex(&header.receipt_root);
let miner = bytes_to_hex(&header.coinbase); // EVM Address
let mix_hash = bytes_to_hex(&header.mix_hash);
let extra_data = bytes_to_hex(&header.extra_data.clone());
let extra_data_utf8 = String::from_utf8(header.extra_data.clone()).unwrap_or_default();
let gas_limit = header.gas_limit;
let gas_used = header.gas_used;
let difficulty = optional_bigint_to_string(&header.difficulty, "0"); // UInt64
let total_difficulty = optional_bigint_to_string(&header.total_difficulty.clone(), "0"); // UInt256

// block detail levels
// https://streamingfastio.medium.com/new-block-model-to-accelerate-chain-integration-9f65126e5425
let detail_level_code = block.detail_level;
let detail_level = block_detail_to_string(detail_level_code);

// forks
let withdrawals_root = bytes_to_hex(&header.withdrawals_root); // EIP-4895 (Shangai Fork)
let parent_beacon_root = bytes_to_hex(&header.parent_beacon_root); // EIP-4788 (Dencun Fork)
let base_fee_per_gas = optional_bigint_to_string(&header.base_fee_per_gas, ""); // UInt256 - EIP-1559 (London Fork)
let excess_blob_gas = optional_u64_to_string(&header.excess_blob_gas, ""); // UInt64 - EIP-4844 (Dencun Fork)
let blob_gas_used = optional_u64_to_string(&header.blob_gas_used, ""); // UInt64 - EIP-4844 (Dencun Fork)
pub fn collect_block(block: &Block, timestamp: &BlockTimestamp) -> BlockHeader {
let header = block.header.as_ref().unwrap();

// blocks
let keys = blocks_keys(&clock, true);
let row = tables
.push_change_composite("blocks", keys, 0, table_change::Operation::Create)
.change("parent_hash", ("", parent_hash.as_str()))
.change("nonce", ("", nonce.to_string().as_str()))
.change("ommers_hash", ("", ommers_hash.as_str()))
.change("logs_bloom", ("", logs_bloom.as_str()))
.change("transactions_root", ("", transactions_root.as_str()))
.change("state_root", ("", state_root.as_str()))
.change("receipts_root", ("", receipts_root.as_str()))
.change("miner", ("", miner.as_str()))
.change("mix_hash", ("", mix_hash.as_str()))
.change("extra_data", ("", extra_data.as_str()))
.change("extra_data_utf8", ("", extra_data_utf8.as_str()))
.change("gas_limit", ("", gas_limit.to_string().as_str()))
.change("gas_used", ("", gas_used.to_string().as_str()))
.change("difficulty", ("", difficulty.as_str()))
.change("total_difficulty", ("", total_difficulty.as_str()))
// EIP-1559 (London Fork)
.change("base_fee_per_gas", ("", base_fee_per_gas.as_str()))
// EIP-4895 (Shangai Fork)
.change("withdrawals_root", ("", withdrawals_root.as_str()))
// EIP-4844 & EIP-4788 (Dencun Fork)
.change("parent_beacon_root", ("", parent_beacon_root.as_str()))
.change("excess_blob_gas", ("", excess_blob_gas.as_str()))
.change("blob_gas_used", ("", blob_gas_used.as_str()))
let total_transactions = block.transaction_traces.len() as u64;
let successful_transactions = block.transaction_traces.iter().filter(|t| t.status == 1).count() as u64;
let failed_transactions = total_transactions - successful_transactions;
let total_withdrawals = block.balance_changes.iter().filter(|t| t.reason == 16).count() as u64;

// block detail levels
.change("detail_level", ("", detail_level.as_str()))
.change("detail_level_code", ("", detail_level_code.to_string().as_str()))
;
BlockHeader {
// clock
time: Some(timestamp.time),
number: header.number,
date: timestamp.date.clone(),
hash: bytes_to_hex(&block.hash),

insert_timestamp(row, clock, true, true);
insert_size(row, &block);
// header
parent_hash: bytes_to_hex(&header.parent_hash),
nonce: header.nonce,
ommers_hash: bytes_to_hex(&header.uncle_hash),
logs_bloom: bytes_to_hex(&header.logs_bloom),
transactions_root: bytes_to_hex(&header.transactions_root),
state_root: bytes_to_hex(&header.state_root),
receipts_root: bytes_to_hex(&header.receipt_root),
withdrawals_root: bytes_to_hex(&header.withdrawals_root),
parent_beacon_root: bytes_to_hex(&header.parent_beacon_root),
miner: bytes_to_hex(&header.coinbase),
difficulty: optional_bigint_to_u64(&header.difficulty),
total_difficulty: optional_bigint_to_string(&header.total_difficulty, "0"),
mix_hash: bytes_to_hex(&header.mix_hash),
extra_data: bytes_to_hex(&header.extra_data),
extra_data_utf8: String::from_utf8(header.extra_data.clone()).unwrap_or_default(),
gas_limit: header.gas_limit,
gas_used: header.gas_used,
base_fee_per_gas: optional_bigint_to_string(&header.base_fee_per_gas, ""),
blob_gas_used: optional_u64_to_string(&header.blob_gas_used, ""),
excess_blob_gas: optional_u64_to_string(&header.excess_blob_gas, ""),

// TABLE::code_changes
for code_change in block.code_changes.iter() {
insert_code_change(tables, clock, code_change);
}
// TABLE::traces
for system_call in block.system_calls.iter() {
insert_system_trace(tables, clock, system_call);
}
// TABLE::balance_changes
for balance_change in block.balance_changes.iter() {
insert_balance_change(tables, clock, balance_change);
}
// TABLE::transactions
for transaction in block.transaction_traces.iter() {
insert_transaction( tables, clock, &transaction, &header, &detail_level);
// counters
size: block.size,
total_transactions: block.transaction_traces.len() as u64,
successful_transactions,
failed_transactions,
total_balance_changes: block.balance_changes.len() as u64,
total_withdrawals,
detail_level: block_detail_to_string(block.detail_level),
detail_level_code: block.detail_level as u32,
}
}
Loading

0 comments on commit 3b3d16c

Please sign in to comment.