Skip to content

Commit

Permalink
feat: eip1559 escalator
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Nov 19, 2024
1 parent 7960ede commit b74815e
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 44 deletions.
127 changes: 85 additions & 42 deletions ethers-middleware/src/gas_escalator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ use thiserror::Error;
use tracing::{self, instrument};

Check warning on line 13 in ethers-middleware/src/gas_escalator/mod.rs

View workflow job for this annotation

GitHub Actions / WASM

unused import: `instrument`
use tracing_futures::Instrument;

use ethers_core::types::{
transaction::eip2718::TypedTransaction, BlockId, TransactionRequest, TxHash, U256,
};
use ethers_core::types::{transaction::eip2718::TypedTransaction, BlockId, TxHash, U256};
use ethers_providers::{interval, FromErr, Middleware, PendingTransaction, StreamExt};

Check warning on line 17 in ethers-middleware/src/gas_escalator/mod.rs

View workflow job for this annotation

GitHub Actions / WASM

unused import: `interval`

#[cfg(not(target_arch = "wasm32"))]
use tokio::spawn;

type ToEscalate = Arc<Mutex<Vec<(TxHash, TransactionRequest, Instant, Option<BlockId>)>>>;
pub type ToEscalate = Arc<Mutex<Vec<MonitoredTransaction>>>;

