Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use app context classifier in relayer submitter queues #3385

Merged
merged 11 commits into from
Mar 13, 2024
16 changes: 10 additions & 6 deletions rust/agents/relayer/src/msg/metadata/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ impl DefaultIsmCache {
/// Classifies messages into an app context if they have one.
#[derive(Debug)]
pub struct AppContextClassifier {
default_ism: DefaultIsmCache,
default_ism: Option<DefaultIsmCache>,
app_matching_lists: Vec<(MatchingList, String)>,
}

impl AppContextClassifier {
pub fn new(
destination_mailbox: Arc<dyn Mailbox>,
destination_mailbox: Option<Arc<dyn Mailbox>>,
app_matching_lists: Vec<(MatchingList, String)>,
) -> Self {
Self {
default_ism: DefaultIsmCache::new(destination_mailbox),
default_ism: destination_mailbox.map(DefaultIsmCache::new),
app_matching_lists,
}
}
Expand All @@ -131,7 +131,7 @@ impl AppContextClassifier {
pub async fn get_app_context(
&self,
message: &HyperlaneMessage,
root_ism: H256,
root_ism: Option<H256>,
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Option<String>> {
// Give priority to the matching list. If the app from the matching list happens
// to use the default ISM, it's preferable to use the app context from the matching
Expand All @@ -142,7 +142,11 @@ impl AppContextClassifier {
}
}

let default_ism = self.default_ism.get().await?;
let (root_ism, default_ism) = match (root_ism, &self.default_ism) {
(Some(root_ism), Some(default_ism)) => (root_ism, default_ism.get().await?),
_ => return Ok(None),
};

if root_ism == default_ism {
return Ok(Some("default_ism".to_string()));
}
Expand Down Expand Up @@ -191,7 +195,7 @@ impl MessageMetadataBuilder {
) -> Result<Self> {
let app_context = base
.app_context_classifier
.get_app_context(message, ism_address)
.get_app_context(message, Some(ism_address))
.await?;
Ok(Self {
base,
Expand Down
20 changes: 17 additions & 3 deletions rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct MessageContext {
pub struct PendingMessage {
pub message: HyperlaneMessage,
ctx: Arc<MessageContext>,
app_context: Option<String>,
#[new(default)]
submitted: bool,
#[new(default)]
Expand Down Expand Up @@ -104,6 +105,15 @@ impl PendingOperation for PendingMessage {
self.ctx.destination_mailbox.domain()
}

fn app_context(&self) -> Option<String> {
self.app_context.clone()
}

fn destination(&self) -> String {
// TODO: check whether this is the correct way of converting the address to string
self.ctx.destination_mailbox.address().to_string()
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
}

#[instrument]
async fn prepare(&mut self) -> PendingOperationResult {
make_op_try!(|| self.on_reprepare());
Expand Down Expand Up @@ -302,7 +312,7 @@ impl PendingOperation for PendingMessage {
}
}

fn _next_attempt_after(&self) -> Option<Instant> {
fn next_attempt_after(&self) -> Option<Instant> {
self.next_attempt_after
}

Expand All @@ -315,8 +325,12 @@ impl PendingOperation for PendingMessage {
impl PendingMessage {
/// Constructor that tries reading the retry count from the HyperlaneDB in order to recompute the `next_attempt_after`.
/// In case of failure, behaves like `Self::new(...)`.
pub fn from_persisted_retries(message: HyperlaneMessage, ctx: Arc<MessageContext>) -> Self {
let mut pm = Self::new(message, ctx);
pub fn from_persisted_retries(
message: HyperlaneMessage,
ctx: Arc<MessageContext>,
app_context: Option<String>,
) -> Self {
let mut pm = Self::new(message, ctx, app_context);
match pm
.ctx
.origin_db
Expand Down
17 changes: 15 additions & 2 deletions rust/agents/relayer/src/msg/pending_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ pub trait PendingOperation {
/// The domain this operation will take place on.
fn domain(&self) -> &HyperlaneDomain;

/// Label to use for metrics granularity.
fn app_context(&self) -> Option<String>;
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved

/// The destination contract address this operation is meant for.
fn destination(&self) -> String;

/// Get tuple of labels for metrics.
fn get_operation_labels(&self) -> (String, String) {
let app_context = self.app_context().unwrap_or("Unknown".to_string());
let destination = self.destination();
(destination, app_context)
}

/// Prepare to submit this operation. This will be called before every
/// submission and will usually have a very short gap between it and the
/// submit call.
Expand All @@ -50,7 +63,7 @@ pub trait PendingOperation {
///
/// This is only used for sorting, the functions are responsible for
/// returning `NotReady` if it is too early and matters.
fn _next_attempt_after(&self) -> Option<Instant>;
fn next_attempt_after(&self) -> Option<Instant>;

#[cfg(test)]
/// Set the number of times this operation has been retried.
Expand Down Expand Up @@ -80,7 +93,7 @@ impl Ord for DynPendingOperation {
fn cmp(&self, other: &Self) -> Ordering {
use DynPendingOperation::*;
use Ordering::*;
match (self._next_attempt_after(), other._next_attempt_after()) {
match (self.next_attempt_after(), other.next_attempt_after()) {
(Some(a), Some(b)) => a.cmp(&b),
// No time means it should come before
(None, Some(_)) => Less,
Expand Down
24 changes: 20 additions & 4 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ use async_trait::async_trait;
use derive_new::new;
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, KnownHyperlaneDomain, Mailbox};
use prometheus::IntGauge;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, trace};

use super::pending_message::*;
use super::{metadata::AppContextClassifier, pending_message::*};
use crate::msg::pending_operation::DynPendingOperation;
use crate::{processor::ProcessorExt, settings::matching_list::MatchingList};

/// Finds unprocessed messages from an origin and submits then through a channel
/// for to the appropriate destination.
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct MessageProcessor {
db: HyperlaneRocksDB,
Expand All @@ -31,6 +32,8 @@ pub struct MessageProcessor {
send_channels: HashMap<u32, UnboundedSender<Box<DynPendingOperation>>>,
/// Needed context to send a message for each destination chain
destination_ctxs: HashMap<u32, Arc<MessageContext>>,
mailboxes: HashMap<HyperlaneDomain, Arc<dyn Mailbox>>,
metric_app_contexts: Vec<(MatchingList, String)>,
#[new(default)]
message_nonce: u32,
}
Expand Down Expand Up @@ -94,10 +97,18 @@ impl ProcessorExt for MessageProcessor {

debug!(%msg, "Sending message to submitter");

let domain: HyperlaneDomain = KnownHyperlaneDomain::try_from(msg.destination)?.into();
let app_context_classifier = AppContextClassifier::new(
self.mailboxes.get(&domain).map(Clone::clone),
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
self.metric_app_contexts.clone(),
);

let app_context = app_context_classifier.get_app_context(&msg, None).await?;
// Finally, build the submit arg and dispatch it to the submitter.
let pending_msg = PendingMessage::from_persisted_retries(
msg,
self.destination_ctxs[&destination].clone(),
app_context,
);
self.send_channels[&destination].send(Box::new(pending_msg.into()))?;
self.message_nonce += 1;
Expand Down Expand Up @@ -265,7 +276,7 @@ mod test {
Arc::new(core_metrics),
db.clone(),
5,
AppContextClassifier::new(Arc::new(MockMailboxContract::default()), vec![]),
AppContextClassifier::new(Some(Arc::new(MockMailboxContract::default())), vec![]),
)
}

Expand Down Expand Up @@ -296,6 +307,11 @@ mod test {
dummy_processor_metrics(origin_domain.id()),
HashMap::from([(destination_domain.id(), send_channel)]),
HashMap::from([(destination_domain.id(), message_context)]),
HashMap::from([(
destination_domain.clone(),
Arc::new(MockMailboxContract::default()) as Arc<dyn Mailbox>,
)]),
vec![],
),
receive_channel,
)
Expand Down Expand Up @@ -425,7 +441,7 @@ mod test {
// Round up the actual backoff because it was calculated with an `Instant::now()` that was a fraction of a second ago
let expected_backoff = PendingMessage::calculate_msg_backoff(*expected_retries)
.map(|b| b.as_secs_f32().round());
let actual_backoff = pm._next_attempt_after().map(|instant| {
let actual_backoff = pm.next_attempt_after().map(|instant| {
instant.duration_since(Instant::now()).as_secs_f32().round()
});
assert_eq!(expected_backoff, actual_backoff);
Expand Down
Loading
Loading