Skip to content

Commit

Permalink
Observability of validators for relayers (#3057)
Browse files Browse the repository at this point in the history
### Description

Goal of this was to have insight into validators of important sets being
"up"

Introduces a new metric used by relayers:
`hyperlane_observed_validator_latest_index`, e.g.:

```
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test1",hyperlane_baselib_version="0.1.0",origin="test2",validator="0x9965507d1a55bcc2695c58ba16fb37d819b0a4dc"} 664
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test1",hyperlane_baselib_version="0.1.0",origin="test3",validator="0x976ea74026e726554db657fa54763abd0c3a0aa9"} 641
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test2",hyperlane_baselib_version="0.1.0",origin="test1",validator="0x15d34aaf54267db7d7c367839aaf71a00a2c6a65"} 670
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test2",hyperlane_baselib_version="0.1.0",origin="test3",validator="0x976ea74026e726554db657fa54763abd0c3a0aa9"} 665
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test3",hyperlane_baselib_version="0.1.0",origin="test1",validator="0x15d34aaf54267db7d7c367839aaf71a00a2c6a65"} 652
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test3",hyperlane_baselib_version="0.1.0",origin="test2",validator="0x9965507d1a55bcc2695c58ba16fb37d819b0a4dc"} 664
hyperlane_observed_validator_latest_index{agent="relayer",app_context="testapp",destination="test1",hyperlane_baselib_version="0.1.0",origin="test2",validator="0x9965507d1a55bcc2695c58ba16fb37d819b0a4dc"} 658
hyperlane_observed_validator_latest_index{agent="relayer",app_context="testapp",destination="test1",hyperlane_baselib_version="0.1.0",origin="test3",validator="0x976ea74026e726554db657fa54763abd0c3a0aa9"} 641
```

Tapping into metadata building for multisig ISMs, the relayer will
update the metric with the latest indices for the validators in a set.
In order to prevent the cardinality being ridiculously high, only
certain validator sets are tracked. This is done by introducing an
`app_context` label (I'm very open to other names here, for some reason
whenever idk how to name some kind of identifier I end up calling it a
context 😆)

The app context can either be:
- if a new setting, --metricAppContexts, is specified, a message will be
classified based off the first matching list it matches. E.g.
`--metricAppContexts '[{"name": "testapp", "matchingList":
[{"recipient_address": "0xd84379ceae14aa33c123af12424a37803f885889",
"destination_domain": 13371 }] }]'`. This is nice for e.g. warp route
deployments, where the ISM is maybe not a default ISM, and can be
changed
- if a message doesn't get classified this way, it can also be
classified with the "default_ism" app context, which is just for any
message that happens to use the default ISM as its "root" ISM

This way we have insight in to the default ISM and any
application-specific ISMs.

Some things to note:
- it's possible for a message to actually have more than one validator
set, e.g. if it's using an aggregation ISM. In this case, we'll have
metrics on the union of all validator sets for that app context
- Some effort is required to make sure that metrics don't stick around
for a validator that has actually been removed from the set. To handle
this, we cache the validator set for an app context and clear out the
entire set each time we set the metrics

### Drive-by changes

- Zod's nonempty function for strings is deprecated, moves to `.min(1)`
instead

### Related issues

- Fixes #1762

### Backward compatibility

yes

### Testing

Ran locally - I think i'll probably add something in e2e tests, but
opening now
  • Loading branch information
tkporter authored Jan 2, 2024
1 parent a04454d commit 3f88aa6
Show file tree
Hide file tree
Showing 17 changed files with 542 additions and 209 deletions.
4 changes: 2 additions & 2 deletions rust/agents/relayer/src/msg/metadata/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use tracing::{info, instrument};

use hyperlane_core::{HyperlaneMessage, InterchainSecurityModule, ModuleType, H256, U256};

use super::{BaseMetadataBuilder, MetadataBuilder};
use super::{MessageMetadataBuilder, MetadataBuilder};

/// Bytes used to store one member of the (start, end) range tuple
/// Copied from `AggregationIsmMetadata.sol`
const METADATA_RANGE_SIZE: usize = 4;

#[derive(Clone, Debug, new, Deref)]
pub struct AggregationIsmMetadataBuilder {
base: BaseMetadataBuilder,
base: MessageMetadataBuilder,
}

#[derive(Clone, Debug, new, PartialEq, Eq)]
Expand Down
287 changes: 220 additions & 67 deletions rust/agents/relayer/src/msg/metadata/base.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::{collections::HashMap, fmt::Debug, str::FromStr, sync::Arc};
use std::{
collections::HashMap,
fmt::Debug,
ops::Deref,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};

use crate::{
merkle_tree::builder::MerkleTreeBuilder,
Expand All @@ -7,6 +14,7 @@ use crate::{
AggregationIsmMetadataBuilder, CcipReadIsmMetadataBuilder, NullMetadataBuilder,
RoutingIsmMetadataBuilder,
},
settings::matching_list::MatchingList,
};
use async_trait::async_trait;
use derive_new::new;
Expand All @@ -18,9 +26,10 @@ use hyperlane_base::{
};
use hyperlane_core::{
accumulator::merkle::Proof, AggregationIsm, CcipReadIsm, Checkpoint, HyperlaneDomain,
HyperlaneMessage, InterchainSecurityModule, ModuleType, MultisigIsm, RoutingIsm,
HyperlaneMessage, InterchainSecurityModule, Mailbox, ModuleType, MultisigIsm, RoutingIsm,
ValidatorAnnounce, H160, H256,
};

use tokio::sync::RwLock;
use tracing::{debug, info, instrument, warn};

Expand All @@ -40,39 +49,129 @@ pub struct IsmWithMetadataAndType {

#[async_trait]
pub trait MetadataBuilder: Send + Sync {
#[allow(clippy::async_yields_async)]
async fn build(&self, ism_address: H256, message: &HyperlaneMessage)
-> Result<Option<Vec<u8>>>;
}

#[derive(Clone, new)]
pub struct BaseMetadataBuilder {
destination_chain_setup: ChainConf,
origin_prover_sync: Arc<RwLock<MerkleTreeBuilder>>,
origin_validator_announce: Arc<dyn ValidatorAnnounce>,
allow_local_checkpoint_syncers: bool,
metrics: Arc<CoreMetrics>,
db: HyperlaneRocksDB,
/// Allows fetching the default ISM, caching the value for a period of time
/// to avoid fetching it all the time.
/// TODO: make this generic
#[derive(Debug)]
pub struct DefaultIsmCache {
value: RwLock<Option<(H256, Instant)>>,
mailbox: Arc<dyn Mailbox>,
}

impl DefaultIsmCache {
/// Time to live for the cached default ISM. 10 mins.
const TTL: Duration = Duration::from_secs(60 * 10);

pub fn new(mailbox: Arc<dyn Mailbox>) -> Self {
Self {
value: RwLock::new(None),
mailbox,
}
}

/// Gets the default ISM, fetching it from onchain if the cached value
/// is stale.
/// TODO: this can and should be made generic eventually
pub async fn get(&self) -> Result<H256> {
// If the duration since the value was last updated does not
// exceed the TTL, return the cached value.
// This is in its own block to avoid holding the lock during the
// async operation to fetch the on-chain default ISM if
// the cached value is stale.
{
let value = self.value.read().await;

if let Some(value) = *value {
if value.1.elapsed() < Self::TTL {
return Ok(value.0);
}
}
}

let default_ism = self.mailbox.default_ism().await?;
// Update the cached value.
{
let mut value = self.value.write().await;
*value = Some((default_ism, Instant::now()));
}

Ok(default_ism)
}
}

/// Classifies messages into an app context if they have one.
#[derive(Debug)]
pub struct AppContextClassifier {
default_ism: DefaultIsmCache,
app_matching_lists: Vec<(MatchingList, String)>,
}

impl AppContextClassifier {
pub fn new(
destination_mailbox: Arc<dyn Mailbox>,
app_matching_lists: Vec<(MatchingList, String)>,
) -> Self {
Self {
default_ism: DefaultIsmCache::new(destination_mailbox),
app_matching_lists,
}
}

/// Classifies messages into an app context if they have one, or None
/// if they don't.
/// An app context is a string that identifies the app that sent the message
/// and exists just for metrics.
/// An app context is chosen based on:
/// - the first element in `app_matching_lists` that matches the message
/// - if the message's ISM is the default ISM, the app context is "default_ism"
pub async fn get_app_context(
&self,
message: &HyperlaneMessage,
root_ism: H256,
) -> Result<Option<String>> {
// Give priority to the matching list. If the app from the matching list happens
// to use the default ISM, it's preferable to use the app context from the matching
// list.
for (matching_list, app_context) in self.app_matching_lists.iter() {
if matching_list.msg_matches(message, false) {
return Ok(Some(app_context.clone()));
}
}

let default_ism = self.default_ism.get().await?;
if root_ism == default_ism {
return Ok(Some("default_ism".to_string()));
}

Ok(None)
}
}

/// Builds metadata for a message.
#[derive(Debug, Clone)]
pub struct MessageMetadataBuilder {
pub base: Arc<BaseMetadataBuilder>,
/// ISMs can be structured recursively. We keep track of the depth
/// of the recursion to avoid infinite loops.
#[new(default)]
depth: u32,
max_depth: u32,
pub depth: u32,
pub app_context: Option<String>,
}

impl Debug for BaseMetadataBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"MetadataBuilder {{ chain_setup: {:?}, validator_announce: {:?} }}",
self.destination_chain_setup, self.origin_validator_announce
)
impl Deref for MessageMetadataBuilder {
type Target = BaseMetadataBuilder;

fn deref(&self) -> &Self::Target {
&self.base
}
}

#[async_trait]
impl MetadataBuilder for BaseMetadataBuilder {
#[instrument(err, skip(self), fields(domain=self.domain().name()))]
impl MetadataBuilder for MessageMetadataBuilder {
#[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))]
async fn build(
&self,
ism_address: H256,
Expand All @@ -84,12 +183,24 @@ impl MetadataBuilder for BaseMetadataBuilder {
}
}

impl BaseMetadataBuilder {
pub fn domain(&self) -> &HyperlaneDomain {
&self.destination_chain_setup.domain
impl MessageMetadataBuilder {
pub async fn new(
ism_address: H256,
message: &HyperlaneMessage,
base: Arc<BaseMetadataBuilder>,
) -> Result<Self> {
let app_context = base
.app_context_classifier
.get_app_context(message, ism_address)
.await?;
Ok(Self {
base,
depth: 0,
app_context,
})
}

pub fn clone_with_incremented_depth(&self) -> Result<BaseMetadataBuilder> {
fn clone_with_incremented_depth(&self) -> Result<MessageMetadataBuilder> {
let mut cloned = self.clone();
cloned.depth += 1;
if cloned.depth > cloned.max_depth {
Expand All @@ -99,6 +210,82 @@ impl BaseMetadataBuilder {
}
}

#[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))]
pub async fn build_ism_and_metadata(
&self,
ism_address: H256,
message: &HyperlaneMessage,
) -> Result<IsmWithMetadataAndType> {
let ism: Box<dyn InterchainSecurityModule> = self
.build_ism(ism_address)
.await
.context("When building ISM")?;

let module_type = ism
.module_type()
.await
.context("When fetching module type")?;
let cloned = self.clone_with_incremented_depth()?;

let metadata_builder: Box<dyn MetadataBuilder> = match module_type {
ModuleType::MerkleRootMultisig => {
Box::new(MerkleRootMultisigMetadataBuilder::new(cloned))
}
ModuleType::MessageIdMultisig => {
Box::new(MessageIdMultisigMetadataBuilder::new(cloned))
}
ModuleType::Routing => Box::new(RoutingIsmMetadataBuilder::new(cloned)),
ModuleType::Aggregation => Box::new(AggregationIsmMetadataBuilder::new(cloned)),
ModuleType::Null => Box::new(NullMetadataBuilder::new()),
ModuleType::CcipRead => Box::new(CcipReadIsmMetadataBuilder::new(cloned)),
_ => return Err(MetadataBuilderError::UnsupportedModuleType(module_type).into()),
};
let meta = metadata_builder
.build(ism_address, message)
.await
.context("When building metadata");
Ok(IsmWithMetadataAndType {
ism,
metadata: meta?,
module_type,
})
}
}

/// Base metadata builder with types used by higher level metadata builders.
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct BaseMetadataBuilder {
origin_domain: HyperlaneDomain,
destination_chain_setup: ChainConf,
origin_prover_sync: Arc<RwLock<MerkleTreeBuilder>>,
origin_validator_announce: Arc<dyn ValidatorAnnounce>,
allow_local_checkpoint_syncers: bool,
metrics: Arc<CoreMetrics>,
db: HyperlaneRocksDB,
max_depth: u32,
app_context_classifier: AppContextClassifier,
}

impl Debug for BaseMetadataBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"BaseMetadataBuilder {{ origin_domain: {:?} destination_chain_setup: {:?}, validator_announce: {:?} }}",
self.origin_domain, self.destination_chain_setup, self.origin_validator_announce
)
}
}

impl BaseMetadataBuilder {
pub fn origin_domain(&self) -> &HyperlaneDomain {
&self.origin_domain
}

pub fn destination_domain(&self) -> &HyperlaneDomain {
&self.destination_chain_setup.domain
}

pub async fn get_proof(&self, leaf_index: u32, checkpoint: Checkpoint) -> Result<Proof> {
const CTX: &str = "When fetching message proof";
let proof = self
Expand Down Expand Up @@ -162,6 +349,7 @@ impl BaseMetadataBuilder {
pub async fn build_checkpoint_syncer(
&self,
validators: &[H256],
app_context: Option<String>,
) -> Result<MultisigCheckpointSyncer> {
let storage_locations = self
.origin_validator_announce
Expand Down Expand Up @@ -221,45 +409,10 @@ impl BaseMetadataBuilder {
}
}
}
Ok(MultisigCheckpointSyncer::new(checkpoint_syncers))
}

#[instrument(err, skip(self), fields(domain=self.domain().name()))]
pub async fn build_ism_and_metadata(
&self,
ism_address: H256,
message: &HyperlaneMessage,
) -> Result<IsmWithMetadataAndType> {
let ism: Box<dyn InterchainSecurityModule> = self
.build_ism(ism_address)
.await
.context("When building ISM")?;

let module_type = ism
.module_type()
.await
.context("When fetching module type")?;
let base = self.clone_with_incremented_depth()?;

let metadata_builder: Box<dyn MetadataBuilder> = match module_type {
ModuleType::MerkleRootMultisig => {
Box::new(MerkleRootMultisigMetadataBuilder::new(base))
}
ModuleType::MessageIdMultisig => Box::new(MessageIdMultisigMetadataBuilder::new(base)),
ModuleType::Routing => Box::new(RoutingIsmMetadataBuilder::new(base)),
ModuleType::Aggregation => Box::new(AggregationIsmMetadataBuilder::new(base)),
ModuleType::Null => Box::new(NullMetadataBuilder::new()),
ModuleType::CcipRead => Box::new(CcipReadIsmMetadataBuilder::new(base)),
_ => return Err(MetadataBuilderError::UnsupportedModuleType(module_type).into()),
};
let meta = metadata_builder
.build(ism_address, message)
.await
.context("When building metadata");
Ok(IsmWithMetadataAndType {
ism,
metadata: meta?,
module_type,
})
Ok(MultisigCheckpointSyncer::new(
checkpoint_syncers,
self.metrics.clone(),
app_context,
))
}
}
4 changes: 2 additions & 2 deletions rust/agents/relayer/src/msg/metadata/ccip_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{info, instrument};

use super::{BaseMetadataBuilder, MetadataBuilder};
use super::{base::MessageMetadataBuilder, MetadataBuilder};

#[derive(Serialize, Deserialize)]
struct OffchainResponse {
Expand All @@ -20,7 +20,7 @@ struct OffchainResponse {

#[derive(Clone, Debug, new, Deref)]
pub struct CcipReadIsmMetadataBuilder {
base: BaseMetadataBuilder,
base: MessageMetadataBuilder,
}

#[async_trait]
Expand Down
Loading

0 comments on commit 3f88aa6

Please sign in to comment.