diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index af7df960ae..0256b77329 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -9,9 +9,11 @@ use derive_more::AsRef; use eyre::Result; use hyperlane_base::{ db::{HyperlaneRocksDB, DB}, - metrics::{AgentMetrics, InstrumentedFallibleTask, Metrics}, - run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, - SequencedDataContractSync, WatermarkContractSync, + metrics::{AgentMetrics, Metrics}, + run_all, + settings::ChainConf, + BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync, + WatermarkContractSync, }; use hyperlane_core::{ metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage, @@ -51,7 +53,7 @@ struct ContextKey { #[derive(AsRef)] pub struct Relayer { origin_chains: HashSet, - destination_chains: HashSet, + destination_chains: HashMap, #[as_ref] core: HyperlaneAgentCore, message_syncs: HashMap>>, @@ -69,6 +71,8 @@ pub struct Relayer { transaction_gas_limit: Option, skip_transaction_gas_limit_for: HashSet, allow_local_checkpoint_syncers: bool, + core_metrics: Arc, + agent_metrics: Metrics, } impl Debug for Relayer { @@ -98,7 +102,7 @@ impl BaseAgent for Relayer { settings: Self::Settings, core_metrics: Arc, agent_metrics: Metrics, - ) -> Result<(Self, Vec>)> + ) -> Result where Self: Sized, { @@ -194,30 +198,10 @@ impl BaseAgent for Relayer { .collect(); let mut msg_ctxs = HashMap::new(); - let mut metrics_fetchers = vec![]; + let mut destination_chains = HashMap::new(); for destination in &settings.destination_chains { let destination_chain_setup = core.settings.chain_setup(destination).unwrap().clone(); - let agent_metrics_conf = destination_chain_setup - .agent_metrics_conf(Self::AGENT_NAME.to_string()) - .await?; - let agent_metrics_fetcher = destination_chain_setup - .build_agent_metrics_fetcher() - .await?; - let agent_metrics = AgentMetrics::new( - agent_metrics.clone(), - agent_metrics_conf, - agent_metrics_fetcher, - ); - - let fetcher_task = tokio::spawn(async move { - agent_metrics - .start_updating_on_interval(METRICS_SCRAPE_INTERVAL) - .await; - Ok(()) - }) - .instrument(info_span!("AgentMetricsFetcher")); - metrics_fetchers.push(fetcher_task); - + destination_chains.insert(destination.clone(), destination_chain_setup.clone()); let transaction_gas_limit: Option = if skip_transaction_gas_limit_for.contains(&destination.id()) { None @@ -257,7 +241,7 @@ impl BaseAgent for Relayer { let relayer = Self { dbs, origin_chains: settings.origin_chains, - destination_chains: settings.destination_chains, + destination_chains, msg_ctxs, core, message_syncs, @@ -269,28 +253,48 @@ impl BaseAgent for Relayer { transaction_gas_limit, skip_transaction_gas_limit_for, allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers, + core_metrics, + agent_metrics, }; - Ok((relayer, metrics_fetchers)) + Ok(relayer) } #[allow(clippy::async_yields_async)] - async fn run( - self, - metrics_fetchers: Vec>, - ) -> Instrumented>> { - // The tasks vec is initialized with the metrics fetcher tasks, - // and is then extended with the rest of the tasks. - let mut tasks = metrics_fetchers; + async fn run(self) -> Instrumented>> { + let mut tasks = vec![]; // send channels by destination chain let mut send_channels = HashMap::with_capacity(self.destination_chains.len()); - for destination in &self.destination_chains { + for (dest_domain, dest_conf) in &self.destination_chains { let (send_channel, receive_channel) = mpsc::unbounded_channel::>(); - send_channels.insert(destination.id(), send_channel); + send_channels.insert(dest_domain.id(), send_channel); + + tasks.push(self.run_destination_submitter(dest_domain, receive_channel)); - tasks.push(self.run_destination_submitter(destination, receive_channel)); + let agent_metrics_conf = dest_conf + .agent_metrics_conf(Self::AGENT_NAME.to_string()) + .await + .unwrap(); + let agent_metrics_fetcher = dest_conf + .build_agent_metrics_fetcher(&self.core_metrics) + .await + .unwrap(); + let agent_metrics = AgentMetrics::new( + self.agent_metrics.clone(), + agent_metrics_conf, + agent_metrics_fetcher, + ); + + let fetcher_task = tokio::spawn(async move { + agent_metrics + .start_updating_on_interval(METRICS_SCRAPE_INTERVAL) + .await; + Ok(()) + }) + .instrument(info_span!("AgentMetricsFetcher")); + tasks.push(fetcher_task); } for origin in &self.origin_chains { @@ -364,11 +368,11 @@ impl Relayer { let metrics = MessageProcessorMetrics::new( &self.core.metrics, origin, - self.destination_chains.iter(), + self.destination_chains.keys(), ); let destination_ctxs = self .destination_chains - .iter() + .keys() .filter(|&destination| destination != origin) .map(|destination| { ( diff --git a/rust/agents/scraper/src/agent.rs b/rust/agents/scraper/src/agent.rs index 24549b3765..69b9d750ea 100644 --- a/rust/agents/scraper/src/agent.rs +++ b/rust/agents/scraper/src/agent.rs @@ -3,10 +3,8 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use derive_more::AsRef; use hyperlane_base::{ - metrics::{InstrumentedFallibleTask, Metrics as AgentMetrics}, - run_all, - settings::IndexSettings, - BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, + metrics::Metrics as AgentMetrics, run_all, settings::IndexSettings, BaseAgent, + ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, }; use hyperlane_core::HyperlaneDomain; use tokio::task::JoinHandle; @@ -41,7 +39,7 @@ impl BaseAgent for Scraper { settings: Self::Settings, metrics: Arc, _agent_metrics: AgentMetrics, - ) -> eyre::Result<(Self, Vec>)> + ) -> eyre::Result where Self: Sized, { @@ -76,15 +74,12 @@ impl BaseAgent for Scraper { trace!(domain_count = scrapers.len(), "Created scrapers"); - Ok(( - Self { - core, - metrics, - contract_sync_metrics, - scrapers, - }, - Default::default(), - )) + Ok(Self { + core, + metrics, + contract_sync_metrics, + scrapers, + }) } /// Run the scraper @@ -92,13 +87,8 @@ impl BaseAgent for Scraper { /// * `metrics_fetchers` - A list of metrics fetchers to run. Currently this /// only comprise #[allow(clippy::async_yields_async)] - async fn run( - self, - metrics_fetchers: Vec>, - ) -> Instrumented>> { - // The tasks vec is initialized with the metrics fetcher tasks, - // and is then extended with the rest of the tasks. - let mut tasks = metrics_fetchers; + async fn run(self) -> Instrumented>> { + let mut tasks = Vec::with_capacity(self.scrapers.len()); for domain in self.scrapers.keys() { tasks.push(self.scrape(*domain).await); } diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index 2fd90b4edb..7633decbc2 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -10,7 +10,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument use hyperlane_base::{ db::{HyperlaneRocksDB, DB}, - metrics::{InstrumentedFallibleTask, Metrics as AgentMetrics}, + metrics::Metrics as AgentMetrics, run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync, }; @@ -56,7 +56,7 @@ impl BaseAgent for Validator { settings: Self::Settings, metrics: Arc, _agent_metrics: AgentMetrics, - ) -> Result<(Self, Vec>)> + ) -> Result where Self: Sized, { @@ -108,17 +108,12 @@ impl BaseAgent for Validator { checkpoint_syncer, }; - Ok((validator, Default::default())) + Ok(validator) } #[allow(clippy::async_yields_async)] - async fn run( - mut self, - metrics_fetchers: Vec>, - ) -> Instrumented>> { - // The tasks vec is initialized with the metrics fetcher tasks, - // and is then extended with the rest of the tasks. - let mut tasks = metrics_fetchers; + async fn run(mut self) -> Instrumented>> { + let mut tasks = vec![]; if let Some(signer_instance) = self.signer_instance.take() { tasks.push( diff --git a/rust/chains/hyperlane-cosmos/src/agent_metrics.rs b/rust/chains/hyperlane-cosmos/src/agent_metrics.rs deleted file mode 100644 index 30c15f98f3..0000000000 --- a/rust/chains/hyperlane-cosmos/src/agent_metrics.rs +++ /dev/null @@ -1,54 +0,0 @@ -use async_trait::async_trait; -use hyperlane_core::{ - metrics::agent::AgenMetricsFetcher, ChainResult, ContractLocator, HyperlaneChain, - HyperlaneDomain, HyperlaneProvider, U256, -}; - -use crate::{address::CosmosAddress, ConnectionConf, CosmosProvider}; - -/// Concrete struct for implementing the AgenMetricsFetcher and HyperlaneChain traits for Cosmos -#[derive(Debug)] -pub struct CosmosMetricsFetcher { - address: CosmosAddress, - provider: CosmosProvider, - domain: HyperlaneDomain, -} - -impl CosmosMetricsFetcher { - /// Instiante a new CosmosMetricsFetcher - pub fn new( - conf: ConnectionConf, - locator: ContractLocator, - address: CosmosAddress, - ) -> ChainResult { - let provider = CosmosProvider::new( - locator.domain.clone(), - conf.clone(), - Some(locator.clone()), - None, - )?; - - Ok(Self { - address, - provider, - domain: locator.domain.clone(), - }) - } -} - -impl HyperlaneChain for CosmosMetricsFetcher { - fn domain(&self) -> &HyperlaneDomain { - &self.domain - } - - fn provider(&self) -> Box { - Box::new(self.provider.clone()) - } -} - -#[async_trait] -impl AgenMetricsFetcher for CosmosMetricsFetcher { - async fn get_balance(&self) -> ChainResult { - self.provider.get_balance(self.address.address()).await - } -} diff --git a/rust/chains/hyperlane-cosmos/src/aggregation_ism.rs b/rust/chains/hyperlane-cosmos/src/aggregation_ism.rs index 49e5d60397..a17a4ba3af 100644 --- a/rust/chains/hyperlane-cosmos/src/aggregation_ism.rs +++ b/rust/chains/hyperlane-cosmos/src/aggregation_ism.rs @@ -55,7 +55,7 @@ impl HyperlaneChain for CosmosAggregationIsm { } fn provider(&self) -> Box { - Box::new(self.provider.clone()) + self.provider.clone() } } diff --git a/rust/chains/hyperlane-cosmos/src/lib.rs b/rust/chains/hyperlane-cosmos/src/lib.rs index c61f1de261..82a4a0ece1 100644 --- a/rust/chains/hyperlane-cosmos/src/lib.rs +++ b/rust/chains/hyperlane-cosmos/src/lib.rs @@ -5,7 +5,6 @@ // TODO: Remove once we start filling things in #![allow(unused_variables)] -mod agent_metrics; mod aggregation_ism; mod error; mod interchain_gas; @@ -24,8 +23,7 @@ mod utils; mod validator_announce; pub use self::{ - agent_metrics::*, aggregation_ism::*, error::*, interchain_gas::*, - interchain_security_module::*, libs::*, mailbox::*, merkle_tree_hook::*, multisig_ism::*, - providers::*, routing_ism::*, signers::*, trait_builder::*, trait_builder::*, - validator_announce::*, validator_announce::*, + aggregation_ism::*, error::*, interchain_gas::*, interchain_security_module::*, libs::*, + mailbox::*, merkle_tree_hook::*, multisig_ism::*, providers::*, routing_ism::*, signers::*, + trait_builder::*, trait_builder::*, validator_announce::*, validator_announce::*, }; diff --git a/rust/chains/hyperlane-cosmos/src/providers/mod.rs b/rust/chains/hyperlane-cosmos/src/providers/mod.rs index 4bb960b536..66763509b5 100644 --- a/rust/chains/hyperlane-cosmos/src/providers/mod.rs +++ b/rust/chains/hyperlane-cosmos/src/providers/mod.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use hyperlane_core::{ - BlockInfo, ChainResult, ContractLocator, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, - TxnInfo, H256, U256, + metrics::agent::AgentMetricsFetcher, BlockInfo, ChainResult, ContractLocator, HyperlaneChain, + HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256, }; use tendermint_rpc::{client::CompatMode, HttpClient}; @@ -71,6 +71,17 @@ impl HyperlaneChain for CosmosProvider { } } +#[async_trait] +impl AgentMetricsFetcher for CosmosProvider { + async fn get_balance(&self, address: String) -> ChainResult { + Ok(self + .grpc_client + .get_balance(address, self.canonical_asset.clone()) + .await? + .into()) + } +} + #[async_trait] impl HyperlaneProvider for CosmosProvider { async fn get_block_by_hash(&self, _hash: &H256) -> ChainResult { @@ -85,12 +96,4 @@ impl HyperlaneProvider for CosmosProvider { // FIXME Ok(true) } - - async fn get_balance(&self, address: String) -> ChainResult { - Ok(self - .grpc_client - .get_balance(address, self.canonical_asset.clone()) - .await? - .into()) - } } diff --git a/rust/chains/hyperlane-ethereum/src/agent_metrics.rs b/rust/chains/hyperlane-ethereum/src/agent_metrics.rs deleted file mode 100644 index f1bd5065ba..0000000000 --- a/rust/chains/hyperlane-ethereum/src/agent_metrics.rs +++ /dev/null @@ -1,12 +0,0 @@ -use async_trait::async_trait; -use hyperlane_core::{metrics::agent::AgenMetricsFetcher, ChainResult, U256}; - -/// Concrete struct for implementing the AgenMetricsFetcher trait for Ethereum -pub struct EthereumMetricsFetcher {} - -#[async_trait] -impl AgenMetricsFetcher for EthereumMetricsFetcher { - async fn get_balance(&self) -> ChainResult { - Ok(0.into()) - } -} diff --git a/rust/chains/hyperlane-ethereum/src/lib.rs b/rust/chains/hyperlane-ethereum/src/lib.rs index 6f3d9b268b..2d42850bc4 100644 --- a/rust/chains/hyperlane-ethereum/src/lib.rs +++ b/rust/chains/hyperlane-ethereum/src/lib.rs @@ -74,11 +74,8 @@ mod signers; #[cfg(not(doctest))] mod singleton_signer; -mod agent_metrics; mod config; -pub use self::agent_metrics::*; - fn extract_fn_map(abi: &'static Lazy) -> HashMap, &'static str> { abi.functions() .map(|f| (f.selector().to_vec(), f.name.as_str())) diff --git a/rust/chains/hyperlane-ethereum/src/provider.rs b/rust/chains/hyperlane-ethereum/src/provider.rs index e23b19a779..620ae9aabc 100644 --- a/rust/chains/hyperlane-ethereum/src/provider.rs +++ b/rust/chains/hyperlane-ethereum/src/provider.rs @@ -6,7 +6,7 @@ use std::time::Duration; use async_trait::async_trait; use derive_new::new; use ethers::prelude::Middleware; -use hyperlane_core::{ethers_core_types, U256}; +use hyperlane_core::{ethers_core_types, metrics::agent::AgentMetricsFetcher, U256}; use tokio::time::sleep; use tracing::instrument; @@ -44,6 +44,22 @@ where } } +#[async_trait] +impl AgentMetricsFetcher for EthereumProvider +where + M: Middleware + 'static, +{ + #[instrument(err, skip(self))] + async fn get_balance(&self, address: String) -> ChainResult { + let balance = self + .provider + .get_balance(address, None) + .await + .map_err(ChainCommunicationError::from_other)?; + Ok(balance.into()) + } +} + #[async_trait] impl HyperlaneProvider for EthereumProvider where @@ -105,10 +121,6 @@ where .map_err(ChainCommunicationError::from_other)?; Ok(!code.is_empty()) } - - async fn get_balance(&self, _address: String) -> ChainResult { - todo!() - } } impl EthereumProvider @@ -149,6 +161,29 @@ impl BuildableWithProvider for HyperlaneProviderBuilder { } } +/// Builder for the Agent Metrics Fetcher. +// TODO: Remove this when trait upcasting is stabilized and Box can be used +// as Box +// Tracking issue: +// https://github.com/rust-lang/rust/issues/65991 +pub struct AgentMetricsFetcherBuilder {} + +#[async_trait] +impl BuildableWithProvider for AgentMetricsFetcherBuilder { + type Output = Box; + + async fn build_with_provider( + &self, + provider: M, + locator: &ContractLocator, + ) -> Self::Output { + Box::new(EthereumProvider::new( + Arc::new(provider), + locator.domain.clone(), + )) + } +} + /// Call a get function that returns a Result> and retry if the inner /// option is None. This can happen because the provider has not discovered the /// object we are looking for yet. diff --git a/rust/chains/hyperlane-fuel/src/provider.rs b/rust/chains/hyperlane-fuel/src/provider.rs index 8048076e04..54c38e850c 100644 --- a/rust/chains/hyperlane-fuel/src/provider.rs +++ b/rust/chains/hyperlane-fuel/src/provider.rs @@ -1,7 +1,8 @@ use async_trait::async_trait; use hyperlane_core::{ - BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256, + metrics::agent::AgentMetricsFetcher, BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, + HyperlaneProvider, TxnInfo, H256, U256, }; /// A wrapper around a fuel provider to get generic blockchain information. @@ -18,6 +19,13 @@ impl HyperlaneChain for FuelProvider { } } +#[async_trait] +impl AgentMetricsFetcher for FuelProvider { + async fn get_balance(&self, address: String) -> ChainResult { + todo!() + } +} + #[async_trait] impl HyperlaneProvider for FuelProvider { async fn get_block_by_hash(&self, hash: &H256) -> ChainResult { @@ -31,8 +39,4 @@ impl HyperlaneProvider for FuelProvider { async fn is_contract(&self, address: &H256) -> ChainResult { todo!() } - - async fn get_balance(&self, address: String) -> ChainResult { - todo!() - } } diff --git a/rust/chains/hyperlane-sealevel/src/provider.rs b/rust/chains/hyperlane-sealevel/src/provider.rs index 3f7449aef2..fdd20795a8 100644 --- a/rust/chains/hyperlane-sealevel/src/provider.rs +++ b/rust/chains/hyperlane-sealevel/src/provider.rs @@ -1,7 +1,8 @@ use async_trait::async_trait; use hyperlane_core::{ - BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256, + metrics::agent::AgentMetricsFetcher, BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, + HyperlaneProvider, TxnInfo, H256, U256, }; /// A wrapper around a Sealevel provider to get generic blockchain information. @@ -29,6 +30,13 @@ impl HyperlaneChain for SealevelProvider { } } +#[async_trait] +impl AgentMetricsFetcher for SealevelProvider { + async fn get_balance(&self, _address: String) -> ChainResult { + todo!() // FIXME + } +} + #[async_trait] impl HyperlaneProvider for SealevelProvider { async fn get_block_by_hash(&self, _hash: &H256) -> ChainResult { @@ -43,8 +51,4 @@ impl HyperlaneProvider for SealevelProvider { // FIXME Ok(true) } - - async fn get_balance(&self, _address: String) -> ChainResult { - todo!() - } } diff --git a/rust/hyperlane-base/src/agent.rs b/rust/hyperlane-base/src/agent.rs index e638108dcf..40fa639051 100644 --- a/rust/hyperlane-base/src/agent.rs +++ b/rust/hyperlane-base/src/agent.rs @@ -8,9 +8,7 @@ use tokio::task::JoinHandle; use tracing::{debug_span, instrument::Instrumented, Instrument}; use crate::{ - metrics::{ - create_agent_metrics, CoreMetrics, InstrumentedFallibleTask, Metrics as AgentMetrics, - }, + metrics::{create_agent_metrics, CoreMetrics, Metrics as AgentMetrics}, settings::Settings, }; @@ -45,16 +43,13 @@ pub trait BaseAgent: Send + Sync + Debug { settings: Self::Settings, metrics: Arc, agent_metrics: AgentMetrics, - ) -> Result<(Self, Vec>)> + ) -> Result where Self: Sized; /// Start running this agent. #[allow(clippy::async_yields_async)] - async fn run( - self, - metrics_fetchers: Vec>, - ) -> Instrumented>>; + async fn run(self) -> Instrumented>>; } /// Call this from `main` to fully initialize and run the agent for its entire @@ -81,11 +76,10 @@ pub async fn agent_main() -> Result<()> { let metrics = settings.as_ref().metrics(A::AGENT_NAME)?; core_settings.tracing.start_tracing(&metrics)?; let agent_metrics = create_agent_metrics(&metrics)?; - let (agent, metrics_fetchers) = - A::from_settings(settings, metrics.clone(), agent_metrics).await?; + let agent = A::from_settings(settings, metrics.clone(), agent_metrics).await?; metrics.run_http_server(); - agent.run(metrics_fetchers).await.await? + agent.run().await.await? } /// Utility to run multiple tasks and shutdown if any one task ends. diff --git a/rust/hyperlane-base/src/metrics/agent_metrics.rs b/rust/hyperlane-base/src/metrics/agent_metrics.rs index 267197e9b4..868de5b619 100644 --- a/rust/hyperlane-base/src/metrics/agent_metrics.rs +++ b/rust/hyperlane-base/src/metrics/agent_metrics.rs @@ -4,11 +4,10 @@ use derive_builder::Builder; use derive_new::new; use eyre::Result; use hyperlane_core::metrics::agent::u256_as_scaled_f64; -use hyperlane_core::{metrics::agent::AgenMetricsFetcher, HyperlaneDomain}; +use hyperlane_core::{metrics::agent::AgentMetricsFetcher, HyperlaneDomain}; use maplit::hashmap; use prometheus::GaugeVec; use tokio::time::MissedTickBehavior; -use tracing::instrument::Instrumented; use tracing::{trace, warn}; use crate::CoreMetrics; @@ -26,9 +25,6 @@ pub const WALLET_BALANCE_LABELS: &[&str] = &[ pub const WALLET_BALANCE_HELP: &str = "Current native token balance for the wallet addresses in the `wallets` set"; -/// Instrumented fallible task alias -pub type InstrumentedFallibleTask = Instrumented>>; - /// Agent-specific metrics #[derive(Clone, Builder)] pub struct Metrics { @@ -76,7 +72,7 @@ pub struct AgentMetricsConf { pub struct AgentMetrics { metrics: Metrics, conf: AgentMetricsConf, - fetcher: Box, + fetcher: Box, } impl AgentMetrics { @@ -90,7 +86,7 @@ impl AgentMetrics { }; let chain = self.conf.domain.name(); - match self.fetcher.get_balance().await { + match self.fetcher.get_balance(wallet_addr.clone()).await { Ok(balance) => { // Okay, so the native type is not a token, but whatever, close enough. // Note: This is ETH for many chains, but not all so that is why we use `N` and `Native` diff --git a/rust/hyperlane-base/src/settings/chains.rs b/rust/hyperlane-base/src/settings/chains.rs index a40abeb531..99add295c2 100644 --- a/rust/hyperlane-base/src/settings/chains.rs +++ b/rust/hyperlane-base/src/settings/chains.rs @@ -1,14 +1,14 @@ use ethers::prelude::Selector; -use h_cosmos::{address::CosmosAddress, CosmosMetricsFetcher}; +use h_cosmos::CosmosProvider; use std::collections::HashMap; use eyre::{eyre, Context, Result}; use ethers_prometheus::middleware::{ChainInfo, ContractInfo, PrometheusMiddlewareConf}; use hyperlane_core::{ - metrics::agent::AgenMetricsFetcher, AggregationIsm, CcipReadIsm, ContractLocator, HyperlaneAbi, - HyperlaneDomain, HyperlaneDomainProtocol, HyperlaneMessage, HyperlaneProvider, IndexMode, - InterchainGasPaymaster, InterchainGasPayment, InterchainSecurityModule, Mailbox, + metrics::agent::AgentMetricsFetcher, AggregationIsm, CcipReadIsm, ContractLocator, + HyperlaneAbi, HyperlaneDomain, HyperlaneDomainProtocol, HyperlaneMessage, HyperlaneProvider, + IndexMode, InterchainGasPaymaster, InterchainGasPayment, InterchainSecurityModule, Mailbox, MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, RoutingIsm, SequenceIndexer, ValidatorAnnounce, H256, }; @@ -22,10 +22,12 @@ use hyperlane_sealevel as h_sealevel; use crate::{ metrics::AgentMetricsConf, - settings::signers::{BuildableWithSignerConf, ChainSigner, SignerConf}, + settings::signers::{BuildableWithSignerConf, SignerConf}, CoreMetrics, }; +use super::ChainSigner; + /// A chain setup is a domain ID, an address on that chain (where the mailbox is /// deployed) and details for connecting to the chain API. #[derive(Clone, Debug)] @@ -595,30 +597,34 @@ impl ChainConf { } /// Try to convert the chain setting into a trait object for fetching agent metrics - pub async fn build_agent_metrics_fetcher(&self) -> Result> { - let ctx = "Building Agent Metrics Fetcher"; - + pub async fn build_agent_metrics_fetcher( + &self, + metrics: &CoreMetrics, + ) -> Result> { match &self.connection { - ChainConnectionConf::Ethereum(_conf) => { - Ok(Box::new(h_eth::EthereumMetricsFetcher {}) as Box) + ChainConnectionConf::Ethereum(conf) => { + let locator = self.locator(H256::zero()); + let provider = self + .build_ethereum( + conf, + &locator, + metrics, + h_eth::AgentMetricsFetcherBuilder {}, + ) + .await?; + Ok(provider) } ChainConnectionConf::Fuel(_) => todo!(), ChainConnectionConf::Sealevel(_) => todo!(), ChainConnectionConf::Cosmos(conf) => { - let signer = self - .cosmos_signer() - .await - .context(ctx)? - .ok_or(eyre!("No signer set")) - .context(ctx)?; - let address = CosmosAddress::from_pubkey(signer.public_key, &conf.get_prefix()) - .context(ctx)?; - let metrics_fetcher = CosmosMetricsFetcher::new( + let locator = self.locator(H256::zero()); + let provider = CosmosProvider::new( + locator.domain.clone(), conf.clone(), - self.locator(self.addresses.mailbox), - address, + Some(locator.clone()), + None, )?; - Ok(Box::new(metrics_fetcher) as Box) + Ok(Box::new(provider) as Box) } } } diff --git a/rust/hyperlane-core/src/metrics/agent.rs b/rust/hyperlane-core/src/metrics/agent.rs index 6134e2b8e5..e7e77c9c6d 100644 --- a/rust/hyperlane-core/src/metrics/agent.rs +++ b/rust/hyperlane-core/src/metrics/agent.rs @@ -11,9 +11,9 @@ pub const METRICS_SCRAPE_INTERVAL: Duration = Duration::from_secs(60); /// Trait to be implemented by all chain-specific agent implementations, /// to support gathering agent metrics. #[async_trait] -pub trait AgenMetricsFetcher: Send + Sync { +pub trait AgentMetricsFetcher: Send + Sync { /// Fetch the balance of the wallet address associated with the chain provider. - async fn get_balance(&self) -> ChainResult; + async fn get_balance(&self, address: String) -> ChainResult; } /// Convert a u256 scaled integer value into the corresponding f64 value. diff --git a/rust/hyperlane-core/src/traits/provider.rs b/rust/hyperlane-core/src/traits/provider.rs index ec8cae7052..4d0d127467 100644 --- a/rust/hyperlane-core/src/traits/provider.rs +++ b/rust/hyperlane-core/src/traits/provider.rs @@ -4,7 +4,9 @@ use async_trait::async_trait; use auto_impl::auto_impl; use thiserror::Error; -use crate::{BlockInfo, ChainResult, HyperlaneChain, TxnInfo, H256, U256}; +use crate::{ + metrics::agent::AgentMetricsFetcher, BlockInfo, ChainResult, HyperlaneChain, TxnInfo, H256, +}; /// Interface for a provider. Allows abstraction over different provider types /// for different chains. @@ -15,7 +17,7 @@ use crate::{BlockInfo, ChainResult, HyperlaneChain, TxnInfo, H256, U256}; /// the context of a contract. #[async_trait] #[auto_impl(&, Box, Arc)] -pub trait HyperlaneProvider: HyperlaneChain + Send + Sync + Debug { +pub trait HyperlaneProvider: HyperlaneChain + AgentMetricsFetcher + Send + Sync + Debug { /// Get block info for a given block hash async fn get_block_by_hash(&self, hash: &H256) -> ChainResult; @@ -24,9 +26,6 @@ pub trait HyperlaneProvider: HyperlaneChain + Send + Sync + Debug { /// Returns whether a contract exists at the provided address async fn is_contract(&self, address: &H256) -> ChainResult; - - /// Returns the native currency balance of the given address - async fn get_balance(&self, address: String) -> ChainResult; } /// Errors when querying for provider information. diff --git a/rust/utils/run-locally/src/cosmos/mod.rs b/rust/utils/run-locally/src/cosmos/mod.rs index 6027611ddc..1bf109e15f 100644 --- a/rust/utils/run-locally/src/cosmos/mod.rs +++ b/rust/utils/run-locally/src/cosmos/mod.rs @@ -25,6 +25,7 @@ use utils::*; use crate::cosmos::link::link_networks; use crate::logging::log; +use crate::metrics::agent_balance_sum; use crate::program::Program; use crate::utils::{as_task, concat_path, stop_child, AgentHandles, TaskHandle}; use crate::{fetch_metric, AGENT_BIN_PATH}; @@ -258,7 +259,6 @@ fn launch_cosmos_validator( .hyp_env("CHECKPOINTSYNCER_PATH", checkpoint_path.to_str().unwrap()) .hyp_env("CHECKPOINTSYNCER_TYPE", "localStorage") .hyp_env("ORIGINCHAINNAME", agent_config.name) - .hyp_env("REORGPERIOD", "1") .hyp_env("DB", validator_base_db.to_str().unwrap()) .hyp_env("METRICSPORT", agent_config.metrics_port.to_string()) .hyp_env("VALIDATOR_SIGNER_TYPE", agent_config.signer.typ) @@ -288,7 +288,6 @@ fn launch_cosmos_relayer( .env("CONFIG_FILES", agent_config_path.to_str().unwrap()) .env("RUST_BACKTRACE", "1") .hyp_env("RELAYCHAINS", relay_chains.join(",")) - .hyp_env("REORGPERIOD", "1") .hyp_env("DB", relayer_base.as_ref().to_str().unwrap()) .hyp_env("ALLOWLOCALCHECKPOINTSYNCERS", "true") .hyp_env("TRACING_LEVEL", if debug { "debug" } else { "info" }) @@ -462,11 +461,9 @@ fn run_locally() { ); // give things a chance to fully start. - println!("sleeping for 100s"); - sleep(Duration::from_secs(100)); - println!("done sleeping for 100s"); + sleep(Duration::from_secs(10)); - let starting_relayer_balance: f64 = relayer_balance_sum(hpl_rly_metrics_port).unwrap(); + let starting_relayer_balance: f64 = agent_balance_sum(hpl_rly_metrics_port).unwrap(); // dispatch messages let mut dispatched_messages = 0; @@ -554,17 +551,6 @@ fn run_locally() { } } -fn relayer_balance_sum(metrics_port: u32) -> eyre::Result { - let balance = fetch_metric( - &metrics_port.to_string(), - "hyperlane_wallet_balance", - &hashmap! {}, - )? - .iter() - .sum(); - Ok(balance) -} - fn termination_invariants_met( relayer_metrics_port: u32, messages_expected: u32, @@ -603,7 +589,7 @@ fn termination_invariants_met( return Ok(false); } - let ending_relayer_balance: f64 = relayer_balance_sum(relayer_metrics_port).unwrap(); + let ending_relayer_balance: f64 = agent_balance_sum(relayer_metrics_port).unwrap(); // Ideally, make sure that the difference is >= gas_per_tx * gas_cost, set here: // https://github.com/hyperlane-xyz/hyperlane-monorepo/blob/c2288eb31734ba1f2f997e2c6ecb30176427bc2c/rust/utils/run-locally/src/cosmos/cli.rs#L55 // What's stopping this is that the format returned by the `uosmo` balance query is a surprisingly low number (0.000003999999995184) diff --git a/rust/utils/run-locally/src/invariants.rs b/rust/utils/run-locally/src/invariants.rs index f1fb725959..4e76c9473c 100644 --- a/rust/utils/run-locally/src/invariants.rs +++ b/rust/utils/run-locally/src/invariants.rs @@ -1,6 +1,7 @@ // use std::path::Path; use crate::config::Config; +use crate::metrics::agent_balance_sum; use maplit::hashmap; use crate::logging::log; @@ -15,6 +16,7 @@ pub const SOL_MESSAGES_EXPECTED: u32 = 0; /// number of messages have been sent. pub fn termination_invariants_met( config: &Config, + starting_relayer_balance: f64, // solana_cli_tools_path: &Path, // solana_config_path: &Path, ) -> eyre::Result { @@ -129,6 +131,18 @@ pub fn termination_invariants_met( return Ok(false); } + let ending_relayer_balance: f64 = agent_balance_sum(9092).unwrap(); + if starting_relayer_balance <= ending_relayer_balance { + // worth retrying this because metrics are polled every + // `METRICS_SCRAPE_INTERVAL` + log!( + "Expected starting relayer balance to be greater than ending relayer balance, but got {} <= {}", + starting_relayer_balance, + ending_relayer_balance + ); + return Ok(false); + } + log!("Termination invariants have been meet"); Ok(true) } diff --git a/rust/utils/run-locally/src/main.rs b/rust/utils/run-locally/src/main.rs index 0910d14710..52d56ed5d4 100644 --- a/rust/utils/run-locally/src/main.rs +++ b/rust/utils/run-locally/src/main.rs @@ -30,6 +30,7 @@ use crate::{ config::Config, ethereum::start_anvil, invariants::termination_invariants_met, + metrics::agent_balance_sum, solana::*, utils::{concat_path, make_static, stop_child, AgentHandles, ArbitraryData, TaskHandle}, }; @@ -390,10 +391,11 @@ fn main() -> ExitCode { // give things a chance to fully start. sleep(Duration::from_secs(10)); let mut failure_occurred = false; + let starting_relayer_balance: f64 = agent_balance_sum(9092).unwrap(); while !SHUTDOWN.load(Ordering::Relaxed) { if config.ci_mode { // for CI we have to look for the end condition. - if termination_invariants_met(&config) + if termination_invariants_met(&config, starting_relayer_balance) // if termination_invariants_met(&config, &solana_path, &solana_config_path) .unwrap_or(false) { diff --git a/rust/utils/run-locally/src/metrics.rs b/rust/utils/run-locally/src/metrics.rs index 9aff78ea3a..aad0f626d9 100644 --- a/rust/utils/run-locally/src/metrics.rs +++ b/rust/utils/run-locally/src/metrics.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, error::Error as StdError, str::FromStr}; use eyre::{eyre, ErrReport, Result}; +use maplit::hashmap; /// Fetch a prometheus format metric, filtering by labels. pub fn fetch_metric(port: &str, metric: &str, labels: &HashMap<&str, &str>) -> Result> @@ -26,3 +27,14 @@ where }) .collect() } + +pub fn agent_balance_sum(metrics_port: u32) -> eyre::Result { + let balance = fetch_metric( + &metrics_port.to_string(), + "hyperlane_wallet_balance", + &hashmap! {}, + )? + .iter() + .sum(); + Ok(balance) +}