diff --git a/Node/src/node/mod.rs b/Node/src/node/mod.rs index 4646cc4..2a92e5b 100644 --- a/Node/src/node/mod.rs +++ b/Node/src/node/mod.rs @@ -1,10 +1,10 @@ use crate::taiko::Taiko; -use anyhow::{anyhow as err, Context, Error}; +use anyhow::{anyhow as err, Context, Error, Ok}; use tokio::sync::mpsc::{Receiver, Sender}; pub struct Node { taiko: Taiko, - node_rx: Receiver, + node_rx: Option>, avs_p2p_tx: Sender, } @@ -13,7 +13,7 @@ impl Node { let taiko = Taiko::new("http://127.0.0.1:1234"); Self { taiko, - node_rx, + node_rx: Some(node_rx), avs_p2p_tx, } } @@ -22,20 +22,41 @@ impl Node { /// one for handling incoming messages and one for the block preconfirmation pub async fn entrypoint(mut self) -> Result<(), Error> { tracing::info!("Starting node"); + self.start_new_msg_receiver_thread(); + self.preconfirmation_loop().await; + Ok(()) + } + + fn start_new_msg_receiver_thread(&mut self) { + if let Some(node_rx) = self.node_rx.take() { + tokio::spawn(async move { + Self::handle_incoming_messages(node_rx).await; + }); + } else { + tracing::error!("node_rx has already been moved"); + } + } + + async fn handle_incoming_messages(mut node_rx: Receiver) { loop { - if let Err(err) = self.step().await { - tracing::error!("Node processing step failed: {}", err); - } + tokio::select! { + Some(message) = node_rx.recv() => { + tracing::debug!("Node received message: {}", message); + }, + } } } - async fn step(&mut self) -> Result<(), Error> { - if let Ok(msg) = self.node_rx.try_recv() { - self.process_incoming_message(msg).await?; - } else { - self.main_block_preconfirmation_step().await?; + async fn preconfirmation_loop(&self) { + loop { + let start_time = tokio::time::Instant::now(); + if let Err(err) = self.main_block_preconfirmation_step().await { + tracing::error!("Failed to execute main block preconfirmation step: {}", err); + } + let elapsed = start_time.elapsed(); + let sleep_duration = std::time::Duration::from_secs(4).saturating_sub(elapsed); + tokio::time::sleep(sleep_duration).await; } - Ok(()) } async fn main_block_preconfirmation_step(&self) -> Result<(), Error> { @@ -50,11 +71,6 @@ impl Node { Ok(()) } - async fn process_incoming_message(&mut self, msg: String) -> Result<(), Error> { - tracing::debug!("Node received message: {}", msg); - Ok(()) - } - fn commit_to_the_tx_lists(&self) { //TODO: implement }