Skip to content

Commit

Permalink
feat: query evm balance, clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Dec 1, 2023
1 parent 2ff104b commit b1ccae1
Show file tree
Hide file tree
Showing 21 changed files with 211 additions and 238 deletions.
86 changes: 45 additions & 41 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::{AgentMetrics, InstrumentedFallibleTask, Metrics},
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
metrics::{AgentMetrics, Metrics},
run_all,
settings::ChainConf,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync,
WatermarkContractSync,
};
use hyperlane_core::{
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
Expand Down Expand Up @@ -51,7 +53,7 @@ struct ContextKey {
#[derive(AsRef)]
pub struct Relayer {
origin_chains: HashSet<HyperlaneDomain>,
destination_chains: HashSet<HyperlaneDomain>,
destination_chains: HashMap<HyperlaneDomain, ChainConf>,
#[as_ref]
core: HyperlaneAgentCore,
message_syncs: HashMap<HyperlaneDomain, Arc<SequencedDataContractSync<HyperlaneMessage>>>,
Expand All @@ -69,6 +71,8 @@ pub struct Relayer {
transaction_gas_limit: Option<U256>,
skip_transaction_gas_limit_for: HashSet<u32>,
allow_local_checkpoint_syncers: bool,
core_metrics: Arc<CoreMetrics>,
agent_metrics: Metrics,
}

impl Debug for Relayer {
Expand Down Expand Up @@ -98,7 +102,7 @@ impl BaseAgent for Relayer {
settings: Self::Settings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: Metrics,
) -> Result<(Self, Vec<InstrumentedFallibleTask<()>>)>
) -> Result<Self>
where
Self: Sized,
{
Expand Down Expand Up @@ -194,30 +198,10 @@ impl BaseAgent for Relayer {
.collect();

let mut msg_ctxs = HashMap::new();
let mut metrics_fetchers = vec![];
let mut destination_chains = HashMap::new();
for destination in &settings.destination_chains {
let destination_chain_setup = core.settings.chain_setup(destination).unwrap().clone();
let agent_metrics_conf = destination_chain_setup
.agent_metrics_conf(Self::AGENT_NAME.to_string())
.await?;
let agent_metrics_fetcher = destination_chain_setup
.build_agent_metrics_fetcher()
.await?;
let agent_metrics = AgentMetrics::new(
agent_metrics.clone(),
agent_metrics_conf,
agent_metrics_fetcher,
);

let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetricsFetcher"));
metrics_fetchers.push(fetcher_task);

destination_chains.insert(destination.clone(), destination_chain_setup.clone());
let transaction_gas_limit: Option<U256> =
if skip_transaction_gas_limit_for.contains(&destination.id()) {
None
Expand Down Expand Up @@ -257,7 +241,7 @@ impl BaseAgent for Relayer {
let relayer = Self {
dbs,
origin_chains: settings.origin_chains,
destination_chains: settings.destination_chains,
destination_chains,
msg_ctxs,
core,
message_syncs,
Expand All @@ -269,28 +253,48 @@ impl BaseAgent for Relayer {
transaction_gas_limit,
skip_transaction_gas_limit_for,
allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers,
core_metrics,
agent_metrics,
};

Ok((relayer, metrics_fetchers))
Ok(relayer)
}

#[allow(clippy::async_yields_async)]
async fn run(
self,
metrics_fetchers: Vec<InstrumentedFallibleTask<()>>,
) -> Instrumented<JoinHandle<Result<()>>> {
// The tasks vec is initialized with the metrics fetcher tasks,
// and is then extended with the rest of the tasks.
let mut tasks = metrics_fetchers;
async fn run(self) -> Instrumented<JoinHandle<Result<()>>> {
let mut tasks = vec![];

// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
for destination in &self.destination_chains {
for (dest_domain, dest_conf) in &self.destination_chains {
let (send_channel, receive_channel) =
mpsc::unbounded_channel::<Box<DynPendingOperation>>();
send_channels.insert(destination.id(), send_channel);
send_channels.insert(dest_domain.id(), send_channel);

tasks.push(self.run_destination_submitter(dest_domain, receive_channel));

tasks.push(self.run_destination_submitter(destination, receive_channel));
let agent_metrics_conf = dest_conf
.agent_metrics_conf(Self::AGENT_NAME.to_string())
.await
.unwrap();
let agent_metrics_fetcher = dest_conf
.build_agent_metrics_fetcher(&self.core_metrics)
.await
.unwrap();
let agent_metrics = AgentMetrics::new(
self.agent_metrics.clone(),
agent_metrics_conf,
agent_metrics_fetcher,
);

let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetricsFetcher"));
tasks.push(fetcher_task);
}

for origin in &self.origin_chains {
Expand Down Expand Up @@ -364,11 +368,11 @@ impl Relayer {
let metrics = MessageProcessorMetrics::new(
&self.core.metrics,
origin,
self.destination_chains.iter(),
self.destination_chains.keys(),
);
let destination_ctxs = self
.destination_chains
.iter()
.keys()
.filter(|&destination| destination != origin)
.map(|destination| {
(
Expand Down
32 changes: 11 additions & 21 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
metrics::{InstrumentedFallibleTask, Metrics as AgentMetrics},
run_all,
settings::IndexSettings,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
metrics::Metrics as AgentMetrics, run_all, settings::IndexSettings, BaseAgent,
ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
};
use hyperlane_core::HyperlaneDomain;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -41,7 +39,7 @@ impl BaseAgent for Scraper {
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> eyre::Result<(Self, Vec<InstrumentedFallibleTask<()>>)>
) -> eyre::Result<Self>
where
Self: Sized,
{
Expand Down Expand Up @@ -76,29 +74,21 @@ impl BaseAgent for Scraper {

trace!(domain_count = scrapers.len(), "Created scrapers");

Ok((
Self {
core,
metrics,
contract_sync_metrics,
scrapers,
},
Default::default(),
))
Ok(Self {
core,
metrics,
contract_sync_metrics,
scrapers,
})
}

/// Run the scraper
///
/// * `metrics_fetchers` - A list of metrics fetchers to run. Currently this
/// only comprise
#[allow(clippy::async_yields_async)]
async fn run(
self,
metrics_fetchers: Vec<InstrumentedFallibleTask<()>>,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
// The tasks vec is initialized with the metrics fetcher tasks,
// and is then extended with the rest of the tasks.
let mut tasks = metrics_fetchers;
async fn run(self) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let mut tasks = Vec::with_capacity(self.scrapers.len());
for domain in self.scrapers.keys() {
tasks.push(self.scrape(*domain).await);
}
Expand Down
15 changes: 5 additions & 10 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument

use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::{InstrumentedFallibleTask, Metrics as AgentMetrics},
metrics::Metrics as AgentMetrics,
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync,
};
Expand Down Expand Up @@ -56,7 +56,7 @@ impl BaseAgent for Validator {
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> Result<(Self, Vec<InstrumentedFallibleTask<()>>)>
) -> Result<Self>
where
Self: Sized,
{
Expand Down Expand Up @@ -108,17 +108,12 @@ impl BaseAgent for Validator {
checkpoint_syncer,
};

Ok((validator, Default::default()))
Ok(validator)
}

#[allow(clippy::async_yields_async)]
async fn run(
mut self,
metrics_fetchers: Vec<InstrumentedFallibleTask<()>>,
) -> Instrumented<JoinHandle<Result<()>>> {
// The tasks vec is initialized with the metrics fetcher tasks,
// and is then extended with the rest of the tasks.
let mut tasks = metrics_fetchers;
async fn run(mut self) -> Instrumented<JoinHandle<Result<()>>> {
let mut tasks = vec![];

if let Some(signer_instance) = self.signer_instance.take() {
tasks.push(
Expand Down
54 changes: 0 additions & 54 deletions rust/chains/hyperlane-cosmos/src/agent_metrics.rs

This file was deleted.

2 changes: 1 addition & 1 deletion rust/chains/hyperlane-cosmos/src/aggregation_ism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl HyperlaneChain for CosmosAggregationIsm {
}

fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(self.provider.clone())
self.provider.clone()
}
}

Expand Down
8 changes: 3 additions & 5 deletions rust/chains/hyperlane-cosmos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// TODO: Remove once we start filling things in
#![allow(unused_variables)]

mod agent_metrics;
mod aggregation_ism;
mod error;
mod interchain_gas;
Expand All @@ -24,8 +23,7 @@ mod utils;
mod validator_announce;

pub use self::{
agent_metrics::*, aggregation_ism::*, error::*, interchain_gas::*,
interchain_security_module::*, libs::*, mailbox::*, merkle_tree_hook::*, multisig_ism::*,
providers::*, routing_ism::*, signers::*, trait_builder::*, trait_builder::*,
validator_announce::*, validator_announce::*,
aggregation_ism::*, error::*, interchain_gas::*, interchain_security_module::*, libs::*,
mailbox::*, merkle_tree_hook::*, multisig_ism::*, providers::*, routing_ism::*, signers::*,
trait_builder::*, trait_builder::*, validator_announce::*, validator_announce::*,
};
23 changes: 13 additions & 10 deletions rust/chains/hyperlane-cosmos/src/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use hyperlane_core::{
BlockInfo, ChainResult, ContractLocator, HyperlaneChain, HyperlaneDomain, HyperlaneProvider,
TxnInfo, H256, U256,
metrics::agent::AgentMetricsFetcher, BlockInfo, ChainResult, ContractLocator, HyperlaneChain,
HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256,
};
use tendermint_rpc::{client::CompatMode, HttpClient};

Expand Down Expand Up @@ -71,6 +71,17 @@ impl HyperlaneChain for CosmosProvider {
}
}

#[async_trait]
impl AgentMetricsFetcher for CosmosProvider {
async fn get_balance(&self, address: String) -> ChainResult<U256> {
Ok(self
.grpc_client
.get_balance(address, self.canonical_asset.clone())
.await?
.into())
}
}

#[async_trait]
impl HyperlaneProvider for CosmosProvider {
async fn get_block_by_hash(&self, _hash: &H256) -> ChainResult<BlockInfo> {
Expand All @@ -85,12 +96,4 @@ impl HyperlaneProvider for CosmosProvider {
// FIXME
Ok(true)
}

async fn get_balance(&self, address: String) -> ChainResult<U256> {
Ok(self
.grpc_client
.get_balance(address, self.canonical_asset.clone())
.await?
.into())
}
}
12 changes: 0 additions & 12 deletions rust/chains/hyperlane-ethereum/src/agent_metrics.rs

This file was deleted.

Loading

0 comments on commit b1ccae1

Please sign in to comment.