Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial attempt to make Source generic over JsonRpcClient #68

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ serde_json = "1.0.104"
thiserror = "1.0.40"
thousands = "0.2.0"
tokio = { version = "1.29.0", features = ["macros", "rt-multi-thread", "sync"] }
url = { version = "2.4", default-features = false }

[profile.dev]
incremental = true
Expand Down
1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ serde = { workspace = true }
serde_json = { workspace = true }
thousands = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
67 changes: 49 additions & 18 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ use crate::args::Args;

pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
// parse network info
let rpc_url = parse_rpc_url(args);
let provider =
Provider::<RetryClient<Http>>::new_client(&rpc_url, args.max_retries, args.initial_backoff)
.map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
let provider = parse_rpc_url(args).await;
let chain_id: u64;

let rate_limiter = match args.requests_per_second {
Some(rate_limit) => match NonZeroU32::new(rate_limit) {
Expand All @@ -35,18 +32,35 @@ pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
let semaphore = tokio::sync::Semaphore::new(max_concurrent_requests as usize);
let semaphore = Some(semaphore);

let fetcher = Fetcher { provider, semaphore, rate_limiter };
let output = Source {
fetcher: Arc::new(fetcher),
chain_id,
inner_request_size: args.inner_request_size,
max_concurrent_chunks,
};
match provider {
RpcProvider::Http(provider) => {
chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
let fetcher = Fetcher { provider, semaphore, rate_limiter };
let output = Source {
fetcher: Arc::new(fetcher),
chain_id,
inner_request_size: args.inner_request_size,
max_concurrent_chunks,
};

Ok(output)
}
RpcProvider::Ws(provider) => {
chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();
let fetcher = Fetcher { provider, semaphore, rate_limiter };
let output = Source {
fetcher: Arc::new(fetcher),
chain_id,
inner_request_size: args.inner_request_size,
max_concurrent_chunks,
};

Ok(output)
Ok(output)
}
}
}

fn parse_rpc_url(args: &Args) -> String {
async fn parse_rpc_url(args: &Args) -> RpcProvider {
let mut url = match &args.rpc {
Some(url) => url.clone(),
_ => match env::var("ETH_RPC_URL") {
Expand All @@ -57,8 +71,25 @@ fn parse_rpc_url(args: &Args) -> String {
}
},
};
if !url.starts_with("http") {
url = "http://".to_string() + url.as_str();
};
url
if url.starts_with("wss") {
let ws_provider = Provider::<Ws>::connect(url)
.await
.map_err(|_e| ParseError::ParseError("could not connect to ws_provider".to_string()))
.unwrap();
return RpcProvider::Ws(ws_provider)
} else {
if !url.starts_with("http") {
url = "http://".to_string() + url.as_str();
}
let http_provider =
Provider::<RetryClient<Http>>::new_client(&url, args.max_retries, args.initial_backoff)
.map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))
.unwrap();
return RpcProvider::Http(http_provider)
}
}

enum RpcProvider {
Http(Provider<RetryClient<Http>>),
Ws(Provider<Ws>),
}
15 changes: 11 additions & 4 deletions crates/freeze/src/collect.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::collections::HashMap;

use ethers::providers::{JsonRpcClient, Provider};
use polars::prelude::*;

use crate::types::{CollectError, Datatype, MultiQuery, SingleQuery, Source};

/// collect data and return as dataframe
pub async fn collect(query: SingleQuery, source: Source) -> Result<DataFrame, CollectError> {
pub async fn collect<P>(query: SingleQuery, source: Source<P>) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
if query.chunks.len() > 1 {
return Err(CollectError::CollectError("can only collect 1 chunk".to_string()))
};
Expand All @@ -18,9 +22,12 @@ pub async fn collect(query: SingleQuery, source: Source) -> Result<DataFrame, Co
}

