diff --git a/Node/Cargo.lock b/Node/Cargo.lock index 4b7d0ed..221e8e8 100644 --- a/Node/Cargo.lock +++ b/Node/Cargo.lock @@ -6847,9 +6847,11 @@ dependencies = [ "beacon-api-client", "bincode", "chrono", + "clap", "dotenv", "ecdsa 0.16.9", "elliptic-curve 0.13.8", + "futures-util", "hex", "jsonrpsee", "k256 0.13.3", diff --git a/Node/Cargo.toml b/Node/Cargo.toml index 69b4548..65c0d66 100644 --- a/Node/Cargo.toml +++ b/Node/Cargo.toml @@ -29,6 +29,8 @@ chrono = "0.4" p2p-network = { path = "../p2pNode/p2pNetwork" } bincode = "1.3" serde_bytes = "0.11" +clap = "4.5" +futures-util = "0.3" [dev-dependencies] mockito = "1.4" diff --git a/Node/src/ethereum_l1/execution_layer.rs b/Node/src/ethereum_l1/execution_layer.rs index 9e4995a..669bb0d 100644 --- a/Node/src/ethereum_l1/execution_layer.rs +++ b/Node/src/ethereum_l1/execution_layer.rs @@ -1,6 +1,7 @@ use super::slot_clock::SlotClock; use crate::utils::config; use alloy::{ + contract::EventPoller, network::{Ethereum, EthereumWallet, NetworkWallet}, primitives::{Address, Bytes, FixedBytes, B256, U256}, providers::ProviderBuilder, @@ -14,6 +15,7 @@ use alloy::{ use anyhow::Error; use beacon_api_client::ProposerDuty; use ecdsa::SigningKey; +use futures_util::StreamExt; use k256::Secp256k1; use rand_core::{OsRng, RngCore}; use std::str::FromStr; @@ -23,7 +25,7 @@ pub struct ExecutionLayer { rpc_url: reqwest::Url, signer: LocalSigner>, wallet: EthereumWallet, - avs_node_address: Address, + preconfer_address: Address, contract_addresses: ContractAddresses, slot_clock: Arc, preconf_registry_expiry_sec: u64, @@ -113,8 +115,8 @@ impl ExecutionLayer { tracing::debug!("Creating ExecutionLayer with RPC URL: {}", rpc_url); let signer = PrivateKeySigner::from_str(avs_node_ecdsa_private_key)?; - let avs_node_address: Address = signer.address(); - tracing::info!("AVS node address: {}", avs_node_address); + let preconfer_address: Address = signer.address(); + tracing::info!("AVS node address: {}", preconfer_address); let wallet = EthereumWallet::from(signer.clone()); @@ -125,7 +127,7 @@ impl ExecutionLayer { rpc_url: rpc_url.parse()?, signer, wallet, - avs_node_address, + preconfer_address, contract_addresses, slot_clock, preconf_registry_expiry_sec, @@ -203,7 +205,7 @@ impl ExecutionLayer { Ok(()) } - pub async fn register(&self) -> Result<(), Error> { + pub async fn register_preconfer(&self) -> Result<(), Error> { let provider = ProviderBuilder::new() .with_recommended_fillers() .wallet(self.wallet.clone()) @@ -240,7 +242,7 @@ impl ExecutionLayer { U256::from(chrono::Utc::now().timestamp() as u64 + self.preconf_registry_expiry_sec); let digest_hash = avs_directory .calculateOperatorAVSRegistrationDigestHash( - self.avs_node_address, + self.preconfer_address, self.contract_addresses.avs.service_manager, salt, expiration_timestamp, @@ -310,7 +312,7 @@ impl ExecutionLayer { let header = PreconfTaskManager::PreconfirmationHeader { blockId: U256::from(block_id), chainId: U256::from(chain_id), - txListHash: B256::from(tx_list_hash), + txListHash: B256::from(tx_list_hash), }; let signature = Bytes::from(signature); let builder = contract.proveIncorrectPreconfirmation(header, signature); @@ -319,6 +321,53 @@ impl ExecutionLayer { Ok(()) } + pub async fn watch_for_registered_event( + &self, + ) -> Result< + EventPoller< + alloy::transports::http::Http, + PreconfRegistry::PreconferRegistered, + >, + Error, + > { + let provider = ProviderBuilder::new() + .with_recommended_fillers() + .wallet(self.wallet.clone()) + .on_http(self.rpc_url.clone()); + + let registry = PreconfRegistry::new(self.contract_addresses.avs.preconf_registry, provider); + let registered_filter = registry.PreconferRegistered_filter().watch().await?; + tracing::debug!("Subscribed to registered event"); + + Ok(registered_filter) + } + + pub async fn wait_for_the_registered_event( + &self, + registered_filter: EventPoller< + alloy::transports::http::Http, + PreconfRegistry::PreconferRegistered, + >, + ) -> Result<(), Error> { + let mut stream = registered_filter.into_stream(); + while let Some(log) = stream.next().await { + match log { + Ok(log) => { + tracing::info!("Received PreconferRegistered for: {}", log.0.preconfer); + if log.0.preconfer == self.preconfer_address { + tracing::info!("Preconfer registered!"); + break; + } + } + Err(e) => { + tracing::error!("Error receiving log: {:?}", e); + } + } + } + + Ok(()) + } + #[cfg(test)] pub fn new_from_pk( rpc_url: reqwest::Url, @@ -332,7 +381,7 @@ impl ExecutionLayer { rpc_url, signer, wallet, - avs_node_address: "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" // some random address for test + preconfer_address: "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" // some random address for test .parse()?, slot_clock: Arc::new(clock), contract_addresses: ContractAddresses { @@ -426,7 +475,7 @@ mod tests { let private_key = anvil.keys()[0].clone(); let el = ExecutionLayer::new_from_pk(rpc_url, private_key).unwrap(); - let result = el.register().await; + let result = el.register_preconfer().await; assert!(result.is_ok(), "Register method failed: {:?}", result.err()); } } diff --git a/Node/src/main.rs b/Node/src/main.rs index 15f64ae..cbc1bde 100644 --- a/Node/src/main.rs +++ b/Node/src/main.rs @@ -2,21 +2,47 @@ mod ethereum_l1; mod mev_boost; mod node; mod p2p_network; +mod registration; mod taiko; mod utils; use anyhow::Error; +use clap::Parser; use node::block_proposed_receiver::BlockProposedEventReceiver; use std::sync::Arc; use tokio::sync::mpsc; const MESSAGE_QUEUE_SIZE: usize = 100; +#[derive(Parser)] +struct Cli { + #[clap(long, help = "Start registration as a preconfer")] + register: bool, +} + #[tokio::main] async fn main() -> Result<(), Error> { init_logging(); + let args = Cli::parse(); let config = utils::config::Config::read_env_variables(); + let ethereum_l1 = ethereum_l1::EthereumL1::new( + &config.mev_boost_url, + &config.avs_node_ecdsa_private_key, + &config.contract_addresses, + &config.l1_beacon_url, + config.l1_slot_duration_sec, + config.l1_slots_per_epoch, + config.preconf_registry_expiry_sec, + ) + .await?; + + if args.register { + let registration = registration::Registration::new(ethereum_l1); + registration.register().await?; + return Ok(()); + } + let (node_to_p2p_tx, node_to_p2p_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE); let (p2p_to_node_tx, p2p_to_node_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE); let (node_tx, node_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE); @@ -28,22 +54,12 @@ async fn main() -> Result<(), Error> { config.block_proposed_receiver_timeout_sec, config.taiko_chain_id, )); - let ethereum_l1 = Arc::new( - ethereum_l1::EthereumL1::new( - &config.mev_boost_url, - &config.avs_node_ecdsa_private_key, - &config.contract_addresses, - &config.l1_beacon_url, - config.l1_slot_duration_sec, - config.l1_slots_per_epoch, - config.preconf_registry_expiry_sec, - ) - .await?, - ); + let mev_boost = mev_boost::MevBoost::new(&config.mev_boost_url); let block_proposed_event_checker = BlockProposedEventReceiver::new(taiko.clone(), node_tx.clone()); BlockProposedEventReceiver::start(block_proposed_event_checker).await; + let ethereum_l1 = Arc::new(ethereum_l1); let node = node::Node::new( node_rx, node_to_p2p_tx, diff --git a/Node/src/registration/mod.rs b/Node/src/registration/mod.rs new file mode 100644 index 0000000..87df1c2 --- /dev/null +++ b/Node/src/registration/mod.rs @@ -0,0 +1,32 @@ +use crate::ethereum_l1::EthereumL1; +use anyhow::Error; + +pub struct Registration { + ethereum_l1: EthereumL1, +} + +impl Registration { + pub fn new(ethereum_l1: EthereumL1) -> Self { + Self { ethereum_l1 } + } + + pub async fn register(&self) -> Result<(), Error> { + let registered_filter = self + .ethereum_l1 + .execution_layer + .watch_for_registered_event() + .await?; + + self.ethereum_l1 + .execution_layer + .register_preconfer() + .await?; + + self.ethereum_l1 + .execution_layer + .wait_for_the_registered_event(registered_filter) + .await?; + + Ok(()) + } +}