Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observability of validators for relayers #3057

Merged
merged 42 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
abfa375
wip
tkporter Nov 26, 2023
c0cebe4
Merge branch 'main' of github.com:abacus-network/abacus-monorepo into…
tkporter Nov 27, 2023
2535b9b
wip
tkporter Nov 27, 2023
b9f66d7
oop
tkporter Nov 27, 2023
b988181
wip
tkporter Nov 27, 2023
2885fbf
Keep getting pulled into other things
tkporter Nov 28, 2023
f391b71
Merge branch 'main' of github.com:abacus-network/abacus-monorepo into…
tkporter Dec 7, 2023
d158749
compiles
tkporter Dec 7, 2023
bf6bf6d
Wip
tkporter Dec 10, 2023
aa96b7f
Super ugly WIP introduction of the MessageBaseMetadataBuilder
tkporter Dec 12, 2023
7dae3c5
moved over to MessageBaseMetadataBuilder
tkporter Dec 12, 2023
b158483
Starting the cleanup process
tkporter Dec 13, 2023
7c67917
more clean
tkporter Dec 13, 2023
dc7ea06
Clean up
tkporter Dec 13, 2023
501f717
Cleanin up
tkporter Dec 14, 2023
8e0fd63
use deref
tkporter Dec 14, 2023
c211744
more cleaning
tkporter Dec 14, 2023
c26fb3f
clean
tkporter Dec 14, 2023
09170d7
Move to base: Arc<BaseMetadataBuilder>
tkporter Dec 14, 2023
a0d75c5
cleaning up metrics
tkporter Dec 14, 2023
1ca15ee
tidying
tkporter Dec 14, 2023
a002391
refactor default ism cache
tkporter Dec 14, 2023
8180f23
fix
tkporter Dec 14, 2023
767e220
comments
tkporter Dec 14, 2023
ca14153
Add ts definition
tkporter Dec 14, 2023
b7ba63a
Merge branch 'main' of github.com:abacus-network/abacus-monorepo into…
tkporter Dec 14, 2023
7448197
No need for the arc around the rwlock
tkporter Dec 14, 2023
cad38f8
reset run-locally changes
tkporter Dec 14, 2023
d892b9c
nit
tkporter Dec 14, 2023
1e5ff5d
nit
tkporter Dec 14, 2023
58c24f8
nit
tkporter Dec 14, 2023
6eda91c
undo hardhat change
tkporter Dec 14, 2023
af5e47a
Merge branch 'main' of github.com:abacus-network/abacus-monorepo into…
tkporter Dec 15, 2023
36801c8
min(1) instead of nonempty
tkporter Dec 15, 2023
88b61e3
Merge branch 'main' of github.com:abacus-network/abacus-monorepo into…
tkporter Dec 20, 2023
5934886
PR comments
tkporter Dec 20, 2023
51d0515
comment about making generic
tkporter Dec 20, 2023
ff8692f
fix default ISM cosmwasm
tkporter Dec 20, 2023
aaa98e5
Merge branch 'main' into trevor/relayer-validator-metrics
tkporter Dec 20, 2023
b92095e
Merge branch 'main' into trevor/relayer-validator-metrics
tkporter Dec 20, 2023
f05ef89
Merge branch 'main' into trevor/relayer-validator-metrics
tkporter Jan 2, 2024
4bb3afd
Merge branch 'main' into trevor/relayer-validator-metrics
tkporter Jan 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/agents/relayer/src/msg/metadata/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use tracing::{info, instrument};

use hyperlane_core::{HyperlaneMessage, InterchainSecurityModule, ModuleType, H256, U256};

use super::{BaseMetadataBuilder, MetadataBuilder};
use super::{MessageMetadataBuilder, MetadataBuilder};

/// Bytes used to store one member of the (start, end) range tuple
/// Copied from `AggregationIsmMetadata.sol`
const METADATA_RANGE_SIZE: usize = 4;

#[derive(Clone, Debug, new, Deref)]
pub struct AggregationIsmMetadataBuilder {
base: BaseMetadataBuilder,
base: MessageMetadataBuilder,
}

