Skip to content

Commit

Permalink
Fix loops in Node/src/node/mod.rs to be consistent (#38)
Browse files Browse the repository at this point in the history
every 4 seconds and not blocked by channel
processing

Co-authored-by: Ahmad Bitar <[email protected]>
  • Loading branch information
smartprogrammer93 and Ahmad Bitar authored Jun 26, 2024
1 parent 45e2032 commit 3cbe0d5
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions Node/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::taiko::Taiko;
use anyhow::{anyhow as err, Context, Error};
use anyhow::{anyhow as err, Context, Error, Ok};
use tokio::sync::mpsc::{Receiver, Sender};

pub struct Node {
taiko: Taiko,
node_rx: Receiver<String>,
node_rx: Option<Receiver<String>>,
avs_p2p_tx: Sender<String>,
}

Expand All @@ -13,7 +13,7 @@ impl Node {
let taiko = Taiko::new("http://127.0.0.1:1234");
Self {
taiko,
node_rx,
node_rx: Some(node_rx),
avs_p2p_tx,
}
}
Expand All @@ -22,20 +22,41 @@ impl Node {
/// one for handling incoming messages and one for the block preconfirmation
pub async fn entrypoint(mut self) -> Result<(), Error> {
tracing::info!("Starting node");
self.start_new_msg_receiver_thread();
self.preconfirmation_loop().await;
Ok(())
}

fn start_new_msg_receiver_thread(&mut self) {
if let Some(node_rx) = self.node_rx.take() {
tokio::spawn(async move {
Self::handle_incoming_messages(node_rx).await;
});
} else {
tracing::error!("node_rx has already been moved");
}
}

async fn handle_incoming_messages(mut node_rx: Receiver<String>) {
loop {
if let Err(err) = self.step().await {
tracing::error!("Node processing step failed: {}", err);
}
tokio::select! {
Some(message) = node_rx.recv() => {
tracing::debug!("Node received message: {}", message);
},
}
}
}

async fn step(&mut self) -> Result<(), Error> {
if let Ok(msg) = self.node_rx.try_recv() {
self.process_incoming_message(msg).await?;
} else {
self.main_block_preconfirmation_step().await?;
async fn preconfirmation_loop(&self) {
loop {
let start_time = tokio::time::Instant::now();
if let Err(err) = self.main_block_preconfirmation_step().await {
tracing::error!("Failed to execute main block preconfirmation step: {}", err);
}
let elapsed = start_time.elapsed();
let sleep_duration = std::time::Duration::from_secs(4).saturating_sub(elapsed);
tokio::time::sleep(sleep_duration).await;
}
Ok(())
}

async fn main_block_preconfirmation_step(&self) -> Result<(), Error> {
Expand All @@ -50,11 +71,6 @@ impl Node {
Ok(())
}

async fn process_incoming_message(&mut self, msg: String) -> Result<(), Error> {
tracing::debug!("Node received message: {}", msg);
Ok(())
}

fn commit_to_the_tx_lists(&self) {
//TODO: implement
}
Expand Down

0 comments on commit 3cbe0d5

Please sign in to comment.