From 3e14fa36aa28d5f09184191495d2bb048f957869 Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Sun, 24 Sep 2023 13:18:45 +0530 Subject: [PATCH 1/3] Initial attempt to make Source generic over Provider --- Cargo.lock | 1 + Cargo.toml | 1 + crates/cli/Cargo.toml | 1 + crates/cli/src/parse/source.rs | 58 +++++++++++++++---- crates/freeze/src/collect.rs | 8 ++- crates/freeze/src/datasets/balance_diffs.rs | 5 +- crates/freeze/src/datasets/balances.rs | 5 +- crates/freeze/src/datasets/blocks.rs | 8 +-- .../src/datasets/blocks_and_transactions.rs | 6 +- crates/freeze/src/datasets/code_diffs.rs | 6 +- crates/freeze/src/datasets/codes.rs | 6 +- crates/freeze/src/datasets/contracts.rs | 9 ++- crates/freeze/src/datasets/erc20_balances.rs | 12 ++-- crates/freeze/src/datasets/erc20_metadata.rs | 9 ++- crates/freeze/src/datasets/erc20_supplies.rs | 7 ++- crates/freeze/src/datasets/erc20_transfers.rs | 9 ++- .../freeze/src/datasets/erc721_transfers.rs | 9 ++- crates/freeze/src/datasets/eth_calls.rs | 9 ++- crates/freeze/src/datasets/logs.rs | 13 +++-- .../freeze/src/datasets/native_transfers.rs | 9 ++- crates/freeze/src/datasets/nonce_diffs.rs | 5 +- crates/freeze/src/datasets/nonces.rs | 9 ++- crates/freeze/src/datasets/state_diffs.rs | 21 ++++--- crates/freeze/src/datasets/storage_diffs.rs | 5 +- crates/freeze/src/datasets/storages.rs | 9 ++- crates/freeze/src/datasets/trace_calls.rs | 9 ++- crates/freeze/src/datasets/traces.rs | 13 +++-- .../src/datasets/transaction_addresses.rs | 17 +++--- crates/freeze/src/datasets/transactions.rs | 6 +- crates/freeze/src/datasets/vm_traces.rs | 13 +++-- crates/freeze/src/freeze.rs | 7 ++- crates/freeze/src/types/datatypes/multi.rs | 10 ++-- crates/freeze/src/types/datatypes/scalar.rs | 9 +-- crates/freeze/src/types/sources.rs | 4 +- 34 files changed, 217 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2824dd19..961e8e73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -870,6 +870,7 @@ dependencies = [ "serde_json", "thousands", "tokio", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 29742dde..08ec8a94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ serde_json = "1.0.104" thiserror = "1.0.40" thousands = "0.2.0" tokio = { version = "1.29.0", features = ["macros", "rt-multi-thread", "sync"] } +url = { version = "2.4", default-features = false } [profile.dev] incremental = true diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 8e136a09..6780b683 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -36,3 +36,4 @@ serde = { workspace = true } serde_json = { workspace = true } thousands = { workspace = true } tokio = { workspace = true } +url = { workspace = true } diff --git a/crates/cli/src/parse/source.rs b/crates/cli/src/parse/source.rs index 10cbd0a8..d8a110d1 100644 --- a/crates/cli/src/parse/source.rs +++ b/crates/cli/src/parse/source.rs @@ -11,11 +11,13 @@ use crate::args::Args; pub(crate) async fn parse_source(args: &Args) -> Result { // parse network info - let rpc_url = parse_rpc_url(args); - let provider = - Provider::>::new_client(&rpc_url, args.max_retries, args.initial_backoff) - .map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?; - let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64(); + let provider = parse_rpc_url(args).await; + let chain_id: u64; + let ws_url = "wss://eth-mainnet.ws.alchemyapi.io/ws/demo"; + + if let RpcProvider::Http(provider) | RpcProvider::Ws(provider) = provider { + chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64(); + } let rate_limiter = match args.requests_per_second { Some(rate_limit) => match NonZeroU32::new(rate_limit) { @@ -35,6 +37,25 @@ pub(crate) async fn parse_source(args: &Args) -> Result { let semaphore = tokio::sync::Semaphore::new(max_concurrent_requests as usize); let semaphore = Some(semaphore); + match provider { + RpcProvider::Http(provider) => { + chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64(); + Fetcher { provider, semaphore, rate_limiter } + } + RpcProvider::Ws(provider) => { + chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64(); + let fetcher = Fetcher { provider, semaphore, rate_limiter }; + let output = Source { + fetcher: Arc::new(fetcher), + chain_id, + inner_request_size: args.inner_request_size, + max_concurrent_chunks, + }; + + Ok(output) + } + } + let fetcher = Fetcher { provider, semaphore, rate_limiter }; let output = Source { fetcher: Arc::new(fetcher), @@ -46,7 +67,7 @@ pub(crate) async fn parse_source(args: &Args) -> Result { Ok(output) } -fn parse_rpc_url(args: &Args) -> String { +async fn parse_rpc_url(args: &Args) -> RpcProvider { let mut url = match &args.rpc { Some(url) => url.clone(), _ => match env::var("ETH_RPC_URL") { @@ -57,8 +78,25 @@ fn parse_rpc_url(args: &Args) -> String { } }, }; - if !url.starts_with("http") { - url = "http://".to_string() + url.as_str(); - }; - url + if url.starts_with("wss") { + let ws_provider = Provider::::connect(url) + .await + .map_err(|_e| ParseError::ParseError("could not connect to ws_provider".to_string())) + .unwrap(); + return RpcProvider::Ws(ws_provider) + } else { + if !url.starts_with("http") { + url = "http://".to_string() + url.as_str(); + } + let http_provider = + Provider::>::new_client(&url, args.max_retries, args.initial_backoff) + .map_err(|_e| ParseError::ParseError("could not connect to provider".to_string())) + .unwrap(); + return RpcProvider::Http(http_provider) + } +} + +enum RpcProvider { + Http(Provider>), + Ws(Provider), } diff --git a/crates/freeze/src/collect.rs b/crates/freeze/src/collect.rs index 956a1045..af8c2758 100644 --- a/crates/freeze/src/collect.rs +++ b/crates/freeze/src/collect.rs @@ -1,11 +1,15 @@ use std::collections::HashMap; +use ethers::providers::{JsonRpcClient, Provider}; use polars::prelude::*; use crate::types::{CollectError, Datatype, MultiQuery, SingleQuery, Source}; /// collect data and return as dataframe -pub async fn collect(query: SingleQuery, source: Source) -> Result { +pub async fn collect( + query: SingleQuery, + source: Source>, +) -> Result { if query.chunks.len() > 1 { return Err(CollectError::CollectError("can only collect 1 chunk".to_string())) }; @@ -20,7 +24,7 @@ pub async fn collect(query: SingleQuery, source: Source) -> Result>, ) -> Result, CollectError> { todo!() } diff --git a/crates/freeze/src/datasets/balance_diffs.rs b/crates/freeze/src/datasets/balance_diffs.rs index 7809f25b..709c2c2b 100644 --- a/crates/freeze/src/datasets/balance_diffs.rs +++ b/crates/freeze/src/datasets/balance_diffs.rs @@ -1,3 +1,4 @@ +use ethers::providers::{JsonRpcClient, Provider}; use std::collections::HashMap; use polars::prelude::*; @@ -48,7 +49,7 @@ impl Dataset for BalanceDiffs { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -65,7 +66,7 @@ impl Dataset for BalanceDiffs { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/balances.rs b/crates/freeze/src/datasets/balances.rs index 97fa9320..5da219ba 100644 --- a/crates/freeze/src/datasets/balances.rs +++ b/crates/freeze/src/datasets/balances.rs @@ -1,6 +1,7 @@ // required args:: address use crate::{types::Balances, ColumnType, Dataset, Datatype}; +use ethers::providers::{JsonRpcClient, ProviderError}; use std::collections::HashMap; use ethers::prelude::*; @@ -46,7 +47,7 @@ impl Dataset for Balances { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -67,7 +68,7 @@ pub(crate) type BlockAddressBalance = (u64, Vec, U256); async fn fetch_balances( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(100); diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index d8e462be..2a85f26c 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -60,7 +60,7 @@ impl Dataset for Blocks { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -76,7 +76,7 @@ impl Dataset for Blocks { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -93,7 +93,7 @@ impl Dataset for Blocks { async fn fetch_tx_block_numbers( tx_hashes: &Vec>, - source: &Source, + source: &Source>, ) -> Result, CollectError> { let mut tasks = Vec::new(); for tx_hash in tx_hashes { @@ -126,7 +126,7 @@ async fn fetch_tx_block_numbers( async fn fetch_blocks( block_chunk: &BlockChunk, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); diff --git a/crates/freeze/src/datasets/blocks_and_transactions.rs b/crates/freeze/src/datasets/blocks_and_transactions.rs index 2391acd7..4c0a93bb 100644 --- a/crates/freeze/src/datasets/blocks_and_transactions.rs +++ b/crates/freeze/src/datasets/blocks_and_transactions.rs @@ -13,6 +13,8 @@ use crate::{ }, }; +use ethers::providers::{JsonRpcClient, ProviderError}; + #[async_trait::async_trait] impl MultiDataset for BlocksAndTransactions { fn name(&self) -> &'static str { @@ -26,7 +28,7 @@ impl MultiDataset for BlocksAndTransactions { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { @@ -57,7 +59,7 @@ impl MultiDataset for BlocksAndTransactions { pub(crate) async fn fetch_blocks_and_transactions( block_chunk: &BlockChunk, - source: &Source, + source: &Source>, include_gas_used: bool, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); diff --git a/crates/freeze/src/datasets/code_diffs.rs b/crates/freeze/src/datasets/code_diffs.rs index 2dbf7579..74f475ef 100644 --- a/crates/freeze/src/datasets/code_diffs.rs +++ b/crates/freeze/src/datasets/code_diffs.rs @@ -8,6 +8,8 @@ use crate::types::{ TransactionChunk, }; +use ethers::providers::{JsonRpcClient, Provider}; + #[async_trait::async_trait] impl Dataset for CodeDiffs { fn datatype(&self) -> Datatype { @@ -48,7 +50,7 @@ impl Dataset for CodeDiffs { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -59,7 +61,7 @@ impl Dataset for CodeDiffs { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/codes.rs b/crates/freeze/src/datasets/codes.rs index 4452471f..d2c379cb 100644 --- a/crates/freeze/src/datasets/codes.rs +++ b/crates/freeze/src/datasets/codes.rs @@ -7,6 +7,8 @@ use ethers::prelude::*; use polars::prelude::*; use tokio::{sync::mpsc, task}; +use ethers::providers::{JsonRpcClient, ProviderError}; + use crate::{ dataframes::SortableDataFrame, types::{ @@ -45,7 +47,7 @@ impl Dataset for Codes { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -66,7 +68,7 @@ pub(crate) type BlockAddressCode = (u64, Vec, Vec); async fn fetch_codes( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(100); diff --git a/crates/freeze/src/datasets/contracts.rs b/crates/freeze/src/datasets/contracts.rs index 24ef274e..7788a04f 100644 --- a/crates/freeze/src/datasets/contracts.rs +++ b/crates/freeze/src/datasets/contracts.rs @@ -1,6 +1,9 @@ use std::collections::HashMap; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use tokio::sync::mpsc; @@ -61,7 +64,7 @@ impl Dataset for Contracts { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -72,7 +75,7 @@ impl Dataset for Contracts { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/erc20_balances.rs b/crates/freeze/src/datasets/erc20_balances.rs index f6cc3ab1..6e1591e6 100644 --- a/crates/freeze/src/datasets/erc20_balances.rs +++ b/crates/freeze/src/datasets/erc20_balances.rs @@ -7,10 +7,6 @@ use crate::{types::Erc20Balances, ColumnType, Dataset, Datatype}; use std::collections::HashMap; -use ethers::prelude::*; -use polars::prelude::*; -use tokio::sync::mpsc; - use crate::{ dataframes::SortableDataFrame, types::{ @@ -19,6 +15,12 @@ use crate::{ }, with_series, with_series_binary, with_series_u256, ColumnEncoding, }; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; +use polars::prelude::*; +use tokio::sync::mpsc; use super::eth_calls; use crate::U256Type; @@ -54,7 +56,7 @@ impl Dataset for Erc20Balances { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/erc20_metadata.rs b/crates/freeze/src/datasets/erc20_metadata.rs index daeac909..f68b947a 100644 --- a/crates/freeze/src/datasets/erc20_metadata.rs +++ b/crates/freeze/src/datasets/erc20_metadata.rs @@ -4,7 +4,10 @@ use crate::{conversions::ToVecHex, ColumnType, Dataset, Datatype}; use std::collections::HashMap; use tokio::{sync::mpsc, task}; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use crate::{ @@ -45,7 +48,7 @@ impl Dataset for Erc20Metadata { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -64,7 +67,7 @@ type MetadataOutput = (u32, Vec, (Option, Option, Option, address_chunks: Vec, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(100); diff --git a/crates/freeze/src/datasets/erc20_supplies.rs b/crates/freeze/src/datasets/erc20_supplies.rs index a7467889..cd1df213 100644 --- a/crates/freeze/src/datasets/erc20_supplies.rs +++ b/crates/freeze/src/datasets/erc20_supplies.rs @@ -6,7 +6,10 @@ use crate::{ use std::collections::HashMap; use tokio::sync::mpsc; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use crate::{ @@ -47,7 +50,7 @@ impl Dataset for Erc20Supplies { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/erc20_transfers.rs b/crates/freeze/src/datasets/erc20_transfers.rs index 3b06adb9..08b7eca6 100644 --- a/crates/freeze/src/datasets/erc20_transfers.rs +++ b/crates/freeze/src/datasets/erc20_transfers.rs @@ -10,7 +10,10 @@ use std::collections::HashMap; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use tokio::sync::mpsc; @@ -72,7 +75,7 @@ impl Dataset for Erc20Transfers { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -84,7 +87,7 @@ impl Dataset for Erc20Transfers { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/erc721_transfers.rs b/crates/freeze/src/datasets/erc721_transfers.rs index 9e2a42fe..8b1c6326 100644 --- a/crates/freeze/src/datasets/erc721_transfers.rs +++ b/crates/freeze/src/datasets/erc721_transfers.rs @@ -13,7 +13,10 @@ use crate::{types::Erc721Transfers, ColumnType, Dataset, Datatype}; use std::collections::HashMap; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use tokio::sync::mpsc; @@ -75,7 +78,7 @@ impl Dataset for Erc721Transfers { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -87,7 +90,7 @@ impl Dataset for Erc721Transfers { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/eth_calls.rs b/crates/freeze/src/datasets/eth_calls.rs index dab469ea..24eedd2c 100644 --- a/crates/freeze/src/datasets/eth_calls.rs +++ b/crates/freeze/src/datasets/eth_calls.rs @@ -2,7 +2,10 @@ use crate::{conversions::ToVecHex, types::EthCalls, ColumnType, Dataset, Datatyp use std::collections::HashMap; use tokio::{sync::mpsc, task}; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use crate::{ @@ -55,7 +58,7 @@ impl Dataset for EthCalls { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -75,7 +78,7 @@ pub(crate) async fn fetch_eth_calls( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, call_data_chunks: Vec, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(100); diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index a5e16be8..caacf5c8 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -1,6 +1,9 @@ use std::collections::{HashMap, HashSet}; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use ethers_core::abi::{AbiEncode, EventParam, HumanReadableParser, ParamType, RawLog, Token}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -62,7 +65,7 @@ impl Dataset for Logs { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -73,7 +76,7 @@ impl Dataset for Logs { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -89,7 +92,7 @@ impl Dataset for Logs { pub(crate) async fn fetch_block_logs( block_chunk: &BlockChunk, - source: &Source, + source: &Source>, filter: Option<&RowFilter>, ) -> mpsc::Receiver, CollectError>> { // todo: need to modify these functions so they turn a result @@ -126,7 +129,7 @@ pub(crate) async fn fetch_block_logs( pub(crate) async fn fetch_transaction_logs( transaction_chunk: &TransactionChunk, - source: &Source, + source: &Source>, _filter: Option<&RowFilter>, ) -> mpsc::Receiver, CollectError>> { match transaction_chunk { diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs index a66b0742..b6018dd3 100644 --- a/crates/freeze/src/datasets/native_transfers.rs +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -1,6 +1,9 @@ use std::collections::HashMap; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use tokio::sync::mpsc; @@ -55,7 +58,7 @@ impl Dataset for NativeTransfers { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -66,7 +69,7 @@ impl Dataset for NativeTransfers { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/nonce_diffs.rs b/crates/freeze/src/datasets/nonce_diffs.rs index 27ec40d7..3af31326 100644 --- a/crates/freeze/src/datasets/nonce_diffs.rs +++ b/crates/freeze/src/datasets/nonce_diffs.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use ethers::providers::{JsonRpcClient, Provider}; use polars::prelude::*; use super::state_diffs; @@ -48,7 +49,7 @@ impl Dataset for NonceDiffs { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -59,7 +60,7 @@ impl Dataset for NonceDiffs { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/nonces.rs b/crates/freeze/src/datasets/nonces.rs index cb072824..301fdf5e 100644 --- a/crates/freeze/src/datasets/nonces.rs +++ b/crates/freeze/src/datasets/nonces.rs @@ -3,7 +3,10 @@ use crate::{types::Nonces, ColumnType, Dataset, Datatype}; use std::collections::HashMap; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -45,7 +48,7 @@ impl Dataset for Nonces { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -66,7 +69,7 @@ pub(crate) type BlockAddressNonce = (u64, Vec, u32); async fn fetch_nonces( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(100); diff --git a/crates/freeze/src/datasets/state_diffs.rs b/crates/freeze/src/datasets/state_diffs.rs index aebfab1f..65d913ef 100644 --- a/crates/freeze/src/datasets/state_diffs.rs +++ b/crates/freeze/src/datasets/state_diffs.rs @@ -1,6 +1,9 @@ use std::collections::{HashMap, HashSet}; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use tokio::sync::mpsc; @@ -32,7 +35,7 @@ impl MultiDataset for StateDiffs { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { @@ -43,7 +46,7 @@ impl MultiDataset for StateDiffs { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { @@ -56,7 +59,7 @@ impl MultiDataset for StateDiffs { pub(crate) async fn collect_block_state_diffs( datatype: &Datatype, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -80,7 +83,7 @@ pub(crate) async fn collect_block_state_diffs( pub(crate) async fn collect_transaction_state_diffs( datatype: &Datatype, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -106,7 +109,7 @@ pub(crate) async fn collect_transaction_state_diffs( pub(crate) async fn fetch_block_traces( block_chunk: &BlockChunk, trace_types: &[TraceType], - source: &Source, + source: &Source>, ) -> mpsc::Receiver { let (tx, rx) = mpsc::channel(block_chunk.size() as usize); for number in block_chunk.numbers() { @@ -145,7 +148,7 @@ pub(crate) async fn fetch_block_traces( pub(crate) async fn fetch_transaction_traces( transaction_chunk: &TransactionChunk, trace_types: &[TraceType], - source: &Source, + source: &Source>, include_indices: bool, ) -> mpsc::Receiver { match transaction_chunk { @@ -215,14 +218,14 @@ pub(crate) async fn fetch_transaction_traces( pub(crate) async fn fetch_block_state_diffs( chunk: &BlockChunk, - source: &Source, + source: &Source>, ) -> mpsc::Receiver { fetch_block_traces(chunk, &[TraceType::StateDiff], source).await } pub(crate) async fn fetch_transaction_state_diffs( chunk: &TransactionChunk, - source: &Source, + source: &Source>, include_indices: bool, ) -> mpsc::Receiver { fetch_transaction_traces(chunk, &[TraceType::StateDiff], source, include_indices).await diff --git a/crates/freeze/src/datasets/storage_diffs.rs b/crates/freeze/src/datasets/storage_diffs.rs index 9ca0a0fd..9c1fb89b 100644 --- a/crates/freeze/src/datasets/storage_diffs.rs +++ b/crates/freeze/src/datasets/storage_diffs.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use ethers::providers::{JsonRpcClient, Provider}; use polars::prelude::*; use super::state_diffs; @@ -50,7 +51,7 @@ impl Dataset for StorageDiffs { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -67,7 +68,7 @@ impl Dataset for StorageDiffs { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/datasets/storages.rs b/crates/freeze/src/datasets/storages.rs index 66efbc09..bf38e2c9 100644 --- a/crates/freeze/src/datasets/storages.rs +++ b/crates/freeze/src/datasets/storages.rs @@ -3,7 +3,10 @@ use crate::{types::Storages, ColumnType, Dataset, Datatype}; use std::collections::HashMap; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -47,7 +50,7 @@ impl Dataset for Storages { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -66,7 +69,7 @@ async fn fetch_slots( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, slot_chunks: Vec, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(100); diff --git a/crates/freeze/src/datasets/trace_calls.rs b/crates/freeze/src/datasets/trace_calls.rs index 71885216..cfa305f9 100644 --- a/crates/freeze/src/datasets/trace_calls.rs +++ b/crates/freeze/src/datasets/trace_calls.rs @@ -8,7 +8,10 @@ use std::collections::HashMap; use crate::{conversions::ToVecHex, types::conversions::ToVecU8}; use tokio::{sync::mpsc, task}; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use super::traces; @@ -46,7 +49,7 @@ impl Dataset for TraceCalls { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -66,7 +69,7 @@ async fn fetch_trace_calls( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, call_data_chunks: Vec, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(100); diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index b8e0b6e8..3c54d2b7 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -1,6 +1,9 @@ use std::collections::HashMap; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -81,7 +84,7 @@ impl Dataset for Traces { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -92,7 +95,7 @@ impl Dataset for Traces { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -103,7 +106,7 @@ impl Dataset for Traces { pub(crate) fn fetch_block_traces( block_chunk: &BlockChunk, - source: &Source, + source: &Source>, ) -> mpsc::Receiver, CollectError>> { let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); @@ -127,7 +130,7 @@ pub(crate) fn fetch_block_traces( pub(crate) fn fetch_transaction_traces( transaction_chunk: &TransactionChunk, - source: &Source, + source: &Source>, ) -> mpsc::Receiver, CollectError>> { match transaction_chunk { TransactionChunk::Values(tx_hashes) => { diff --git a/crates/freeze/src/datasets/transaction_addresses.rs b/crates/freeze/src/datasets/transaction_addresses.rs index 131d33c8..40482839 100644 --- a/crates/freeze/src/datasets/transaction_addresses.rs +++ b/crates/freeze/src/datasets/transaction_addresses.rs @@ -1,7 +1,10 @@ use crate::{types::TransactionAddresses, ColumnType, Dataset, Datatype}; use std::collections::HashMap; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, ProviderError}, +}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -56,7 +59,7 @@ impl Dataset for TransactionAddresses { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -67,7 +70,7 @@ impl Dataset for TransactionAddresses { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -80,7 +83,7 @@ type BlockLogTraces = (Block, Vec, Vec); pub(crate) async fn fetch_block_tx_addresses( block_chunk: &BlockChunk, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); @@ -103,7 +106,7 @@ pub(crate) async fn fetch_block_tx_addresses( async fn fetch_transaction_tx_addresses( transaction_chunk: &TransactionChunk, - source: &Source, + source: &Source>, ) -> mpsc::Receiver> { match transaction_chunk { TransactionChunk::Values(tx_hashes) => { @@ -144,7 +147,7 @@ async fn fetch_transaction_tx_addresses( async fn get_block_block_logs_traces( number: u64, - source: &Source, + source: &Source>, ) -> Result { let block_number: BlockNumber = number.into(); @@ -173,7 +176,7 @@ async fn get_block_block_logs_traces( async fn get_tx_block_logs_traces( tx_hash: H256, - source: &Source, + source: &Source>, ) -> Result { let tx_data = source.fetcher.get_transaction(tx_hash).await?.ok_or_else(|| { diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 18ed3610..29feae92 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -71,7 +71,7 @@ impl Dataset for Transactions { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -90,7 +90,7 @@ impl Dataset for Transactions { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -102,7 +102,7 @@ impl Dataset for Transactions { async fn fetch_transactions( transaction_chunk: &TransactionChunk, - source: &Source, + source: &Source>, include_gas_used: bool, ) -> mpsc::Receiver), CollectError>> { let (tx, rx) = mpsc::channel(1); diff --git a/crates/freeze/src/datasets/vm_traces.rs b/crates/freeze/src/datasets/vm_traces.rs index 480e524a..faf43d01 100644 --- a/crates/freeze/src/datasets/vm_traces.rs +++ b/crates/freeze/src/datasets/vm_traces.rs @@ -1,6 +1,9 @@ use std::collections::HashMap; -use ethers::prelude::*; +use ethers::{ + prelude::*, + providers::{JsonRpcClient, Provider, ProviderError}, +}; use polars::prelude::*; use tokio::sync::mpsc; @@ -52,7 +55,7 @@ impl Dataset for VmTraces { async fn collect_block_chunk( &self, chunk: &BlockChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -63,7 +66,7 @@ impl Dataset for VmTraces { async fn collect_transaction_chunk( &self, chunk: &TransactionChunk, - source: &Source, + source: &Source>, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -75,14 +78,14 @@ impl Dataset for VmTraces { async fn fetch_block_vm_traces( block_chunk: &BlockChunk, - source: &Source, + source: &Source>, ) -> mpsc::Receiver { state_diffs::fetch_block_traces(block_chunk, &[TraceType::VmTrace], source).await } async fn fetch_transaction_vm_traces( chunk: &TransactionChunk, - source: &Source, + source: &Source>, include_indices: bool, ) -> mpsc::Receiver { state_diffs::fetch_transaction_traces(chunk, &[TraceType::VmTrace], source, include_indices) diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index 7d6bee29..2e134046 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, path::Path, sync::Arc}; +use ethers::providers::{JsonRpcClient, Provider}; use futures::future::join_all; use indicatif::ProgressBar; use tokio::sync::Semaphore; @@ -12,7 +13,7 @@ use crate::types::{ /// perform a bulk data extraction of multiple datatypes over multiple block chunks pub async fn freeze( query: &MultiQuery, - source: &Source, + source: &Source>, sink: &FileOutput, bar: Arc, ) -> Result { @@ -76,7 +77,7 @@ async fn freeze_datatype_chunk( datatype: Datatype, sem: Arc, query: Arc, - source: Arc, + source: Arc>>, sink: Arc, bar: Arc, ) -> FreezeChunkSummary { @@ -126,7 +127,7 @@ async fn freeze_multi_datatype_chunk( mdt: MultiDatatype, sem: Arc, query: Arc, - source: Arc, + source: Arc>>, sink: Arc, bar: Arc, ) -> FreezeChunkSummary { diff --git a/crates/freeze/src/types/datatypes/multi.rs b/crates/freeze/src/types/datatypes/multi.rs index 018fdfe0..7a749ca7 100644 --- a/crates/freeze/src/types/datatypes/multi.rs +++ b/crates/freeze/src/types/datatypes/multi.rs @@ -3,6 +3,8 @@ use std::collections::{HashMap, HashSet}; use async_trait; use polars::prelude::*; +use ethers::providers::{JsonRpcClient, Provider}; + use crate::types::{ AddressChunk, BlockChunk, Chunk, CollectError, Dataset, Datatype, RowFilter, Source, Table, TransactionChunk, @@ -57,7 +59,7 @@ pub trait MultiDataset: Sync + Send { async fn collect_chunk( &self, chunk: &Chunk, - source: &Source, + source: &Source>, schemas: HashMap, filter: HashMap, ) -> Result, CollectError> { @@ -76,7 +78,7 @@ pub trait MultiDataset: Sync + Send { async fn collect_block_chunk( &self, _chunk: &BlockChunk, - _source: &Source, + _source: &Source>, _schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { @@ -87,7 +89,7 @@ pub trait MultiDataset: Sync + Send { async fn collect_transaction_chunk( &self, _chunk: &TransactionChunk, - _source: &Source, + _source: &Source>, _schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { @@ -98,7 +100,7 @@ pub trait MultiDataset: Sync + Send { async fn collect_address_chunk( &self, _chunk: &AddressChunk, - _source: &Source, + _source: &Source>, _schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { diff --git a/crates/freeze/src/types/datatypes/scalar.rs b/crates/freeze/src/types/datatypes/scalar.rs index a83a4f55..c0f52a92 100644 --- a/crates/freeze/src/types/datatypes/scalar.rs +++ b/crates/freeze/src/types/datatypes/scalar.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use async_trait; +use ethers::providers::{JsonRpcClient, Provider}; use polars::prelude::*; use crate::types::{ @@ -176,7 +177,7 @@ pub trait Dataset: Sync + Send { async fn collect_chunk( &self, chunk: &Chunk, - source: &Source, + source: &Source>, schema: &Table, filter: Option<&RowFilter>, ) -> Result { @@ -195,7 +196,7 @@ pub trait Dataset: Sync + Send { async fn collect_block_chunk( &self, _chunk: &BlockChunk, - _source: &Source, + _source: &Source>, _schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -206,7 +207,7 @@ pub trait Dataset: Sync + Send { async fn collect_transaction_chunk( &self, _chunk: &TransactionChunk, - _source: &Source, + _source: &Source>, _schema: &Table, _filter: Option<&RowFilter>, ) -> Result { @@ -217,7 +218,7 @@ pub trait Dataset: Sync + Send { async fn collect_address_chunk( &self, _chunk: &AddressChunk, - _source: &Source, + _source: &Source>, _schema: &Table, _filter: Option<&RowFilter>, ) -> Result { diff --git a/crates/freeze/src/types/sources.rs b/crates/freeze/src/types/sources.rs index cfbdf00f..ffd1ffc3 100644 --- a/crates/freeze/src/types/sources.rs +++ b/crates/freeze/src/types/sources.rs @@ -15,9 +15,9 @@ pub type RateLimiter = governor::RateLimiter { /// Shared provider for rpc data - pub fetcher: Arc>>, + pub fetcher: Arc>, /// chain_id of network pub chain_id: u64, /// number of blocks per log request From 077a64d0e9ab97c71ac8f29d7da47b9c2a280797 Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Sun, 24 Sep 2023 15:19:37 +0530 Subject: [PATCH 2/3] make Source

generic over P: JsonRpcClient --- crates/cli/src/parse/source.rs | 13 ++-- crates/freeze/src/collect.rs | 17 ++-- crates/freeze/src/datasets/balance_diffs.rs | 20 +++-- crates/freeze/src/datasets/balances.rs | 20 +++-- crates/freeze/src/datasets/blocks.rs | 36 ++++++--- .../src/datasets/blocks_and_transactions.rs | 20 +++-- crates/freeze/src/datasets/code_diffs.rs | 20 +++-- crates/freeze/src/datasets/codes.rs | 20 +++-- crates/freeze/src/datasets/contracts.rs | 23 +++--- crates/freeze/src/datasets/erc20_balances.rs | 14 ++-- crates/freeze/src/datasets/erc20_metadata.rs | 23 +++--- crates/freeze/src/datasets/erc20_supplies.rs | 14 ++-- crates/freeze/src/datasets/erc20_transfers.rs | 23 +++--- .../freeze/src/datasets/erc721_transfers.rs | 23 +++--- crates/freeze/src/datasets/eth_calls.rs | 23 +++--- crates/freeze/src/datasets/logs.rs | 41 ++++++---- .../freeze/src/datasets/native_transfers.rs | 23 +++--- crates/freeze/src/datasets/nonce_diffs.rs | 20 +++-- crates/freeze/src/datasets/nonces.rs | 23 +++--- crates/freeze/src/datasets/state_diffs.rs | 77 ++++++++++++------- crates/freeze/src/datasets/storage_diffs.rs | 20 +++-- crates/freeze/src/datasets/storages.rs | 23 +++--- crates/freeze/src/datasets/trace_calls.rs | 23 +++--- crates/freeze/src/datasets/traces.rs | 41 ++++++---- .../src/datasets/transaction_addresses.rs | 59 ++++++++------ crates/freeze/src/datasets/transactions.rs | 27 ++++--- crates/freeze/src/datasets/vm_traces.rs | 36 ++++++--- crates/freeze/src/freeze.rs | 27 ++++--- crates/freeze/src/types/datatypes/multi.rs | 40 ++++++---- crates/freeze/src/types/datatypes/scalar.rs | 36 ++++++--- crates/freeze/src/types/sources.rs | 5 +- 31 files changed, 514 insertions(+), 316 deletions(-) diff --git a/crates/cli/src/parse/source.rs b/crates/cli/src/parse/source.rs index d8a110d1..daaf3a04 100644 --- a/crates/cli/src/parse/source.rs +++ b/crates/cli/src/parse/source.rs @@ -13,11 +13,6 @@ pub(crate) async fn parse_source(args: &Args) -> Result { // parse network info let provider = parse_rpc_url(args).await; let chain_id: u64; - let ws_url = "wss://eth-mainnet.ws.alchemyapi.io/ws/demo"; - - if let RpcProvider::Http(provider) | RpcProvider::Ws(provider) = provider { - chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64(); - } let rate_limiter = match args.requests_per_second { Some(rate_limit) => match NonZeroU32::new(rate_limit) { @@ -41,6 +36,14 @@ pub(crate) async fn parse_source(args: &Args) -> Result { RpcProvider::Http(provider) => { chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64(); Fetcher { provider, semaphore, rate_limiter } + let output = Source { + fetcher: Arc::new(fetcher), + chain_id, + inner_request_size: args.inner_request_size, + max_concurrent_chunks, + }; + + Ok(output) } RpcProvider::Ws(provider) => { chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64(); diff --git a/crates/freeze/src/collect.rs b/crates/freeze/src/collect.rs index af8c2758..4670f750 100644 --- a/crates/freeze/src/collect.rs +++ b/crates/freeze/src/collect.rs @@ -6,10 +6,10 @@ use polars::prelude::*; use crate::types::{CollectError, Datatype, MultiQuery, SingleQuery, Source}; /// collect data and return as dataframe -pub async fn collect( - query: SingleQuery, - source: Source>, -) -> Result { +pub async fn collect

(query: SingleQuery, source: Source

) -> Result +where + P: JsonRpcClient, +{ if query.chunks.len() > 1 { return Err(CollectError::CollectError("can only collect 1 chunk".to_string())) }; @@ -22,9 +22,12 @@ pub async fn collect( } /// collect data and return as dataframe -pub async fn collect_multiple( +pub async fn collect_multiple

( _query: MultiQuery, - _source: Source>, -) -> Result, CollectError> { + _source: Source

, +) -> Result, CollectError> +where + P: JsonRpcClient, +{ todo!() } diff --git a/crates/freeze/src/datasets/balance_diffs.rs b/crates/freeze/src/datasets/balance_diffs.rs index 709c2c2b..ba614442 100644 --- a/crates/freeze/src/datasets/balance_diffs.rs +++ b/crates/freeze/src/datasets/balance_diffs.rs @@ -1,4 +1,4 @@ -use ethers::providers::{JsonRpcClient, Provider}; +use ethers::providers::JsonRpcClient; use std::collections::HashMap; use polars::prelude::*; @@ -46,13 +46,16 @@ impl Dataset for BalanceDiffs { vec!["block_number".to_string(), "transaction_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { state_diffs::collect_block_state_diffs( &Datatype::BalanceDiffs, chunk, @@ -63,13 +66,16 @@ impl Dataset for BalanceDiffs { .await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { state_diffs::collect_transaction_state_diffs( &Datatype::BalanceDiffs, chunk, diff --git a/crates/freeze/src/datasets/balances.rs b/crates/freeze/src/datasets/balances.rs index 5da219ba..85cbd134 100644 --- a/crates/freeze/src/datasets/balances.rs +++ b/crates/freeze/src/datasets/balances.rs @@ -1,7 +1,7 @@ // required args:: address use crate::{types::Balances, ColumnType, Dataset, Datatype}; -use ethers::providers::{JsonRpcClient, ProviderError}; +use ethers::providers::JsonRpcClient; use std::collections::HashMap; use ethers::prelude::*; @@ -44,13 +44,16 @@ impl Dataset for Balances { vec!["block_number".to_string(), "address".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let address_chunks = match filter { Some(filter) => match &filter.address_chunks { Some(address_chunks) => address_chunks.clone(), @@ -65,11 +68,14 @@ impl Dataset for Balances { pub(crate) type BlockAddressBalance = (u64, Vec, U256); -async fn fetch_balances( +async fn fetch_balances

( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(100); for block_chunk in block_chunks { diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index 2a85f26c..664a7ce6 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -57,13 +57,16 @@ impl Dataset for Blocks { vec!["number".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = fetch_blocks(chunk, source).await; let output = blocks_to_dfs(rx, &Some(schema), &None, source.chain_id).await; match output { @@ -73,13 +76,16 @@ impl Dataset for Blocks { } } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let block_numbers = match chunk { TransactionChunk::Values(tx_hashes) => { fetch_tx_block_numbers(tx_hashes, source).await? @@ -91,10 +97,13 @@ impl Dataset for Blocks { } } -async fn fetch_tx_block_numbers( +async fn fetch_tx_block_numbers

( tx_hashes: &Vec>, - source: &Source>, -) -> Result, CollectError> { + source: &Source

, +) -> Result, CollectError> +where + P: JsonRpcClient, +{ let mut tasks = Vec::new(); for tx_hash in tx_hashes { let fetcher = source.fetcher.clone(); @@ -124,10 +133,13 @@ async fn fetch_tx_block_numbers( Ok(block_numbers) } -async fn fetch_blocks( +async fn fetch_blocks

( block_chunk: &BlockChunk, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); for number in block_chunk.numbers() { diff --git a/crates/freeze/src/datasets/blocks_and_transactions.rs b/crates/freeze/src/datasets/blocks_and_transactions.rs index 4c0a93bb..c32953fd 100644 --- a/crates/freeze/src/datasets/blocks_and_transactions.rs +++ b/crates/freeze/src/datasets/blocks_and_transactions.rs @@ -13,7 +13,7 @@ use crate::{ }, }; -use ethers::providers::{JsonRpcClient, ProviderError}; +use ethers::providers::JsonRpcClient; #[async_trait::async_trait] impl MultiDataset for BlocksAndTransactions { @@ -25,13 +25,16 @@ impl MultiDataset for BlocksAndTransactions { [Datatype::Blocks, Datatype::Transactions].into_iter().collect() } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schemas: HashMap, _filter: HashMap, - ) -> Result, CollectError> { + ) -> Result, CollectError> + where + P: JsonRpcClient, + { let include_gas_used = match &schemas.get(&Datatype::Transactions) { Some(table) => table.has_column("gas_used"), _ => false, @@ -57,11 +60,14 @@ impl MultiDataset for BlocksAndTransactions { } } -pub(crate) async fn fetch_blocks_and_transactions( +pub(crate) async fn fetch_blocks_and_transactions

( block_chunk: &BlockChunk, - source: &Source>, + source: &Source

, include_gas_used: bool, -) -> mpsc::Receiver> { +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); let source = Arc::new(source.clone()); diff --git a/crates/freeze/src/datasets/code_diffs.rs b/crates/freeze/src/datasets/code_diffs.rs index 74f475ef..524b0025 100644 --- a/crates/freeze/src/datasets/code_diffs.rs +++ b/crates/freeze/src/datasets/code_diffs.rs @@ -8,7 +8,7 @@ use crate::types::{ TransactionChunk, }; -use ethers::providers::{JsonRpcClient, Provider}; +use ethers::providers::JsonRpcClient; #[async_trait::async_trait] impl Dataset for CodeDiffs { @@ -47,24 +47,30 @@ impl Dataset for CodeDiffs { vec!["block_number".to_string(), "transaction_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { state_diffs::collect_block_state_diffs(&Datatype::CodeDiffs, chunk, source, schema, filter) .await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { state_diffs::collect_transaction_state_diffs( &Datatype::CodeDiffs, chunk, diff --git a/crates/freeze/src/datasets/codes.rs b/crates/freeze/src/datasets/codes.rs index d2c379cb..8bd355eb 100644 --- a/crates/freeze/src/datasets/codes.rs +++ b/crates/freeze/src/datasets/codes.rs @@ -7,7 +7,7 @@ use ethers::prelude::*; use polars::prelude::*; use tokio::{sync::mpsc, task}; -use ethers::providers::{JsonRpcClient, ProviderError}; +use ethers::providers::JsonRpcClient; use crate::{ dataframes::SortableDataFrame, @@ -44,13 +44,16 @@ impl Dataset for Codes { vec!["block_number".to_string(), "address".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let address_chunks = match filter { Some(filter) => match &filter.address_chunks { Some(address_chunks) => address_chunks.clone(), @@ -65,11 +68,14 @@ impl Dataset for Codes { pub(crate) type BlockAddressCode = (u64, Vec, Vec); -async fn fetch_codes( +async fn fetch_codes

( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(100); for block_chunk in block_chunks { diff --git a/crates/freeze/src/datasets/contracts.rs b/crates/freeze/src/datasets/contracts.rs index 7788a04f..59a5f375 100644 --- a/crates/freeze/src/datasets/contracts.rs +++ b/crates/freeze/src/datasets/contracts.rs @@ -1,9 +1,6 @@ use std::collections::HashMap; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::sync::mpsc; @@ -61,24 +58,30 @@ impl Dataset for Contracts { vec!["block_number".to_string(), "create_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = traces::fetch_block_traces(chunk, source); traces_to_contracts_df(rx, schema, source.chain_id).await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = traces::fetch_transaction_traces(chunk, source); traces_to_contracts_df(rx, schema, source.chain_id).await } diff --git a/crates/freeze/src/datasets/erc20_balances.rs b/crates/freeze/src/datasets/erc20_balances.rs index 6e1591e6..fa42a3c8 100644 --- a/crates/freeze/src/datasets/erc20_balances.rs +++ b/crates/freeze/src/datasets/erc20_balances.rs @@ -15,10 +15,7 @@ use crate::{ }, with_series, with_series_binary, with_series_u256, ColumnEncoding, }; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::sync::mpsc; @@ -53,13 +50,16 @@ impl Dataset for Erc20Balances { vec!["block_number".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let (contract_chunks, call_data_chunks) = match filter { Some(filter) => { (filter.contract_chunks()?, create_balance_of_call_datas(filter.address_chunks()?)?) diff --git a/crates/freeze/src/datasets/erc20_metadata.rs b/crates/freeze/src/datasets/erc20_metadata.rs index f68b947a..b0918a13 100644 --- a/crates/freeze/src/datasets/erc20_metadata.rs +++ b/crates/freeze/src/datasets/erc20_metadata.rs @@ -4,10 +4,7 @@ use crate::{conversions::ToVecHex, ColumnType, Dataset, Datatype}; use std::collections::HashMap; use tokio::{sync::mpsc, task}; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use crate::{ @@ -45,13 +42,16 @@ impl Dataset for Erc20Metadata { vec!["symbol".to_string(), "block_number".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let contract_chunks = match filter { Some(filter) => filter.contract_chunks()?, _ => return Err(CollectError::CollectError("must specify RowFilter".to_string())), @@ -64,11 +64,14 @@ impl Dataset for Erc20Metadata { type MetadataOutput = (u32, Vec, (Option, Option, Option)); -pub(crate) async fn fetch_metadata_calls( +pub(crate) async fn fetch_metadata_calls

( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(100); let name_call_data: Vec = prefix_hex::decode("0x06fdde03").expect("Decoding failed"); diff --git a/crates/freeze/src/datasets/erc20_supplies.rs b/crates/freeze/src/datasets/erc20_supplies.rs index cd1df213..ccbae952 100644 --- a/crates/freeze/src/datasets/erc20_supplies.rs +++ b/crates/freeze/src/datasets/erc20_supplies.rs @@ -6,10 +6,7 @@ use crate::{ use std::collections::HashMap; use tokio::sync::mpsc; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use crate::{ @@ -47,13 +44,16 @@ impl Dataset for Erc20Supplies { vec!["erc20".to_string(), "block_number".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let contract_chunks = match filter { Some(filter) => filter.contract_chunks()?, _ => return Err(CollectError::CollectError("must specify RowFilter".to_string())), diff --git a/crates/freeze/src/datasets/erc20_transfers.rs b/crates/freeze/src/datasets/erc20_transfers.rs index 08b7eca6..5dad6962 100644 --- a/crates/freeze/src/datasets/erc20_transfers.rs +++ b/crates/freeze/src/datasets/erc20_transfers.rs @@ -10,10 +10,7 @@ use std::collections::HashMap; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::sync::mpsc; @@ -72,25 +69,31 @@ impl Dataset for Erc20Transfers { vec!["block_number".to_string(), "log_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let filter = get_row_filter(filter); let rx = logs::fetch_block_logs(chunk, source, Some(&filter)).await; logs_to_erc20_transfers(rx, schema, source.chain_id).await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let filter = get_row_filter(filter); let rx = logs::fetch_transaction_logs(chunk, source, Some(&filter)).await; logs_to_erc20_transfers(rx, schema, source.chain_id).await diff --git a/crates/freeze/src/datasets/erc721_transfers.rs b/crates/freeze/src/datasets/erc721_transfers.rs index 8b1c6326..9d8d8730 100644 --- a/crates/freeze/src/datasets/erc721_transfers.rs +++ b/crates/freeze/src/datasets/erc721_transfers.rs @@ -13,10 +13,7 @@ use crate::{types::Erc721Transfers, ColumnType, Dataset, Datatype}; use std::collections::HashMap; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::sync::mpsc; @@ -75,25 +72,31 @@ impl Dataset for Erc721Transfers { vec!["block_number".to_string(), "log_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let filter = erc20_transfers::get_row_filter(filter); let rx = logs::fetch_block_logs(chunk, source, Some(&filter)).await; logs_to_erc721_transfers(rx, schema, source.chain_id).await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let filter = erc20_transfers::get_row_filter(filter); let rx = logs::fetch_transaction_logs(chunk, source, Some(&filter)).await; logs_to_erc721_transfers(rx, schema, source.chain_id).await diff --git a/crates/freeze/src/datasets/eth_calls.rs b/crates/freeze/src/datasets/eth_calls.rs index 24eedd2c..9b8c7c92 100644 --- a/crates/freeze/src/datasets/eth_calls.rs +++ b/crates/freeze/src/datasets/eth_calls.rs @@ -2,10 +2,7 @@ use crate::{conversions::ToVecHex, types::EthCalls, ColumnType, Dataset, Datatyp use std::collections::HashMap; use tokio::{sync::mpsc, task}; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use crate::{ @@ -55,13 +52,16 @@ impl Dataset for EthCalls { .collect() } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let (address_chunks, call_data_chunks) = match filter { Some(filter) => (filter.address_chunks()?, filter.call_data_chunks()?), _ => return Err(CollectError::CollectError("must specify RowFilter".to_string())), @@ -74,12 +74,15 @@ impl Dataset for EthCalls { // block, address, call_data, output pub(crate) type CallDataOutput = (u64, Vec, Vec, Bytes); -pub(crate) async fn fetch_eth_calls( +pub(crate) async fn fetch_eth_calls

( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, call_data_chunks: Vec, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(100); for block_chunk in block_chunks { diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index caacf5c8..2a4a2788 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -1,9 +1,6 @@ use std::collections::{HashMap, HashSet}; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use ethers_core::abi::{AbiEncode, EventParam, HumanReadableParser, ParamType, RawLog, Token}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -62,24 +59,30 @@ impl Dataset for Logs { vec!["block_number".to_string(), "log_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = fetch_block_logs(chunk, source, filter).await; logs_to_df(rx, schema, source.chain_id).await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { // if let Some(_filter) = filter { // return Err(CollectError::CollectError( // "filters not supported when using --txs".to_string(), @@ -90,11 +93,14 @@ impl Dataset for Logs { } } -pub(crate) async fn fetch_block_logs( +pub(crate) async fn fetch_block_logs

( block_chunk: &BlockChunk, - source: &Source>, + source: &Source

, filter: Option<&RowFilter>, -) -> mpsc::Receiver, CollectError>> { +) -> mpsc::Receiver, CollectError>> +where + P: JsonRpcClient, +{ // todo: need to modify these functions so they turn a result let request_chunks = block_chunk.to_log_filter_options(&source.inner_request_size); let (tx, rx) = mpsc::channel(request_chunks.len()); @@ -127,11 +133,14 @@ pub(crate) async fn fetch_block_logs( rx } -pub(crate) async fn fetch_transaction_logs( +pub(crate) async fn fetch_transaction_logs

( transaction_chunk: &TransactionChunk, - source: &Source>, + source: &Source

, _filter: Option<&RowFilter>, -) -> mpsc::Receiver, CollectError>> { +) -> mpsc::Receiver, CollectError>> +where + P: JsonRpcClient, +{ match transaction_chunk { TransactionChunk::Values(tx_hashes) => { let (tx, rx) = mpsc::channel(tx_hashes.len() * 200); diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs index b6018dd3..9e12d439 100644 --- a/crates/freeze/src/datasets/native_transfers.rs +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -1,9 +1,6 @@ use std::collections::HashMap; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::sync::mpsc; @@ -55,24 +52,30 @@ impl Dataset for NativeTransfers { vec!["block_number".to_string(), "transfer_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = traces::fetch_block_traces(chunk, source); traces_to_native_transfers_df(rx, schema, source.chain_id).await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = traces::fetch_transaction_traces(chunk, source); traces_to_native_transfers_df(rx, schema, source.chain_id).await } diff --git a/crates/freeze/src/datasets/nonce_diffs.rs b/crates/freeze/src/datasets/nonce_diffs.rs index 3af31326..e33e74bb 100644 --- a/crates/freeze/src/datasets/nonce_diffs.rs +++ b/crates/freeze/src/datasets/nonce_diffs.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use ethers::providers::{JsonRpcClient, Provider}; +use ethers::providers::JsonRpcClient; use polars::prelude::*; use super::state_diffs; @@ -46,24 +46,30 @@ impl Dataset for NonceDiffs { vec!["block_number".to_string(), "transaction_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { state_diffs::collect_block_state_diffs(&Datatype::NonceDiffs, chunk, source, schema, filter) .await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { state_diffs::collect_transaction_state_diffs( &Datatype::NonceDiffs, chunk, diff --git a/crates/freeze/src/datasets/nonces.rs b/crates/freeze/src/datasets/nonces.rs index 301fdf5e..cd4c87c8 100644 --- a/crates/freeze/src/datasets/nonces.rs +++ b/crates/freeze/src/datasets/nonces.rs @@ -3,10 +3,7 @@ use crate::{types::Nonces, ColumnType, Dataset, Datatype}; use std::collections::HashMap; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -45,13 +42,16 @@ impl Dataset for Nonces { vec!["block_number".to_string(), "address".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let address_chunks = match filter { Some(filter) => match &filter.address_chunks { Some(address_chunks) => address_chunks.clone(), @@ -66,11 +66,14 @@ impl Dataset for Nonces { pub(crate) type BlockAddressNonce = (u64, Vec, u32); -async fn fetch_nonces( +async fn fetch_nonces

( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(100); for block_chunk in block_chunks { diff --git a/crates/freeze/src/datasets/state_diffs.rs b/crates/freeze/src/datasets/state_diffs.rs index 65d913ef..91f6b877 100644 --- a/crates/freeze/src/datasets/state_diffs.rs +++ b/crates/freeze/src/datasets/state_diffs.rs @@ -1,9 +1,6 @@ use std::collections::{HashMap, HashSet}; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::sync::mpsc; @@ -32,37 +29,46 @@ impl MultiDataset for StateDiffs { .collect() } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schemas: HashMap, _filter: HashMap, - ) -> Result, CollectError> { + ) -> Result, CollectError> + where + P: JsonRpcClient, + { let rx = fetch_block_state_diffs(chunk, source).await; state_diffs_to_df(rx, &schemas, source.chain_id).await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schemas: HashMap, _filter: HashMap, - ) -> Result, CollectError> { + ) -> Result, CollectError> + where + P: JsonRpcClient, + { let include_indices = schemas.values().any(|schema| schema.has_column("block_number")); let rx = fetch_transaction_state_diffs(chunk, source, include_indices).await; state_diffs_to_df(rx, &schemas, source.chain_id).await } } -pub(crate) async fn collect_block_state_diffs( +pub(crate) async fn collect_block_state_diffs

( datatype: &Datatype, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, -) -> Result { +) -> Result +where + P: JsonRpcClient, +{ let rx = fetch_block_state_diffs(chunk, source).await; let mut schemas: HashMap = HashMap::new(); schemas.insert(*datatype, schema.clone()); @@ -80,13 +86,16 @@ pub(crate) async fn collect_block_state_diffs( df.sort_by_schema(schema) } -pub(crate) async fn collect_transaction_state_diffs( +pub(crate) async fn collect_transaction_state_diffs

( datatype: &Datatype, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, -) -> Result { +) -> Result +where + P: JsonRpcClient, +{ let include_indices = schema.has_column("block_number"); let chain_id = source.chain_id; let rx = fetch_transaction_state_diffs(chunk, source, include_indices).await; @@ -106,11 +115,14 @@ pub(crate) async fn collect_transaction_state_diffs( df.sort_by_schema(schema) } -pub(crate) async fn fetch_block_traces( +pub(crate) async fn fetch_block_traces

( block_chunk: &BlockChunk, trace_types: &[TraceType], - source: &Source>, -) -> mpsc::Receiver { + source: &Source

, +) -> mpsc::Receiver +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(block_chunk.size() as usize); for number in block_chunk.numbers() { if number == 0 { @@ -145,12 +157,15 @@ pub(crate) async fn fetch_block_traces( rx } -pub(crate) async fn fetch_transaction_traces( +pub(crate) async fn fetch_transaction_traces

( transaction_chunk: &TransactionChunk, trace_types: &[TraceType], - source: &Source>, + source: &Source

, include_indices: bool, -) -> mpsc::Receiver { +) -> mpsc::Receiver +where + P: JsonRpcClient, +{ match transaction_chunk { TransactionChunk::Values(tx_hashes) => { let (tx, rx) = mpsc::channel(tx_hashes.len()); @@ -216,18 +231,24 @@ pub(crate) async fn fetch_transaction_traces( } } -pub(crate) async fn fetch_block_state_diffs( +pub(crate) async fn fetch_block_state_diffs

( chunk: &BlockChunk, - source: &Source>, -) -> mpsc::Receiver { + source: &Source

, +) -> mpsc::Receiver +where + P: JsonRpcClient, +{ fetch_block_traces(chunk, &[TraceType::StateDiff], source).await } -pub(crate) async fn fetch_transaction_state_diffs( +pub(crate) async fn fetch_transaction_state_diffs

( chunk: &TransactionChunk, - source: &Source>, + source: &Source

, include_indices: bool, -) -> mpsc::Receiver { +) -> mpsc::Receiver +where + P: JsonRpcClient, +{ fetch_transaction_traces(chunk, &[TraceType::StateDiff], source, include_indices).await } diff --git a/crates/freeze/src/datasets/storage_diffs.rs b/crates/freeze/src/datasets/storage_diffs.rs index 9c1fb89b..70619ba7 100644 --- a/crates/freeze/src/datasets/storage_diffs.rs +++ b/crates/freeze/src/datasets/storage_diffs.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use ethers::providers::{JsonRpcClient, Provider}; +use ethers::providers::JsonRpcClient; use polars::prelude::*; use super::state_diffs; @@ -48,13 +48,16 @@ impl Dataset for StorageDiffs { vec!["block_number".to_string(), "transaction_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { state_diffs::collect_block_state_diffs( &Datatype::StorageDiffs, chunk, @@ -65,13 +68,16 @@ impl Dataset for StorageDiffs { .await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { state_diffs::collect_transaction_state_diffs( &Datatype::StorageDiffs, chunk, diff --git a/crates/freeze/src/datasets/storages.rs b/crates/freeze/src/datasets/storages.rs index bf38e2c9..eb617386 100644 --- a/crates/freeze/src/datasets/storages.rs +++ b/crates/freeze/src/datasets/storages.rs @@ -3,10 +3,7 @@ use crate::{types::Storages, ColumnType, Dataset, Datatype}; use std::collections::HashMap; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -47,13 +44,16 @@ impl Dataset for Storages { vec!["block_number".to_string(), "address".to_string(), "slot".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let (address_chunks, slot_chunks) = match filter { Some(filter) => (filter.address_chunks()?, filter.slot_chunks()?), _ => return Err(CollectError::CollectError("must specify RowFilter".to_string())), @@ -65,12 +65,15 @@ impl Dataset for Storages { pub(crate) type BlockAddressSlot = (u64, Vec, Vec, Vec); -async fn fetch_slots( +async fn fetch_slots

( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, slot_chunks: Vec, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(100); for block_chunk in block_chunks { diff --git a/crates/freeze/src/datasets/trace_calls.rs b/crates/freeze/src/datasets/trace_calls.rs index cfa305f9..66ecb18f 100644 --- a/crates/freeze/src/datasets/trace_calls.rs +++ b/crates/freeze/src/datasets/trace_calls.rs @@ -8,10 +8,7 @@ use std::collections::HashMap; use crate::{conversions::ToVecHex, types::conversions::ToVecU8}; use tokio::{sync::mpsc, task}; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use super::traces; @@ -46,13 +43,16 @@ impl Dataset for TraceCalls { Traces.default_sort() } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let (address_chunks, call_data_chunks) = match filter { Some(filter) => (filter.address_chunks()?, filter.call_data_chunks()?), _ => return Err(CollectError::CollectError("must specify RowFilter".to_string())), @@ -65,12 +65,15 @@ impl Dataset for TraceCalls { // block, address, call_data, BlockTrace type TraceCallOutput = (u64, Vec, Vec, BlockTrace); -async fn fetch_trace_calls( +async fn fetch_trace_calls

( block_chunks: Vec<&BlockChunk>, address_chunks: Vec, call_data_chunks: Vec, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(100); for block_chunk in block_chunks { diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index 3c54d2b7..b8a7d4dd 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -1,9 +1,6 @@ use std::collections::HashMap; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -81,33 +78,42 @@ impl Dataset for Traces { vec!["block_number".to_string(), "transaction_position".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = fetch_block_traces(chunk, source); traces_to_df(rx, schema, source.chain_id).await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = fetch_transaction_traces(chunk, source); traces_to_df(rx, schema, source.chain_id).await } } -pub(crate) fn fetch_block_traces( +pub(crate) fn fetch_block_traces

( block_chunk: &BlockChunk, - source: &Source>, -) -> mpsc::Receiver, CollectError>> { + source: &Source

, +) -> mpsc::Receiver, CollectError>> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); for number in block_chunk.numbers() { @@ -128,10 +134,13 @@ pub(crate) fn fetch_block_traces( rx } -pub(crate) fn fetch_transaction_traces( +pub(crate) fn fetch_transaction_traces

( transaction_chunk: &TransactionChunk, - source: &Source>, -) -> mpsc::Receiver, CollectError>> { + source: &Source

, +) -> mpsc::Receiver, CollectError>> +where + P: JsonRpcClient, +{ match transaction_chunk { TransactionChunk::Values(tx_hashes) => { let (tx, rx) = mpsc::channel(tx_hashes.len()); diff --git a/crates/freeze/src/datasets/transaction_addresses.rs b/crates/freeze/src/datasets/transaction_addresses.rs index 40482839..221c27ed 100644 --- a/crates/freeze/src/datasets/transaction_addresses.rs +++ b/crates/freeze/src/datasets/transaction_addresses.rs @@ -1,10 +1,7 @@ use crate::{types::TransactionAddresses, ColumnType, Dataset, Datatype}; use std::collections::HashMap; -use ethers::{ - prelude::*, - providers::{JsonRpcClient, ProviderError}, -}; +use ethers::{prelude::*, providers::JsonRpcClient}; use polars::prelude::*; use tokio::{sync::mpsc, task}; @@ -56,24 +53,30 @@ impl Dataset for TransactionAddresses { ] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = fetch_block_tx_addresses(chunk, source).await; traces_to_addresses_df(rx, schema, source.chain_id).await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = fetch_transaction_tx_addresses(chunk, source).await; traces_to_addresses_df(rx, schema, source.chain_id).await } @@ -81,10 +84,13 @@ impl Dataset for TransactionAddresses { type BlockLogTraces = (Block, Vec, Vec); -pub(crate) async fn fetch_block_tx_addresses( +pub(crate) async fn fetch_block_tx_addresses

( block_chunk: &BlockChunk, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); for number in block_chunk.numbers() { @@ -104,10 +110,13 @@ pub(crate) async fn fetch_block_tx_addresses( rx } -async fn fetch_transaction_tx_addresses( +async fn fetch_transaction_tx_addresses

( transaction_chunk: &TransactionChunk, - source: &Source>, -) -> mpsc::Receiver> { + source: &Source

, +) -> mpsc::Receiver> +where + P: JsonRpcClient, +{ match transaction_chunk { TransactionChunk::Values(tx_hashes) => { let (tx, rx) = mpsc::channel(tx_hashes.len()); @@ -145,10 +154,13 @@ async fn fetch_transaction_tx_addresses( } } -async fn get_block_block_logs_traces( +async fn get_block_block_logs_traces

( number: u64, - source: &Source>, -) -> Result { + source: &Source

, +) -> Result +where + P: JsonRpcClient, +{ let block_number: BlockNumber = number.into(); // block @@ -174,10 +186,13 @@ async fn get_block_block_logs_traces( Ok((block_result, log_result, traces_result)) } -async fn get_tx_block_logs_traces( +async fn get_tx_block_logs_traces

( tx_hash: H256, - source: &Source>, -) -> Result { + source: &Source

, +) -> Result +where + P: JsonRpcClient, +{ let tx_data = source.fetcher.get_transaction(tx_hash).await?.ok_or_else(|| { CollectError::CollectError("could not find transaction data".to_string()) diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 29feae92..419544fa 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -68,13 +68,16 @@ impl Dataset for Transactions { vec!["block_number".to_string(), "transaction_index".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let include_gas_used = schema.has_column("gas_used"); let rx = blocks_and_transactions::fetch_blocks_and_transactions(chunk, source, include_gas_used) @@ -87,24 +90,30 @@ impl Dataset for Transactions { } } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let include_gas_used = schema.has_column("gas_used"); let rx = fetch_transactions(chunk, source, include_gas_used).await; transactions_to_df(rx, schema, source.chain_id).await } } -async fn fetch_transactions( +async fn fetch_transactions

( transaction_chunk: &TransactionChunk, - source: &Source>, + source: &Source

, include_gas_used: bool, -) -> mpsc::Receiver), CollectError>> { +) -> mpsc::Receiver), CollectError>> +where + P: JsonRpcClient, +{ let (tx, rx) = mpsc::channel(1); match transaction_chunk { diff --git a/crates/freeze/src/datasets/vm_traces.rs b/crates/freeze/src/datasets/vm_traces.rs index faf43d01..fa132b8f 100644 --- a/crates/freeze/src/datasets/vm_traces.rs +++ b/crates/freeze/src/datasets/vm_traces.rs @@ -52,42 +52,54 @@ impl Dataset for VmTraces { vec!["block_number".to_string(), "transaction_position".to_string(), "used".to_string()] } - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, chunk: &BlockChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let rx = fetch_block_vm_traces(chunk, source).await; vm_traces_to_df(rx, schema, source.chain_id).await } - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, chunk: &TransactionChunk, - source: &Source>, + source: &Source

, schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { let include_indices = schema.has_column("block_number"); let rx = fetch_transaction_vm_traces(chunk, source, include_indices).await; vm_traces_to_df(rx, schema, source.chain_id).await } } -async fn fetch_block_vm_traces( +async fn fetch_block_vm_traces

( block_chunk: &BlockChunk, - source: &Source>, -) -> mpsc::Receiver { + source: &Source

, +) -> mpsc::Receiver +where + P: JsonRpcClient, +{ state_diffs::fetch_block_traces(block_chunk, &[TraceType::VmTrace], source).await } -async fn fetch_transaction_vm_traces( +async fn fetch_transaction_vm_traces

( chunk: &TransactionChunk, - source: &Source>, + source: &Source

, include_indices: bool, -) -> mpsc::Receiver { +) -> mpsc::Receiver +where + P: JsonRpcClient, +{ state_diffs::fetch_transaction_traces(chunk, &[TraceType::VmTrace], source, include_indices) .await } diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index 2e134046..04d5a00c 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -11,12 +11,15 @@ use crate::types::{ }; /// perform a bulk data extraction of multiple datatypes over multiple block chunks -pub async fn freeze( +pub async fn freeze

( query: &MultiQuery, - source: &Source>, + source: &Source

, sink: &FileOutput, bar: Arc, -) -> Result { +) -> Result +where + P: JsonRpcClient, +{ // freeze chunks concurrently let (datatypes, multi_datatypes) = cluster_datatypes(query.schemas.keys().collect()); let sem = Arc::new(Semaphore::new(source.max_concurrent_chunks as usize)); @@ -72,15 +75,18 @@ fn cluster_datatypes(dts: Vec<&Datatype>) -> (Vec, Vec) (other_dts, mdts) } -async fn freeze_datatype_chunk( +async fn freeze_datatype_chunk

( chunk: (Chunk, Option), datatype: Datatype, sem: Arc, query: Arc, - source: Arc>>, + source: Arc>, sink: Arc, bar: Arc, -) -> FreezeChunkSummary { +) -> FreezeChunkSummary +where + P: JsonRpcClient, +{ let _permit = sem.acquire().await.expect("Semaphore acquire"); let ds = datatype.dataset(); @@ -122,15 +128,18 @@ async fn freeze_datatype_chunk( FreezeChunkSummary::success(paths) } -async fn freeze_multi_datatype_chunk( +async fn freeze_multi_datatype_chunk

( chunk: (Chunk, Option), mdt: MultiDatatype, sem: Arc, query: Arc, - source: Arc>>, + source: Arc>, sink: Arc, bar: Arc, -) -> FreezeChunkSummary { +) -> FreezeChunkSummary +where + P: JsonRpcClient, +{ let _permit = sem.acquire().await.expect("Semaphore acquire"); // create paths diff --git a/crates/freeze/src/types/datatypes/multi.rs b/crates/freeze/src/types/datatypes/multi.rs index 7a749ca7..5273f6cb 100644 --- a/crates/freeze/src/types/datatypes/multi.rs +++ b/crates/freeze/src/types/datatypes/multi.rs @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet}; use async_trait; use polars::prelude::*; -use ethers::providers::{JsonRpcClient, Provider}; +use ethers::providers::JsonRpcClient; use crate::types::{ AddressChunk, BlockChunk, Chunk, CollectError, Dataset, Datatype, RowFilter, Source, Table, @@ -33,7 +33,7 @@ impl MultiDatatype { } /// return MultiDataset corresponding to MultiDatatype - pub fn multi_dataset(&self) -> Box { + pub fn multi_dataset(&self) -> Box { match self { MultiDatatype::BlocksAndTransactions => Box::new(BlocksAndTransactions), MultiDatatype::StateDiffs => Box::new(StateDiffs), @@ -56,13 +56,16 @@ pub trait MultiDataset: Sync + Send { } /// collect dataset for a particular chunk - async fn collect_chunk( + async fn collect_chunk

( &self, chunk: &Chunk, - source: &Source>, + source: &Source

, schemas: HashMap, filter: HashMap, - ) -> Result, CollectError> { + ) -> Result, CollectError> + where + P: JsonRpcClient, + { match chunk { Chunk::Block(chunk) => self.collect_block_chunk(chunk, source, schemas, filter).await, Chunk::Transaction(chunk) => { @@ -75,35 +78,44 @@ pub trait MultiDataset: Sync + Send { } /// collect dataset for a particular block chunk - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, _chunk: &BlockChunk, - _source: &Source>, + _source: &Source

, _schemas: HashMap, _filter: HashMap, - ) -> Result, CollectError> { + ) -> Result, CollectError> + where + P: JsonRpcClient, + { panic!("block_chunk collection not implemented for {}", self.name()) } /// collect dataset for a particular transaction chunk - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, _chunk: &TransactionChunk, - _source: &Source>, + _source: &Source

, _schemas: HashMap, _filter: HashMap, - ) -> Result, CollectError> { + ) -> Result, CollectError> + where + P: JsonRpcClient, + { panic!("transaction_chunk collection not implemented for {}", self.name()) } /// collect dataset for a particular transaction chunk - async fn collect_address_chunk( + async fn collect_address_chunk

( &self, _chunk: &AddressChunk, - _source: &Source>, + _source: &Source

, _schemas: HashMap, _filter: HashMap, - ) -> Result, CollectError> { + ) -> Result, CollectError> + where + P: JsonRpcClient, + { panic!("transaction_chunk collection not implemented for {}", self.name()) } } diff --git a/crates/freeze/src/types/datatypes/scalar.rs b/crates/freeze/src/types/datatypes/scalar.rs index c0f52a92..d6e0acb8 100644 --- a/crates/freeze/src/types/datatypes/scalar.rs +++ b/crates/freeze/src/types/datatypes/scalar.rs @@ -174,13 +174,16 @@ pub trait Dataset: Sync + Send { } /// collect dataset for a particular chunk - async fn collect_chunk( + async fn collect_chunk

( &self, chunk: &Chunk, - source: &Source>, + source: &Source

, schema: &Table, filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { match chunk { Chunk::Block(chunk) => self.collect_block_chunk(chunk, source, schema, filter).await, Chunk::Transaction(chunk) => { @@ -193,35 +196,44 @@ pub trait Dataset: Sync + Send { } /// collect dataset for a particular block chunk - async fn collect_block_chunk( + async fn collect_block_chunk

( &self, _chunk: &BlockChunk, - _source: &Source>, + _source: &Source

, _schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { panic!("block_chunk collection not implemented for {}", self.name()) } /// collect dataset for a particular transaction chunk - async fn collect_transaction_chunk( + async fn collect_transaction_chunk

( &self, _chunk: &TransactionChunk, - _source: &Source>, + _source: &Source

, _schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { panic!("transaction_chunk collection not implemented for {}", self.name()) } /// collect dataset for a particular transaction chunk - async fn collect_address_chunk( + async fn collect_address_chunk

( &self, _chunk: &AddressChunk, - _source: &Source>, + _source: &Source

, _schema: &Table, _filter: Option<&RowFilter>, - ) -> Result { + ) -> Result + where + P: JsonRpcClient, + { panic!("transaction_chunk collection not implemented for {}", self.name()) } } diff --git a/crates/freeze/src/types/sources.rs b/crates/freeze/src/types/sources.rs index ffd1ffc3..cbfc9c1c 100644 --- a/crates/freeze/src/types/sources.rs +++ b/crates/freeze/src/types/sources.rs @@ -15,7 +15,10 @@ pub type RateLimiter = governor::RateLimiter { +pub struct Source

+where + P: JsonRpcClient, +{ /// Shared provider for rpc data pub fetcher: Arc>, /// chain_id of network From 79ca39bc218d46e66c9e79e6085a4a809c5326c5 Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Sun, 24 Sep 2023 15:33:57 +0530 Subject: [PATCH 3/3] fix parse_source, Source<> parametrized correctly now --- crates/cli/src/parse/source.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/crates/cli/src/parse/source.rs b/crates/cli/src/parse/source.rs index daaf3a04..48a33924 100644 --- a/crates/cli/src/parse/source.rs +++ b/crates/cli/src/parse/source.rs @@ -35,7 +35,7 @@ pub(crate) async fn parse_source(args: &Args) -> Result { match provider { RpcProvider::Http(provider) => { chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64(); - Fetcher { provider, semaphore, rate_limiter } + let fetcher = Fetcher { provider, semaphore, rate_limiter }; let output = Source { fetcher: Arc::new(fetcher), chain_id, @@ -58,16 +58,6 @@ pub(crate) async fn parse_source(args: &Args) -> Result { Ok(output) } } - - let fetcher = Fetcher { provider, semaphore, rate_limiter }; - let output = Source { - fetcher: Arc::new(fetcher), - chain_id, - inner_request_size: args.inner_request_size, - max_concurrent_chunks, - }; - - Ok(output) } async fn parse_rpc_url(args: &Args) -> RpcProvider {