#[derive(Clone, Debug, new, PartialEq, Eq)]
Expand Down
284 changes: 217 additions & 67 deletions rust/agents/relayer/src/msg/metadata/base.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::{collections::HashMap, fmt::Debug, str::FromStr, sync::Arc};
use std::{
collections::HashMap,
fmt::Debug,
ops::Deref,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};

use crate::{
merkle_tree::builder::MerkleTreeBuilder,
Expand All @@ -7,6 +14,7 @@ use crate::{
AggregationIsmMetadataBuilder, CcipReadIsmMetadataBuilder, NullMetadataBuilder,
RoutingIsmMetadataBuilder,
},
settings::matching_list::MatchingList,
};
use async_trait::async_trait;
use derive_new::new;
Expand All @@ -18,9 +26,10 @@ use hyperlane_base::{
};
use hyperlane_core::{
accumulator::merkle::Proof, AggregationIsm, CcipReadIsm, Checkpoint, HyperlaneDomain,
HyperlaneMessage, InterchainSecurityModule, ModuleType, MultisigIsm, RoutingIsm,
HyperlaneMessage, InterchainSecurityModule, Mailbox, ModuleType, MultisigIsm, RoutingIsm,
ValidatorAnnounce, H160, H256,
};

use tokio::sync::RwLock;
use tracing::{debug, info, instrument, warn};

