Skip to content

Commit

Permalink
Receiving of the block proposed event from the driver (#61)
Browse files Browse the repository at this point in the history
* block proposed JSON parsing

* block proposed event receiver

* timeout for the error while waiting for the block proposed event

* test fixed
  • Loading branch information
mskrzypkows authored Aug 12, 2024
1 parent 6e3800f commit 9239df4
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 18 deletions.
14 changes: 12 additions & 2 deletions Node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ mod taiko;
mod utils;

use anyhow::Error;
use node::block_proposed_receiver::BlockProposedEventReceiver;
use std::sync::Arc;
use tokio::sync::mpsc;
use utils::node_message::NodeMessage;

const MESSAGE_QUEUE_SIZE: usize = 100;

Expand All @@ -16,10 +19,14 @@ async fn main() -> Result<(), Error> {
let config = utils::config::Config::read_env_variables();

let (avs_p2p_tx, avs_p2p_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
let (node_tx, node_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
let (node_tx, node_rx) = mpsc::channel::<NodeMessage>(MESSAGE_QUEUE_SIZE);
let p2p = p2p_network::AVSp2p::new(node_tx.clone(), avs_p2p_rx);
p2p.start();
let taiko = taiko::Taiko::new(&config.taiko_proposer_url, &config.taiko_driver_url);
let taiko = Arc::new(taiko::Taiko::new(
&config.taiko_proposer_url,
&config.taiko_driver_url,
config.block_proposed_receiver_timeout_sec,
));
let ethereum_l1 = ethereum_l1::EthereumL1::new(
&config.mev_boost_url,
&config.ethereum_private_key,
Expand All @@ -30,6 +37,9 @@ async fn main() -> Result<(), Error> {
)
.await?;
let mev_boost = mev_boost::MevBoost::new(&config.mev_boost_url);
let block_proposed_event_checker =
BlockProposedEventReceiver::new(taiko.clone(), node_tx.clone());
BlockProposedEventReceiver::start(block_proposed_event_checker).await;
let node = node::Node::new(
node_rx,
avs_p2p_tx,
Expand Down
46 changes: 46 additions & 0 deletions Node/src/node/block_proposed_receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::{taiko::Taiko, utils::node_message::NodeMessage};
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tracing::{error, info};

pub struct BlockProposedEventReceiver {
taiko: Arc<Taiko>,
node_tx: Sender<NodeMessage>,
}

impl BlockProposedEventReceiver {
pub fn new(taiko: Arc<Taiko>, node_tx: Sender<NodeMessage>) -> Self {
Self { taiko, node_tx }
}

pub async fn start(receiver: Self) {
tokio::spawn(async move {
receiver.check_for_events().await;
});
}

pub async fn check_for_events(&self) {
loop {
let block_proposed_event = self.taiko.wait_for_block_proposed_event().await;
match block_proposed_event {
Ok(block_proposed) => {
info!(
"Received block proposed event for block: {}",
block_proposed.block_id
);
if let Err(e) = self
.node_tx
.send(NodeMessage::BlockProposed(block_proposed))
.await
{
error!("Error sending block proposed event by channel: {:?}", e);
}
}
Err(e) => {
error!("Error receiving block proposed event: {:?}", e);
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}
}
}
}
28 changes: 20 additions & 8 deletions Node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ use crate::{
},
mev_boost::MevBoost,
taiko::Taiko,
utils::node_message::NodeMessage,
};
use anyhow::{anyhow as any_err, Error};
use beacon_api_client::ProposerDuty;
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};

pub mod block_proposed_receiver;

pub struct Node {
taiko: Taiko,
node_rx: Option<Receiver<String>>,
taiko: Arc<Taiko>,
node_rx: Option<Receiver<NodeMessage>>,
avs_p2p_tx: Sender<String>,
gas_used: u64,
ethereum_l1: EthereumL1,
Expand All @@ -27,9 +31,9 @@ pub struct Node {

impl Node {
pub async fn new(
node_rx: Receiver<String>,
node_rx: Receiver<NodeMessage>,
avs_p2p_tx: Sender<String>,
taiko: Taiko,
taiko: Arc<Taiko>,
ethereum_l1: EthereumL1,
mev_boost: MevBoost,
l2_slot_duration_sec: u64,
Expand Down Expand Up @@ -70,18 +74,26 @@ impl Node {
}
}

async fn handle_incoming_messages(mut node_rx: Receiver<String>) {
async fn handle_incoming_messages(mut node_rx: Receiver<NodeMessage>) {
loop {
tokio::select! {
Some(message) = node_rx.recv() => {
tracing::debug!("Node received message: {}", message);
match message {
NodeMessage::BlockProposed(block_proposed) => {
tracing::debug!("Node received block proposed event: {:?}", block_proposed);
}
NodeMessage::P2P(message) => {
tracing::debug!("Node received P2P message: {:?}", message);
}
}
},
}
}
}

async fn preconfirmation_loop(&mut self) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(self.l2_slot_duration_sec));
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(self.l2_slot_duration_sec));

loop {
interval.tick().await;
Expand Down Expand Up @@ -169,4 +181,4 @@ impl Node {
.await
.map_err(|e| any_err!("Failed to send message to avs_p2p_tx: {}", e))
}
}
}
7 changes: 4 additions & 3 deletions Node/src/p2p_network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::utils::node_message::NodeMessage;
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::info;

pub struct AVSp2p {
node_tx: Sender<String>,
node_tx: Sender<NodeMessage>,
avs_p2p_rx: Receiver<String>,
}

impl AVSp2p {
pub fn new(node_tx: Sender<String>, avs_p2p_rx: Receiver<String>) -> Self {
pub fn new(node_tx: Sender<NodeMessage>, avs_p2p_rx: Receiver<String>) -> Self {
AVSp2p {
node_tx,
avs_p2p_rx,
Expand All @@ -23,7 +24,7 @@ impl AVSp2p {
tokio::spawn(async move {
loop {
node_tx
.send("Hello from avs p2p!".to_string())
.send(NodeMessage::P2P("Hello from avs p2p!".to_string()))
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
Expand Down
22 changes: 20 additions & 2 deletions Node/src/taiko/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use crate::utils::rpc_client::RpcClient;
use crate::utils::{block_proposed, rpc_client::RpcClient};
use anyhow::Error;
use serde_json::Value;
use std::time::Duration;

pub mod l2_tx_lists;

pub struct Taiko {
rpc_proposer: RpcClient,
rpc_driver: RpcClient,
rpc_driver_long_timeout: RpcClient,
}

impl Taiko {
pub fn new(proposer_url: &str, driver_url: &str) -> Self {
pub fn new(proposer_url: &str, driver_url: &str, long_timeout_sec: u64) -> Self {
Self {
rpc_proposer: RpcClient::new(proposer_url),
rpc_driver: RpcClient::new(driver_url),
rpc_driver_long_timeout: RpcClient::new_with_timeout(
driver_url,
Duration::from_secs(long_timeout_sec),
),
}
}

Expand Down Expand Up @@ -62,6 +68,17 @@ impl Taiko {
.call_method("RPC.AdvanceL2ChainHeadWithNewBlocks", vec![payload])
.await
}

pub async fn wait_for_block_proposed_event(
&self,
) -> Result<block_proposed::BlockProposed, Error> {
tracing::debug!("Waiting for block proposed event");
let result = self
.rpc_driver_long_timeout
.call_method("RPC.WaitForBlockProposed", vec![])
.await?;
block_proposed::decompose_block_proposed_json(result)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -138,6 +155,7 @@ mod test {
let taiko = Taiko::new(
&format!("http://127.0.0.1:{}", port),
&format!("http://127.0.0.1:{}", port),
120,
);
(rpc_server, taiko)
}
Expand Down
63 changes: 63 additions & 0 deletions Node/src/utils/block_proposed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use anyhow::Error;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct BlockProposed {
#[serde(rename = "BlockID")]
pub block_id: u64,
pub tx_list_hash: [u8; 32],
#[serde(deserialize_with = "deserialize_proposer")]
pub proposer: [u8; 20],
}

fn deserialize_proposer<'de, D>(deserializer: D) -> Result<[u8; 20], D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
let s = s.trim_start_matches("0x");
let bytes = hex::decode(s).map_err(serde::de::Error::custom)?;
if bytes.len() != 20 {
return Err(serde::de::Error::custom(
"Invalid length for proposer address",
));
}
let mut array = [0u8; 20];
array.copy_from_slice(&bytes);
Ok(array)
}

pub fn decompose_block_proposed_json(json_data: Value) -> Result<BlockProposed, Error> {
let block_proposed: BlockProposed = serde_json::from_value(json_data)?;
Ok(block_proposed)
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

#[test]
fn test_decompose_block_proposed_json() {
let json_data = json!({
"BlockID":4321,"TxListHash":[12,34,56,78,90,12,34,56,78,90,12,34,56,78,90,12,34,56,78,90,12,34,56,78,90,12,34,56,78,90,12,34],"Proposer":"0x0000000000000000000000000000000000000008"
});

let result = decompose_block_proposed_json(json_data).unwrap();

assert_eq!(result.block_id, 4321);
assert_eq!(
result.tx_list_hash,
[
12, 34, 56, 78, 90, 12, 34, 56, 78, 90, 12, 34, 56, 78, 90, 12, 34, 56, 78, 90, 12,
34, 56, 78, 90, 12, 34, 56, 78, 90, 12, 34,
]
);
assert_eq!(
result.proposer,
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8,]
);
}
}
12 changes: 11 additions & 1 deletion Node/src/utils/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct Config {
pub l1_slots_per_epoch: u64,
pub l2_slot_duration_sec: u64,
pub validator_pubkey: String,
pub block_proposed_receiver_timeout_sec: u64,
}

impl Config {
Expand Down Expand Up @@ -63,6 +64,12 @@ impl Config {
"0x0".to_string()
});

let block_proposed_receiver_timeout_sec =
std::env::var("BLOCK_PROPOSED_RECEIVER_TIMEOUT_SEC")
.unwrap_or_else(|_| "120".to_string())
.parse::<u64>()
.expect("BLOCK_PROPOSED_RECEIVER_TIMEOUT_SEC must be a number");

let config = Self {
taiko_proposer_url: std::env::var("TAIKO_PROPOSER_URL")
.unwrap_or_else(|_| "http://127.0.0.1:1234".to_string()),
Expand Down Expand Up @@ -92,6 +99,7 @@ impl Config {
l1_slots_per_epoch,
l2_slot_duration_sec,
validator_pubkey,
block_proposed_receiver_timeout_sec,
};

info!(
Expand All @@ -106,6 +114,7 @@ L1 slot duration: {}
L1 slots per epoch: {}
L2 slot duration: {}
Validator pubkey: {}
Block proposed receiver timeout: {}
"#,
config.taiko_proposer_url,
config.taiko_driver_url,
Expand All @@ -115,7 +124,8 @@ Validator pubkey: {}
config.l1_slot_duration_sec,
config.l1_slots_per_epoch,
config.l2_slot_duration_sec,
config.validator_pubkey
config.validator_pubkey,
config.block_proposed_receiver_timeout_sec
);

config
Expand Down
2 changes: 2 additions & 0 deletions Node/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod block_proposed;
pub mod commit;
pub mod config;
pub mod node_message;
pub mod rpc_client;
pub mod rpc_server;
7 changes: 7 additions & 0 deletions Node/src/utils/node_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use super::block_proposed::BlockProposed;

#[derive(Debug)]
pub enum NodeMessage {
BlockProposed(BlockProposed),
P2P(String),
}
6 changes: 4 additions & 2 deletions Node/src/utils/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ pub struct RpcClient {

impl RpcClient {
pub fn new(url: &str) -> Self {
// let client = HttpClientBuilder::default().build(url).unwrap();
Self::new_with_timeout(url, Duration::from_secs(10))
}

pub fn new_with_timeout(url: &str, timeout: Duration) -> Self {
let client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(1))
.request_timeout(timeout)
.build(url)
.unwrap();
RpcClient { client }
Expand Down

0 comments on commit 9239df4

Please sign in to comment.