Skip to content

Commit

Permalink
feat: add tx to escalator before any broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Jan 2, 2025
1 parent d9f822e commit ac7b7c9
Showing 1 changed file with 51 additions and 42 deletions.
93 changes: 51 additions & 42 deletions ethers-middleware/src/gas_escalator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub(crate) struct GasEscalatorMiddlewareInternal<M> {

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MonitoredTransaction {
hash: TxHash,
hash: Option<TxHash>,
inner: TypedTransaction,
creation_time: Instant,
block: Option<BlockId>,
Expand Down Expand Up @@ -204,22 +204,30 @@ where
) -> Result<PendingTransaction<'_, M::Provider>, GasEscalatorError<M>> {
let tx = tx.into();

let pending_tx = self
.inner
.send_transaction(tx.clone(), block)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

// insert the tx in the pending txs
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: *pending_tx,
inner: tx,
creation_time: Instant::now(),
block,
});

Ok(pending_tx)
match self.inner.send_transaction(tx.clone(), block).await {
Ok(pending_tx) => {
// insert the tx in the pending txs
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: Some(*pending_tx),
inner: tx,
creation_time: Instant::now(),
block,
});
Ok(pending_tx)
}
Err(err) => {
// insert the tx in the pending txs
let mut lock = self.txs.lock().await;
lock.push(MonitoredTransaction {
hash: None,
inner: tx,
creation_time: Instant::now(),
block: None,
});
Err(GasEscalatorError::MiddlewareError(err))
}
}
}
}

Expand Down Expand Up @@ -274,11 +282,11 @@ impl<M, E: Clone> EscalationTask<M, E> {
err_message: String,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Option<(H256, Instant)> {
) -> Option<(Option<H256>, Instant)> {
if err_message.contains("nonce too low") {
// may happen if we try to broadcast a new, gas-escalated tx when the original tx
// already landed onchain, meaning we no longer need to escalate it
tracing::warn!(err = err_message, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
tracing::warn!(err = err_message, ?old_monitored_tx, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator");
None
} else if err_message.contains("replacement transaction underpriced") {
// the gas escalation wasn't sufficient
Expand Down Expand Up @@ -316,21 +324,15 @@ impl<M, E: Clone> EscalationTask<M, E> {
///
/// **Returns** a tx hash to monitor and the time it was created, unless the tx was already
/// included or an unknown error occurred
async fn broadcast_tx_if_escalated(
async fn broadcast_tx(
&self,
old_monitored_tx: MonitoredTransaction,
new_tx: TypedTransaction,
) -> Option<(H256, Instant)>
) -> Option<(Option<H256>, Instant)>
where
M: Middleware,
E: GasEscalator,
{
// gas price wasn't escalated
// keep monitoring the old tx
if old_monitored_tx.inner.eq(&new_tx) {
return Some((old_monitored_tx.hash, old_monitored_tx.creation_time));
}

// send a replacement tx with the escalated gas price
match self.inner.send_transaction(new_tx.clone(), old_monitored_tx.block).await {
Ok(new_tx_hash) => {
Expand All @@ -343,7 +345,7 @@ impl<M, E: Clone> EscalationTask<M, E> {
);
// Return the new tx hash to monitor and the time it was created.
// The latter is used to know when to escalate the gas price again
Some((new_tx_hash, Instant::now()))
Some((Some(new_tx_hash), Instant::now()))
}
Err(err) => Self::handle_broadcast_error(err.to_string(), old_monitored_tx, new_tx),
}
Expand All @@ -365,34 +367,41 @@ impl<M, E: Clone> EscalationTask<M, E> {
tracing::trace!(?monitored_txs, "In the escalator watcher loop. Monitoring txs");
}
let mut new_txs_to_monitor = vec![];
for monitored_tx in monitored_txs {
let receipt = self
.inner
.get_transaction_receipt(monitored_tx.hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?monitored_tx.hash, "checking if exists");
for old_monitored_tx in monitored_txs {
let receipt = if let Some(tx_hash) = old_monitored_tx.hash {
tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists");
self.inner
.get_transaction_receipt(tx_hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?
} else {
None
};

if receipt.is_some() {
// tx was already included, can drop from escalator
tracing::debug!(tx = ?monitored_tx, "Transaction was included onchain, dropping from escalator");
tracing::debug!(tx = ?old_monitored_tx, "Transaction was included onchain, dropping from escalator");
continue;
}
let Some(new_tx) = monitored_tx.escalate_gas_price(self.escalator.clone()) else {
tracing::error!(tx=?monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone()) else {
tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
continue;
};

let maybe_tx_to_monitor =
self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await;
// gas price wasn't escalated
// keep monitoring the old tx
let maybe_tx_to_monitor = if old_monitored_tx.inner.eq(&new_tx) {
Some((old_monitored_tx.hash, old_monitored_tx.creation_time))
} else {
self.broadcast_tx(old_monitored_tx.clone(), new_tx.clone()).await
};

if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor {
new_txs_to_monitor.push(MonitoredTransaction {
hash: new_txhash,
inner: new_tx,
creation_time: new_creation_time,
block: monitored_tx.block,
block: old_monitored_tx.block,
});
}
}
Expand Down

0 comments on commit ac7b7c9

Please sign in to comment.