Skip to content

Commit

Permalink
feat: middleware submission tweaks (#23)
Browse files Browse the repository at this point in the history
- adds tx to gas escalator before any broadcast. If there's an immediate
error when broadcasting, the escalator will keep trying to resubmit with
a higher gas price
- resyncs the nonce manager's internal nonce to the one onchain every 10
txs

both are short-term fixes for the issues dicussed in
https://www.notion.so/hyperlanexyz/State-of-EVM-provider-middleware-16c6d35200d680d8a6b5f0d32cd8c66b?pvs=4
  • Loading branch information
daniel-savu authored Jan 3, 2025
1 parent 31653ac commit f18abe3
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 44 deletions.
96 changes: 52 additions & 44 deletions ethers-middleware/src/gas_escalator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub(crate) struct GasEscalatorMiddlewareInternal<M> {

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MonitoredTransaction {
hash: TxHash,
hash: Option<TxHash>,
inner: TypedTransaction,
creation_time: Instant,
block: Option<BlockId>,
Expand Down Expand Up @@ -205,23 +205,30 @@ where
) -> Result<PendingTransaction<'_, M::Provider>, GasEscalatorError<M>> {
let tx = tx.into();

let pending_tx = self
.inner
.send_transaction(tx.clone(), block)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher");
// insert the tx in the pending txs
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: *pending_tx,
inner: tx,
creation_time: Instant::now(),
block,
});

Ok(pending_tx)
match self.inner.send_transaction(tx.clone(), block).await {
Ok(pending_tx) => {
tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher");
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: Some(*pending_tx),
inner: tx,
creation_time: Instant::now(),
block,
});
Ok(pending_tx)
}
Err(err) => {
tracing::warn!(tx = ?tx, "Failed to send tx, adding to gas escalator watcher regardless");
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: None,
inner: tx,
creation_time: Instant::now(),
block: None,
});
Err(GasEscalatorError::MiddlewareError(err))
}
}
}
}