#[cfg(target_arch = "wasm32")]
type WatcherFuture<'a> = Pin<Box<dyn futures_util::stream::Stream<Item = ()> + 'a>>;
Expand Down Expand Up @@ -61,6 +59,57 @@ pub(crate) struct GasEscalatorMiddlewareInternal<M> {
pub txs: ToEscalate,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MonitoredTransaction {
hash: TxHash,
inner: TypedTransaction,
creation_time: Instant,
block: Option<BlockId>,
}

impl MonitoredTransaction {
fn escalate_gas_price<E: GasEscalator>(&self, escalator: E) -> Option<TypedTransaction> {
// Get the new gas price based on how much time passed since the
// tx was last broadcast
let time_elapsed = self.creation_time.elapsed().as_secs();
match self.inner.clone() {
TypedTransaction::Legacy(tx) => {
let Some(gas_price) = tx.gas_price else {
return None;
};
let new_gas_price = escalator.get_gas_price(gas_price, time_elapsed);
let mut updated_tx = tx.clone();
updated_tx.gas_price = Some(new_gas_price);
Some(updated_tx.into())
}
TypedTransaction::Eip2930(tx) => {
let Some(gas_price) = tx.tx.gas_price else {
return None;
};
let new_gas_price = escalator.get_gas_price(gas_price, time_elapsed);
let mut updated_tx = tx.clone();
updated_tx.tx.gas_price = Some(new_gas_price);
Some(updated_tx.into())
}
TypedTransaction::Eip1559(tx) => {
let Some(max_fee_per_gas) = tx.max_fee_per_gas else {
return None;
};
let Some(max_priority_fee_per_gas) = tx.max_priority_fee_per_gas else {
return None;
};
let new_max_fee_per_gas = escalator.get_gas_price(max_fee_per_gas, time_elapsed);
let new_max_priority_fee_per_gas =
escalator.get_gas_price(max_priority_fee_per_gas, time_elapsed);
let mut updated_tx = tx.clone();
updated_tx.max_fee_per_gas = Some(new_max_fee_per_gas);
updated_tx.max_priority_fee_per_gas = Some(new_max_priority_fee_per_gas);
Some(updated_tx.into())
}
}
}
}

/// A Gas escalator allows bumping transactions' gas price to avoid getting them
/// stuck in the memory pool.
///
Expand Down Expand Up @@ -161,17 +210,14 @@ where
.await
.map_err(GasEscalatorError::MiddlewareError)?;

let tx = match tx {
TypedTransaction::Legacy(inner) => inner,
TypedTransaction::Eip2930(inner) => inner.tx,
// don't run the escalator for this tx if it's not supported
_ => return Ok(pending_tx),
// _ => return Err(GasEscalatorError::UnsupportedTxType),
};

// insert the tx in the pending txs
let mut lock = self.txs.lock().await;
lock.push((*pending_tx, tx, Instant::now(), block));
lock.push(MonitoredTransaction {
hash: *pending_tx,
inner: tx,
creation_time: Instant::now(),
block,
});

Ok(pending_tx)
}
Expand All @@ -188,7 +234,7 @@ where
#[instrument(skip(inner, escalator, frequency))]
pub fn new<E>(inner: M, escalator: E, frequency: Frequency) -> Self
where
E: GasEscalator + 'static,
E: GasEscalator + Clone + 'static,
M: 'static,
{
let inner = Arc::new(inner);
Expand Down Expand Up @@ -216,7 +262,7 @@ pub struct EscalationTask<M, E> {
}

#[cfg(not(target_arch = "wasm32"))]
impl<M, E> EscalationTask<M, E> {
impl<M, E: Clone> EscalationTask<M, E> {
pub fn new(inner: M, escalator: E, frequency: Frequency, txs: ToEscalate) -> Self {
Self { inner, escalator, frequency, txs }
}
Expand All @@ -241,8 +287,6 @@ impl<M, E> EscalationTask<M, E> {
let mut watcher = watcher.fuse();

while watcher.next().await.is_some() {
let now = Instant::now();

// We take the contents of the mutex, and then add them back in
// later.
let mut txs: Vec<_> = {
Expand All @@ -258,44 +302,38 @@ impl<M, E> EscalationTask<M, E> {
// Pop all transactions and re-insert those that have not been included yet
for _ in 0..len {
// this must never panic as we're explicitly within bounds
let (old_tx_hash, mut replacement_tx, old_creation_time, priority) =
txs.pop().expect("should have element in vector");
let old_monitored_tx = txs.pop().expect("should have element in vector");

let receipt = self
.inner
.get_transaction_receipt(old_tx_hash)
.get_transaction_receipt(old_monitored_tx.hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?old_tx_hash, "checking if exists");
tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists");

if receipt.is_none() {
let Some(old_gas_price) = replacement_tx.gas_price else {
tracing::error!(tx=?old_tx_hash, "gas price is not set for transaction, dropping from escalator");
let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone())
else {
tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator");
continue;
};
// Get the new gas price based on how much time passed since the
// tx was last broadcast
let new_gas_price = self.escalator.get_gas_price(
old_gas_price,
now.duration_since(old_creation_time).as_secs(),
);

let (new_txhash, new_creation_time) = if new_gas_price == old_gas_price {
(old_tx_hash, old_creation_time)
} else {
// bump the gas price
replacement_tx.gas_price = Some(new_gas_price);

// gas price wasn't escalated
let (new_txhash, new_creation_time) = if old_monitored_tx.inner.eq(&new_tx) {
(old_monitored_tx.hash, old_monitored_tx.creation_time)
} else {
// the tx hash will be different so we need to update it
match self.inner.send_transaction(replacement_tx.clone(), priority).await {
match self
.inner
.send_transaction(new_tx.clone(), old_monitored_tx.block)
.await
{
Ok(new_tx_hash) => {
let new_tx_hash = *new_tx_hash;
tracing::debug!(
old_tx_hash = ?old_tx_hash,
new_tx_hash = ?new_tx_hash,
old_gas_price = ?old_gas_price,
new_gas_price = ?new_gas_price,
old_tx = ?old_monitored_tx,
new_tx = ?new_tx,
"escalated gas price"
);
(new_tx_hash, Instant::now())
Expand All @@ -306,7 +344,7 @@ impl<M, E> EscalationTask<M, E> {
// gas price tx when one of the previous ones
// was already mined (meaning we also do not
// push it back to the pending txs vector)
tracing::warn!(err = %err, ?old_tx_hash, ?replacement_tx, "Nonce error when escalating gas price. Tx may have already been mined.");
tracing::warn!(err = %err, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been mined.");
continue;
} else {
tracing::error!(
Expand All @@ -318,7 +356,12 @@ impl<M, E> EscalationTask<M, E> {
}
}
};
txs.push((new_txhash, replacement_tx, new_creation_time, priority));
txs.push(MonitoredTransaction {
hash: new_txhash,
inner: new_tx,
creation_time: new_creation_time,
block: old_monitored_tx.block,
});
}
}
// after this big ugly loop, we dump everything back in
Expand Down
63 changes: 61 additions & 2 deletions ethers-middleware/tests/gas_escalator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#![cfg(not(target_arch = "wasm32"))]
use std::convert::TryFrom;

use ethers_core::{types::*, utils::Anvil};
use ethers_core::{
types::{transaction::eip1559, *},
utils::Anvil,
};
use ethers_middleware::{
gas_escalator::{Frequency, GasEscalatorMiddleware, GeometricGasPrice},
SignerMiddleware,
Expand All @@ -12,7 +15,7 @@ use instant::Duration;
use tokio::time::sleep;

#[tokio::test]
async fn gas_escalator_live() {
async fn gas_escalator_legacy_works() {
// TODO: show tracing logs in the test

let anvil = Anvil::new().port(8545u16).block_time(10u64).spawn();
Expand Down Expand Up @@ -57,3 +60,59 @@ async fn gas_escalator_live() {
sleep(Duration::from_secs(2)).await;
println!("receipt gas price: , hardcoded_gas_price: {}, receipt: {:?}", gas_price, receipt);
}

#[tokio::test]
async fn gas_escalator_1559_works() {
// TODO: show tracing logs in the test

let anvil = Anvil::new().port(8545u16).block_time(10u64).spawn();
let chain_id = anvil.chain_id();
let provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();

// wrap with signer
let wallet: LocalWallet = anvil.keys().first().unwrap().clone().into();
let wallet = wallet.with_chain_id(chain_id);
let address = wallet.address();
let provider = SignerMiddleware::new(provider, wallet);

// wrap with escalator
// escalate every 2 seconds. We should only see 4-5 escalations in total
let escalator = GeometricGasPrice::new(1.1, 2u64, Some(2_000_000_000_000u64));
let provider = GasEscalatorMiddleware::new(provider, escalator, Frequency::Duration(300));

// set the gas price to 10 gwei, so we need to escalate twice
// this works but the tx still goes through regardless of its gas price for some reason
// reqwest::Client::new()
// .post(&format!("{}/", anvil.endpoint()))
// .json(&json!({
// "jsonrpc": "2.0",
// "method": "anvil_setMinGasPrice",
// "params": [10_000_000_000u64],
// "id": 1
// }))
// .send()
// .await
// .unwrap();

let nonce = provider.get_transaction_count(address, None).await.unwrap();
// 1 gwei default base fee
let max_fee_per_gas = U256::from(1_000_000_000_u64);
let max_priority_fee_per_gas = U256::from(500_000_000_u64);
let tx = eip1559::Eip1559TransactionRequest {
to: Some(Address::zero().into()),
value: Some(1u64.into()),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
nonce: Some(nonce),
chain_id: Some(chain_id.into()),
..Default::default()
};

let pending = provider.send_transaction(tx, None).await.expect("could not send");
let receipt = pending.await;
sleep(Duration::from_secs(2)).await;
println!(
"receipt gas price: , hardcoded_gas_price: {}, receipt: {:?}",
max_fee_per_gas, receipt
);
}

0 comments on commit b74815e

Please sign in to comment.