Skip to content

Commit

Permalink
chore: refactor escalator
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Dec 2, 2024
1 parent 1e710da commit 4d75c24
Showing 1 changed file with 105 additions and 88 deletions.
193 changes: 105 additions & 88 deletions ethers-middleware/src/gas_escalator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use thiserror::Error;
use tracing::{self, instrument};
use tracing_futures::Instrument;

use ethers_core::types::{transaction::eip2718::TypedTransaction, BlockId, TxHash, U256};
use ethers_core::types::{transaction::eip2718::TypedTransaction, BlockId, TxHash, H256, U256};
use ethers_providers::{interval, FromErr, Middleware, PendingTransaction, StreamExt};

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -246,7 +246,7 @@ where

let esc = EscalationTask { inner, escalator, frequency, txs };

spawn(esc.escalate().instrument(tracing::debug_span!("gas_escalation")));
spawn(esc.monitor().instrument(tracing::debug_span!("gas_escalation")));

Self { inner: this }
}
Expand All @@ -267,13 +267,111 @@ impl<M, E: Clone> EscalationTask<M, E> {
Self { inner, escalator, frequency, txs }
}

async fn escalate(mut self) -> Result<(), GasEscalatorError<M>>
async fn broadcast_tx_if_escalated(
&self,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Result<Option<(H256, Instant)>, GasEscalatorError<M>>
where
M: Middleware,
E: GasEscalator,
{
// gas price wasn't escalated
// keep monitoring the old tx
if old_monitored_tx.inner.eq(&new_tx) {
return Ok(Some((old_monitored_tx.hash, old_monitored_tx.creation_time)));
}

// send a replacement tx with the escalated gas price
match self.inner.send_transaction(new_tx.clone(), old_monitored_tx.block).await {
Ok(new_tx_hash) => {
let new_tx_hash = *new_tx_hash;
tracing::debug!(
old_tx = ?old_monitored_tx,
new_tx = ?new_tx,
"escalated gas price"
);
// Return the new tx hash to monitor and the time it was created.
// The latter is used to know when to escalate the gas price again
Ok(Some((new_tx_hash, Instant::now())))
}
Err(err) => {
if err.to_string().contains("nonce too low") {
// may happen if we try to broadcast a new, gas-escalated tx when the original tx
// already landed onchain, meaning we no longer need to escalate it
tracing::warn!(err = %err, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
Ok(None)
} else {
tracing::error!(
err = %err,
"Unexpected error. Killing escalator backend."
);
Err(GasEscalatorError::MiddlewareError(err))
}
}
}
}

async fn escalate_stuck_txs(&self) -> Result<(), GasEscalatorError<M>>
where
M: Middleware,
E: GasEscalator,
{
// We take monitored txs out of the mutex, and add them back if they weren't included yet
let monitored_txs: Vec<_> = {
let mut txs = self.txs.lock().await;
std::mem::take(&mut (*txs))
// Lock scope ends
};

if !monitored_txs.is_empty() {
tracing::trace!(?monitored_txs, "In the escalator watcher loop. Monitoring txs");
}
let mut new_txs_to_monitor = vec![];
for monitored_tx in monitored_txs {
let receipt = self
.inner
.get_transaction_receipt(monitored_tx.hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?monitored_tx.hash, "checking if exists");

if receipt.is_some() {
// tx was already included, can drop from escalator
continue;
}
let Some(new_tx) = monitored_tx.escalate_gas_price(self.escalator.clone()) else {
tracing::error!(tx=?monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
continue;
};

let maybe_tx_to_monitor =
self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await?;

if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor {
new_txs_to_monitor.push(MonitoredTransaction {
hash: new_txhash,
inner: new_tx,
creation_time: new_creation_time,
block: monitored_tx.block,
});
}
}
// we add the new txs to monitor back to the list
// we don't replace here, as the vec in the mutex may contain
// items!
self.txs.lock().await.extend(new_txs_to_monitor);
Ok(())
}

async fn monitor(self) -> Result<(), GasEscalatorError<M>>
where
M: Middleware,
E: GasEscalator,
{
// the escalation frequency is either on a per-block basis, or on a duration basis
let watcher: WatcherFuture = match self.frequency {
let escalation_frequency_watcher: WatcherFuture = match self.frequency {
Frequency::PerBlock => Box::pin(
self.inner
.watch_blocks()
Expand All @@ -284,90 +382,9 @@ impl<M, E: Clone> EscalationTask<M, E> {
Frequency::Duration(ms) => Box::pin(interval(std::time::Duration::from_millis(ms))),
};

let mut watcher = watcher.fuse();

while watcher.next().await.is_some() {
// We take the contents of the mutex, and then add them back in
// later.
let mut txs: Vec<_> = {
let mut txs = self.txs.lock().await;
std::mem::take(&mut (*txs))
// Lock scope ends
};

let len = txs.len();
if len > 0 {
tracing::debug!(?txs, "In the escalator watcher loop. Monitoring txs");
}
// Pop all transactions and re-insert those that have not been included yet
for _ in 0..len {
// this must never panic as we're explicitly within bounds
let old_monitored_tx = txs.pop().expect("should have element in vector");

let receipt = self
.inner
.get_transaction_receipt(old_monitored_tx.hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists");

if receipt.is_none() {
let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone())
else {
tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
continue;
};

// gas price wasn't escalated
let (new_txhash, new_creation_time) = if old_monitored_tx.inner.eq(&new_tx) {
(old_monitored_tx.hash, old_monitored_tx.creation_time)
} else {
// the tx hash will be different so we need to update it
match self
.inner
.send_transaction(new_tx.clone(), old_monitored_tx.block)
.await
{
Ok(new_tx_hash) => {
let new_tx_hash = *new_tx_hash;
tracing::debug!(
old_tx = ?old_monitored_tx,
new_tx = ?new_tx,
"escalated gas price"
);
(new_tx_hash, Instant::now())
}
Err(err) => {
if err.to_string().contains("nonce too low") {
// may happen if we try to broadcast a higher
// gas price tx when one of the previous ones
// was already mined (meaning we also do not
// push it back to the pending txs vector)
tracing::warn!(err = %err, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been mined.");
continue;
} else {
tracing::error!(
err = %err,
"Killing escalator backend"
);
return Err(GasEscalatorError::MiddlewareError(err));
}
}
}
};
txs.push(MonitoredTransaction {
hash: new_txhash,
inner: new_tx,
creation_time: new_creation_time,
block: old_monitored_tx.block,
});
}
}
// after this big ugly loop, we dump everything back in
// we don't replace here, as the vec in the mutex may contain
// items!
self.txs.lock().await.extend(txs);
let mut escalation_frequency_watcher = escalation_frequency_watcher.fuse();
while escalation_frequency_watcher.next().await.is_some() {
self.escalate_stuck_txs().await?;
}
tracing::error!("timing future has gone away");
Ok(())
Expand Down

0 comments on commit 4d75c24

Please sign in to comment.