diff --git a/rust/agents/relayer/src/msg/metadata/base.rs b/rust/agents/relayer/src/msg/metadata/base.rs index 950723e74f..9fb65902e6 100644 --- a/rust/agents/relayer/src/msg/metadata/base.rs +++ b/rust/agents/relayer/src/msg/metadata/base.rs @@ -103,24 +103,47 @@ impl DefaultIsmCache { } } -/// Classifies messages into an app context if they have one. #[derive(Debug)] -pub struct AppContextClassifier { +pub struct IsmAwareAppContextClassifier { default_ism: DefaultIsmCache, - app_matching_lists: Vec<(MatchingList, String)>, + app_context_classifier: AppContextClassifier, } -impl AppContextClassifier { +impl IsmAwareAppContextClassifier { pub fn new( destination_mailbox: Arc, app_matching_lists: Vec<(MatchingList, String)>, ) -> Self { Self { default_ism: DefaultIsmCache::new(destination_mailbox), - app_matching_lists, + app_context_classifier: AppContextClassifier::new(app_matching_lists), } } + pub async fn get_app_context( + &self, + message: &HyperlaneMessage, + root_ism: H256, + ) -> Result> { + if let Some(app_context) = self.app_context_classifier.get_app_context(message).await? { + return Ok(Some(app_context)); + } + + if root_ism == self.default_ism.get().await? { + return Ok(Some("default_ism".to_string())); + } + + Ok(None) + } +} + +/// Classifies messages into an app context if they have one. +#[derive(Debug, new)] +pub struct AppContextClassifier { + app_matching_lists: Vec<(MatchingList, String)>, +} + +impl AppContextClassifier { /// Classifies messages into an app context if they have one, or None /// if they don't. /// An app context is a string that identifies the app that sent the message @@ -128,11 +151,7 @@ impl AppContextClassifier { /// An app context is chosen based on: /// - the first element in `app_matching_lists` that matches the message /// - if the message's ISM is the default ISM, the app context is "default_ism" - pub async fn get_app_context( - &self, - message: &HyperlaneMessage, - root_ism: H256, - ) -> Result> { + pub async fn get_app_context(&self, message: &HyperlaneMessage) -> Result> { // Give priority to the matching list. If the app from the matching list happens // to use the default ISM, it's preferable to use the app context from the matching // list. @@ -142,11 +161,6 @@ impl AppContextClassifier { } } - let default_ism = self.default_ism.get().await?; - if root_ism == default_ism { - return Ok(Some("default_ism".to_string())); - } - Ok(None) } } @@ -264,7 +278,7 @@ pub struct BaseMetadataBuilder { metrics: Arc, db: HyperlaneRocksDB, max_depth: u32, - app_context_classifier: AppContextClassifier, + app_context_classifier: IsmAwareAppContextClassifier, } impl Debug for BaseMetadataBuilder { diff --git a/rust/agents/relayer/src/msg/metadata/mod.rs b/rust/agents/relayer/src/msg/metadata/mod.rs index 2b3c3bf3d7..7ab1d892a6 100644 --- a/rust/agents/relayer/src/msg/metadata/mod.rs +++ b/rust/agents/relayer/src/msg/metadata/mod.rs @@ -7,7 +7,9 @@ mod routing; use aggregation::AggregationIsmMetadataBuilder; pub(crate) use base::MetadataBuilder; -pub(crate) use base::{AppContextClassifier, BaseMetadataBuilder, MessageMetadataBuilder}; +pub(crate) use base::{ + AppContextClassifier, BaseMetadataBuilder, IsmAwareAppContextClassifier, MessageMetadataBuilder, +}; use ccip_read::CcipReadIsmMetadataBuilder; use null_metadata::NullMetadataBuilder; use routing::RoutingIsmMetadataBuilder; diff --git a/rust/agents/relayer/src/msg/pending_message.rs b/rust/agents/relayer/src/msg/pending_message.rs index 1294d45922..ef4b0e4d17 100644 --- a/rust/agents/relayer/src/msg/pending_message.rs +++ b/rust/agents/relayer/src/msg/pending_message.rs @@ -50,6 +50,7 @@ pub struct MessageContext { pub struct PendingMessage { pub message: HyperlaneMessage, ctx: Arc, + app_context: Option, #[new(default)] submitted: bool, #[new(default)] @@ -104,6 +105,10 @@ impl PendingOperation for PendingMessage { self.ctx.destination_mailbox.domain() } + fn app_context(&self) -> Option { + self.app_context.clone() + } + #[instrument] async fn prepare(&mut self) -> PendingOperationResult { make_op_try!(|| self.on_reprepare()); @@ -302,7 +307,7 @@ impl PendingOperation for PendingMessage { } } - fn _next_attempt_after(&self) -> Option { + fn next_attempt_after(&self) -> Option { self.next_attempt_after } @@ -315,8 +320,12 @@ impl PendingOperation for PendingMessage { impl PendingMessage { /// Constructor that tries reading the retry count from the HyperlaneDB in order to recompute the `next_attempt_after`. /// In case of failure, behaves like `Self::new(...)`. - pub fn from_persisted_retries(message: HyperlaneMessage, ctx: Arc) -> Self { - let mut pm = Self::new(message, ctx); + pub fn from_persisted_retries( + message: HyperlaneMessage, + ctx: Arc, + app_context: Option, + ) -> Self { + let mut pm = Self::new(message, ctx, app_context); match pm .ctx .origin_db diff --git a/rust/agents/relayer/src/msg/pending_operation.rs b/rust/agents/relayer/src/msg/pending_operation.rs index 6192cd7cd3..c1a1c46264 100644 --- a/rust/agents/relayer/src/msg/pending_operation.rs +++ b/rust/agents/relayer/src/msg/pending_operation.rs @@ -32,6 +32,16 @@ pub trait PendingOperation { /// The domain this operation will take place on. fn domain(&self) -> &HyperlaneDomain; + /// Label to use for metrics granularity. + fn app_context(&self) -> Option; + + /// Get tuple of labels for metrics. + fn get_operation_labels(&self) -> (String, String) { + let app_context = self.app_context().unwrap_or("Unknown".to_string()); + let destination = self.domain().to_string(); + (destination, app_context) + } + /// Prepare to submit this operation. This will be called before every /// submission and will usually have a very short gap between it and the /// submit call. @@ -50,7 +60,7 @@ pub trait PendingOperation { /// /// This is only used for sorting, the functions are responsible for /// returning `NotReady` if it is too early and matters. - fn _next_attempt_after(&self) -> Option; + fn next_attempt_after(&self) -> Option; #[cfg(test)] /// Set the number of times this operation has been retried. @@ -80,7 +90,7 @@ impl Ord for DynPendingOperation { fn cmp(&self, other: &Self) -> Ordering { use DynPendingOperation::*; use Ordering::*; - match (self._next_attempt_after(), other._next_attempt_after()) { + match (self.next_attempt_after(), other.next_attempt_after()) { (Some(a), Some(b)) => a.cmp(&b), // No time means it should come before (None, Some(_)) => Less, diff --git a/rust/agents/relayer/src/msg/processor.rs b/rust/agents/relayer/src/msg/processor.rs index 76777181c9..0042c12847 100644 --- a/rust/agents/relayer/src/msg/processor.rs +++ b/rust/agents/relayer/src/msg/processor.rs @@ -14,12 +14,13 @@ use prometheus::IntGauge; use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, trace}; -use super::pending_message::*; +use super::{metadata::AppContextClassifier, pending_message::*}; use crate::msg::pending_operation::DynPendingOperation; use crate::{processor::ProcessorExt, settings::matching_list::MatchingList}; /// Finds unprocessed messages from an origin and submits then through a channel /// for to the appropriate destination. +#[allow(clippy::too_many_arguments)] #[derive(new)] pub struct MessageProcessor { db: HyperlaneRocksDB, @@ -31,6 +32,7 @@ pub struct MessageProcessor { send_channels: HashMap>>, /// Needed context to send a message for each destination chain destination_ctxs: HashMap>, + metric_app_contexts: Vec<(MatchingList, String)>, #[new(default)] message_nonce: u32, } @@ -94,10 +96,15 @@ impl ProcessorExt for MessageProcessor { debug!(%msg, "Sending message to submitter"); + let app_context_classifier = + AppContextClassifier::new(self.metric_app_contexts.clone()); + + let app_context = app_context_classifier.get_app_context(&msg).await?; // Finally, build the submit arg and dispatch it to the submitter. let pending_msg = PendingMessage::from_persisted_retries( msg, self.destination_ctxs[&destination].clone(), + app_context, ); self.send_channels[&destination].send(Box::new(pending_msg.into()))?; self.message_nonce += 1; @@ -184,7 +191,7 @@ mod test { merkle_tree::builder::MerkleTreeBuilder, msg::{ gas_payment::GasPaymentEnforcer, - metadata::{AppContextClassifier, BaseMetadataBuilder}, + metadata::{BaseMetadataBuilder, IsmAwareAppContextClassifier}, pending_operation::PendingOperation, }, processor::Processor, @@ -265,7 +272,7 @@ mod test { Arc::new(core_metrics), db.clone(), 5, - AppContextClassifier::new(Arc::new(MockMailboxContract::default()), vec![]), + IsmAwareAppContextClassifier::new(Arc::new(MockMailboxContract::default()), vec![]), ) } @@ -296,6 +303,7 @@ mod test { dummy_processor_metrics(origin_domain.id()), HashMap::from([(destination_domain.id(), send_channel)]), HashMap::from([(destination_domain.id(), message_context)]), + vec![], ), receive_channel, ) @@ -425,7 +433,7 @@ mod test { // Round up the actual backoff because it was calculated with an `Instant::now()` that was a fraction of a second ago let expected_backoff = PendingMessage::calculate_msg_backoff(*expected_retries) .map(|b| b.as_secs_f32().round()); - let actual_backoff = pm._next_attempt_after().map(|instant| { + let actual_backoff = pm.next_attempt_after().map(|instant| { instant.duration_since(Instant::now()).as_secs_f32().round() }); assert_eq!(expected_backoff, actual_backoff); diff --git a/rust/agents/relayer/src/msg/serial_submitter.rs b/rust/agents/relayer/src/msg/serial_submitter.rs index a4fbb13ce0..c342aa8da3 100644 --- a/rust/agents/relayer/src/msg/serial_submitter.rs +++ b/rust/agents/relayer/src/msg/serial_submitter.rs @@ -5,7 +5,7 @@ use std::time::Duration; use derive_new::new; use futures_util::future::try_join_all; -use prometheus::{IntCounter, IntGauge}; +use prometheus::{IntCounter, IntGauge, IntGaugeVec}; use tokio::spawn; use tokio::sync::{ mpsc::{self}, @@ -20,7 +20,43 @@ use hyperlane_core::HyperlaneDomain; use super::pending_operation::*; -type OpQueue = Arc>>>>; +/// Queue of generic operations that can be submitted to a destination chain. +/// Includes logic for maintaining queue metrics by the destination and `app_context` of an operation +#[derive(Debug, Clone, new)] +struct OpQueue { + metrics: IntGaugeVec, + queue_metrics_label: String, + #[new(default)] + queue: Arc>>>>, +} + +impl OpQueue { + /// Push an element onto the queue and update metrics + async fn push(&self, op: Box) { + // increment the metric before pushing onto the queue, because we lose ownership afterwards + self.get_operation_metric(&op).inc(); + + self.queue.lock().await.push(Reverse(op)); + } + + /// Pop an element from the queue and update metrics + async fn pop(&self) -> Option>> { + let op = self.queue.lock().await.pop(); + op.map(|op| { + // even if the metric is decremented here, the operation may fail to process and be re-added to the queue. + // in those cases, the queue length will decrease to zero until the operation is re-added. + self.get_operation_metric(&op.0).dec(); + op + }) + } + + /// Get the metric associated with this operation + fn get_operation_metric(&self, operation: &DynPendingOperation) -> IntGauge { + let (destination, app_context) = operation.get_operation_labels(); + self.metrics + .with_label_values(&[&destination, &self.queue_metrics_label, &app_context]) + } +} /// SerialSubmitter accepts operations over a channel. It is responsible for /// executing the right strategy to deliver those messages to the destination @@ -91,8 +127,14 @@ impl SerialSubmitter { metrics, rx: rx_prepare, } = self; - let prepare_queue: OpQueue = Default::default(); - let confirm_queue: OpQueue = Default::default(); + let prepare_queue = OpQueue::new( + metrics.submitter_queue_length.clone(), + "prepare_queue".to_string(), + ); + let confirm_queue = OpQueue::new( + metrics.submitter_queue_length.clone(), + "confirm_queue".to_string(), + ); // This is a channel because we want to only have a small number of messages // sitting ready to go at a time and this acts as a synchronization tool @@ -149,7 +191,7 @@ async fn receive_task( // make sure things are getting wired up correctly; if this works in testing it // should also be valid in production. debug_assert_eq!(*op.domain(), domain); - prepare_queue.lock().await.push(Reverse(op)); + prepare_queue.push(op).await; } } @@ -162,16 +204,14 @@ async fn prepare_task( ) { loop { // Pick the next message to try preparing. - let next = { - let mut queue = prepare_queue.lock().await; - metrics.prepare_queue_length.set(queue.len() as i64); - queue.pop() - }; + let next = prepare_queue.pop().await; + let Some(Reverse(mut op)) = next else { // queue is empty so give some time before checking again to prevent burning CPU sleep(Duration::from_millis(200)).await; continue; }; + trace!(?op, "Preparing operation"); debug_assert_eq!(*op.domain(), domain); @@ -186,12 +226,12 @@ async fn prepare_task( } PendingOperationResult::NotReady => { // none of the operations are ready yet, so wait for a little bit - prepare_queue.lock().await.push(Reverse(op)); + prepare_queue.push(op).await; sleep(Duration::from_millis(200)).await; } PendingOperationResult::Reprepare => { metrics.ops_failed.inc(); - prepare_queue.lock().await.push(Reverse(op)); + prepare_queue.push(op).await; } PendingOperationResult::Drop => { metrics.ops_dropped.inc(); @@ -216,14 +256,14 @@ async fn submit_task( PendingOperationResult::Success => { debug!(?op, "Operation submitted"); metrics.ops_submitted.inc(); - confirm_queue.lock().await.push(Reverse(op)); + confirm_queue.push(op).await; } PendingOperationResult::NotReady => { panic!("Pending operation was prepared and therefore must be ready") } PendingOperationResult::Reprepare => { metrics.ops_failed.inc(); - prepare_queue.lock().await.push(Reverse(op)); + prepare_queue.push(op).await; } PendingOperationResult::Drop => { metrics.ops_dropped.inc(); @@ -241,15 +281,11 @@ async fn confirm_task( ) { loop { // Pick the next message to try confirming. - let next = { - let mut queue = confirm_queue.lock().await; - metrics.confirm_queue_length.set(queue.len() as i64); - queue.pop() - }; - let Some(Reverse(mut op)) = next else { + let Some(Reverse(mut op)) = confirm_queue.pop().await else { sleep(Duration::from_secs(5)).await; continue; }; + trace!(?op, "Confirming operation"); debug_assert_eq!(*op.domain(), domain); @@ -260,12 +296,12 @@ async fn confirm_task( } PendingOperationResult::NotReady => { // none of the operations are ready yet, so wait for a little bit - confirm_queue.lock().await.push(Reverse(op)); + confirm_queue.push(op).await; sleep(Duration::from_secs(5)).await; } PendingOperationResult::Reprepare => { metrics.ops_reorged.inc(); - prepare_queue.lock().await.push(Reverse(op)); + prepare_queue.push(op).await; } PendingOperationResult::Drop => { metrics.ops_dropped.inc(); @@ -276,9 +312,7 @@ async fn confirm_task( #[derive(Debug, Clone)] pub struct SerialSubmitterMetrics { - prepare_queue_length: IntGauge, - confirm_queue_length: IntGauge, - + submitter_queue_length: IntGaugeVec, ops_prepared: IntCounter, ops_submitted: IntCounter, ops_confirmed: IntCounter, @@ -291,12 +325,7 @@ impl SerialSubmitterMetrics { pub fn new(metrics: &CoreMetrics, destination: &HyperlaneDomain) -> Self { let destination = destination.name(); Self { - prepare_queue_length: metrics - .submitter_queue_length() - .with_label_values(&[destination, "prepare_queue"]), - confirm_queue_length: metrics - .submitter_queue_length() - .with_label_values(&[destination, "confirm_queue"]), + submitter_queue_length: metrics.submitter_queue_length(), ops_prepared: metrics .operations_processed_count() .with_label_values(&["prepared", destination]), diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 035ab66752..d684b18adb 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -32,7 +32,7 @@ use crate::{ merkle_tree::builder::MerkleTreeBuilder, msg::{ gas_payment::GasPaymentEnforcer, - metadata::{AppContextClassifier, BaseMetadataBuilder}, + metadata::{BaseMetadataBuilder, IsmAwareAppContextClassifier}, pending_message::{MessageContext, MessageSubmissionMetrics}, pending_operation::DynPendingOperation, processor::{MessageProcessor, MessageProcessorMetrics}, @@ -73,6 +73,7 @@ pub struct Relayer { transaction_gas_limit: Option, skip_transaction_gas_limit_for: HashSet, allow_local_checkpoint_syncers: bool, + metric_app_contexts: Vec<(MatchingList, String)>, core_metrics: Arc, // TODO: decide whether to consolidate `agent_metrics` and `chain_metrics` into a single struct // or move them in `core_metrics`, like the validator metrics @@ -226,7 +227,7 @@ impl BaseAgent for Relayer { core.metrics.clone(), db, 5, - AppContextClassifier::new( + IsmAwareAppContextClassifier::new( mailboxes[destination].clone(), settings.metric_app_contexts.clone(), ), @@ -264,6 +265,7 @@ impl BaseAgent for Relayer { transaction_gas_limit, skip_transaction_gas_limit_for, allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers, + metric_app_contexts: settings.metric_app_contexts, core_metrics, agent_metrics, chain_metrics, @@ -402,6 +404,7 @@ impl Relayer { metrics, send_channels, destination_ctxs, + self.metric_app_contexts.clone(), ); let span = info_span!("MessageProcessor", origin=%message_processor.domain()); diff --git a/rust/hyperlane-base/src/metrics/core.rs b/rust/hyperlane-base/src/metrics/core.rs index 447c6f2f45..8e16684aa8 100644 --- a/rust/hyperlane-base/src/metrics/core.rs +++ b/rust/hyperlane-base/src/metrics/core.rs @@ -132,7 +132,7 @@ impl CoreMetrics { "Submitter queue length", const_labels_ref ), - &["remote", "queue_name"], + &["remote", "queue_name", "app_context"], registry )?;