Expand Down Expand Up @@ -279,11 +286,11 @@ impl<M, E: Clone> EscalationTask<M, E> {
err_message: String,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Option<(H256, Instant)> {
) -> Option<(Option<H256>, Instant)> {
if err_message.contains("nonce too low") {
// may happen if we try to broadcast a new, gas-escalated tx when the original tx
// already landed onchain, meaning we no longer need to escalate it
tracing::warn!(err = err_message, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
tracing::warn!(err = err_message, ?old_monitored_tx, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
None
} else if RETRYABLE_ERRORS.iter().any(|err_msg| err_message.contains(err_msg)) {
// if the error is one of the known retryable errors, we can keep trying to escalate
Expand Down Expand Up @@ -311,21 +318,15 @@ impl<M, E: Clone> EscalationTask<M, E> {
///
/// **Returns** a tx hash to monitor and the time it was created, unless the tx was already
/// included or an unknown error occurred
async fn broadcast_tx_if_escalated(
async fn broadcast_tx(
&self,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Option<(H256, Instant)>
) -> Option<(Option<H256>, Instant)>
where
M: Middleware,
E: GasEscalator,
{
// gas price wasn't escalated
// keep monitoring the old tx
if old_monitored_tx.inner.eq(&new_tx) {
return Some((old_monitored_tx.hash, old_monitored_tx.creation_time));
}

// send a replacement tx with the escalated gas price
match self.inner.send_transaction(new_tx.clone(), old_monitored_tx.block).await {
Ok(new_tx_hash) => {
Expand All @@ -338,7 +339,7 @@ impl<M, E: Clone> EscalationTask<M, E> {
);
// Return the new tx hash to monitor and the time it was created.
// The latter is used to know when to escalate the gas price again
Some((new_tx_hash, Instant::now()))
Some((Some(new_tx_hash), Instant::now()))
}
Err(err) => Self::handle_broadcast_error(err.to_string(), old_monitored_tx, new_tx),
}
Expand All @@ -360,34 +361,41 @@ impl<M, E: Clone> EscalationTask<M, E> {
tracing::trace!(?monitored_txs, "In the escalator watcher loop. Monitoring txs");
}
let mut new_txs_to_monitor = vec![];
for monitored_tx in monitored_txs {
let receipt = self
.inner
.get_transaction_receipt(monitored_tx.hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?monitored_tx.hash, "checking if exists");
for old_monitored_tx in monitored_txs {
let receipt = if let Some(tx_hash) = old_monitored_tx.hash {
tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists");
self.inner
.get_transaction_receipt(tx_hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?
} else {
None
};

if receipt.is_some() {
if let Some(receipt) = receipt {
// tx was already included, can drop from escalator
tracing::debug!(tx = ?monitored_tx.hash, "Transaction was included onchain, dropping from escalator");
tracing::debug!(tx = ?receipt.transaction_hash, "Transaction was included onchain, dropping from escalator");
continue;
}
let Some(new_tx) = monitored_tx.escalate_gas_price(self.escalator.clone()) else {
tracing::error!(tx=?monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone()) else {
tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
continue;
};

let maybe_tx_to_monitor =
self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await;
// gas price wasn't escalated
// keep monitoring the old tx
let maybe_tx_to_monitor = if old_monitored_tx.inner.eq(&new_tx) {
Some((old_monitored_tx.hash, old_monitored_tx.creation_time))
} else {
self.broadcast_tx(old_monitored_tx.clone(), new_tx.clone()).await
};

if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor {
new_txs_to_monitor.push(MonitoredTransaction {
hash: new_txhash,
inner: new_tx,
creation_time: new_creation_time,
block: monitored_tx.block,
block: old_monitored_tx.block,
});
}
}
Expand Down
26 changes: 26 additions & 0 deletions ethers-middleware/src/nonce_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use thiserror::Error;
use tracing::instrument;

const DEFAULT_TX_COUNT_FOR_RESYNC: u64 = 10;

#[derive(Debug)]
/// Middleware used for calculating nonces locally, useful for signing multiple
/// consecutive transactions without waiting for them to hit the mempool
Expand All @@ -13,6 +15,8 @@ pub struct NonceManagerMiddleware<M> {
init_guard: futures_locks::Mutex<()>,
initialized: AtomicBool,
nonce: AtomicU64,
tx_count_for_resync: Option<AtomicU64>,
txs_since_resync: AtomicU64,
address: Address,
}

Expand All @@ -28,6 +32,8 @@ where
init_guard: Default::default(),
initialized: Default::default(),
nonce: Default::default(),
tx_count_for_resync: Default::default(),
txs_since_resync: 0u64.into(),
address,
}
}
Expand All @@ -38,6 +44,13 @@ where
nonce.into()
}

pub fn get_tx_count_for_resync(&self) -> u64 {
self.tx_count_for_resync
.as_ref()
.map(|count| count.load(Ordering::SeqCst))
.unwrap_or(DEFAULT_TX_COUNT_FOR_RESYNC)
}

pub async fn initialize_nonce(
&self,
block: Option<BlockId>,
Expand Down Expand Up @@ -142,7 +155,20 @@ where
tracing::debug!(?nonce, "Sending transaction");
match self.inner.send_transaction(tx.clone(), block).await {
Ok(pending_tx) => {
let txs_since_resync = self.txs_since_resync.load(Ordering::SeqCst);
let new_txs_since_resync = txs_since_resync + 1;
tracing::debug!(?nonce, "Sent transaction");
let tx_count_for_resync = self.get_tx_count_for_resync();
if new_txs_since_resync >= tx_count_for_resync {
let onchain_nonce = self.get_transaction_count(self.address, block).await?;
self.nonce.store(onchain_nonce.as_u64(), Ordering::SeqCst);
self.txs_since_resync.store(0, Ordering::SeqCst);
tracing::debug!(?nonce, "Resynced internal nonce with onchain nonce");
} else {
self.txs_since_resync.store(new_txs_since_resync, Ordering::SeqCst);
let txs_until_resync = tx_count_for_resync - new_txs_since_resync;
tracing::debug!(?txs_until_resync, "Transactions until nonce resync");
}
Ok(pending_tx)
}
Err(err) => {
Expand Down

0 comments on commit f18abe3

Please sign in to comment.