Expand All @@ -40,39 +49,126 @@ pub struct IsmWithMetadataAndType {

#[async_trait]
pub trait MetadataBuilder: Send + Sync {
#[allow(clippy::async_yields_async)]
async fn build(&self, ism_address: H256, message: &HyperlaneMessage)
-> Result<Option<Vec<u8>>>;
}

#[derive(Clone, new)]
pub struct BaseMetadataBuilder {
destination_chain_setup: ChainConf,
origin_prover_sync: Arc<RwLock<MerkleTreeBuilder>>,
origin_validator_announce: Arc<dyn ValidatorAnnounce>,
allow_local_checkpoint_syncers: bool,
metrics: Arc<CoreMetrics>,
db: HyperlaneRocksDB,
/// Allows fetching the default ISM, caching the value for a period of time
/// to avoid fetching it all the time.
/// TODO: make this generic
#[derive(Debug)]
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
pub struct DefaultIsmCache {
value: RwLock<Option<(H256, Instant)>>,
mailbox: Arc<dyn Mailbox>,
}

impl DefaultIsmCache {
/// Time to live for the cached default ISM. 10 mins.
const TTL: Duration = Duration::from_secs(60 * 10);

pub fn new(mailbox: Arc<dyn Mailbox>) -> Self {
Self {
value: RwLock::new(None),
mailbox,
}
}

pub async fn get(&self) -> Result<H256> {
// If the duration since the value was last updated does not
// exceed the TTL, return the cached value.
// This is in its own block to avoid holding the lock during the
// async operation to fetch the on-chain default ISM if
// the cached value is stale.
{
let value = self.value.read().await;

if let Some(value) = *value {
if value.1.elapsed() < Self::TTL {
return Ok(value.0);
}
}
}

tkporter marked this conversation as resolved.
Show resolved Hide resolved
let default_ism = self.mailbox.default_ism().await?;
// Update the cached value.
{
let mut value = self.value.write().await;
*value = Some((default_ism, Instant::now()));
}

Ok(default_ism)
}
}

/// Classifies messages into an app context if they have one.
#[derive(Debug)]
pub struct AppContextClassifier {
default_ism: DefaultIsmCache,
app_matching_lists: Vec<(MatchingList, String)>,
}

impl AppContextClassifier {
pub fn new(
destination_mailbox: Arc<dyn Mailbox>,
app_matching_lists: Vec<(MatchingList, String)>,
) -> Self {
Self {
default_ism: DefaultIsmCache::new(destination_mailbox),
app_matching_lists,
}
}

/// Classifies messages into an app context if they have one, or None
/// if they don't.
/// An app context is a string that identifies the app that sent the message
/// and exists just for metrics.
/// An app context is chosen based on:
/// - the first element in `app_matching_lists` that matches the message
/// - if the message's ISM is the default ISM, the app context is "default_ism"
pub async fn get_app_context(
&self,
message: &HyperlaneMessage,
root_ism: H256,
) -> Result<Option<String>> {
// Give priority to the matching list. If the app from the matching list happens
// to use the default ISM, it's preferable to use the app context from the matching
// list.
for (matching_list, app_context) in self.app_matching_lists.iter() {
if matching_list.msg_matches(message, false) {
return Ok(Some(app_context.clone()));
}
}

let default_ism = self.default_ism.get().await?;
if root_ism == default_ism {
return Ok(Some("default_ism".to_string()));
}

Ok(None)
}
}

/// Builds metadata for a message.
#[derive(Debug, Clone)]
pub struct MessageMetadataBuilder {
pub base: Arc<BaseMetadataBuilder>,
/// ISMs can be structured recursively. We keep track of the depth
/// of the recursion to avoid infinite loops.
#[new(default)]
depth: u32,
max_depth: u32,
pub depth: u32,
pub app_context: Option<String>,
}

impl Debug for BaseMetadataBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"MetadataBuilder {{ chain_setup: {:?}, validator_announce: {:?} }}",
self.destination_chain_setup, self.origin_validator_announce
)
impl Deref for MessageMetadataBuilder {
type Target = BaseMetadataBuilder;

fn deref(&self) -> &Self::Target {
&self.base
}
}

#[async_trait]
impl MetadataBuilder for BaseMetadataBuilder {
#[instrument(err, skip(self), fields(domain=self.domain().name()))]
impl MetadataBuilder for MessageMetadataBuilder {
#[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))]
async fn build(
&self,
ism_address: H256,
Expand All @@ -84,12 +180,24 @@ impl MetadataBuilder for BaseMetadataBuilder {
}
}

impl BaseMetadataBuilder {
pub fn domain(&self) -> &HyperlaneDomain {
&self.destination_chain_setup.domain
impl MessageMetadataBuilder {
pub async fn new(
ism_address: H256,
message: &HyperlaneMessage,
base: Arc<BaseMetadataBuilder>,
) -> Result<Self> {
let app_context = base
.app_context_classifier
.get_app_context(message, ism_address)
.await?;
Ok(Self {
base,
depth: 0,
app_context,
})
}

pub fn clone_with_incremented_depth(&self) -> Result<BaseMetadataBuilder> {
fn clone_with_incremented_depth(&self) -> Result<MessageMetadataBuilder> {
let mut cloned = self.clone();
cloned.depth += 1;
if cloned.depth > cloned.max_depth {
Expand All @@ -99,6 +207,82 @@ impl BaseMetadataBuilder {
}
}

#[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))]
pub async fn build_ism_and_metadata(
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
&self,
ism_address: H256,
message: &HyperlaneMessage,
) -> Result<IsmWithMetadataAndType> {
let ism: Box<dyn InterchainSecurityModule> = self
.build_ism(ism_address)
.await
.context("When building ISM")?;

let module_type = ism
.module_type()
.await
.context("When fetching module type")?;
let cloned = self.clone_with_incremented_depth()?;

let metadata_builder: Box<dyn MetadataBuilder> = match module_type {
ModuleType::MerkleRootMultisig => {
Box::new(MerkleRootMultisigMetadataBuilder::new(cloned))
}
ModuleType::MessageIdMultisig => {
Box::new(MessageIdMultisigMetadataBuilder::new(cloned))
}
ModuleType::Routing => Box::new(RoutingIsmMetadataBuilder::new(cloned)),
ModuleType::Aggregation => Box::new(AggregationIsmMetadataBuilder::new(cloned)),
ModuleType::Null => Box::new(NullMetadataBuilder::new()),
ModuleType::CcipRead => Box::new(CcipReadIsmMetadataBuilder::new(cloned)),
_ => return Err(MetadataBuilderError::UnsupportedModuleType(module_type).into()),
};
let meta = metadata_builder
.build(ism_address, message)
.await
.context("When building metadata");
Ok(IsmWithMetadataAndType {
ism,
metadata: meta?,
module_type,
})
}
}

/// Base metadata builder with types used by higher level metadata builders.
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct BaseMetadataBuilder {
origin_domain: HyperlaneDomain,
destination_chain_setup: ChainConf,
origin_prover_sync: Arc<RwLock<MerkleTreeBuilder>>,
origin_validator_announce: Arc<dyn ValidatorAnnounce>,
allow_local_checkpoint_syncers: bool,
metrics: Arc<CoreMetrics>,
db: HyperlaneRocksDB,
max_depth: u32,
app_context_classifier: AppContextClassifier,
}

impl Debug for BaseMetadataBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"BaseMetadataBuilder {{ origin_domain: {:?} destination_chain_setup: {:?}, validator_announce: {:?} }}",
self.origin_domain, self.destination_chain_setup, self.origin_validator_announce
)
}
}

impl BaseMetadataBuilder {
pub fn origin_domain(&self) -> &HyperlaneDomain {
&self.origin_domain
}

pub fn destination_domain(&self) -> &HyperlaneDomain {
&self.destination_chain_setup.domain
}

pub async fn get_proof(&self, leaf_index: u32, checkpoint: Checkpoint) -> Result<Proof> {
const CTX: &str = "When fetching message proof";
let proof = self
Expand Down Expand Up @@ -162,6 +346,7 @@ impl BaseMetadataBuilder {
pub async fn build_checkpoint_syncer(
&self,
validators: &[H256],
app_context: Option<String>,
) -> Result<MultisigCheckpointSyncer> {
let storage_locations = self
.origin_validator_announce
Expand Down Expand Up @@ -221,45 +406,10 @@ impl BaseMetadataBuilder {
}
}
}
Ok(MultisigCheckpointSyncer::new(checkpoint_syncers))
}

#[instrument(err, skip(self), fields(domain=self.domain().name()))]
pub async fn build_ism_and_metadata(
&self,
ism_address: H256,
message: &HyperlaneMessage,
) -> Result<IsmWithMetadataAndType> {
let ism: Box<dyn InterchainSecurityModule> = self
.build_ism(ism_address)
.await
.context("When building ISM")?;

let module_type = ism
.module_type()
.await
.context("When fetching module type")?;
let base = self.clone_with_incremented_depth()?;

let metadata_builder: Box<dyn MetadataBuilder> = match module_type {
ModuleType::MerkleRootMultisig => {
Box::new(MerkleRootMultisigMetadataBuilder::new(base))
}
ModuleType::MessageIdMultisig => Box::new(MessageIdMultisigMetadataBuilder::new(base)),
ModuleType::Routing => Box::new(RoutingIsmMetadataBuilder::new(base)),
ModuleType::Aggregation => Box::new(AggregationIsmMetadataBuilder::new(base)),
ModuleType::Null => Box::new(NullMetadataBuilder::new()),
ModuleType::CcipRead => Box::new(CcipReadIsmMetadataBuilder::new(base)),
_ => return Err(MetadataBuilderError::UnsupportedModuleType(module_type).into()),
};
let meta = metadata_builder
.build(ism_address, message)
.await
.context("When building metadata");
Ok(IsmWithMetadataAndType {
ism,
metadata: meta?,
module_type,
})
Ok(MultisigCheckpointSyncer::new(
checkpoint_syncers,
self.metrics.clone(),
app_context,
))
}
}
4 changes: 2 additions & 2 deletions rust/agents/relayer/src/msg/metadata/ccip_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{info, instrument};

use super::{BaseMetadataBuilder, MetadataBuilder};
use super::{base::MessageMetadataBuilder, MetadataBuilder};

#[derive(Serialize, Deserialize)]
struct OffchainResponse {
Expand All @@ -20,7 +20,7 @@ struct OffchainResponse {

#[derive(Clone, Debug, new, Deref)]
pub struct CcipReadIsmMetadataBuilder {
base: BaseMetadataBuilder,
base: MessageMetadataBuilder,
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/msg/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ mod null_metadata;
mod routing;

use aggregation::AggregationIsmMetadataBuilder;
pub(crate) use base::BaseMetadataBuilder;
pub(crate) use base::MetadataBuilder;
pub(crate) use base::{AppContextClassifier, BaseMetadataBuilder, MessageMetadataBuilder};
use ccip_read::CcipReadIsmMetadataBuilder;
use null_metadata::NullMetadataBuilder;
use routing::RoutingIsmMetadataBuilder;
Loading
Loading