Skip to content

Commit

Permalink
Merge pull request #26 from NethermindEth/KD/node-sustain-async
Browse files Browse the repository at this point in the history
Improve async processing and error handling
  • Loading branch information
smartprogrammer93 authored Jun 21, 2024
2 parents 18582c5 + 42437f2 commit 1d69135
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ tracing-subscriber = "0.3"
jsonrpsee = { version = "0.23", features = ["http-client", "server"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
lazy_static = "1.4"
anyhow = "1.0.86"
6 changes: 4 additions & 2 deletions Node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ mod p2p_network;
mod taiko;
mod utils;

use anyhow::Error;
use tokio::sync::mpsc;

const MESSAGE_QUEUE_SIZE: usize = 100;

#[tokio::main]
async fn main() {
async fn main() -> Result<(), Error> {
init_logging();

let (avs_p2p_tx, avs_p2p_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
Expand All @@ -19,7 +20,8 @@ async fn main() {
p2p.start();

let node = node::Node::new(node_rx, avs_p2p_tx);
node.start().await;
node.entrypoint().await?;
Ok(())
}

fn init_logging() {
Expand Down
79 changes: 34 additions & 45 deletions Node/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::taiko::Taiko;
use anyhow::{anyhow as err, Context, Error};
use tokio::sync::mpsc::{Receiver, Sender};

pub struct Node {
taiko: Taiko,
node_rx: Option<Receiver<String>>,
node_rx: Receiver<String>,
avs_p2p_tx: Sender<String>,
}

Expand All @@ -12,67 +13,55 @@ impl Node {
let taiko = Taiko::new("http://127.0.0.1:1234");
Self {
taiko,
node_rx: Some(node_rx),
node_rx,
avs_p2p_tx,
}
}

/// Consumes the Node and starts two loops:
/// one for handling incoming messages and one for the block preconfirmation
pub async fn start(mut self) {
pub async fn entrypoint(mut self) -> Result<(), Error> {
tracing::info!("Starting node");
self.start_new_msg_receiver_thread();
self.main_block_preconfirmation_loop().await;
}

async fn main_block_preconfirmation_loop(&self) {
loop {
let _tx_lists = match self.taiko.get_pending_l2_tx_lists().await {
Ok(lists) => lists,
Err(err) => {
tracing::error!("Failed to get pending l2 tx lists: {}", err);
continue;
}
};
self.commit_to_the_tx_lists();
self.send_preconfirmations_to_the_avs_p2p();
self.taiko.submit_new_l2_blocks();
if let Err(err) = self.step().await {
tracing::debug!("Node processing step failed: {}", err);
}
}
}

//TODO: remove after implementation of above methods
std::thread::sleep(std::time::Duration::from_secs(1));
async fn step(&mut self) -> Result<(), Error> {
if let Ok(msg) = self.node_rx.try_recv() {
self.process_incoming_message(msg).await?;
} else {
self.main_block_preconfirmation_step().await?;
}
Ok(())
}

fn commit_to_the_tx_lists(&self) {
//TODO: implement
async fn main_block_preconfirmation_step(&self) -> Result<(), Error> {
self.taiko
.get_pending_l2_tx_lists()
.await
.context("Failed to get pending l2 tx lists")?;
self.commit_to_the_tx_lists();
self.send_preconfirmations_to_the_avs_p2p().await?;
self.taiko.submit_new_l2_blocks();
Ok(())
}

fn send_preconfirmations_to_the_avs_p2p(&self) {
let avs_p2p_tx = self.avs_p2p_tx.clone();
tokio::spawn(async move {
if let Err(e) = avs_p2p_tx.send("Hello from node!".to_string()).await {
tracing::error!("Failed to send message to avs_p2p_tx: {}", e);
}
});
async fn process_incoming_message(&mut self, msg: String) -> Result<(), Error> {
tracing::debug!("Node received message: {}", msg);
Ok(())
}

fn start_new_msg_receiver_thread(&mut self) {
if let Some(node_rx) = self.node_rx.take() {
tokio::spawn(async move {
Self::handle_incoming_messages(node_rx).await;
});
} else {
tracing::error!("node_rx has already been moved");
}
fn commit_to_the_tx_lists(&self) {
//TODO: implement
}

async fn handle_incoming_messages(mut node_rx: Receiver<String>) {
loop {
tokio::select! {
Some(message) = node_rx.recv() => {
tracing::debug!("Node received message: {}", message);
}
}
}
async fn send_preconfirmations_to_the_avs_p2p(&self) -> Result<(), Error> {
self.avs_p2p_tx
.send("Hello from node!".to_string())
.await
.map_err(|e| err!("Failed to send message to avs_p2p_tx: {}", e))
}
}
12 changes: 3 additions & 9 deletions Node/src/taiko/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use crate::utils::rpc_client::RpcClient;
use anyhow::Error;
use serde_json::Value;

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("RPC Error {0}")]
RpcError(#[from] Box<dyn std::error::Error>),
}

pub struct Taiko {
rpc_client: RpcClient,
}
Expand All @@ -20,10 +15,9 @@ impl Taiko {

pub async fn get_pending_l2_tx_lists(&self) -> Result<Value, Error> {
tracing::debug!("Getting L2 tx lists");
Ok(self
.rpc_client
self.rpc_client
.call_method("RPC.GetL2TxLists", vec![])
.await?)
.await
}

pub fn submit_new_l2_blocks(&self) {
Expand Down
14 changes: 6 additions & 8 deletions Node/src/utils/rpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Error;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use serde_json::Value;
use std::error::Error;
use std::time::Duration;

pub struct RpcClient {
Expand All @@ -19,12 +19,10 @@ impl RpcClient {
RpcClient { client }
}

pub async fn call_method(
&self,
method: &str,
params: Vec<Value>,
) -> Result<Value, Box<dyn Error>> {
let response: Value = self.client.request(method, params).await?;
Ok(response)
pub async fn call_method(&self, method: &str, params: Vec<Value>) -> Result<Value, Error> {
self.client
.request(method, params)
.await
.map_err(Error::from)
}
}

0 comments on commit 1d69135

Please sign in to comment.