From f18abe381c8f93b6a402111bb976dbd2bae0cedc Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Fri, 3 Jan 2025 15:36:37 +0000 Subject: [PATCH] feat: middleware submission tweaks (#23) - adds tx to gas escalator before any broadcast. If there's an immediate error when broadcasting, the escalator will keep trying to resubmit with a higher gas price - resyncs the nonce manager's internal nonce to the one onchain every 10 txs both are short-term fixes for the issues dicussed in https://www.notion.so/hyperlanexyz/State-of-EVM-provider-middleware-16c6d35200d680d8a6b5f0d32cd8c66b?pvs=4 --- ethers-middleware/src/gas_escalator/mod.rs | 96 ++++++++++++---------- ethers-middleware/src/nonce_manager.rs | 26 ++++++ 2 files changed, 78 insertions(+), 44 deletions(-) diff --git a/ethers-middleware/src/gas_escalator/mod.rs b/ethers-middleware/src/gas_escalator/mod.rs index 0bf23744c..9ca6d3531 100644 --- a/ethers-middleware/src/gas_escalator/mod.rs +++ b/ethers-middleware/src/gas_escalator/mod.rs @@ -61,7 +61,7 @@ pub(crate) struct GasEscalatorMiddlewareInternal { #[derive(Debug, Clone, PartialEq, Eq)] pub struct MonitoredTransaction { - hash: TxHash, + hash: Option, inner: TypedTransaction, creation_time: Instant, block: Option, @@ -205,23 +205,30 @@ where ) -> Result, GasEscalatorError> { let tx = tx.into(); - let pending_tx = self - .inner - .send_transaction(tx.clone(), block) - .await - .map_err(GasEscalatorError::MiddlewareError)?; - - tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher"); - // insert the tx in the pending txs - let mut lock = self.txs.lock().await; - lock.push(MonitoredTransaction { - hash: *pending_tx, - inner: tx, - creation_time: Instant::now(), - block, - }); - - Ok(pending_tx) + match self.inner.send_transaction(tx.clone(), block).await { + Ok(pending_tx) => { + tracing::debug!(tx = ?tx, tx_hash = ?pending_tx.tx_hash(), "Sent tx, adding to gas escalator watcher"); + let mut lock = self.txs.lock().await; + lock.push(MonitoredTransaction { + hash: Some(*pending_tx), + inner: tx, + creation_time: Instant::now(), + block, + }); + Ok(pending_tx) + } + Err(err) => { + tracing::warn!(tx = ?tx, "Failed to send tx, adding to gas escalator watcher regardless"); + let mut lock = self.txs.lock().await; + lock.push(MonitoredTransaction { + hash: None, + inner: tx, + creation_time: Instant::now(), + block: None, + }); + Err(GasEscalatorError::MiddlewareError(err)) + } + } } } @@ -279,11 +286,11 @@ impl EscalationTask { err_message: String, old_monitored_tx: MonitoredTransaction, new_tx: TypedTransaction, - ) -> Option<(H256, Instant)> { + ) -> Option<(Option, Instant)> { if err_message.contains("nonce too low") { // may happen if we try to broadcast a new, gas-escalated tx when the original tx // already landed onchain, meaning we no longer need to escalate it - tracing::warn!(err = err_message, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator"); + tracing::warn!(err = err_message, ?old_monitored_tx, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator"); None } else if RETRYABLE_ERRORS.iter().any(|err_msg| err_message.contains(err_msg)) { // if the error is one of the known retryable errors, we can keep trying to escalate @@ -311,21 +318,15 @@ impl EscalationTask { /// /// **Returns** a tx hash to monitor and the time it was created, unless the tx was already /// included or an unknown error occurred - async fn broadcast_tx_if_escalated( + async fn broadcast_tx( &self, old_monitored_tx: MonitoredTransaction, new_tx: TypedTransaction, - ) -> Option<(H256, Instant)> + ) -> Option<(Option, Instant)> where M: Middleware, E: GasEscalator, { - // gas price wasn't escalated - // keep monitoring the old tx - if old_monitored_tx.inner.eq(&new_tx) { - return Some((old_monitored_tx.hash, old_monitored_tx.creation_time)); - } - // send a replacement tx with the escalated gas price match self.inner.send_transaction(new_tx.clone(), old_monitored_tx.block).await { Ok(new_tx_hash) => { @@ -338,7 +339,7 @@ impl EscalationTask { ); // Return the new tx hash to monitor and the time it was created. // The latter is used to know when to escalate the gas price again - Some((new_tx_hash, Instant::now())) + Some((Some(new_tx_hash), Instant::now())) } Err(err) => Self::handle_broadcast_error(err.to_string(), old_monitored_tx, new_tx), } @@ -360,34 +361,41 @@ impl EscalationTask { tracing::trace!(?monitored_txs, "In the escalator watcher loop. Monitoring txs"); } let mut new_txs_to_monitor = vec![]; - for monitored_tx in monitored_txs { - let receipt = self - .inner - .get_transaction_receipt(monitored_tx.hash) - .await - .map_err(GasEscalatorError::MiddlewareError)?; - - tracing::trace!(tx_hash = ?monitored_tx.hash, "checking if exists"); + for old_monitored_tx in monitored_txs { + let receipt = if let Some(tx_hash) = old_monitored_tx.hash { + tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists"); + self.inner + .get_transaction_receipt(tx_hash) + .await + .map_err(GasEscalatorError::MiddlewareError)? + } else { + None + }; - if receipt.is_some() { + if let Some(receipt) = receipt { // tx was already included, can drop from escalator - tracing::debug!(tx = ?monitored_tx.hash, "Transaction was included onchain, dropping from escalator"); + tracing::debug!(tx = ?receipt.transaction_hash, "Transaction was included onchain, dropping from escalator"); continue; } - let Some(new_tx) = monitored_tx.escalate_gas_price(self.escalator.clone()) else { - tracing::error!(tx=?monitored_tx.hash, "gas price is not set for transaction, dropping from escalator"); + let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone()) else { + tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator"); continue; }; - let maybe_tx_to_monitor = - self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await; + // gas price wasn't escalated + // keep monitoring the old tx + let maybe_tx_to_monitor = if old_monitored_tx.inner.eq(&new_tx) { + Some((old_monitored_tx.hash, old_monitored_tx.creation_time)) + } else { + self.broadcast_tx(old_monitored_tx.clone(), new_tx.clone()).await + }; if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor { new_txs_to_monitor.push(MonitoredTransaction { hash: new_txhash, inner: new_tx, creation_time: new_creation_time, - block: monitored_tx.block, + block: old_monitored_tx.block, }); } } diff --git a/ethers-middleware/src/nonce_manager.rs b/ethers-middleware/src/nonce_manager.rs index 7905c7b78..aed41490f 100644 --- a/ethers-middleware/src/nonce_manager.rs +++ b/ethers-middleware/src/nonce_manager.rs @@ -5,6 +5,8 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use thiserror::Error; use tracing::instrument; +const DEFAULT_TX_COUNT_FOR_RESYNC: u64 = 10; + #[derive(Debug)] /// Middleware used for calculating nonces locally, useful for signing multiple /// consecutive transactions without waiting for them to hit the mempool @@ -13,6 +15,8 @@ pub struct NonceManagerMiddleware { init_guard: futures_locks::Mutex<()>, initialized: AtomicBool, nonce: AtomicU64, + tx_count_for_resync: Option, + txs_since_resync: AtomicU64, address: Address, } @@ -28,6 +32,8 @@ where init_guard: Default::default(), initialized: Default::default(), nonce: Default::default(), + tx_count_for_resync: Default::default(), + txs_since_resync: 0u64.into(), address, } } @@ -38,6 +44,13 @@ where nonce.into() } + pub fn get_tx_count_for_resync(&self) -> u64 { + self.tx_count_for_resync + .as_ref() + .map(|count| count.load(Ordering::SeqCst)) + .unwrap_or(DEFAULT_TX_COUNT_FOR_RESYNC) + } + pub async fn initialize_nonce( &self, block: Option, @@ -142,7 +155,20 @@ where tracing::debug!(?nonce, "Sending transaction"); match self.inner.send_transaction(tx.clone(), block).await { Ok(pending_tx) => { + let txs_since_resync = self.txs_since_resync.load(Ordering::SeqCst); + let new_txs_since_resync = txs_since_resync + 1; tracing::debug!(?nonce, "Sent transaction"); + let tx_count_for_resync = self.get_tx_count_for_resync(); + if new_txs_since_resync >= tx_count_for_resync { + let onchain_nonce = self.get_transaction_count(self.address, block).await?; + self.nonce.store(onchain_nonce.as_u64(), Ordering::SeqCst); + self.txs_since_resync.store(0, Ordering::SeqCst); + tracing::debug!(?nonce, "Resynced internal nonce with onchain nonce"); + } else { + self.txs_since_resync.store(new_txs_since_resync, Ordering::SeqCst); + let txs_until_resync = tx_count_for_resync - new_txs_since_resync; + tracing::debug!(?txs_until_resync, "Transactions until nonce resync"); + } Ok(pending_tx) } Err(err) => {