Skip to content

Commit

Permalink
Merge pull request #145 from propeller-heads/lp/update-on-account-cha…
Browse files Browse the repository at this point in the history
…nges

fix: decoder triggers state updates on related account changes
  • Loading branch information
louise-poole authored Feb 17, 2025
2 parents 5f92449 + 33284ad commit e9a6507
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 18 deletions.
73 changes: 72 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ tracing-subscriber = { version = "0.3.17", default-features = false, features =
] }
tempfile = "3.13.0"

# testing
mockall = "0.13"

# price_printer example
clap = { version = "4.5.3", features = ["derive"] }
anyhow = "1.0.79"
Expand Down
162 changes: 152 additions & 10 deletions src/evm/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{hash_map::Entry, HashMap},
collections::{hash_map::Entry, HashMap, HashSet},
future::Future,
pin::Pin,
str::FromStr,
Expand All @@ -11,7 +11,7 @@ use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
use tycho_client::feed::{synchronizer::ComponentWithState, FeedMessage, Header};
use tycho_core::Bytes;
use tycho_core::{dto::ProtocolStateDelta, Bytes};

use crate::{
evm::{
Expand All @@ -36,6 +36,8 @@ pub enum StreamDecodeError {
struct DecoderState {
tokens: HashMap<Bytes, Token>,
states: HashMap<String, Box<dyn ProtocolSim>>,
// maps contract address to the pools they affect
contracts_map: HashMap<Bytes, Vec<String>>,
}

type DecodeFut =
Expand Down Expand Up @@ -149,6 +151,7 @@ impl TychoStreamDecoder {
let mut updated_states = HashMap::new();
let mut new_pairs = HashMap::new();
let mut removed_pairs = HashMap::new();
let mut contracts_map = HashMap::new();

let block = msg
.state_msgs
Expand Down Expand Up @@ -190,6 +193,7 @@ impl TychoStreamDecoder {
}
}

// Remove untracked components
let state_guard = self.state.read().await;
removed_pairs.extend(
protocol_msg
Expand Down Expand Up @@ -282,6 +286,20 @@ impl TychoStreamDecoder {
component_tokens,
),
);
// collect contracts:ids mapping for states that should update on contract changes
for component in new_pairs.values() {
if component
.static_attributes
.contains_key("manual_updates")
{
for contract in &component.contract_ids {
contracts_map
.entry(contract.clone())
.or_insert_with(Vec::new)
.push(id.clone());
}
}
}

// Construct state from snapshot
if let Some(state_decode_f) = self.registry.get(protocol.as_str()) {
Expand Down Expand Up @@ -333,6 +351,72 @@ impl TychoStreamDecoder {
.await;
info!("Engine updated");

// update states related to contracts with account deltas
let mut pools_to_update = HashSet::new();
// get pools related to the updated accounts
for (account, _update) in deltas.account_updates {
// get new pools related to the account updated
pools_to_update.extend(
contracts_map
.get(&account)
.cloned()
.unwrap_or_default(),
);
// get existing pools related to the account updated
pools_to_update.extend(
state_guard
.contracts_map
.get(&account)
.cloned()
.unwrap_or_default(),
);
}
// update the pools
for pool in pools_to_update {
match updated_states.entry(pool.clone()) {
Entry::Occupied(mut entry) => {
// if state exists in updated_states, update it
let state: &mut Box<dyn ProtocolSim> = entry.get_mut();
state
.delta_transition(
ProtocolStateDelta::default(),
&state_guard.tokens,
)
.map_err(|e| {
error!(pool = pool, error = ?e, "DeltaTransitionError");
StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
})?;
}
Entry::Vacant(_) => {
match state_guard.states.get(&pool) {
// if state does not exist in updated_states, update the stored
// state
Some(stored_state) => {
let mut state = stored_state.clone();
state
.delta_transition(
ProtocolStateDelta::default(),
&state_guard.tokens,
)
.map_err(|e| {
error!(pool = pool, error = ?e, "DeltaTransitionError");
StreamDecodeError::Fatal(format!(
"TransitionFailure: {e:?}"
))
})?;
updated_states.insert(pool.clone(), state);
}
None => debug!(
pool = pool,
reason = "MissingState",
"DeltaTransitionError"
),
}
}
}
}

// update states with protocol state deltas (attribute changes etc.)
for (id, update) in deltas.state_updates {
match updated_states.entry(id.clone()) {
Entry::Occupied(mut entry) => {
Expand Down Expand Up @@ -372,12 +456,18 @@ impl TychoStreamDecoder {
}
};
}

// Persist the newly added/updated states
let mut state_guard = self.state.write().await;
state_guard
.states
.extend(updated_states.clone().into_iter());
for (key, values) in contracts_map {
state_guard
.contracts_map
.entry(key)
.or_insert_with(Vec::new)
.extend(values);
}

// Send the tick with all updated states
Ok(BlockUpdate::new(block.number, updated_states, new_pairs)
Expand All @@ -389,17 +479,14 @@ impl TychoStreamDecoder {
mod tests {
use std::{fs, path::Path};

use mockall::predicate::*;
use num_bigint::ToBigUint;
use rstest::*;
use tycho_client::feed::FeedMessage;
use tycho_core::Bytes;

use super::*;
use crate::{
evm::{
decoder::{StreamDecodeError, TychoStreamDecoder},
protocol::uniswap_v2::state::UniswapV2State,
},
models::Token,
evm::protocol::uniswap_v2::state::UniswapV2State, models::Token,
protocol::state::MockProtocolSim,
};

async fn setup_decoder(set_tokens: bool) -> TychoStreamDecoder {
Expand Down Expand Up @@ -525,4 +612,59 @@ mod tests {
}
}
}

#[tokio::test]
async fn test_decode_updates_state_on_contract_change() {
let decoder = setup_decoder(true).await;

// Create the mock instances
let mut mock_state = MockProtocolSim::new();

mock_state
.expect_clone_box()
.times(1)
.returning(|| {
let mut cloned_mock_state = MockProtocolSim::new();
// Expect `delta_transition` to be called once with any parameters
cloned_mock_state
.expect_delta_transition()
.times(1)
.returning(|_, _| Ok(()));
cloned_mock_state
.expect_clone_box()
.times(1)
.returning(|| Box::new(MockProtocolSim::new()));
Box::new(cloned_mock_state)
});

// Insert mock state into `updated_states`
let pool_id =
"0x93d199263632a4ef4bb438f1feb99e57b4b5f0bd0000000000000000000005c2".to_string();
decoder
.state
.write()
.await
.states
.insert(pool_id.clone(), Box::new(mock_state) as Box<dyn ProtocolSim>);
decoder
.state
.write()
.await
.contracts_map
.insert(
Bytes::from("0xba12222222228d8ba445958a75a0704d566bf2c8").lpad(20, 0),
vec![pool_id.clone()],
);

// Load a test message containing a contract update
let msg = load_test_msg("balancer_v2_delta");

// Decode the message
let _ = decoder
.decode(msg)
.await
.expect("decode failure");

// The mock framework will assert that `delta_transition` was called exactly once
}
}
Loading

0 comments on commit e9a6507

Please sign in to comment.