/// collect data and return as dataframe
pub async fn collect_multiple(
pub async fn collect_multiple<P>(
_query: MultiQuery,
_source: Source,
) -> Result<HashMap<Datatype, DataFrame>, CollectError> {
_source: Source<P>,
) -> Result<HashMap<Datatype, DataFrame>, CollectError>
where
P: JsonRpcClient,
{
todo!()
}
19 changes: 13 additions & 6 deletions crates/freeze/src/datasets/balance_diffs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ethers::providers::JsonRpcClient;
use std::collections::HashMap;

use polars::prelude::*;
Expand Down Expand Up @@ -45,13 +46,16 @@ impl Dataset for BalanceDiffs {
vec!["block_number".to_string(), "transaction_index".to_string()]
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
state_diffs::collect_block_state_diffs(
&Datatype::BalanceDiffs,
chunk,
Expand All @@ -62,13 +66,16 @@ impl Dataset for BalanceDiffs {
.await
}

async fn collect_transaction_chunk(
async fn collect_transaction_chunk<P>(
&self,
chunk: &TransactionChunk,
source: &Source,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
state_diffs::collect_transaction_state_diffs(
&Datatype::BalanceDiffs,
chunk,
Expand Down
19 changes: 13 additions & 6 deletions crates/freeze/src/datasets/balances.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// required args:: address

use crate::{types::Balances, ColumnType, Dataset, Datatype};
use ethers::providers::JsonRpcClient;
use std::collections::HashMap;

use ethers::prelude::*;
Expand Down Expand Up @@ -43,13 +44,16 @@ impl Dataset for Balances {
vec!["block_number".to_string(), "address".to_string()]
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
let address_chunks = match filter {
Some(filter) => match &filter.address_chunks {
Some(address_chunks) => address_chunks.clone(),
Expand All @@ -64,11 +68,14 @@ impl Dataset for Balances {

pub(crate) type BlockAddressBalance = (u64, Vec<u8>, U256);

async fn fetch_balances(
async fn fetch_balances<P>(
block_chunks: Vec<&BlockChunk>,
address_chunks: Vec<AddressChunk>,
source: &Source,
) -> mpsc::Receiver<Result<BlockAddressBalance, CollectError>> {
source: &Source<P>,
) -> mpsc::Receiver<Result<BlockAddressBalance, CollectError>>
where
P: JsonRpcClient,
{
let (tx, rx) = mpsc::channel(100);

for block_chunk in block_chunks {
Expand Down
36 changes: 24 additions & 12 deletions crates/freeze/src/datasets/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,16 @@ impl Dataset for Blocks {
vec!["number".to_string()]
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source,
source: &Source<P>,
schema: &Table,
_filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
let rx = fetch_blocks(chunk, source).await;
let output = blocks_to_dfs(rx, &Some(schema), &None, source.chain_id).await;
match output {
Expand All @@ -73,13 +76,16 @@ impl Dataset for Blocks {
}
}

async fn collect_transaction_chunk(
async fn collect_transaction_chunk<P>(
&self,
chunk: &TransactionChunk,
source: &Source,
source: &Source<P>,
schema: &Table,
filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
) -> Result<DataFrame, CollectError>
where
P: JsonRpcClient,
{
let block_numbers = match chunk {
TransactionChunk::Values(tx_hashes) => {
fetch_tx_block_numbers(tx_hashes, source).await?
Expand All @@ -91,10 +97,13 @@ impl Dataset for Blocks {
}
}

async fn fetch_tx_block_numbers(
async fn fetch_tx_block_numbers<P>(
tx_hashes: &Vec<Vec<u8>>,
source: &Source,
) -> Result<Vec<u64>, CollectError> {
source: &Source<P>,
) -> Result<Vec<u64>, CollectError>
where
P: JsonRpcClient,
{
let mut tasks = Vec::new();
for tx_hash in tx_hashes {
let fetcher = source.fetcher.clone();
Expand Down Expand Up @@ -124,10 +133,13 @@ async fn fetch_tx_block_numbers(
Ok(block_numbers)
}

async fn fetch_blocks(
async fn fetch_blocks<P>(
block_chunk: &BlockChunk,
source: &Source,
) -> mpsc::Receiver<BlockTxGasTuple<TxHash>> {
source: &Source<P>,
) -> mpsc::Receiver<BlockTxGasTuple<TxHash>>
where
P: JsonRpcClient,
{
let (tx, rx) = mpsc::channel(block_chunk.numbers().len());

for number in block_chunk.numbers() {
Expand Down
20 changes: 14 additions & 6 deletions crates/freeze/src/datasets/blocks_and_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::{
},
};

use ethers::providers::JsonRpcClient;

#[async_trait::async_trait]
impl MultiDataset for BlocksAndTransactions {
fn name(&self) -> &'static str {
Expand All @@ -23,13 +25,16 @@ impl MultiDataset for BlocksAndTransactions {
[Datatype::Blocks, Datatype::Transactions].into_iter().collect()
}

async fn collect_block_chunk(
async fn collect_block_chunk<P>(
&self,
chunk: &BlockChunk,
source: &Source,
source: &Source<P>,
schemas: HashMap<Datatype, Table>,
_filter: HashMap<Datatype, RowFilter>,
) -> Result<HashMap<Datatype, DataFrame>, CollectError> {
) -> Result<HashMap<Datatype, DataFrame>, CollectError>
where
P: JsonRpcClient,
{
let include_gas_used = match &schemas.get(&Datatype::Transactions) {
Some(table) => table.has_column("gas_used"),
_ => false,
Expand All @@ -55,11 +60,14 @@ impl MultiDataset for BlocksAndTransactions {
}
}

pub(crate) async fn fetch_blocks_and_transactions(
pub(crate) async fn fetch_blocks_and_transactions<P>(
block_chunk: &BlockChunk,
source: &Source,
source: &Source<P>,
include_gas_used: bool,
) -> mpsc::Receiver<blocks::BlockTxGasTuple<Transaction>> {
) -> mpsc::Receiver<blocks::BlockTxGasTuple<Transaction>>
where
P: JsonRpcClient,
{
let (tx, rx) = mpsc::channel(block_chunk.numbers().len());
let source = Arc::new(source.clone());

Expand Down
Loading