Skip to content

Commit

Permalink
feat: apply account balances to decoded pool states
Browse files Browse the repository at this point in the history
For protocols with specified balance owners with tracked account balances, we should use those balances instead of component balances.
  • Loading branch information
louise-poole committed Feb 6, 2025
1 parent 1800f20 commit fbe8aa6
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 72 deletions.
30 changes: 26 additions & 4 deletions src/evm/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ struct DecoderState {
states: HashMap<String, Box<dyn ProtocolSim>>,
}

type AccountBalances = HashMap<Bytes, HashMap<Bytes, Bytes>>;
type DecodeFut =
Pin<Box<dyn Future<Output = Result<Box<dyn ProtocolSim>, InvalidSnapshotError>> + Send + Sync>>;
type RegistryFn =
dyn Fn(ComponentWithState, Header, Arc<RwLock<DecoderState>>) -> DecodeFut + Send + Sync;
type RegistryFn = dyn Fn(ComponentWithState, Header, AccountBalances, Arc<RwLock<DecoderState>>) -> DecodeFut
+ Send
+ Sync;
type FilterFn = fn(&ComponentWithState) -> bool;

/// A decoder to process raw messages.
Expand Down Expand Up @@ -109,10 +111,11 @@ impl TychoStreamDecoder {
let decoder = Box::new(
move |component: ComponentWithState,
header: Header,
account_balances: HashMap<Bytes, HashMap<Bytes, Bytes>>,
state: Arc<RwLock<DecoderState>>| {
Box::pin(async move {
let guard = state.read().await;
T::try_from_with_block(component, header, &guard.tokens)
T::try_from_with_block(component, header, &account_balances, &guard.tokens)
.await
.map(|c| Box::new(c) as Box<dyn ProtocolSim>)
}) as DecodeFut
Expand Down Expand Up @@ -190,6 +193,7 @@ impl TychoStreamDecoder {
}
}

// Remove components marked as no longer tracked
let state_guard = self.state.read().await;
removed_pairs.extend(
protocol_msg
Expand Down Expand Up @@ -233,6 +237,16 @@ impl TychoStreamDecoder {
.iter()
.map(|(key, value)| (Address::from_slice(&key[..20]), value.clone().into()))
.collect();
let account_balances = protocol_msg
.clone()
.snapshots
.get_vm_storage()
.iter()
.map(|(addr, acc)| {
let balances = acc.token_balances.clone();
(addr.clone(), balances)
})
.collect::<AccountBalances>();
info!("Updating engine with {} snapshots", storage_by_address.len());
update_engine(
SHARED_TYCHO_DB.clone(),
Expand Down Expand Up @@ -279,8 +293,16 @@ impl TychoStreamDecoder {

// Construct state from snapshot
if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
match state_decode_f(snapshot, block.clone(), self.state.clone()).await {
match state_decode_f(
snapshot,
block.clone(),
account_balances.clone(),
self.state.clone(),
)
.await
{
Ok(state) => {
// TODO: Handle account balances
new_components.insert(id.clone(), state);
}
Err(e) => {
Expand Down
24 changes: 17 additions & 7 deletions src/evm/protocol/uniswap_v2/tycho_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ impl TryFromWithBlock<ComponentWithState> for UniswapV2State {
async fn try_from_with_block(
snapshot: ComponentWithState,
_block: Header,
_account_balances: &HashMap<Bytes, HashMap<Bytes, Bytes>>,
_all_tokens: &HashMap<Bytes, Token>,
) -> Result<Self, Self::Error> {
let reserve0 = U256::from_be_slice(
Expand All @@ -42,13 +43,10 @@ impl TryFromWithBlock<ComponentWithState> for UniswapV2State {

#[cfg(test)]
mod tests {
use std::{collections::HashMap, str::FromStr};
use std::str::FromStr;

use chrono::DateTime;
use tycho_core::{
dto::{Chain, ChangeType, ProtocolComponent, ResponseProtocolState},
hex_bytes::Bytes,
};
use tycho_core::dto::{Chain, ChangeType, ProtocolComponent, ResponseProtocolState};

use super::*;

Expand Down Expand Up @@ -97,7 +95,13 @@ mod tests {
component: usv2_component(),
};

let result = UniswapV2State::try_from_with_block(snapshot, header(), &HashMap::new()).await;
let result = UniswapV2State::try_from_with_block(
snapshot,
header(),
&HashMap::new(),
&HashMap::new(),
)
.await;

assert!(result.is_ok());
let res = result.unwrap();
Expand All @@ -120,7 +124,13 @@ mod tests {
component: usv2_component(),
};

let result = UniswapV2State::try_from_with_block(snapshot, header(), &HashMap::new()).await;
let result = UniswapV2State::try_from_with_block(
snapshot,
header(),
&HashMap::new(),
&HashMap::new(),
)
.await;

assert!(result.is_err());

Expand Down
32 changes: 24 additions & 8 deletions src/evm/protocol/uniswap_v3/tycho_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ impl TryFromWithBlock<ComponentWithState> for UniswapV3State {
async fn try_from_with_block(
snapshot: ComponentWithState,
_block: Header,
_account_balances: &HashMap<Bytes, HashMap<Bytes, Bytes>>,
_all_tokens: &HashMap<Bytes, Token>,
) -> Result<Self, Self::Error> {
let liq = snapshot
Expand Down Expand Up @@ -126,14 +127,11 @@ impl TryFromWithBlock<ComponentWithState> for UniswapV3State {

#[cfg(test)]
mod tests {
use std::{collections::HashMap, str::FromStr};
use std::str::FromStr;

use chrono::DateTime;
use rstest::rstest;
use tycho_core::{
dto::{Chain, ChangeType, ProtocolComponent, ResponseProtocolState},
hex_bytes::Bytes,
};
use tycho_core::dto::{Chain, ChangeType, ProtocolComponent, ResponseProtocolState};

use super::*;

Expand Down Expand Up @@ -191,7 +189,13 @@ mod tests {
component: usv3_component(),
};

let result = UniswapV3State::try_from_with_block(snapshot, header(), &HashMap::new()).await;
let result = UniswapV3State::try_from_with_block(
snapshot,
header(),
&HashMap::new(),
&HashMap::new(),
)
.await;

assert!(result.is_ok());
let expected = UniswapV3State::new(
Expand Down Expand Up @@ -240,7 +244,13 @@ mod tests {
component,
};

let result = UniswapV3State::try_from_with_block(snapshot, header(), &HashMap::new()).await;
let result = UniswapV3State::try_from_with_block(
snapshot,
header(),
&HashMap::new(),
&HashMap::new(),
)
.await;

assert!(result.is_err());
assert!(matches!(
Expand All @@ -266,7 +276,13 @@ mod tests {
component,
};

let result = UniswapV3State::try_from_with_block(snapshot, header(), &HashMap::new()).await;
let result = UniswapV3State::try_from_with_block(
snapshot,
header(),
&HashMap::new(),
&HashMap::new(),
)
.await;

assert!(result.is_err());
assert!(matches!(
Expand Down
18 changes: 9 additions & 9 deletions src/evm/protocol/uniswap_v4/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,16 +377,12 @@ impl ProtocolSim for UniswapV4State {

#[cfg(test)]
mod tests {
use std::{
collections::{HashMap, HashSet},
str::FromStr,
};
use std::{collections::HashSet, str::FromStr};

use num_bigint::ToBigUint;
use num_traits::FromPrimitive;
use serde_json::Value;
use tycho_client::feed::synchronizer::ComponentWithState;
use tycho_core::hex_bytes::Bytes;

use super::*;
use crate::protocol::models::TryFromWithBlock;
Expand Down Expand Up @@ -456,10 +452,14 @@ mod tests {
let state: ComponentWithState = serde_json::from_value(data)
.expect("Expected json to match ComponentWithState structure");

let usv4_state =
UniswapV4State::try_from_with_block(state, Default::default(), &Default::default())
.await
.unwrap();
let usv4_state = UniswapV4State::try_from_with_block(
state,
Default::default(),
&Default::default(),
&Default::default(),
)
.await
.unwrap();

let t0 = Token::new(
"0x647e32181a64f4ffd4f0b0b4b052ec05b277729c",
Expand Down
27 changes: 18 additions & 9 deletions src/evm/protocol/uniswap_v4/tycho_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ impl TryFromWithBlock<ComponentWithState> for UniswapV4State {
async fn try_from_with_block(
snapshot: ComponentWithState,
_block: Header,
_account_balances: &HashMap<Bytes, HashMap<Bytes, Bytes>>,
_all_tokens: &HashMap<Bytes, Token>,
) -> Result<Self, Self::Error> {
let liq = snapshot
Expand Down Expand Up @@ -126,14 +127,11 @@ impl TryFromWithBlock<ComponentWithState> for UniswapV4State {

#[cfg(test)]
mod tests {
use std::{collections::HashMap, str::FromStr};
use std::str::FromStr;

use chrono::DateTime;
use rstest::rstest;
use tycho_core::{
dto::{Chain, ChangeType, ProtocolComponent, ResponseProtocolState},
hex_bytes::Bytes,
};
use tycho_core::dto::{Chain, ChangeType, ProtocolComponent, ResponseProtocolState};

use super::*;

Expand Down Expand Up @@ -201,9 +199,14 @@ mod tests {
component: usv4_component(),
};

let result = UniswapV4State::try_from_with_block(snapshot, header(), &HashMap::new())
.await
.unwrap();
let result = UniswapV4State::try_from_with_block(
snapshot,
header(),
&HashMap::new(),
&HashMap::new(),
)
.await
.unwrap();

let fees = UniswapV4Fees::new(0, 0, 500);
let expected = UniswapV4State::new(
Expand Down Expand Up @@ -252,7 +255,13 @@ mod tests {
component: usv4_component(),
};

let result = UniswapV4State::try_from_with_block(snapshot, header(), &HashMap::new()).await;
let result = UniswapV4State::try_from_with_block(
snapshot,
header(),
&HashMap::new(),
&HashMap::new(),
)
.await;

assert!(result.is_err());
assert!(matches!(
Expand Down
Loading

0 comments on commit fbe8aa6

Please sign in to comment.