diff --git a/Cargo.lock b/Cargo.lock index eeabf7dc..4dc43fcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4521,6 +4521,16 @@ dependencies = [ "pin-utils", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.0" @@ -4753,6 +4763,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "p256" version = "0.13.2" @@ -4950,12 +4966,12 @@ dependencies = [ [[package]] name = "petgraph" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 1.9.3", + "indexmap 2.2.6", ] [[package]] @@ -7050,8 +7066,10 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log", + "native-tls", "rustls 0.21.12", "tokio", + "tokio-native-tls", "tokio-rustls 0.24.1", "tungstenite 0.20.1", "webpki-roots 0.25.2", @@ -7215,6 +7233,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -7232,14 +7261,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ "matchers", + "nu-ansi-term", "once_cell", "regex", "serde", "serde_json", "sharded-slab", + "smallvec", "thread_local", "tracing", "tracing-core", + "tracing-log", "tracing-serde", ] @@ -7281,6 +7313,7 @@ dependencies = [ "http 0.2.9", "httparse", "log", + "native-tls", "rand", "rustls 0.21.12", "sha1", @@ -7309,10 +7342,25 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tutorial" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "ethers", + "protosim", + "tokio", + "tracing", + "tracing-subscriber", + "tycho-client", + "tycho-core", +] + [[package]] name = "tycho-client" -version = "0.34.0" -source = "git+ssh://github.com/propeller-heads/tycho-indexer.git?tag=0.34.0#1e2ae39125e2b457055869fe0e8309535e0f1c7f" +version = "0.41.0" +source = "git+ssh://github.com/propeller-heads/tycho-indexer.git?tag=0.41.0#a54f7bb98388390f564cd4148c03dc82fd8a7a4e" dependencies = [ "anyhow", "async-trait", @@ -7337,8 +7385,8 @@ dependencies = [ [[package]] name = "tycho-core" -version = "0.34.0" -source = "git+ssh://github.com/propeller-heads/tycho-indexer.git?tag=0.34.0#1e2ae39125e2b457055869fe0e8309535e0f1c7f" +version = "0.41.0" +source = "git+ssh://github.com/propeller-heads/tycho-indexer.git?tag=0.41.0#a54f7bb98388390f564cd4148c03dc82fd8a7a4e" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 6f796799..89392b81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,10 @@ version = "0.32.0" edition = "2021" [workspace] -members = ["protosim_py"] +members = ["protosim_py", "tutorial"] [dependencies] -ethabi = "13.0" +ethabi = "13.0" ethers = "2.0.13" serde_json = "1.0.105" serde = { version = "1.0", features = ["rc"] } @@ -28,8 +28,8 @@ uuid = { version = "1.4.1", features = [ ] } num-traits = "0.2.17" dotenv = "0.15.0" -tycho-core = { git = "ssh://github.com/propeller-heads/tycho-indexer.git", package = "tycho-core", tag = "0.34.0" } -tycho-client = { git = "ssh://github.com/propeller-heads/tycho-indexer.git", package = "tycho-client", tag = "0.34.0" } +tycho-core = { git = "ssh://github.com/propeller-heads/tycho-indexer.git", package = "tycho-core", tag = "0.41.0" } +tycho-client = { git = "ssh://github.com/propeller-heads/tycho-indexer.git", package = "tycho-client", tag = "0.41.0" } foundry-config = { git = "https://github.com/foundry-rs/foundry", rev = "2544793" } foundry-evm = { git = "https://github.com/foundry-rs/foundry", rev = "2544793" } revm-inspectors = { version = "0.5", features = ["serde"] } @@ -47,7 +47,6 @@ tracing-subscriber = { version = "0.3.17", default-features = false, features = "fmt", ] } tempfile = "3.13.0" - [features] default = [] network_tests = [] diff --git a/tutorial/Cargo.toml b/tutorial/Cargo.toml new file mode 100644 index 00000000..215b314d --- /dev/null +++ b/tutorial/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "tutorial" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "tutorial" +path = "src/main.rs" + +[dependencies] +protosim = { path = ".." } +tokio = { version = "1", features = ["full"] } +tracing = "0.1" +tracing-subscriber = "0.3" +ethers = "2.0.13" +clap = { version = "4.5.3", features = ["derive"] } +anyhow = "1.0.79" +tycho-core = { git = "ssh://github.com/propeller-heads/tycho-indexer.git", package = "tycho-core", tag = "0.41.0" } +tycho-client = { git = "ssh://github.com/propeller-heads/tycho-indexer.git", package = "tycho-client", tag = "0.41.0" } diff --git a/tutorial/src/data_feed/mod.rs b/tutorial/src/data_feed/mod.rs new file mode 100644 index 00000000..bfcba7ec --- /dev/null +++ b/tutorial/src/data_feed/mod.rs @@ -0,0 +1,2 @@ +pub mod state; +pub mod tycho; diff --git a/tutorial/src/data_feed/state.rs b/tutorial/src/data_feed/state.rs new file mode 100644 index 00000000..1584b9ed --- /dev/null +++ b/tutorial/src/data_feed/state.rs @@ -0,0 +1,36 @@ +//! Message structs for state updates +//! +//! A BlockState typically groups changes together based on the latency of the data source, +//! for example, on the Ethereum network, a BlockState is emitted every block and contains +//! all the changes from that block. +use std::collections::HashMap; + +use ethers::types::H160; + +use protosim::protocol::{models::ProtocolComponent, state::ProtocolSim}; + +#[derive(Debug)] +pub struct BlockState { + pub time: u64, + /// The current state of all pools + pub states: HashMap>, + /// The new pairs that were added in this block + pub new_pairs: HashMap, + /// The pairs that were removed in this block + pub removed_pairs: HashMap, +} + +impl BlockState { + pub fn new( + time: u64, + states: HashMap>, + new_pairs: HashMap, + ) -> Self { + BlockState { time, states, new_pairs, removed_pairs: HashMap::new() } + } + + pub fn set_removed_pairs(mut self, pairs: HashMap) -> Self { + self.removed_pairs = pairs; + self + } +} diff --git a/tutorial/src/data_feed/tycho.rs b/tutorial/src/data_feed/tycho.rs new file mode 100644 index 00000000..4cdb730b --- /dev/null +++ b/tutorial/src/data_feed/tycho.rs @@ -0,0 +1,280 @@ +use ethers::types::H160; +use std::{ + collections::{hash_map::Entry, HashMap}, + str::FromStr, + sync::mpsc::Sender, +}; +use tracing::{debug, info, warn}; + +use tycho_client::{ + feed::component_tracker::ComponentFilter, rpc::RPCClient, stream::TychoStreamBuilder, + HttpRPCClient, +}; +use tycho_core::dto::Chain; + +use protosim::{ + models::ERC20Token, + protocol::{ + models::ProtocolComponent, state::ProtocolSim, uniswap_v2::state::UniswapV2State, + uniswap_v3::state::UniswapV3State, BytesConvertible, + }, +}; + +use crate::data_feed::state::BlockState; + +// TODO: Make extractors configurable +async fn process_messages( + tycho_url: String, + auth_key: Option, + state_tx: Sender, + tvl_threshold: f64, +) { + // Connect to Tycho + let (jh, mut tycho_stream) = TychoStreamBuilder::new(&tycho_url, Chain::Ethereum) + .exchange("uniswap_v2", ComponentFilter::with_tvl_range(tvl_threshold, tvl_threshold)) + .exchange("uniswap_v3", ComponentFilter::with_tvl_range(tvl_threshold, tvl_threshold)) + .auth_key(auth_key.clone()) + .build() + .await + .expect("Failed to build tycho stream"); + + let mut all_tokens = load_all_tokens(tycho_url.as_str(), auth_key.as_deref()).await; + + // maps protocols to the the last block we've seen a message for it + let mut active_protocols: HashMap = HashMap::new(); + + // persist all protocol states between messages + // note - the current tick implementation expects addresses (H160) as component ids + let mut stored_states: HashMap> = HashMap::new(); + + // Loop through tycho messages + while let Some(msg) = tycho_stream.recv().await { + // stores all states updated in this tick/msg + let mut updated_states = HashMap::new(); + let mut new_pairs = HashMap::new(); + let mut removed_pairs = HashMap::new(); + + let block_id = msg + .state_msgs + .values() + .next() + .expect("Missing sync messages!") + .header + .number; + + for (protocol, protocol_msg) in msg.state_msgs.iter() { + if let Some(deltas) = protocol_msg.deltas.as_ref() { + deltas + .new_tokens + .iter() + .for_each(|(addr, token)| { + if token.quality >= 51 { + all_tokens + .entry(H160::from_bytes(addr)) + .or_insert_with(|| { + token + .clone() + .try_into() + .unwrap_or_else(|_| { + panic!("Couldn't convert {:x} into ERC20 token.", addr) + }) + }); + } + }); + } + + removed_pairs.extend( + protocol_msg + .removed_components + .iter() + .flat_map(|(id, comp)| { + let tokens = comp + .tokens + .iter() + .flat_map(|addr| { + all_tokens + .get(&H160::from_bytes(addr)) + .cloned() + }) + .collect::>(); + let id = H160::from_str(id.as_ref()).unwrap_or_else(|_| { + panic!("Failed parsing H160 from id string {}", id) + }); + if tokens.len() == comp.tokens.len() { + Some((id, ProtocolComponent::new(id, tokens))) + } else { + None + } + }), + ); + + let mut new_components = HashMap::new(); + + // PROCESS SNAPSHOTS + + for (id, snapshot) in protocol_msg + .snapshots + .get_states() + .clone() + { + let id = H160::from_str(id.as_ref()) + .unwrap_or_else(|_| panic!("Failed parsing H160 from id string {}", id)); + let mut pair_tokens = Vec::new(); + let mut skip_pool = false; + + for token in snapshot.component.tokens.clone() { + match all_tokens.get(&H160::from_bytes(&token)) { + Some(token) => pair_tokens.push(token.clone()), + None => { + debug!( + "Token not found in all_tokens {}, ignoring pool {:x?}", + token, id + ); + skip_pool = true; + break; + } + } + } + + if !skip_pool { + new_pairs.insert(id, ProtocolComponent::new(id, pair_tokens)); + } + + let state: Box = match protocol.as_str() { + "uniswap_v3" => match UniswapV3State::try_from(snapshot) { + Ok(state) => Box::new(state), + Err(e) => { + debug!("Failed parsing uniswap-v3 snapshot! {} for pool {:x?}", e, id); + continue; + } + }, + "uniswap_v2" => match UniswapV2State::try_from(snapshot) { + Ok(state) => Box::new(state), + Err(e) => { + warn!("Failed parsing uniswap-v2 snapshot! {} for pool {:x?}", e, id); + continue; + } + }, + _ => panic!("VM snapshot not supported!"), + }; + new_components.insert(id, state); + } + + if !new_components.is_empty() { + info!("Decoded {} snapshots for protocol {}", new_components.len(), protocol); + } + updated_states.extend(new_components); + + // PROCESS DELTAS + + if let Some(deltas) = protocol_msg.deltas.clone() { + for (id, update) in deltas.state_updates { + let id = H160::from_str(id.as_ref()) + .unwrap_or_else(|_| panic!("Failed parsing H160 from id string {}", id)); + match updated_states.entry(id) { + Entry::Occupied(mut entry) => { + // if state exists in updated_states, apply the delta to it + let state: &mut Box = entry.get_mut(); + state + .delta_transition(update) + .expect("Failed applying state update!"); + } + Entry::Vacant(_) => { + match stored_states.get(&id) { + // if state does not exist in updated_states, apply the delta to the stored state + Some(stored_state) => { + let mut state = stored_state.clone(); + state + .delta_transition(update) + .expect("Failed applying state update!"); + updated_states.insert(id, state); + } + None => warn!( + "Update could not be applied: missing stored state for id: {:x?}", + id + ), + } + } + } + } + }; + + // update active protocols + active_protocols + .entry(protocol.clone()) + .and_modify(|block| *block = block_id) + .or_insert(block_id); + } + + // checks all registered extractors have sent a message in the last 10 blocks + active_protocols + .iter() + .for_each(|(protocol, last_block)| { + if *last_block > block_id { + // old block message received - likely caused by a tycho-client restart. We don't skip processing the message + // as the restart provides a clean slate of new snapshots and corresponding deltas + warn!("Extractor {} sent an old block message. Last message at block {}, current block {}", protocol, block_id, last_block) + } else if block_id - last_block > 10 { + panic!("Extractor {} has not sent a message in the last 10 blocks! Last message at block {}, current block {}", protocol, block_id, last_block); + } + }); + + // Persist the newly added/updated states + stored_states.extend( + updated_states + .iter() + .map(|(id, state)| (*id, state.clone())), + ); + + // Send the tick with all updated states + let state = + BlockState::new(block_id, updated_states, new_pairs).set_removed_pairs(removed_pairs); + + state_tx + .send(state) + .expect("Sending tick failed!") + } + + jh.await.unwrap(); +} + +pub async fn load_all_tokens(tycho_url: &str, auth_key: Option<&str>) -> HashMap { + let rpc_url = format!("https://{tycho_url}"); + let rpc_client = HttpRPCClient::new(rpc_url.as_str(), auth_key).unwrap(); + + #[allow(clippy::mutable_key_type)] + rpc_client + .get_all_tokens(Chain::Ethereum, Some(100), Some(42), 3_000) + .await + .expect("Unable to load tokens") + .into_iter() + .map(|token| { + let token_clone = token.clone(); + ( + H160::from_bytes(&token.address), + token.try_into().unwrap_or_else(|_| { + panic!("Couldn't convert {:?} into ERC20 token.", token_clone) + }), + ) + }) + .collect::>() +} + +pub fn start( + tycho_url: String, + auth_key: Option, + state_tx: Sender, + tvl_threshold: f64, +) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + info!("Starting tycho data feed..."); + + rt.block_on(async { + tokio::spawn(async move { + process_messages(tycho_url, auth_key, state_tx, tvl_threshold).await; + }) + .await + .unwrap(); + }); +} diff --git a/tutorial/src/lib.rs b/tutorial/src/lib.rs new file mode 100644 index 00000000..95c7e26a --- /dev/null +++ b/tutorial/src/lib.rs @@ -0,0 +1 @@ +pub mod data_feed; diff --git a/tutorial/src/main.rs b/tutorial/src/main.rs new file mode 100644 index 00000000..40248eb0 --- /dev/null +++ b/tutorial/src/main.rs @@ -0,0 +1,169 @@ +use clap::Parser; +use ethers::types::U256; +use protosim::protocol::state::ProtocolSim; +use std::collections::HashMap; +use std::{ + env, + panic::{self, AssertUnwindSafe}, + process, + sync::mpsc, + thread, +}; +use tracing::{debug, error, info}; +use tracing_subscriber::{fmt, EnvFilter}; + +use protosim::models::ERC20Token; + +use tutorial::data_feed::{state::BlockState, tycho}; + +/// Graph based solver +#[derive(Parser)] +struct Cli { + /// The tvl threshold to filter the graph by + #[arg(short, long, default_value_t = 10.0)] + tvl_threshold: f64, +} + +pub fn process_ticks(rx: mpsc::Receiver) { + let mut pool_graph: HashMap> = HashMap::new(); + + loop { + match rx.recv() { + Ok(state) => { + handle_state(state, &mut pool_graph); + } + + Err(e) => { + error!("Error receiving tick: {:?}", e); + } + } + } +} + +fn handle_state( + block_state: BlockState, + pool_graph: &mut HashMap>, +) { + let usdc = + ERC20Token::new("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", 6, "USDC", U256::from(10000)); + + let weth = ERC20Token::new( + "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", + 18, + "WETH", + U256::from(15000), + ); + + info!("Received block update: {:?}", block_state.time); + info!( + "Found {:?} new pairs. Adding to the graph if they match the criteria", + block_state.new_pairs.len() + ); + for (address, component) in block_state.new_pairs { + let state = block_state.states.get(&address); + if state.is_none() { + debug!("State not found for new pair: {:?}", address); + continue; + } + // Check if token0.address == usdc and token1.address == weth + if component.tokens[0].address == usdc.address + && component.tokens[1].address == weth.address + { + debug!("Found USDC-WETH pair: {:?}", address); + pool_graph.insert(address, state.unwrap().clone()); + } + } + + info!( + "{:?} uniswap_v2 and uniswap_v3 pairs were updated on this block", + block_state.states.len() + ); + for (address, state) in block_state.states { + if let std::collections::hash_map::Entry::Occupied(mut e) = pool_graph.entry(address) { + info!( + "USDC-WETH pair: {:?} price has changed on block: {:?}", + address, block_state.time + ); + e.insert(state.clone()); + } + } + + info!(""); + info!("Found {} direct USDC-WETH pairs", pool_graph.len()); + + let (mut best_price, mut worst_price) = (None, None); + + for (id, pair) in pool_graph.iter() { + info!("USDC-WETH pair: {:?}", id); + let spot_price = pair.spot_price(&weth, &usdc); + info!("Price: {:?}", spot_price); + + best_price = Some(best_price.map_or(spot_price, |bp: f64| bp.max(spot_price))); + worst_price = Some(worst_price.map_or(spot_price, |wp: f64| wp.min(spot_price))); + } + + info!(""); + info!("Best spot price: {:?}", best_price.unwrap()); + info!("Worst spot price: {:?}", worst_price.unwrap()); + info!("----------------------------------------------------"); +} + +pub async fn start_app() { + // Parse command-line arguments into a Cli struct + let cli = Cli::parse(); + + let tycho_url = env::var("TYCHO_URL").expect("Please set 'TYCHO_URL' env variable!"); + let tycho_api_key: String = + env::var("TYCHO_API_KEY").expect("Please set 'TYCHO_API_KEY' env variable!"); + + // Create communication channels for inter-thread communication + let (ctrl_tx, ctrl_rx) = mpsc::channel::<()>(); + let (tick_tx, tick_rx) = mpsc::channel::(); + + // Spawn a new thread to process data feeds + let feed_ctrl_tx = ctrl_tx.clone(); + let _feed_handler = thread::spawn(move || { + info!("Starting data feed thread..."); + let _ = panic::catch_unwind(AssertUnwindSafe(move || { + tycho::start(tycho_url, Some(tycho_api_key), tick_tx, cli.tvl_threshold); + })); + if feed_ctrl_tx.send(()).is_err() { + error!("Fatal feed thread panicked and failed trying to communicate with main thread."); + process::exit(1); + } + }); + + let _graph_handler = thread::spawn(move || { + info!("Starting graph thread..."); + let _ = panic::catch_unwind(AssertUnwindSafe(move || { + process_ticks(tick_rx); + })); + if ctrl_tx.send(()).is_err() { + error!("Fatal feed thread panicked and failed trying to communicate with main thread."); + process::exit(1); + } + }); + + // Wait for termination: If any of the threads panic and exit, the application will terminate + if ctrl_rx.recv().is_ok() { + process::exit(1); + } +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let format = fmt::format() + .with_level(true) // Show log levels + .with_target(false) // Hide module paths + .compact(); // Use a compact format + + fmt() + .event_format(format) + .with_env_filter(EnvFilter::from_default_env()) // Use RUST_LOG for log levels + .init(); + + info!("Starting application..."); + + start_app().await; + Ok(()) +}