From b74815e860ed48fd8a5ebf4c6a737fe35af6b3b1 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Wed, 20 Nov 2024 01:48:08 +0700 Subject: [PATCH] feat: eip1559 escalator --- ethers-middleware/src/gas_escalator/mod.rs | 127 ++++++++++++++------- ethers-middleware/tests/gas_escalator.rs | 63 +++++++++- 2 files changed, 146 insertions(+), 44 deletions(-) diff --git a/ethers-middleware/src/gas_escalator/mod.rs b/ethers-middleware/src/gas_escalator/mod.rs index efc5397fa..71e089790 100644 --- a/ethers-middleware/src/gas_escalator/mod.rs +++ b/ethers-middleware/src/gas_escalator/mod.rs @@ -13,15 +13,13 @@ use thiserror::Error; use tracing::{self, instrument}; use tracing_futures::Instrument; -use ethers_core::types::{ - transaction::eip2718::TypedTransaction, BlockId, TransactionRequest, TxHash, U256, -}; +use ethers_core::types::{transaction::eip2718::TypedTransaction, BlockId, TxHash, U256}; use ethers_providers::{interval, FromErr, Middleware, PendingTransaction, StreamExt}; #[cfg(not(target_arch = "wasm32"))] use tokio::spawn; -type ToEscalate = Arc)>>>; +pub type ToEscalate = Arc>>; #[cfg(target_arch = "wasm32")] type WatcherFuture<'a> = Pin + 'a>>; @@ -61,6 +59,57 @@ pub(crate) struct GasEscalatorMiddlewareInternal { pub txs: ToEscalate, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MonitoredTransaction { + hash: TxHash, + inner: TypedTransaction, + creation_time: Instant, + block: Option, +} + +impl MonitoredTransaction { + fn escalate_gas_price(&self, escalator: E) -> Option { + // Get the new gas price based on how much time passed since the + // tx was last broadcast + let time_elapsed = self.creation_time.elapsed().as_secs(); + match self.inner.clone() { + TypedTransaction::Legacy(tx) => { + let Some(gas_price) = tx.gas_price else { + return None; + }; + let new_gas_price = escalator.get_gas_price(gas_price, time_elapsed); + let mut updated_tx = tx.clone(); + updated_tx.gas_price = Some(new_gas_price); + Some(updated_tx.into()) + } + TypedTransaction::Eip2930(tx) => { + let Some(gas_price) = tx.tx.gas_price else { + return None; + }; + let new_gas_price = escalator.get_gas_price(gas_price, time_elapsed); + let mut updated_tx = tx.clone(); + updated_tx.tx.gas_price = Some(new_gas_price); + Some(updated_tx.into()) + } + TypedTransaction::Eip1559(tx) => { + let Some(max_fee_per_gas) = tx.max_fee_per_gas else { + return None; + }; + let Some(max_priority_fee_per_gas) = tx.max_priority_fee_per_gas else { + return None; + }; + let new_max_fee_per_gas = escalator.get_gas_price(max_fee_per_gas, time_elapsed); + let new_max_priority_fee_per_gas = + escalator.get_gas_price(max_priority_fee_per_gas, time_elapsed); + let mut updated_tx = tx.clone(); + updated_tx.max_fee_per_gas = Some(new_max_fee_per_gas); + updated_tx.max_priority_fee_per_gas = Some(new_max_priority_fee_per_gas); + Some(updated_tx.into()) + } + } + } +} + /// A Gas escalator allows bumping transactions' gas price to avoid getting them /// stuck in the memory pool. /// @@ -161,17 +210,14 @@ where .await .map_err(GasEscalatorError::MiddlewareError)?; - let tx = match tx { - TypedTransaction::Legacy(inner) => inner, - TypedTransaction::Eip2930(inner) => inner.tx, - // don't run the escalator for this tx if it's not supported - _ => return Ok(pending_tx), - // _ => return Err(GasEscalatorError::UnsupportedTxType), - }; - // insert the tx in the pending txs let mut lock = self.txs.lock().await; - lock.push((*pending_tx, tx, Instant::now(), block)); + lock.push(MonitoredTransaction { + hash: *pending_tx, + inner: tx, + creation_time: Instant::now(), + block, + }); Ok(pending_tx) } @@ -188,7 +234,7 @@ where #[instrument(skip(inner, escalator, frequency))] pub fn new(inner: M, escalator: E, frequency: Frequency) -> Self where - E: GasEscalator + 'static, + E: GasEscalator + Clone + 'static, M: 'static, { let inner = Arc::new(inner); @@ -216,7 +262,7 @@ pub struct EscalationTask { } #[cfg(not(target_arch = "wasm32"))] -impl EscalationTask { +impl EscalationTask { pub fn new(inner: M, escalator: E, frequency: Frequency, txs: ToEscalate) -> Self { Self { inner, escalator, frequency, txs } } @@ -241,8 +287,6 @@ impl EscalationTask { let mut watcher = watcher.fuse(); while watcher.next().await.is_some() { - let now = Instant::now(); - // We take the contents of the mutex, and then add them back in // later. let mut txs: Vec<_> = { @@ -258,44 +302,38 @@ impl EscalationTask { // Pop all transactions and re-insert those that have not been included yet for _ in 0..len { // this must never panic as we're explicitly within bounds - let (old_tx_hash, mut replacement_tx, old_creation_time, priority) = - txs.pop().expect("should have element in vector"); + let old_monitored_tx = txs.pop().expect("should have element in vector"); let receipt = self .inner - .get_transaction_receipt(old_tx_hash) + .get_transaction_receipt(old_monitored_tx.hash) .await .map_err(GasEscalatorError::MiddlewareError)?; - tracing::trace!(tx_hash = ?old_tx_hash, "checking if exists"); + tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists"); if receipt.is_none() { - let Some(old_gas_price) = replacement_tx.gas_price else { - tracing::error!(tx=?old_tx_hash, "gas price is not set for transaction, dropping from escalator"); + let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone()) + else { + tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator"); continue; }; - // Get the new gas price based on how much time passed since the - // tx was last broadcast - let new_gas_price = self.escalator.get_gas_price( - old_gas_price, - now.duration_since(old_creation_time).as_secs(), - ); - - let (new_txhash, new_creation_time) = if new_gas_price == old_gas_price { - (old_tx_hash, old_creation_time) - } else { - // bump the gas price - replacement_tx.gas_price = Some(new_gas_price); + // gas price wasn't escalated + let (new_txhash, new_creation_time) = if old_monitored_tx.inner.eq(&new_tx) { + (old_monitored_tx.hash, old_monitored_tx.creation_time) + } else { // the tx hash will be different so we need to update it - match self.inner.send_transaction(replacement_tx.clone(), priority).await { + match self + .inner + .send_transaction(new_tx.clone(), old_monitored_tx.block) + .await + { Ok(new_tx_hash) => { let new_tx_hash = *new_tx_hash; tracing::debug!( - old_tx_hash = ?old_tx_hash, - new_tx_hash = ?new_tx_hash, - old_gas_price = ?old_gas_price, - new_gas_price = ?new_gas_price, + old_tx = ?old_monitored_tx, + new_tx = ?new_tx, "escalated gas price" ); (new_tx_hash, Instant::now()) @@ -306,7 +344,7 @@ impl EscalationTask { // gas price tx when one of the previous ones // was already mined (meaning we also do not // push it back to the pending txs vector) - tracing::warn!(err = %err, ?old_tx_hash, ?replacement_tx, "Nonce error when escalating gas price. Tx may have already been mined."); + tracing::warn!(err = %err, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been mined."); continue; } else { tracing::error!( @@ -318,7 +356,12 @@ impl EscalationTask { } } }; - txs.push((new_txhash, replacement_tx, new_creation_time, priority)); + txs.push(MonitoredTransaction { + hash: new_txhash, + inner: new_tx, + creation_time: new_creation_time, + block: old_monitored_tx.block, + }); } } // after this big ugly loop, we dump everything back in diff --git a/ethers-middleware/tests/gas_escalator.rs b/ethers-middleware/tests/gas_escalator.rs index 827af9865..3431bf85a 100644 --- a/ethers-middleware/tests/gas_escalator.rs +++ b/ethers-middleware/tests/gas_escalator.rs @@ -1,7 +1,10 @@ #![cfg(not(target_arch = "wasm32"))] use std::convert::TryFrom; -use ethers_core::{types::*, utils::Anvil}; +use ethers_core::{ + types::{transaction::eip1559, *}, + utils::Anvil, +}; use ethers_middleware::{ gas_escalator::{Frequency, GasEscalatorMiddleware, GeometricGasPrice}, SignerMiddleware, @@ -12,7 +15,7 @@ use instant::Duration; use tokio::time::sleep; #[tokio::test] -async fn gas_escalator_live() { +async fn gas_escalator_legacy_works() { // TODO: show tracing logs in the test let anvil = Anvil::new().port(8545u16).block_time(10u64).spawn(); @@ -57,3 +60,59 @@ async fn gas_escalator_live() { sleep(Duration::from_secs(2)).await; println!("receipt gas price: , hardcoded_gas_price: {}, receipt: {:?}", gas_price, receipt); } + +#[tokio::test] +async fn gas_escalator_1559_works() { + // TODO: show tracing logs in the test + + let anvil = Anvil::new().port(8545u16).block_time(10u64).spawn(); + let chain_id = anvil.chain_id(); + let provider = Provider::::try_from(anvil.endpoint()).unwrap(); + + // wrap with signer + let wallet: LocalWallet = anvil.keys().first().unwrap().clone().into(); + let wallet = wallet.with_chain_id(chain_id); + let address = wallet.address(); + let provider = SignerMiddleware::new(provider, wallet); + + // wrap with escalator + // escalate every 2 seconds. We should only see 4-5 escalations in total + let escalator = GeometricGasPrice::new(1.1, 2u64, Some(2_000_000_000_000u64)); + let provider = GasEscalatorMiddleware::new(provider, escalator, Frequency::Duration(300)); + + // set the gas price to 10 gwei, so we need to escalate twice + // this works but the tx still goes through regardless of its gas price for some reason + // reqwest::Client::new() + // .post(&format!("{}/", anvil.endpoint())) + // .json(&json!({ + // "jsonrpc": "2.0", + // "method": "anvil_setMinGasPrice", + // "params": [10_000_000_000u64], + // "id": 1 + // })) + // .send() + // .await + // .unwrap(); + + let nonce = provider.get_transaction_count(address, None).await.unwrap(); + // 1 gwei default base fee + let max_fee_per_gas = U256::from(1_000_000_000_u64); + let max_priority_fee_per_gas = U256::from(500_000_000_u64); + let tx = eip1559::Eip1559TransactionRequest { + to: Some(Address::zero().into()), + value: Some(1u64.into()), + max_priority_fee_per_gas: Some(max_priority_fee_per_gas), + max_fee_per_gas: Some(max_fee_per_gas), + nonce: Some(nonce), + chain_id: Some(chain_id.into()), + ..Default::default() + }; + + let pending = provider.send_transaction(tx, None).await.expect("could not send"); + let receipt = pending.await; + sleep(Duration::from_secs(2)).await; + println!( + "receipt gas price: , hardcoded_gas_price: {}, receipt: {:?}", + max_fee_per_gas, receipt + ); +}