From 3ae3ea982b136b06fb6196a849078c6aed6c3c39 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Sat, 16 Nov 2024 15:32:27 +0700 Subject: [PATCH] feat: bls transaction batcher --- Cargo.lock | 6 + Cargo.toml | 5 +- bin/odyssey/Cargo.toml | 2 + bin/odyssey/src/main.rs | 24 +++- crates/wallet/Cargo.toml | 6 +- crates/wallet/src/bls_batcher.rs | 231 +++++++++++++++++++++++++++++++ crates/wallet/src/lib.rs | 30 +++- 7 files changed, 296 insertions(+), 8 deletions(-) create mode 100644 crates/wallet/src/bls_batcher.rs diff --git a/Cargo.lock b/Cargo.lock index e8ae4bc..6c5f9ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4507,6 +4507,8 @@ dependencies = [ "reth-optimism-cli", "reth-optimism-node", "reth-provider", + "tokio", + "tokio-stream", "tracing", ] @@ -4552,9 +4554,12 @@ dependencies = [ name = "odyssey-wallet" version = "0.0.0" dependencies = [ + "alloy", "alloy-network", "alloy-primitives", "alloy-rpc-types", + "alloy-rpc-types-beacon", + "futures", "jsonrpsee", "metrics", "metrics-derive", @@ -4566,6 +4571,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", + "tokio-stream", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index f802366..7549912 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,14 +143,17 @@ alloy = { version = "0.5.3", features = [ "providers", "provider-http", "signers", + "consensus", ] } -alloy-network = { version = "0.5.3" } alloy-primitives = { version = "0.8.7" } +alloy-network = { version = "0.5.3" } alloy-rpc-types = { version = "0.5.3" } +alloy-rpc-types-beacon = { version = "0.5.3" } alloy-signer-local = { version = "0.5.3", features = ["mnemonic"] } # tokio tokio = { version = "1.21", default-features = false } +tokio-stream = "0.1" # reth reth-chainspec = { git = "https://github.com/paradigmxyz/reth.git", rev = "e98a050" } diff --git a/bin/odyssey/Cargo.toml b/bin/odyssey/Cargo.toml index c8fb97c..cf72080 100644 --- a/bin/odyssey/Cargo.toml +++ b/bin/odyssey/Cargo.toml @@ -25,6 +25,8 @@ reth-node-builder.workspace = true reth-optimism-node.workspace = true reth-optimism-cli.workspace = true reth-provider.workspace = true +tokio = { workspace = true, features = ["sync"] } +tokio-stream.workspace = true clap = { workspace = true, features = ["derive"] } [features] diff --git a/bin/odyssey/src/main.rs b/bin/odyssey/src/main.rs index 93ce3bb..e0c1023 100644 --- a/bin/odyssey/src/main.rs +++ b/bin/odyssey/src/main.rs @@ -23,18 +23,22 @@ //! - `min-debug-logs`: Disables all logs below `debug` level. //! - `min-trace-logs`: Disables all logs below `trace` level. +use std::time::Duration; + use alloy_network::EthereumWallet; use alloy_primitives::Address; use alloy_signer_local::PrivateKeySigner; use clap::Parser; use eyre::Context; use odyssey_node::{chainspec::OdysseyChainSpecParser, node::OdysseyNode}; -use odyssey_wallet::{OdysseyWallet, OdysseyWalletApiServer}; +use odyssey_wallet::{BlsTransactionBatcher, OdysseyWallet, OdysseyWalletApiServer}; use odyssey_walltime::{OdysseyWallTime, OdysseyWallTimeRpcApiServer}; use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher}; use reth_optimism_cli::Cli; use reth_optimism_node::{args::RollupArgs, node::OptimismAddOns}; use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{info, warn}; #[global_allocator] @@ -70,13 +74,29 @@ fn main() { .collect::>() .wrap_err("No valid EXP0001 delegations specified")?; + let chain_id = ctx.config().chain.chain().id(); + let (bls_batcher_tx, bls_batcher_rx) = mpsc::unbounded_channel(); + + let batch_gas_limit = 3_500_000; + let wallet_ = wallet.clone(); + let eth_api = ctx.registry.eth_api().clone(); + tokio::task::spawn(BlsTransactionBatcher::new( + chain_id, + wallet_, + Duration::from_millis(100), + batch_gas_limit, + eth_api, + UnboundedReceiverStream::from(bls_batcher_rx), + )); + ctx.modules.merge_configured( OdysseyWallet::new( ctx.provider().clone(), wallet, ctx.registry.eth_api().clone(), - ctx.config().chain.chain().id(), + chain_id, valid_delegations, + bls_batcher_tx, ) .into_rpc(), )?; diff --git a/crates/wallet/Cargo.toml b/crates/wallet/Cargo.toml index 488e6b7..edf15b1 100644 --- a/crates/wallet/Cargo.toml +++ b/crates/wallet/Cargo.toml @@ -10,9 +10,11 @@ keywords.workspace = true categories.workspace = true [dependencies] +alloy.workspace = true alloy-network.workspace = true alloy-primitives.workspace = true alloy-rpc-types.workspace = true +alloy-rpc-types-beacon.workspace = true reth-primitives.workspace = true reth-storage-api.workspace = true @@ -23,7 +25,9 @@ jsonrpsee = { workspace = true, features = ["server", "macros"] } serde = { workspace = true, features = ["derive"] } thiserror.workspace = true tracing.workspace = true -tokio = { workspace = true, features = ["sync"] } +tokio = { workspace = true, features = ["sync", "time"] } +tokio-stream.workspace = true +futures.workspace = true metrics.workspace = true metrics-derive.workspace = true diff --git a/crates/wallet/src/bls_batcher.rs b/crates/wallet/src/bls_batcher.rs new file mode 100644 index 0000000..c7303cd --- /dev/null +++ b/crates/wallet/src/bls_batcher.rs @@ -0,0 +1,231 @@ +use crate::OdysseyWalletError; +use alloy::{sol, sol_types::SolCall}; +use alloy_network::{ + eip2718::Encodable2718, Ethereum, EthereumWallet, NetworkWallet, TransactionBuilder, +}; +use alloy_primitives::{bytes::BytesMut, Address, Bytes, ChainId, TxHash, TxKind}; +use alloy_rpc_types::{BlockId, TransactionInput, TransactionRequest}; +use alloy_rpc_types_beacon::BlsPublicKey; +use futures::{stream::FuturesUnordered, StreamExt}; +use jsonrpsee::core::RpcResult; +use reth_rpc_eth_api::helpers::{EthCall, EthTransactions, FullEthApi, LoadFee, LoadState}; +use std::{ + collections::{HashMap, VecDeque}, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; +use tokio::{ + sync::Mutex, + time::{interval, Interval}, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::*; + +const BATCH_DELEGATION_CONTRACT: Address = Address::new([0; 20]); + +sol! { + contract BlsAggregator { + struct CallByUser { + address user; + bytes pubkey; + bytes calls; + } + + function executeAggregated( + bytes calldata signature, + CallByUser[] memory callsByUser + ) external; + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct BlsData { + signature: Bytes, + pubkey: BlsPublicKey, +} + +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct BlsTransactionBatcher { + chain_id: ChainId, + /// Ethereum wallet. Idea is that a wallet used for BLS signature aggregation would be + /// different than the one in the regular flow. + wallet: EthereumWallet, + interval: Interval, + batch_gas_limit: u64, + eth_api: Eth, + transaction_stream: UnboundedReceiverStream<(TransactionRequest, BlsData)>, + batch: VecDeque<(TransactionRequest, BlsData)>, // (sig, validated tx, request) + /// Used to guard tx signing. + permit: Arc>, + /// The pending requests that were sent to the sequencer. + pending_requests: FuturesUnordered> + Send>>>, +} + +impl BlsTransactionBatcher { + /// Create new BLS transaction batcher. + pub fn new( + chain_id: ChainId, + wallet: EthereumWallet, + period: Duration, + batch_gas_limit: u64, + eth_api: Eth, + transaction_stream: UnboundedReceiverStream<(TransactionRequest, BlsData)>, + ) -> Self { + Self { + chain_id, + wallet, + interval: interval(period), + batch_gas_limit, + eth_api, + transaction_stream, + batch: VecDeque::new(), + permit: Arc::>::default(), + pending_requests: FuturesUnordered::new(), + } + } +} + +impl Future for BlsTransactionBatcher +where + Eth: FullEthApi + Clone + Send + Sync + Unpin + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + if let Poll::Ready(Some(item)) = this.transaction_stream.poll_next_unpin(cx) { + this.batch.push_back(item); + continue; + } + + if let Poll::Ready(Some(pending)) = this.pending_requests.poll_next_unpin(cx) { + match pending { + Ok(tx_hash) => { + debug!(target: "rpc::wallet::bls_aggregator", %tx_hash, "Batch transaction successfully submitted to the pool"); + } + Err(err) => { + error!(target: "rpc::wallet::bls_aggregator", ?err, "Error sending batch transaction"); + } + } + continue; + } + + if this.interval.poll_tick(cx).is_ready() { + let mut cumulative_gas = 0; + let mut current_batch = Vec::new(); + while let Some((tx, _sig)) = this.batch.front() { + let tx_gas = tx.gas.unwrap(); + if cumulative_gas + tx_gas > this.batch_gas_limit { + break; + } + cumulative_gas += tx_gas; + current_batch.push(this.batch.pop_front().unwrap()); + } + + if !current_batch.is_empty() { + let chain_id = this.chain_id; + let wallet = this.wallet.clone(); + let eth_api = this.eth_api.clone(); + let permit = this.permit.clone(); + this.pending_requests.push(Box::pin(async move { + let permit = permit.lock().await; + construct_and_send_batch_transaction( + chain_id, + wallet, + eth_api, + current_batch, + cumulative_gas, + ) + .await + })); + } + + continue; + } + + return Poll::Pending; + } + } +} + +async fn construct_and_send_batch_transaction( + chain_id: ChainId, + wallet: EthereumWallet, + eth_api: Eth, + batch: Vec<(TransactionRequest, BlsData)>, + cumulative_gas: u64, +) -> RpcResult { + let signature = Bytes::new(); + let mut aggregated = HashMap::>>::default(); + for (tx, bls_data) in batch { + // TODO: aggregate signature + let to_address = *tx.to.unwrap().to().unwrap(); + let input = tx.input.input().take().unwrap().clone(); + aggregated.entry(to_address).or_default().entry(bls_data.pubkey).or_default().push(input); + } + + let mut calls_by_user = Vec::new(); + for (user, by_key) in aggregated { + for (pubkey, inputs) in by_key { + let mut calls = BytesMut::default(); + for input in inputs { + calls.extend(input); + } + calls_by_user.push(BlsAggregator::CallByUser { + user, + pubkey: Bytes::copy_from_slice(pubkey.as_slice()), + calls: calls.freeze().into(), + }); + } + } + let input = Bytes::from( + BlsAggregator::executeAggregatedCall { signature, callsByUser: calls_by_user }.abi_encode(), + ); + + let next_nonce = LoadState::next_available_nonce( + ð_api, + NetworkWallet::::default_signer_address(&wallet), + ) + .await + .map_err(Into::into)?; + let mut request = TransactionRequest { + chain_id: Some(chain_id), + nonce: Some(next_nonce), + to: Some(TxKind::Call(BATCH_DELEGATION_CONTRACT)), + input: TransactionInput::from(input), + gas: Some(cumulative_gas), + ..Default::default() + }; + + let (estimate, base_fee) = tokio::join!( + EthCall::estimate_gas_at(ð_api, request.clone(), BlockId::latest(), None), + LoadFee::eip1559_fees(ð_api, None, None) + ); + let estimate = estimate.map_err(Into::into)?; + let (base_fee, _) = base_fee.map_err(Into::into)?; + + // Finish the request + let max_priority_fee_per_gas = 1_000_000_000; // 1 gwei + request.max_fee_per_gas = Some(base_fee.to::() + max_priority_fee_per_gas); + request.max_priority_fee_per_gas = Some(max_priority_fee_per_gas); + request.gas = Some(estimate.to()); + + // build and sign + let envelope = >::build::( + request, &wallet, + ) + .await + .map_err(|_| OdysseyWalletError::InvalidTransactionRequest)?; + + EthTransactions::send_raw_transaction(ð_api, envelope.encoded_2718().into()) + .await + .inspect_err( + |err| warn!(target: "rpc::wallet::bls_aggregator", ?err, "Error adding batch tx to pool"), + ) + .map_err(Into::into) +} diff --git a/crates/wallet/src/lib.rs b/crates/wallet/src/lib.rs index f771537..bcd3297 100644 --- a/crates/wallet/src/lib.rs +++ b/crates/wallet/src/lib.rs @@ -22,7 +22,7 @@ use alloy_network::{ eip2718::Encodable2718, Ethereum, EthereumWallet, NetworkWallet, TransactionBuilder, }; -use alloy_primitives::{map::HashMap, Address, ChainId, TxHash, TxKind, U256, U64}; +use alloy_primitives::{map::HashMap, Address, Bytes, ChainId, TxHash, TxKind, U256, U64}; use alloy_rpc_types::TransactionRequest; use jsonrpsee::{ core::{async_trait, RpcResult}, @@ -38,7 +38,10 @@ use std::sync::Arc; use tracing::{trace, warn}; use reth_optimism_rpc as _; -use tokio::sync::Mutex; +use tokio::sync::{mpsc::UnboundedSender, Mutex}; + +mod bls_batcher; +pub use bls_batcher::*; /// The capability to perform [EIP-7702][eip-7702] delegations, sponsored by the sequencer. /// @@ -100,7 +103,11 @@ pub trait OdysseyWalletApi { /// [eip-7702]: https://eips.ethereum.org/EIPS/eip-7702 /// [eip-1559]: https://eips.ethereum.org/EIPS/eip-1559 #[method(name = "sendTransaction", aliases = ["odyssey_sendTransaction"])] - async fn send_transaction(&self, request: TransactionRequest) -> RpcResult; + async fn send_transaction( + &self, + request: TransactionRequest, + bls_data: Option, + ) -> RpcResult; } /// Errors returned by the wallet API. @@ -172,6 +179,7 @@ impl OdysseyWallet { eth_api: Eth, chain_id: ChainId, valid_designations: Vec
, + bls_batcher_tx: UnboundedSender<(TransactionRequest, BlsData)>, ) -> Self { let inner = OdysseyWalletInner { provider, @@ -182,6 +190,7 @@ impl OdysseyWallet { U64::from(chain_id), Capabilities { delegation: DelegationCapability { addresses: valid_designations } }, )])), + bls_batcher_tx, permit: Default::default(), metrics: WalletMetrics::default(), }; @@ -204,7 +213,11 @@ where Ok(self.inner.capabilities.clone()) } - async fn send_transaction(&self, mut request: TransactionRequest) -> RpcResult { + async fn send_transaction( + &self, + mut request: TransactionRequest, + bls_data: Option, + ) -> RpcResult { trace!(target: "rpc::wallet", ?request, "Serving odyssey_sendTransaction"); // validate fields common to eip-7702 and eip-1559 @@ -284,6 +297,13 @@ where } request.gas = Some(estimate.to()); + // We send transaction to BLS aggregator only after performing the estimate check + // to ensure that aggregator has enough data to compose the batch. + if let Some(bls_data) = bls_data { + let _ = self.inner.bls_batcher_tx.send((request, bls_data)); + return Ok(TxHash::default()); + } + // set gas price let (base_fee, _) = base_fee.map_err(|_| { self.inner.metrics.invalid_send_transaction_calls.increment(1); @@ -328,6 +348,8 @@ struct OdysseyWalletInner { wallet: EthereumWallet, chain_id: ChainId, capabilities: WalletCapabilities, + /// Used for sending BLS transactions to the aggregator. + bls_batcher_tx: UnboundedSender<(TransactionRequest, BlsData)>, /// Used to guard tx signing permit: Mutex<()>, /// Metrics for the `wallet_` RPC namespace.