From 1750dcf57e66915b7a078ca9ba32c3aa8384b089 Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Tue, 11 Feb 2025 12:27:57 +0200 Subject: [PATCH 1/5] feat: track contract to pool state map on tycho decoder This is to allow us to trigger pool state updates on related contract updates --- src/evm/decoder.rs | 31 +++++++++++++++++--- src/evm/protocol/uniswap_v2/tycho_decoder.rs | 6 ++-- src/evm/protocol/uniswap_v3/tycho_decoder.rs | 6 ++-- src/evm/protocol/uniswap_v4/state.rs | 3 +- src/evm/protocol/uniswap_v4/tycho_decoder.rs | 10 +++++-- src/evm/protocol/vm/tycho_decoder.rs | 30 ++++++++++++++----- src/protocol/models.rs | 2 +- 7 files changed, 66 insertions(+), 22 deletions(-) diff --git a/src/evm/decoder.rs b/src/evm/decoder.rs index d2d427ad..047f0e57 100644 --- a/src/evm/decoder.rs +++ b/src/evm/decoder.rs @@ -36,10 +36,18 @@ pub enum StreamDecodeError { struct DecoderState { tokens: HashMap, states: HashMap>, + // maps contract address to the pools they affect + contracts_map: HashMap>, } -type DecodeFut = - Pin, InvalidSnapshotError>> + Send + Sync>>; +type ContractMap = HashMap; +type DecodeFut = Pin< + Box< + dyn Future, ContractMap), InvalidSnapshotError>> + + Send + + Sync, + >, +>; type RegistryFn = dyn Fn(ComponentWithState, Header, Arc>) -> DecodeFut + Send + Sync; type FilterFn = fn(&ComponentWithState) -> bool; @@ -114,7 +122,7 @@ impl TychoStreamDecoder { let guard = state.read().await; T::try_from_with_block(component, header, &guard.tokens) .await - .map(|c| Box::new(c) as Box) + .map(|(c, m)| (Box::new(c) as Box, m)) }) as DecodeFut }, ); @@ -149,6 +157,7 @@ impl TychoStreamDecoder { let mut updated_states = HashMap::new(); let mut new_pairs = HashMap::new(); let mut removed_pairs = HashMap::new(); + let mut contracts_map = HashMap::new(); let block = msg .state_msgs @@ -190,6 +199,7 @@ impl TychoStreamDecoder { } } + // Remove untracked components let state_guard = self.state.read().await; removed_pairs.extend( protocol_msg @@ -286,7 +296,13 @@ impl TychoStreamDecoder { // Construct state from snapshot if let Some(state_decode_f) = self.registry.get(protocol.as_str()) { match state_decode_f(snapshot, block.clone(), self.state.clone()).await { - Ok(state) => { + Ok((state, contracts)) => { + for (key, value) in contracts { + contracts_map + .entry(key) + .or_insert_with(Vec::new) + .push(value); + } new_components.insert(id.clone(), state); } Err(e) => { @@ -378,6 +394,13 @@ impl TychoStreamDecoder { state_guard .states .extend(updated_states.clone().into_iter()); + for (key, values) in contracts_map { + state_guard + .contracts_map + .entry(key) + .or_insert_with(Vec::new) + .extend(values); + } // Send the tick with all updated states Ok(BlockUpdate::new(block.number, updated_states, new_pairs) diff --git a/src/evm/protocol/uniswap_v2/tycho_decoder.rs b/src/evm/protocol/uniswap_v2/tycho_decoder.rs index 78d552d1..b5b7f56f 100644 --- a/src/evm/protocol/uniswap_v2/tycho_decoder.rs +++ b/src/evm/protocol/uniswap_v2/tycho_decoder.rs @@ -19,7 +19,7 @@ impl TryFromWithBlock for UniswapV2State { snapshot: ComponentWithState, _block: Header, _all_tokens: &HashMap, - ) -> Result { + ) -> Result<(Self, HashMap), Self::Error> { let reserve0 = U256::from_be_slice( snapshot .state @@ -36,7 +36,7 @@ impl TryFromWithBlock for UniswapV2State { .ok_or(InvalidSnapshotError::MissingAttribute("reserve1".to_string()))?, ); - Ok(UniswapV2State::new(reserve0, reserve1)) + Ok((UniswapV2State::new(reserve0, reserve1), HashMap::new())) } } @@ -100,7 +100,7 @@ mod tests { let result = UniswapV2State::try_from_with_block(snapshot, header(), &HashMap::new()).await; assert!(result.is_ok()); - let res = result.unwrap(); + let res = result.unwrap().0; assert_eq!(res.reserve0, U256::from_str("100").unwrap()); assert_eq!(res.reserve1, U256::from_str("200").unwrap()); } diff --git a/src/evm/protocol/uniswap_v3/tycho_decoder.rs b/src/evm/protocol/uniswap_v3/tycho_decoder.rs index b3632f8d..8ccd8555 100644 --- a/src/evm/protocol/uniswap_v3/tycho_decoder.rs +++ b/src/evm/protocol/uniswap_v3/tycho_decoder.rs @@ -20,7 +20,7 @@ impl TryFromWithBlock for UniswapV3State { snapshot: ComponentWithState, _block: Header, _all_tokens: &HashMap, - ) -> Result { + ) -> Result<(Self, HashMap), Self::Error> { let liq = snapshot .state .attributes @@ -120,7 +120,7 @@ impl TryFromWithBlock for UniswapV3State { ticks.sort_by_key(|tick| tick.index); - Ok(UniswapV3State::new(liquidity, sqrt_price, fee, tick, ticks)) + Ok((UniswapV3State::new(liquidity, sqrt_price, fee, tick, ticks), HashMap::new())) } } @@ -201,7 +201,7 @@ mod tests { 300, vec![TickInfo::new(60, 400)], ); - assert_eq!(result.unwrap(), expected); + assert_eq!(result.unwrap().0, expected); } #[tokio::test] diff --git a/src/evm/protocol/uniswap_v4/state.rs b/src/evm/protocol/uniswap_v4/state.rs index e0c93930..99e05f91 100644 --- a/src/evm/protocol/uniswap_v4/state.rs +++ b/src/evm/protocol/uniswap_v4/state.rs @@ -459,7 +459,8 @@ mod tests { let usv4_state = UniswapV4State::try_from_with_block(state, Default::default(), &Default::default()) .await - .unwrap(); + .unwrap() + .0; let t0 = Token::new( "0x647e32181a64f4ffd4f0b0b4b052ec05b277729c", diff --git a/src/evm/protocol/uniswap_v4/tycho_decoder.rs b/src/evm/protocol/uniswap_v4/tycho_decoder.rs index 000aea8b..fca89e9e 100644 --- a/src/evm/protocol/uniswap_v4/tycho_decoder.rs +++ b/src/evm/protocol/uniswap_v4/tycho_decoder.rs @@ -23,7 +23,7 @@ impl TryFromWithBlock for UniswapV4State { snapshot: ComponentWithState, _block: Header, _all_tokens: &HashMap, - ) -> Result { + ) -> Result<(Self, HashMap), Self::Error> { let liq = snapshot .state .attributes @@ -120,7 +120,10 @@ impl TryFromWithBlock for UniswapV4State { ticks.sort_by_key(|tick| tick.index); - Ok(UniswapV4State::new(liquidity, sqrt_price, fees, tick, tick_spacing, ticks)) + Ok(( + UniswapV4State::new(liquidity, sqrt_price, fees, tick, tick_spacing, ticks), + HashMap::new(), + )) } } @@ -203,7 +206,8 @@ mod tests { let result = UniswapV4State::try_from_with_block(snapshot, header(), &HashMap::new()) .await - .unwrap(); + .unwrap() + .0; let fees = UniswapV4Fees::new(0, 0, 500); let expected = UniswapV4State::new( diff --git a/src/evm/protocol/vm/tycho_decoder.rs b/src/evm/protocol/vm/tycho_decoder.rs index b785205a..cc0eb647 100644 --- a/src/evm/protocol/vm/tycho_decoder.rs +++ b/src/evm/protocol/vm/tycho_decoder.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, str::FromStr, time::{SystemTime, UNIX_EPOCH}, }; @@ -49,7 +49,7 @@ impl TryFromWithBlock for EVMPoolState { snapshot: ComponentWithState, block: Header, all_tokens: &HashMap, - ) -> Result { + ) -> Result<(Self, HashMap), Self::Error> { let id = snapshot.component.id.clone(); let tokens = snapshot.component.tokens.clone(); @@ -115,7 +115,20 @@ impl TryFromWithBlock for EVMPoolState { .contract_ids .iter() .map(|bytes: &Bytes| Address::from_slice(bytes.as_ref())) - .collect(); + .collect::>(); + + let contracts_map = if manual_updates { + // avoid triggering updates for account changes if manual updates are expected + HashMap::new() + } else { + // enable triggering updates to this pool for all involved contracts + snapshot + .component + .contract_ids + .iter() + .map(|address| (address.clone(), id.clone())) + .collect() + }; let protocol_name = snapshot .component @@ -155,7 +168,7 @@ impl TryFromWithBlock for EVMPoolState { pool_state.set_spot_prices(all_tokens)?; - Ok(pool_state) + Ok((pool_state, contracts_map)) } } @@ -312,14 +325,17 @@ mod tests { .await .unwrap(); + let res_pool = res.0; + + assert_eq!(res.1, HashMap::new()); assert_eq!( - res.get_balance_owner(), + res_pool.get_balance_owner(), Some(Address::from_str("0xBA12222222228d8Ba445958a75a0704d566BF2C8").unwrap()) ); let mut exp_involved_contracts = HashSet::new(); exp_involved_contracts .insert(Address::from_str("0xBA12222222228d8Ba445958a75a0704d566BF2C8").unwrap()); - assert_eq!(res.get_involved_contracts(), exp_involved_contracts); - assert!(res.get_manual_updates()); + assert_eq!(res_pool.get_involved_contracts(), exp_involved_contracts); + assert!(res_pool.get_manual_updates()); } } diff --git a/src/protocol/models.rs b/src/protocol/models.rs index cc13e1b2..54d4ee3c 100644 --- a/src/protocol/models.rs +++ b/src/protocol/models.rs @@ -131,7 +131,7 @@ pub trait TryFromWithBlock { value: T, block: Header, all_tokens: &HashMap, - ) -> impl Future> + Send + Sync + ) -> impl Future), Self::Error>> + Send + Sync where Self: Sized; } From f19e656b538006e0be69787d8306b931780fd66c Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Tue, 11 Feb 2025 15:31:30 +0200 Subject: [PATCH 2/5] feat: trigger pool updates on related contract changes --- src/evm/decoder.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/src/evm/decoder.rs b/src/evm/decoder.rs index 047f0e57..a084880c 100644 --- a/src/evm/decoder.rs +++ b/src/evm/decoder.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, HashSet}, future::Future, pin::Pin, str::FromStr, @@ -11,7 +11,7 @@ use thiserror::Error; use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; use tycho_client::feed::{synchronizer::ComponentWithState, FeedMessage, Header}; -use tycho_core::Bytes; +use tycho_core::{dto::ProtocolStateDelta, Bytes}; use crate::{ evm::{ @@ -349,6 +349,65 @@ impl TychoStreamDecoder { .await; info!("Engine updated"); + // update states related to contracts with account deltas + let mut pools_to_update = HashSet::new(); + // get pools related to the updated accounts + for (account, _update) in deltas.account_updates { + pools_to_update.extend(match contracts_map.get(&account) { + Some(contracts) => contracts.clone(), + None => state_guard + .contracts_map + .get(&account) + .cloned() + .unwrap_or_default(), + }); + } + // update the pools + for pool in pools_to_update { + match updated_states.entry(pool.clone()) { + Entry::Occupied(mut entry) => { + // if state exists in updated_states, update it + let state: &mut Box = entry.get_mut(); + state + .delta_transition( + ProtocolStateDelta::default(), + &state_guard.tokens, + ) + .map_err(|e| { + error!(pool = pool, error = ?e, "DeltaTransitionError"); + StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}")) + })?; + } + Entry::Vacant(_) => { + match state_guard.states.get(&pool) { + // if state does not exist in updated_states, update the stored + // state + Some(stored_state) => { + let mut state = stored_state.clone(); + state + .delta_transition( + ProtocolStateDelta::default(), + &state_guard.tokens, + ) + .map_err(|e| { + error!(pool = pool, error = ?e, "DeltaTransitionError"); + StreamDecodeError::Fatal(format!( + "TransitionFailure: {e:?}" + )) + })?; + updated_states.insert(pool.clone(), state); + } + None => debug!( + pool = pool, + reason = "MissingState", + "DeltaTransitionError" + ), + } + } + } + } + + // update states with protocol state deltas (attribute changes etc.) for (id, update) in deltas.state_updates { match updated_states.entry(id.clone()) { Entry::Occupied(mut entry) => { From 70c6a4b774b63b31cbf15dd7e3f98fe116b3b5f3 Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Tue, 11 Feb 2025 16:47:06 +0200 Subject: [PATCH 3/5] test: test triggering state updates on contract changes --- Cargo.lock | 73 +++++++++++++++++- Cargo.toml | 3 + src/evm/decoder.rs | 67 ++++++++++++++-- src/evm/protocol/vm/tycho_decoder.rs | 2 +- src/protocol/state.rs | 68 ++++++++++++++++ tests/assets/decoder/balancer_v2_delta.json | 77 +++++++++++++++++++ ...napshot.json => balancer_v2_snapshot.json} | 0 .../python/test/test_third_party_pool.py | 1 - 8 files changed, 280 insertions(+), 11 deletions(-) create mode 100644 tests/assets/decoder/balancer_v2_delta.json rename tests/assets/decoder/{balancer_snapshot.json => balancer_v2_snapshot.json} (100%) diff --git a/Cargo.lock b/Cargo.lock index 20e0a5bc..7fbbe3dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2158,6 +2158,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dunce" version = "1.0.5" @@ -3065,6 +3071,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs4" version = "0.9.1" @@ -4255,6 +4267,32 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "native-tls" version = "0.2.12" @@ -4829,6 +4867,32 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" +[[package]] +name = "predicates" +version = "3.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa" + +[[package]] +name = "predicates-tree" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_assertions" version = "1.4.1" @@ -6578,6 +6642,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termtree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" + [[package]] name = "thiserror" version = "1.0.69" @@ -7138,6 +7208,7 @@ dependencies = [ "itertools 0.10.5", "lazy_static", "mini-moka", + "mockall", "num-bigint", "num-traits", "ratatui", @@ -7602,7 +7673,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6ecc6ec3..a526614c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,9 @@ tracing-subscriber = { version = "0.3.17", default-features = false, features = ] } tempfile = "3.13.0" +# testing +mockall = "0.13" + # price_printer example clap = { version = "4.5.3", features = ["derive"] } anyhow = "1.0.79" diff --git a/src/evm/decoder.rs b/src/evm/decoder.rs index a084880c..9c8f46ff 100644 --- a/src/evm/decoder.rs +++ b/src/evm/decoder.rs @@ -447,7 +447,6 @@ impl TychoStreamDecoder { } }; } - // Persist the newly added/updated states let mut state_guard = self.state.write().await; state_guard @@ -471,17 +470,14 @@ impl TychoStreamDecoder { mod tests { use std::{fs, path::Path}; + use mockall::predicate::*; use num_bigint::ToBigUint; use rstest::*; - use tycho_client::feed::FeedMessage; - use tycho_core::Bytes; + use super::*; use crate::{ - evm::{ - decoder::{StreamDecodeError, TychoStreamDecoder}, - protocol::uniswap_v2::state::UniswapV2State, - }, - models::Token, + evm::protocol::uniswap_v2::state::UniswapV2State, models::Token, + protocol::state::MockProtocolSim, }; async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder { @@ -607,4 +603,59 @@ mod tests { } } } + + #[tokio::test] + async fn test_decode_updates_state_on_contract_change() { + let decoder = setup_decoder(true).await; + + // Create the mock instances + let mut mock_state = MockProtocolSim::new(); + + mock_state + .expect_clone_box() + .times(1) + .returning(|| { + let mut cloned_mock_state = MockProtocolSim::new(); + // Expect `delta_transition` to be called once with any parameters + cloned_mock_state + .expect_delta_transition() + .times(1) + .returning(|_, _| Ok(())); + cloned_mock_state + .expect_clone_box() + .times(1) + .returning(|| Box::new(MockProtocolSim::new())); + Box::new(cloned_mock_state) + }); + + // Insert mock state into `updated_states` + let pool_id = + "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string(); + decoder + .state + .write() + .await + .states + .insert(pool_id.clone(), Box::new(mock_state) as Box); + decoder + .state + .write() + .await + .contracts_map + .insert( + Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0), + vec![pool_id.clone()], + ); + + // Load a test message containing a contract update + let msg = load_test_msg("balancer_v2_delta"); + + // Decode the message + let _ = decoder + .decode(msg) + .await + .expect("decode failure"); + + // The mock framework will assert that `delta_transition` was called exactly once + } } diff --git a/src/evm/protocol/vm/tycho_decoder.rs b/src/evm/protocol/vm/tycho_decoder.rs index cc0eb647..45e0a53e 100644 --- a/src/evm/protocol/vm/tycho_decoder.rs +++ b/src/evm/protocol/vm/tycho_decoder.rs @@ -241,7 +241,7 @@ mod tests { fn load_balancer_account_data() -> Vec { let project_root = env!("CARGO_MANIFEST_DIR"); let asset_path = - Path::new(project_root).join("tests/assets/decoder/balancer_snapshot.json"); + Path::new(project_root).join("tests/assets/decoder/balancer_v2_snapshot.json"); let json_data = fs::read_to_string(asset_path).expect("Failed to read test asset"); let data: Value = serde_json::from_str(&json_data).expect("Failed to parse JSON"); diff --git a/src/protocol/state.rs b/src/protocol/state.rs index c587b925..a6a87204 100644 --- a/src/protocol/state.rs +++ b/src/protocol/state.rs @@ -46,6 +46,8 @@ //! ``` use std::{any::Any, collections::HashMap}; +#[cfg(test)] +use mockall::mock; use num_bigint::BigUint; use tycho_core::{dto::ProtocolStateDelta, Bytes}; @@ -142,3 +144,69 @@ impl Clone for Box { self.clone_box() } } + +#[cfg(test)] +mock! { + #[derive(Debug)] + pub ProtocolSim { + pub fn fee(&self) -> f64; + pub fn spot_price(&self, base: &Token, quote: &Token) -> Result; + pub fn get_amount_out( + &self, + amount_in: BigUint, + token_in: &Token, + token_out: &Token, + ) -> Result; + pub fn delta_transition( + &mut self, + delta: ProtocolStateDelta, + tokens: &HashMap, + ) -> Result<(), TransitionError>; + pub fn clone_box(&self) -> Box; + pub fn eq(&self, other: &dyn ProtocolSim) -> bool; + } +} + +#[cfg(test)] +impl ProtocolSim for MockProtocolSim { + fn fee(&self) -> f64 { + self.fee() + } + + fn spot_price(&self, base: &Token, quote: &Token) -> Result { + self.spot_price(base, quote) + } + + fn get_amount_out( + &self, + amount_in: BigUint, + token_in: &Token, + token_out: &Token, + ) -> Result { + self.get_amount_out(amount_in, token_in, token_out) + } + + fn delta_transition( + &mut self, + delta: ProtocolStateDelta, + tokens: &HashMap, + ) -> Result<(), TransitionError> { + self.delta_transition(delta, tokens) + } + + fn clone_box(&self) -> Box { + self.clone_box() + } + + fn as_any(&self) -> &dyn Any { + panic!("MockProtocolSim does not support as_any") + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + panic!("MockProtocolSim does not support as_any_mut") + } + + fn eq(&self, other: &dyn ProtocolSim) -> bool { + self.eq(other) + } +} diff --git a/tests/assets/decoder/balancer_v2_delta.json b/tests/assets/decoder/balancer_v2_delta.json new file mode 100644 index 00000000..cc05503e --- /dev/null +++ b/tests/assets/decoder/balancer_v2_delta.json @@ -0,0 +1,77 @@ +{ + "state_msgs": { + "vm:balancer_v2": { + "header": { + "hash": "0x985c985381d51f7768902baa56da51819c843d300d128a505705833ea68dc210", + "number": 21823189, + "parent_hash": "0x298b11c34ed6d8d13f5cdb9b86528a4dd2e12aedd5121ce6dc698761e1b27f6e", + "revert": false + }, + "snapshots": { + "states": {}, + "vm_storage": {} + }, + "deltas": { + "extractor": "vm:balancer_v2", + "chain": "ethereum", + "block": { + "number": 21823189, + "hash": "0x985c985381d51f7768902baa56da51819c843d300d128a505705833ea68dc210", + "parent_hash": "0x298b11c34ed6d8d13f5cdb9b86528a4dd2e12aedd5121ce6dc698761e1b27f6e", + "chain": "ethereum", + "ts": "2025-02-11T12:09:35" + }, + "finalized_block_height": 21823111, + "revert": false, + "new_tokens": {}, + "account_updates": { + "0xba12222222228d8ba445958a75a0704d566bf2c8": { + "address": "0xba12222222228d8ba445958a75a0704d566bf2c8", + "chain": "ethereum", + "slots": { + "0x43f6aa0cddef7ef5e613fa5a2609fdc84dcdcc7deb972c7b99a7def94b47469a": "0x014cfed5000000000000000000000000000000000000008d518759c491e97bb6", + "0x68937bc243fb8cd085e8097a3892e1d1ea282143f39fc35e8cd0c3f7550b8a2a": "0x014cfed500000000000000000000000000000000000001d3eb473a072ae36163" + }, + "balance": null, + "code": null, + "change": "Update" + } + }, + "state_updates": {}, + "new_protocol_components": {}, + "deleted_protocol_components": {}, + "component_balances": { + "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2": { + "0x7f39c581f595b53c5cb19bd0b3f8da6c935e2ca0": { + "token": "0x7f39c581f595b53c5cb19bd0b3f8da6c935e2ca0", + "balance": "0x01d3eb473a072ae36163", + "balance_float": 8.631583065547079e+21, + "modify_tx": "0xecd63d913fd99d950565c9639446ffc9ab354807f0af256e454bc2924b170f60", + "component_id": "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2" + }, + "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2": { + "token": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", + "balance": "0x8d518759c491e97bb6", + "balance_float": 2.606865677332771e+21, + "modify_tx": "0xecd63d913fd99d950565c9639446ffc9ab354807f0af256e454bc2924b170f60", + "component_id": "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2" + } + } + }, + "component_tvl": { + "0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2": 12902.997863620163 + } + }, + "removed_components": {} + } + }, + "sync_states": { + "vm:balancer_v2": { + "status": "ready", + "hash": "0x985c985381d51f7768902baa56da51819c843d300d128a505705833ea68dc210", + "number": 21823189, + "parent_hash": "0x298b11c34ed6d8d13f5cdb9b86528a4dd2e12aedd5121ce6dc698761e1b27f6e", + "revert": false + } + } +} \ No newline at end of file diff --git a/tests/assets/decoder/balancer_snapshot.json b/tests/assets/decoder/balancer_v2_snapshot.json similarity index 100% rename from tests/assets/decoder/balancer_snapshot.json rename to tests/assets/decoder/balancer_v2_snapshot.json diff --git a/tycho_simulation_py/python/test/test_third_party_pool.py b/tycho_simulation_py/python/test/test_third_party_pool.py index f53f9207..2c920760 100644 --- a/tycho_simulation_py/python/test/test_third_party_pool.py +++ b/tycho_simulation_py/python/test/test_third_party_pool.py @@ -1,6 +1,5 @@ import json from decimal import Decimal -from pathlib import Path from unittest.mock import patch, call import pytest From 2d529289ee484caa18d26019e40dc753737b9a39 Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Thu, 13 Feb 2025 12:00:35 +0200 Subject: [PATCH 4/5] refactor: use new_pairs to collect contracts to pool_ids mapping And revert try_from_with_block interface change --- src/evm/decoder.rs | 34 +++++++++++--------- src/evm/protocol/uniswap_v2/tycho_decoder.rs | 6 ++-- src/evm/protocol/uniswap_v3/tycho_decoder.rs | 6 ++-- src/evm/protocol/uniswap_v4/state.rs | 3 +- src/evm/protocol/uniswap_v4/tycho_decoder.rs | 10 ++---- src/evm/protocol/vm/tycho_decoder.rs | 20 ++---------- src/protocol/models.rs | 2 +- 7 files changed, 32 insertions(+), 49 deletions(-) diff --git a/src/evm/decoder.rs b/src/evm/decoder.rs index 9c8f46ff..4708c50c 100644 --- a/src/evm/decoder.rs +++ b/src/evm/decoder.rs @@ -40,14 +40,8 @@ struct DecoderState { contracts_map: HashMap>, } -type ContractMap = HashMap; -type DecodeFut = Pin< - Box< - dyn Future, ContractMap), InvalidSnapshotError>> - + Send - + Sync, - >, ->; +type DecodeFut = + Pin, InvalidSnapshotError>> + Send + Sync>>; type RegistryFn = dyn Fn(ComponentWithState, Header, Arc>) -> DecodeFut + Send + Sync; type FilterFn = fn(&ComponentWithState) -> bool; @@ -122,7 +116,7 @@ impl TychoStreamDecoder { let guard = state.read().await; T::try_from_with_block(component, header, &guard.tokens) .await - .map(|(c, m)| (Box::new(c) as Box, m)) + .map(|c| Box::new(c) as Box) }) as DecodeFut }, ); @@ -292,17 +286,25 @@ impl TychoStreamDecoder { component_tokens, ), ); + // collect contracts:ids mapping for states that should update on contract changes + for component in new_pairs.values() { + if component + .static_attributes + .contains_key("manual_updates") + { + for contract in &component.contract_ids { + contracts_map + .entry(contract.clone()) + .or_insert_with(Vec::new) + .push(id.clone()); + } + } + } // Construct state from snapshot if let Some(state_decode_f) = self.registry.get(protocol.as_str()) { match state_decode_f(snapshot, block.clone(), self.state.clone()).await { - Ok((state, contracts)) => { - for (key, value) in contracts { - contracts_map - .entry(key) - .or_insert_with(Vec::new) - .push(value); - } + Ok(state) => { new_components.insert(id.clone(), state); } Err(e) => { diff --git a/src/evm/protocol/uniswap_v2/tycho_decoder.rs b/src/evm/protocol/uniswap_v2/tycho_decoder.rs index b5b7f56f..78d552d1 100644 --- a/src/evm/protocol/uniswap_v2/tycho_decoder.rs +++ b/src/evm/protocol/uniswap_v2/tycho_decoder.rs @@ -19,7 +19,7 @@ impl TryFromWithBlock for UniswapV2State { snapshot: ComponentWithState, _block: Header, _all_tokens: &HashMap, - ) -> Result<(Self, HashMap), Self::Error> { + ) -> Result { let reserve0 = U256::from_be_slice( snapshot .state @@ -36,7 +36,7 @@ impl TryFromWithBlock for UniswapV2State { .ok_or(InvalidSnapshotError::MissingAttribute("reserve1".to_string()))?, ); - Ok((UniswapV2State::new(reserve0, reserve1), HashMap::new())) + Ok(UniswapV2State::new(reserve0, reserve1)) } } @@ -100,7 +100,7 @@ mod tests { let result = UniswapV2State::try_from_with_block(snapshot, header(), &HashMap::new()).await; assert!(result.is_ok()); - let res = result.unwrap().0; + let res = result.unwrap(); assert_eq!(res.reserve0, U256::from_str("100").unwrap()); assert_eq!(res.reserve1, U256::from_str("200").unwrap()); } diff --git a/src/evm/protocol/uniswap_v3/tycho_decoder.rs b/src/evm/protocol/uniswap_v3/tycho_decoder.rs index 8ccd8555..b3632f8d 100644 --- a/src/evm/protocol/uniswap_v3/tycho_decoder.rs +++ b/src/evm/protocol/uniswap_v3/tycho_decoder.rs @@ -20,7 +20,7 @@ impl TryFromWithBlock for UniswapV3State { snapshot: ComponentWithState, _block: Header, _all_tokens: &HashMap, - ) -> Result<(Self, HashMap), Self::Error> { + ) -> Result { let liq = snapshot .state .attributes @@ -120,7 +120,7 @@ impl TryFromWithBlock for UniswapV3State { ticks.sort_by_key(|tick| tick.index); - Ok((UniswapV3State::new(liquidity, sqrt_price, fee, tick, ticks), HashMap::new())) + Ok(UniswapV3State::new(liquidity, sqrt_price, fee, tick, ticks)) } } @@ -201,7 +201,7 @@ mod tests { 300, vec![TickInfo::new(60, 400)], ); - assert_eq!(result.unwrap().0, expected); + assert_eq!(result.unwrap(), expected); } #[tokio::test] diff --git a/src/evm/protocol/uniswap_v4/state.rs b/src/evm/protocol/uniswap_v4/state.rs index 99e05f91..e0c93930 100644 --- a/src/evm/protocol/uniswap_v4/state.rs +++ b/src/evm/protocol/uniswap_v4/state.rs @@ -459,8 +459,7 @@ mod tests { let usv4_state = UniswapV4State::try_from_with_block(state, Default::default(), &Default::default()) .await - .unwrap() - .0; + .unwrap(); let t0 = Token::new( "0x647e32181a64f4ffd4f0b0b4b052ec05b277729c", diff --git a/src/evm/protocol/uniswap_v4/tycho_decoder.rs b/src/evm/protocol/uniswap_v4/tycho_decoder.rs index fca89e9e..000aea8b 100644 --- a/src/evm/protocol/uniswap_v4/tycho_decoder.rs +++ b/src/evm/protocol/uniswap_v4/tycho_decoder.rs @@ -23,7 +23,7 @@ impl TryFromWithBlock for UniswapV4State { snapshot: ComponentWithState, _block: Header, _all_tokens: &HashMap, - ) -> Result<(Self, HashMap), Self::Error> { + ) -> Result { let liq = snapshot .state .attributes @@ -120,10 +120,7 @@ impl TryFromWithBlock for UniswapV4State { ticks.sort_by_key(|tick| tick.index); - Ok(( - UniswapV4State::new(liquidity, sqrt_price, fees, tick, tick_spacing, ticks), - HashMap::new(), - )) + Ok(UniswapV4State::new(liquidity, sqrt_price, fees, tick, tick_spacing, ticks)) } } @@ -206,8 +203,7 @@ mod tests { let result = UniswapV4State::try_from_with_block(snapshot, header(), &HashMap::new()) .await - .unwrap() - .0; + .unwrap(); let fees = UniswapV4Fees::new(0, 0, 500); let expected = UniswapV4State::new( diff --git a/src/evm/protocol/vm/tycho_decoder.rs b/src/evm/protocol/vm/tycho_decoder.rs index 45e0a53e..9c7e5c57 100644 --- a/src/evm/protocol/vm/tycho_decoder.rs +++ b/src/evm/protocol/vm/tycho_decoder.rs @@ -49,7 +49,7 @@ impl TryFromWithBlock for EVMPoolState { snapshot: ComponentWithState, block: Header, all_tokens: &HashMap, - ) -> Result<(Self, HashMap), Self::Error> { + ) -> Result { let id = snapshot.component.id.clone(); let tokens = snapshot.component.tokens.clone(); @@ -117,19 +117,6 @@ impl TryFromWithBlock for EVMPoolState { .map(|bytes: &Bytes| Address::from_slice(bytes.as_ref())) .collect::>(); - let contracts_map = if manual_updates { - // avoid triggering updates for account changes if manual updates are expected - HashMap::new() - } else { - // enable triggering updates to this pool for all involved contracts - snapshot - .component - .contract_ids - .iter() - .map(|address| (address.clone(), id.clone())) - .collect() - }; - let protocol_name = snapshot .component .protocol_system @@ -168,7 +155,7 @@ impl TryFromWithBlock for EVMPoolState { pool_state.set_spot_prices(all_tokens)?; - Ok((pool_state, contracts_map)) + Ok(pool_state) } } @@ -325,9 +312,8 @@ mod tests { .await .unwrap(); - let res_pool = res.0; + let res_pool = res; - assert_eq!(res.1, HashMap::new()); assert_eq!( res_pool.get_balance_owner(), Some(Address::from_str("0xBA12222222228d8Ba445958a75a0704d566BF2C8").unwrap()) diff --git a/src/protocol/models.rs b/src/protocol/models.rs index 54d4ee3c..cc13e1b2 100644 --- a/src/protocol/models.rs +++ b/src/protocol/models.rs @@ -131,7 +131,7 @@ pub trait TryFromWithBlock { value: T, block: Header, all_tokens: &HashMap, - ) -> impl Future), Self::Error>> + Send + Sync + ) -> impl Future> + Send + Sync where Self: Sized; } From 33284ad3b269f7dc24265c91970128a656341489 Mon Sep 17 00:00:00 2001 From: Louise Poole Date: Thu, 13 Feb 2025 12:41:22 +0200 Subject: [PATCH 5/5] fix: update both new and existing pools related to the contract updated --- src/evm/decoder.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/evm/decoder.rs b/src/evm/decoder.rs index 4708c50c..15fd0e13 100644 --- a/src/evm/decoder.rs +++ b/src/evm/decoder.rs @@ -355,14 +355,21 @@ impl TychoStreamDecoder { let mut pools_to_update = HashSet::new(); // get pools related to the updated accounts for (account, _update) in deltas.account_updates { - pools_to_update.extend(match contracts_map.get(&account) { - Some(contracts) => contracts.clone(), - None => state_guard + // get new pools related to the account updated + pools_to_update.extend( + contracts_map + .get(&account) + .cloned() + .unwrap_or_default(), + ); + // get existing pools related to the account updated + pools_to_update.extend( + state_guard .contracts_map .get(&account) .cloned() .unwrap_or_default(), - }); + ); } // update the pools for pool in pools_to_update {