Skip to content

Commit

Permalink
Merge pull request #132 from propeller-heads/lp/improve-logs
Browse files Browse the repository at this point in the history
fix: skip decode errors on examples and improve logging
  • Loading branch information
louise-poole authored Feb 4, 2025
2 parents 7271ed9 + 473feeb commit b1fefe1
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 17 deletions.
1 change: 1 addition & 0 deletions examples/price_printer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ async fn main() {
Some(uniswap_v4_pool_with_hook_filter),
)
.auth_key(Some(tycho_api_key.clone()))
.skip_state_decode_failures(true)
.set_tokens(all_tokens)
.await
.build()
Expand Down
26 changes: 15 additions & 11 deletions examples/price_printer/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use ratatui::{
DefaultTerminal, Frame,
};
use tokio::{select, sync::mpsc::Receiver};
use tracing::warn;
use tycho_core::Bytes;
use tycho_simulation::protocol::{
models::{BlockUpdate, ProtocolComponent},
Expand Down Expand Up @@ -151,17 +152,20 @@ impl App {
.map(|el| el.spot_price(&comp.tokens[0], &comp.tokens[1]))
.unwrap_or(Ok(0.0));

self.items.push(Data {
component: comp.clone(),
state: update
.states
.get(id)
.unwrap_or_else(|| panic!("Received update for unknown pool {}", comp.address))
.clone(),
name,
tokens,
price: format!("{}", price.expect("Expected f64 as spot price")),
});
match update.states.get(id) {
Some(state) => {
self.items.push(Data {
component: comp.clone(),
state: state.clone(),
name,
tokens,
price: format!("{}", price.expect("Expected f64 as spot price")),
});
}
None => {
warn!("Received update for unknown pool {}", comp.address)
}
};
}

for (address, state) in update.states.iter() {
Expand Down
1 change: 1 addition & 0 deletions examples/quickstart/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async fn main() {
Some(uniswap_v4_pool_with_hook_filter),
)
.auth_key(Some(tycho_api_key.clone()))
.skip_state_decode_failures(true)
.set_tokens(all_tokens.clone())
.await
.build()
Expand Down
16 changes: 10 additions & 6 deletions src/evm/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use alloy_primitives::Address;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use tycho_client::feed::{synchronizer::ComponentWithState, FeedMessage, Header};
use tycho_core::Bytes;

Expand Down Expand Up @@ -233,15 +233,15 @@ impl TychoStreamDecoder {
.iter()
.map(|(key, value)| (Address::from_slice(&key[..20]), value.clone().into()))
.collect();
info!("Updating engine with snapshot");
info!("Updating engine with {} snapshots", storage_by_address.len());
update_engine(
SHARED_TYCHO_DB.clone(),
block.clone().into(),
Some(storage_by_address),
HashMap::new(),
)
.await;
info!("Engine updated with snapshot");
info!("Engine updated");

let mut new_components = HashMap::new();

Expand Down Expand Up @@ -288,6 +288,7 @@ impl TychoStreamDecoder {
warn!(pool = id, error = %e, "StateDecodingFailure");
continue 'outer;
} else {
error!(pool = id, error = %e, "StateDecodingFailure");
return Err(StreamDecodeError::Fatal(format!("{e}")));
}
}
Expand All @@ -296,14 +297,15 @@ impl TychoStreamDecoder {
warn!(pool = id, "MissingDecoderRegistration");
continue 'outer;
} else {
error!(pool = id, "MissingDecoderRegistration");
return Err(StreamDecodeError::Fatal(format!(
"Missing decoder registration for: {id}"
)));
}
}

if !new_components.is_empty() {
debug!("Decoded {} snapshots for protocol {}", new_components.len(), protocol);
info!("Decoded {} snapshots for protocol {}", new_components.len(), protocol);
}
updated_states.extend(new_components);

Expand All @@ -315,15 +317,15 @@ impl TychoStreamDecoder {
.iter()
.map(|(key, value)| (Address::from_slice(&key[..20]), value.clone().into()))
.collect();
info!("Updating engine with deltas");
info!("Updating engine with {} deltas", deltas.state_updates.len());
update_engine(
SHARED_TYCHO_DB.clone(),
block.clone().into(),
None,
account_update_by_address,
)
.await;
info!("Engine updated with deltas");
info!("Engine updated");

for (id, update) in deltas.state_updates {
match updated_states.entry(id.clone()) {
Expand All @@ -333,6 +335,7 @@ impl TychoStreamDecoder {
state
.delta_transition(update, &state_guard.tokens)
.map_err(|e| {
error!(pool = id, error = ?e, "DeltaTransitionError");
StreamDecodeError::Fatal(format!("TransitionFailure: {e:?}"))
})?;
}
Expand All @@ -345,6 +348,7 @@ impl TychoStreamDecoder {
state
.delta_transition(update, &state_guard.tokens)
.map_err(|e| {
error!(pool = id, error = ?e, "DeltaTransitionError");
StreamDecodeError::Fatal(format!(
"TransitionFailure: {e:?}"
))
Expand Down

0 comments on commit b1fefe1

Please sign in to comment.