Skip to content

Commit

Permalink
Registration - submit and registered event (#76)
Browse files Browse the repository at this point in the history
* Registration - submit and registered event

* split the watch and stream fetching

* renamed avs_node_address to preconfer_address in EL file
  • Loading branch information
mskrzypkows authored Aug 23, 2024
1 parent bb7aa64 commit 181a57f
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ chrono = "0.4"
p2p-network = { path = "../p2pNode/p2pNetwork" }
bincode = "1.3"
serde_bytes = "0.11"
clap = "4.5"
futures-util = "0.3"

[dev-dependencies]
mockito = "1.4"
67 changes: 58 additions & 9 deletions Node/src/ethereum_l1/execution_layer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::slot_clock::SlotClock;
use crate::utils::config;
use alloy::{
contract::EventPoller,
network::{Ethereum, EthereumWallet, NetworkWallet},
primitives::{Address, Bytes, FixedBytes, B256, U256},
providers::ProviderBuilder,
Expand All @@ -14,6 +15,7 @@ use alloy::{
use anyhow::Error;
use beacon_api_client::ProposerDuty;
use ecdsa::SigningKey;
use futures_util::StreamExt;
use k256::Secp256k1;
use rand_core::{OsRng, RngCore};
use std::str::FromStr;
Expand All @@ -23,7 +25,7 @@ pub struct ExecutionLayer {
rpc_url: reqwest::Url,
signer: LocalSigner<SigningKey<Secp256k1>>,
wallet: EthereumWallet,
avs_node_address: Address,
preconfer_address: Address,
contract_addresses: ContractAddresses,
slot_clock: Arc<SlotClock>,
preconf_registry_expiry_sec: u64,
Expand Down Expand Up @@ -113,8 +115,8 @@ impl ExecutionLayer {
tracing::debug!("Creating ExecutionLayer with RPC URL: {}", rpc_url);

let signer = PrivateKeySigner::from_str(avs_node_ecdsa_private_key)?;
let avs_node_address: Address = signer.address();
tracing::info!("AVS node address: {}", avs_node_address);
let preconfer_address: Address = signer.address();
tracing::info!("AVS node address: {}", preconfer_address);

let wallet = EthereumWallet::from(signer.clone());

Expand All @@ -125,7 +127,7 @@ impl ExecutionLayer {
rpc_url: rpc_url.parse()?,
signer,
wallet,
avs_node_address,
preconfer_address,
contract_addresses,
slot_clock,
preconf_registry_expiry_sec,
Expand Down Expand Up @@ -203,7 +205,7 @@ impl ExecutionLayer {
Ok(())
}

pub async fn register(&self) -> Result<(), Error> {
pub async fn register_preconfer(&self) -> Result<(), Error> {
let provider = ProviderBuilder::new()
.with_recommended_fillers()
.wallet(self.wallet.clone())
Expand Down Expand Up @@ -240,7 +242,7 @@ impl ExecutionLayer {
U256::from(chrono::Utc::now().timestamp() as u64 + self.preconf_registry_expiry_sec);
let digest_hash = avs_directory
.calculateOperatorAVSRegistrationDigestHash(
self.avs_node_address,
self.preconfer_address,
self.contract_addresses.avs.service_manager,
salt,
expiration_timestamp,
Expand Down Expand Up @@ -310,7 +312,7 @@ impl ExecutionLayer {
let header = PreconfTaskManager::PreconfirmationHeader {
blockId: U256::from(block_id),
chainId: U256::from(chain_id),
txListHash: B256::from(tx_list_hash),
txListHash: B256::from(tx_list_hash),
};
let signature = Bytes::from(signature);
let builder = contract.proveIncorrectPreconfirmation(header, signature);
Expand All @@ -319,6 +321,53 @@ impl ExecutionLayer {
Ok(())
}

pub async fn watch_for_registered_event(
&self,
) -> Result<
EventPoller<
alloy::transports::http::Http<reqwest::Client>,
PreconfRegistry::PreconferRegistered,
>,
Error,
> {
let provider = ProviderBuilder::new()
.with_recommended_fillers()
.wallet(self.wallet.clone())
.on_http(self.rpc_url.clone());

let registry = PreconfRegistry::new(self.contract_addresses.avs.preconf_registry, provider);
let registered_filter = registry.PreconferRegistered_filter().watch().await?;
tracing::debug!("Subscribed to registered event");

Ok(registered_filter)
}

pub async fn wait_for_the_registered_event(
&self,
registered_filter: EventPoller<
alloy::transports::http::Http<reqwest::Client>,
PreconfRegistry::PreconferRegistered,
>,
) -> Result<(), Error> {
let mut stream = registered_filter.into_stream();
while let Some(log) = stream.next().await {
match log {
Ok(log) => {
tracing::info!("Received PreconferRegistered for: {}", log.0.preconfer);
if log.0.preconfer == self.preconfer_address {
tracing::info!("Preconfer registered!");
break;
}
}
Err(e) => {
tracing::error!("Error receiving log: {:?}", e);
}
}
}

Ok(())
}

#[cfg(test)]
pub fn new_from_pk(
rpc_url: reqwest::Url,
Expand All @@ -332,7 +381,7 @@ impl ExecutionLayer {
rpc_url,
signer,
wallet,
avs_node_address: "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" // some random address for test
preconfer_address: "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" // some random address for test
.parse()?,
slot_clock: Arc::new(clock),
contract_addresses: ContractAddresses {
Expand Down Expand Up @@ -426,7 +475,7 @@ mod tests {
let private_key = anvil.keys()[0].clone();
let el = ExecutionLayer::new_from_pk(rpc_url, private_key).unwrap();

let result = el.register().await;
let result = el.register_preconfer().await;
assert!(result.is_ok(), "Register method failed: {:?}", result.err());
}
}
40 changes: 28 additions & 12 deletions Node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,47 @@ mod ethereum_l1;
mod mev_boost;
mod node;
mod p2p_network;
mod registration;
mod taiko;
mod utils;

use anyhow::Error;
use clap::Parser;
use node::block_proposed_receiver::BlockProposedEventReceiver;
use std::sync::Arc;
use tokio::sync::mpsc;

const MESSAGE_QUEUE_SIZE: usize = 100;

#[derive(Parser)]
struct Cli {
#[clap(long, help = "Start registration as a preconfer")]
register: bool,
}

#[tokio::main]
async fn main() -> Result<(), Error> {
init_logging();
let args = Cli::parse();
let config = utils::config::Config::read_env_variables();

let ethereum_l1 = ethereum_l1::EthereumL1::new(
&config.mev_boost_url,
&config.avs_node_ecdsa_private_key,
&config.contract_addresses,
&config.l1_beacon_url,
config.l1_slot_duration_sec,
config.l1_slots_per_epoch,
config.preconf_registry_expiry_sec,
)
.await?;

if args.register {
let registration = registration::Registration::new(ethereum_l1);
registration.register().await?;
return Ok(());
}

let (node_to_p2p_tx, node_to_p2p_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
let (p2p_to_node_tx, p2p_to_node_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
let (node_tx, node_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
Expand All @@ -28,22 +54,12 @@ async fn main() -> Result<(), Error> {
config.block_proposed_receiver_timeout_sec,
config.taiko_chain_id,
));
let ethereum_l1 = Arc::new(
ethereum_l1::EthereumL1::new(
&config.mev_boost_url,
&config.avs_node_ecdsa_private_key,
&config.contract_addresses,
&config.l1_beacon_url,
config.l1_slot_duration_sec,
config.l1_slots_per_epoch,
config.preconf_registry_expiry_sec,
)
.await?,
);

let mev_boost = mev_boost::MevBoost::new(&config.mev_boost_url);
let block_proposed_event_checker =
BlockProposedEventReceiver::new(taiko.clone(), node_tx.clone());
BlockProposedEventReceiver::start(block_proposed_event_checker).await;
let ethereum_l1 = Arc::new(ethereum_l1);
let node = node::Node::new(
node_rx,
node_to_p2p_tx,
Expand Down
32 changes: 32 additions & 0 deletions Node/src/registration/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use crate::ethereum_l1::EthereumL1;
use anyhow::Error;

pub struct Registration {
ethereum_l1: EthereumL1,
}

impl Registration {
pub fn new(ethereum_l1: EthereumL1) -> Self {
Self { ethereum_l1 }
}

pub async fn register(&self) -> Result<(), Error> {
let registered_filter = self
.ethereum_l1
.execution_layer
.watch_for_registered_event()
.await?;

self.ethereum_l1
.execution_layer
.register_preconfer()
.await?;

self.ethereum_l1
.execution_layer
.wait_for_the_registered_event(registered_filter)
.await?;

Ok(())
}
}

0 comments on commit 181a57f

Please sign in to comment.