Skip to content

Commit

Permalink
make Source<P> generic over P: JsonRpcClient
Browse files Browse the repository at this point in the history
  • Loading branch information
0xprames committed Sep 24, 2023
1 parent 3e14fa3 commit 51c78cd
Show file tree
Hide file tree
Showing 30 changed files with 504 additions and 309 deletions.
17 changes: 10 additions & 7 deletions crates/freeze/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use polars::prelude::*;
use crate::types::{CollectError, Datatype, MultiQuery, SingleQuery, Source};

/// collect data and return as dataframe
pub async fn collect(
query: SingleQuery,
source: Source<Provider<impl JsonRpcClient>>,
) -> Result<DataFrame, CollectError> {
pub async fn collect<P>(query: SingleQuery, source: Source<P>) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
if query.chunks.len() > 1 {
return Err(CollectError::CollectError("can only collect 1 chunk".to_string()))
};
Expand All @@ -22,9 +22,12 @@ pub async fn collect(
}

/// collect data and return as dataframe
pub async fn collect_multiple(
pub async fn collect_multiple<P>(
_query: MultiQuery,
_source: Source<Provider<impl JsonRpcClient>>,
) -> Result<HashMap<Datatype, DataFrame>, CollectError> {
_source: Source<P>,
) -> Result<HashMap<Datatype, DataFrame>, CollectError>
where
P: JsonRpcClient,
{
todo!()
}
20 changes: 13 additions & 7 deletions crates/freeze/src/datasets/balance_diffs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ethers::providers::{JsonRpcClient, Provider};
use ethers::providers::JsonRpcClient;
use std::collections::HashMap;

use polars::prelude::*;
Expand Down Expand Up @@ -46,13 +46,16 @@ impl Dataset for BalanceDiffs {
vec!["block_number".to_string(), "transaction_index".to_string()]
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
state_diffs::collect_block_state_diffs(
&Datatype::BalanceDiffs,
chunk,
Expand All @@ -63,13 +66,16 @@ impl Dataset for BalanceDiffs {
.await
}

async fn collect_transaction_chunk(
async fn collect_transaction_chunk<P>(
&self,
chunk: &TransactionChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
state_diffs::collect_transaction_state_diffs(
&Datatype::BalanceDiffs,
chunk,
Expand Down
20 changes: 13 additions & 7 deletions crates/freeze/src/datasets/balances.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// required args:: address

use crate::{types::Balances, ColumnType, Dataset, Datatype};
use ethers::providers::{JsonRpcClient, ProviderError};
use ethers::providers::JsonRpcClient;
use std::collections::HashMap;

use ethers::prelude::*;
Expand Down Expand Up @@ -44,13 +44,16 @@ impl Dataset for Balances {
vec!["block_number".to_string(), "address".to_string()]
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
let address_chunks = match filter {
Some(filter) => match &filter.address_chunks {
Some(address_chunks) => address_chunks.clone(),
Expand All @@ -65,11 +68,14 @@ impl Dataset for Balances {

pub(crate) type BlockAddressBalance = (u64, Vec<u8>, U256);

async fn fetch_balances(
async fn fetch_balances<P>(
block_chunks: Vec<&BlockChunk>,
address_chunks: Vec<AddressChunk>,
source: &Source<Provider<impl JsonRpcClient>>,
) -> mpsc::Receiver<Result<BlockAddressBalance, CollectError>> {
source: &Source<P>,
) -> mpsc::Receiver<Result<BlockAddressBalance, CollectError>>
where
P: JsonRpcClient,
{
let (tx, rx) = mpsc::channel(100);

for block_chunk in block_chunks {
Expand Down
36 changes: 24 additions & 12 deletions crates/freeze/src/datasets/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,16 @@ impl Dataset for Blocks {
vec!["number".to_string()]
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
_filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
let rx = fetch_blocks(chunk, source).await;
let output = blocks_to_dfs(rx, &Some(schema), &None, source.chain_id).await;
match output {
Expand All @@ -73,13 +76,16 @@ impl Dataset for Blocks {
}
}

async fn collect_transaction_chunk(
async fn collect_transaction_chunk<P>(
&self,
chunk: &TransactionChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
let block_numbers = match chunk {
TransactionChunk::Values(tx_hashes) => {
fetch_tx_block_numbers(tx_hashes, source).await?
Expand All @@ -91,10 +97,13 @@ impl Dataset for Blocks {
}
}

async fn fetch_tx_block_numbers(
async fn fetch_tx_block_numbers<P>(
tx_hashes: &Vec<Vec<u8>>,
source: &Source<Provider<impl JsonRpcClient>>,
) -> Result<Vec<u64>, CollectError> {
source: &Source<P>,
) -> Result<Vec<u64>, CollectError>
where
P: JsonRpcClient,
{
let mut tasks = Vec::new();
for tx_hash in tx_hashes {
let fetcher = source.fetcher.clone();
Expand Down Expand Up @@ -124,10 +133,13 @@ async fn fetch_tx_block_numbers(
Ok(block_numbers)
}

async fn fetch_blocks(
async fn fetch_blocks<P>(
block_chunk: &BlockChunk,
source: &Source<Provider<impl JsonRpcClient>>,
) -> mpsc::Receiver<BlockTxGasTuple<TxHash>> {
source: &Source<P>,
) -> mpsc::Receiver<BlockTxGasTuple<TxHash>>
where
P: JsonRpcClient,
{
let (tx, rx) = mpsc::channel(block_chunk.numbers().len());

for number in block_chunk.numbers() {
Expand Down
20 changes: 13 additions & 7 deletions crates/freeze/src/datasets/blocks_and_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
},
};

use ethers::providers::{JsonRpcClient, ProviderError};
use ethers::providers::JsonRpcClient;

#[async_trait::async_trait]
impl MultiDataset for BlocksAndTransactions {
Expand All @@ -25,13 +25,16 @@ impl MultiDataset for BlocksAndTransactions {
[Datatype::Blocks, Datatype::Transactions].into_iter().collect()
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schemas: HashMap<Datatype, Table>,
_filter: HashMap<Datatype, RowFilter>,
) -> Result<HashMap<Datatype, DataFrame>, CollectError> {
) -> Result<HashMap<Datatype, DataFrame>, CollectError>
where
P: JsonRpcClient,
{
let include_gas_used = match &schemas.get(&Datatype::Transactions) {
Some(table) => table.has_column("gas_used"),
_ => false,
Expand All @@ -57,11 +60,14 @@ impl MultiDataset for BlocksAndTransactions {
}
}

pub(crate) async fn fetch_blocks_and_transactions(
pub(crate) async fn fetch_blocks_and_transactions<P>(
block_chunk: &BlockChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
include_gas_used: bool,
) -> mpsc::Receiver<blocks::BlockTxGasTuple<Transaction>> {
) -> mpsc::Receiver<blocks::BlockTxGasTuple<Transaction>>
where
P: JsonRpcClient,
{
let (tx, rx) = mpsc::channel(block_chunk.numbers().len());
let source = Arc::new(source.clone());

Expand Down
20 changes: 13 additions & 7 deletions crates/freeze/src/datasets/code_diffs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::types::{
TransactionChunk,
};

use ethers::providers::{JsonRpcClient, Provider};
use ethers::providers::JsonRpcClient;

#[async_trait::async_trait]
impl Dataset for CodeDiffs {
Expand Down Expand Up @@ -47,24 +47,30 @@ impl Dataset for CodeDiffs {
vec!["block_number".to_string(), "transaction_index".to_string()]
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
state_diffs::collect_block_state_diffs(&Datatype::CodeDiffs, chunk, source, schema, filter)
.await
}

async fn collect_transaction_chunk(
async fn collect_transaction_chunk<P>(
&self,
chunk: &TransactionChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
state_diffs::collect_transaction_state_diffs(
&Datatype::CodeDiffs,
chunk,
Expand Down
20 changes: 13 additions & 7 deletions crates/freeze/src/datasets/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ethers::prelude::*;
use polars::prelude::*;
use tokio::{sync::mpsc, task};

use ethers::providers::{JsonRpcClient, ProviderError};
use ethers::providers::JsonRpcClient;

use crate::{
dataframes::SortableDataFrame,
Expand Down Expand Up @@ -44,13 +44,16 @@ impl Dataset for Codes {
vec!["block_number".to_string(), "address".to_string()]
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
let address_chunks = match filter {
Some(filter) => match &filter.address_chunks {
Some(address_chunks) => address_chunks.clone(),
Expand All @@ -65,11 +68,14 @@ impl Dataset for Codes {

pub(crate) type BlockAddressCode = (u64, Vec<u8>, Vec<u8>);

async fn fetch_codes(
async fn fetch_codes<P>(
block_chunks: Vec<&BlockChunk>,
address_chunks: Vec<AddressChunk>,
source: &Source<Provider<impl JsonRpcClient>>,
) -> mpsc::Receiver<Result<BlockAddressCode, CollectError>> {
source: &Source<P>,
) -> mpsc::Receiver<Result<BlockAddressCode, CollectError>>
where
P: JsonRpcClient,
{
let (tx, rx) = mpsc::channel(100);

for block_chunk in block_chunks {
Expand Down
23 changes: 13 additions & 10 deletions crates/freeze/src/datasets/contracts.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::collections::HashMap;

use ethers::{
prelude::*,
providers::{JsonRpcClient, ProviderError},
};
use ethers::{prelude::*, providers::JsonRpcClient};
use polars::prelude::*;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -61,24 +58,30 @@ impl Dataset for Contracts {
vec!["block_number".to_string(), "create_index".to_string()]
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
_filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
let rx = traces::fetch_block_traces(chunk, source);
traces_to_contracts_df(rx, schema, source.chain_id).await
}

async fn collect_transaction_chunk(
async fn collect_transaction_chunk<P>(
&self,
chunk: &TransactionChunk,
source: &Source<Provider<impl JsonRpcClient>>,
source: &Source<P>,
schema: &Table,
_filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
let rx = traces::fetch_transaction_traces(chunk, source);
traces_to_contracts_df(rx, schema, source.chain_id).await
}
Expand Down
Loading

0 comments on commit 51c78cd

Please sign in to comment.