Skip to content

Commit

Permalink
P2P network messaging (#69)
Browse files Browse the repository at this point in the history
* Simple p2p network

* Add peer manager

* Implement p2p network

* Upgrade dependencies to latest versions

* Bump libp2p version to 0.54

* Add initial implementation for P2P messaging

* Shift p2p Network implementation to a library

* Remove deprecated files and clean up code

* Add boot_nodes parameter to P2P network config

* Retrieve P2P network config from general node configuration

* Create separate channels for node and P2P network communication
  • Loading branch information
mikhailUshakoff authored Aug 16, 2024
1 parent b1b0ad5 commit 9cd08b1
Show file tree
Hide file tree
Showing 25 changed files with 9,445 additions and 415 deletions.
3,506 changes: 3,177 additions & 329 deletions Node/Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ tiny-keccak = "2.0"
secp256k1 = "0.29"
beacon-api-client = { git = "https://github.com/ralexstokes/ethereum-consensus", package = "beacon-api-client" }
dotenv = "0.15"
p2p-network = { path = "../p2pNode/p2pNetwork" }
bincode = "1.3"
serde_bytes = "0.11"

[dev-dependencies]
mockito = "1.4"
13 changes: 7 additions & 6 deletions Node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use anyhow::Error;
use node::block_proposed_receiver::BlockProposedEventReceiver;
use std::sync::Arc;
use tokio::sync::mpsc;
use utils::node_message::NodeMessage;

const MESSAGE_QUEUE_SIZE: usize = 100;

Expand All @@ -18,10 +17,11 @@ async fn main() -> Result<(), Error> {
init_logging();
let config = utils::config::Config::read_env_variables();

let (avs_p2p_tx, avs_p2p_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
let (node_tx, node_rx) = mpsc::channel::<NodeMessage>(MESSAGE_QUEUE_SIZE);
let p2p = p2p_network::AVSp2p::new(node_tx.clone(), avs_p2p_rx);
p2p.start();
let (node_to_p2p_tx, node_to_p2p_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
let (p2p_to_node_tx, p2p_to_node_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
let (node_tx, node_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
let p2p = p2p_network::AVSp2p::new(p2p_to_node_tx.clone(), node_to_p2p_rx);
p2p.start(config.p2p_network_config).await;
let taiko = Arc::new(taiko::Taiko::new(
&config.taiko_proposer_url,
&config.taiko_driver_url,
Expand All @@ -44,7 +44,8 @@ async fn main() -> Result<(), Error> {
BlockProposedEventReceiver::start(block_proposed_event_checker).await;
let node = node::Node::new(
node_rx,
avs_p2p_tx,
node_to_p2p_tx,
p2p_to_node_rx,
taiko,
ethereum_l1,
mev_boost,
Expand Down
4 changes: 0 additions & 4 deletions Node/src/node/block.rs

This file was deleted.

13 changes: 5 additions & 8 deletions Node/src/node/block_proposed_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use crate::{taiko::Taiko, utils::node_message::NodeMessage};
use crate::taiko::Taiko;
use crate::utils::block_proposed::BlockProposed;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tracing::{error, info};

pub struct BlockProposedEventReceiver {
taiko: Arc<Taiko>,
node_tx: Sender<NodeMessage>,
node_tx: Sender<BlockProposed>,
}

impl BlockProposedEventReceiver {
pub fn new(taiko: Arc<Taiko>, node_tx: Sender<NodeMessage>) -> Self {
pub fn new(taiko: Arc<Taiko>, node_tx: Sender<BlockProposed>) -> Self {
Self { taiko, node_tx }
}

Expand All @@ -28,11 +29,7 @@ impl BlockProposedEventReceiver {
"Received block proposed event for block: {}",
block_proposed.block_id
);
if let Err(e) = self
.node_tx
.send(NodeMessage::BlockProposed(block_proposed))
.await
{
if let Err(e) = self.node_tx.send(block_proposed).await {
error!("Error sending block proposed event by channel: {:?}", e);
}
}
Expand Down
77 changes: 39 additions & 38 deletions Node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
},
mev_boost::MevBoost,
taiko::Taiko,
utils::{block_proposed::BlockProposed, commit::L2TxListsCommit, node_message::NodeMessage},
utils::{block::Block, block_proposed::BlockProposed, commit::L2TxListsCommit},
};
use anyhow::{anyhow as any_err, Error};
use beacon_api_client::ProposerDuty;
Expand All @@ -16,17 +16,15 @@ use tokio::sync::{
};
use tracing::info;

mod block;
pub use block::Block;

pub mod block_proposed_receiver;

const OLDEST_BLOCK_DISTANCE: u64 = 256;

pub struct Node {
taiko: Arc<Taiko>,
node_rx: Option<Receiver<NodeMessage>>,
avs_p2p_tx: Sender<String>,
node_rx: Option<Receiver<BlockProposed>>,
node_to_p2p_tx: Sender<Vec<u8>>,
p2p_to_node_rx: Option<Receiver<Vec<u8>>>,
gas_used: u64,
ethereum_l1: Arc<EthereumL1>,
_mev_boost: MevBoost, // temporary unused
Expand All @@ -41,8 +39,9 @@ pub struct Node {

impl Node {
pub async fn new(
node_rx: Receiver<NodeMessage>,
avs_p2p_tx: Sender<String>,
node_rx: Receiver<BlockProposed>,
node_to_p2p_tx: Sender<Vec<u8>>,
p2p_to_node_rx: Receiver<Vec<u8>>,
taiko: Arc<Taiko>,
ethereum_l1: Arc<EthereumL1>,
mev_boost: MevBoost,
Expand All @@ -52,7 +51,8 @@ impl Node {
Ok(Self {
taiko,
node_rx: Some(node_rx),
avs_p2p_tx,
node_to_p2p_tx,
p2p_to_node_rx: Some(p2p_to_node_rx),
gas_used: 0,
ethereum_l1,
_mev_boost: mev_boost,
Expand All @@ -79,38 +79,37 @@ impl Node {
let preconfirmed_blocks = self.preconfirmed_blocks.clone();
let ethereum_l1 = self.ethereum_l1.clone();
if let Some(node_rx) = self.node_rx.take() {
let p2p_to_node_rx = self.p2p_to_node_rx.take().unwrap();
tokio::spawn(async move {
Self::handle_incoming_messages(node_rx, preconfirmed_blocks, ethereum_l1).await;
Self::handle_incoming_messages(node_rx, p2p_to_node_rx, preconfirmed_blocks, ethereum_l1).await;
});
} else {
tracing::error!("node_rx has already been moved");
}
}

async fn handle_incoming_messages(
mut node_rx: Receiver<NodeMessage>,
mut node_rx: Receiver<BlockProposed>,
mut p2p_to_node_rx: Receiver<Vec<u8>>,
preconfirmed_blocks: Arc<Mutex<HashMap<u64, Block>>>,
ethereum_l1: Arc<EthereumL1>,
) {
loop {
tokio::select! {
Some(message) = node_rx.recv() => {
match message {
NodeMessage::BlockProposed(block_proposed) => {
tracing::debug!("Node received block proposed event: {:?}", block_proposed);
if let Err(e) = Self::check_preconfirmed_blocks_correctness(&preconfirmed_blocks, &block_proposed, ethereum_l1.clone()).await {
tracing::error!("Failed to check preconfirmed blocks correctness: {}", e);
}

if let Err(e) = Self::clean_old_blocks(&preconfirmed_blocks, block_proposed.block_id).await {
tracing::error!("Failed to clean old blocks: {}", e);
}
}
NodeMessage::P2P(message) => {
tracing::debug!("Node received P2P message: {:?}", message);
}
Some(block_proposed) = node_rx.recv() => {
tracing::debug!("Node received block proposed event: {:?}", block_proposed);
if let Err(e) = Self::check_preconfirmed_blocks_correctness(&preconfirmed_blocks, &block_proposed, ethereum_l1.clone()).await {
tracing::error!("Failed to check preconfirmed blocks correctness: {}", e);
}
if let Err(e) = Self::clean_old_blocks(&preconfirmed_blocks, block_proposed.block_id).await {
tracing::error!("Failed to clean old blocks: {}", e);
}
},
Some(p2p_message) = p2p_to_node_rx.recv() => {
let block: Block = p2p_message.into();
tracing::debug!("Node received message from p2p: {:?}", block);
// TODO: add block to preconfirmation queue
}
}
}
}
Expand Down Expand Up @@ -201,7 +200,12 @@ impl Node {
let new_block_height = pending_tx_lists.parent_block_id + 1;
let commit = L2TxListsCommit::new(&pending_tx_lists, new_block_height);

self.send_preconfirmations_to_the_avs_p2p().await?;
let new_block = Block {
tx_list_hash: commit.hash()?,
signature: [0; 96], // TODO: get the signature from the web3signer
};
self.send_preconfirmations_to_the_avs_p2p(new_block.clone())
.await?;
self.taiko
.advance_head_to_new_l2_block(pending_tx_lists.tx_lists, self.gas_used)
.await?;
Expand All @@ -214,13 +218,10 @@ impl Node {
)
.await?;

self.preconfirmed_blocks.lock().await.insert(
new_block_height,
Block {
tx_list_hash: commit.hash()?,
signature: [0; 96], // TODO: get the signature from the web3signer
},
);
self.preconfirmed_blocks
.lock()
.await
.insert(new_block_height, new_block);

Ok(())
}
Expand All @@ -242,10 +243,10 @@ impl Node {
.map(|duty| duty.slot)
}

async fn send_preconfirmations_to_the_avs_p2p(&self) -> Result<(), Error> {
self.avs_p2p_tx
.send("Hello from node!".to_string())
async fn send_preconfirmations_to_the_avs_p2p(&self, block: Block) -> Result<(), Error> {
self.node_to_p2p_tx
.send(block.into())
.await
.map_err(|e| any_err!("Failed to send message to avs_p2p_tx: {}", e))
.map_err(|e| any_err!("Failed to send message to node_to_p2p_tx: {}", e))
}
}
31 changes: 10 additions & 21 deletions Node/src/p2p_network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,29 @@
use crate::utils::node_message::NodeMessage;
use p2p_network::network::{P2PNetwork, P2PNetworkConfig};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task;
use tracing::info;

pub struct AVSp2p {
node_tx: Sender<NodeMessage>,
avs_p2p_rx: Receiver<String>,
node_tx: Sender<Vec<u8>>,
node_to_p2p_rx: Receiver<Vec<u8>>,
}

impl AVSp2p {
pub fn new(node_tx: Sender<NodeMessage>, avs_p2p_rx: Receiver<String>) -> Self {
pub fn new(node_tx: Sender<Vec<u8>>, node_to_p2p_rx: Receiver<Vec<u8>>) -> Self {
AVSp2p {
node_tx,
avs_p2p_rx,
node_to_p2p_rx,
}
}

// Consumes self and fires up threads
pub fn start(mut self) {
pub async fn start(self, config: P2PNetworkConfig) {
info!("Starting P2P network");

//TODO for initial testing
let node_tx = self.node_tx.clone();
tokio::spawn(async move {
loop {
node_tx
.send(NodeMessage::P2P("Hello from avs p2p!".to_string()))
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
});
let mut p2p = P2PNetwork::new(&config, self.node_tx.clone(), self.node_to_p2p_rx).await;

tokio::spawn(async move {
while let Some(message) = self.avs_p2p_rx.recv().await {
tracing::debug!("AVS p2p received: {}", message);
}
task::spawn(async move {
p2p.run(&config).await;
});
}
}
20 changes: 20 additions & 0 deletions Node/src/utils/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Block {
pub tx_list_hash: [u8; 32],
#[serde(with = "serde_bytes")]
pub signature: [u8; 96], // BLS 96 bytes signature
}

impl From<Block> for Vec<u8> {
fn from(val: Block) -> Self {
bincode::serialize(&val).expect("Serialization failed")
}
}

impl From<Vec<u8>> for Block {
fn from(bytes: Vec<u8>) -> Self {
bincode::deserialize(&bytes).expect("Deserialization failed")
}
}
31 changes: 30 additions & 1 deletion Node/src/utils/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use p2p_network::generate_secp256k1;
use p2p_network::network::P2PNetworkConfig;
use tracing::{info, warn};

pub struct Config {
Expand All @@ -12,6 +14,7 @@ pub struct Config {
pub l2_slot_duration_sec: u64,
pub validator_pubkey: String,
pub block_proposed_receiver_timeout_sec: u64,
pub p2p_network_config: P2PNetworkConfig,
}

impl Config {
Expand Down Expand Up @@ -70,6 +73,29 @@ impl Config {
.parse::<u64>()
.expect("BLOCK_PROPOSED_RECEIVER_TIMEOUT_SEC must be a number");

// Load P2P config from env
// Load Ipv4 address from env
let address = std::env::var("ADDRESS").unwrap_or_else(|_| "0.0.0.0".to_string());
let ipv4 = address.parse().unwrap();

// Load boot node from env
let boot_nodes: Option<Vec<String>> =
if let Ok(bootnode_enr) = std::env::var("BOOTNODE_ENR") {
Some(vec![bootnode_enr])
} else {
None
};

// Create P2P network config
let p2p_network_config: P2PNetworkConfig = P2PNetworkConfig {
local_key: generate_secp256k1(),
listen_addr: "/ip4/0.0.0.0/tcp/9000".parse().unwrap(),
ipv4,
udpv4: 9000,
tcpv4: 9000,
boot_nodes,
};

let config = Self {
taiko_proposer_url: std::env::var("TAIKO_PROPOSER_URL")
.unwrap_or_else(|_| "http://127.0.0.1:1234".to_string()),
Expand Down Expand Up @@ -100,6 +126,7 @@ impl Config {
l2_slot_duration_sec,
validator_pubkey,
block_proposed_receiver_timeout_sec,
p2p_network_config,
};

info!(
Expand All @@ -115,6 +142,7 @@ L1 slots per epoch: {}
L2 slot duration: {}
Validator pubkey: {}
Block proposed receiver timeout: {}
p2p_network_config: {}
"#,
config.taiko_proposer_url,
config.taiko_driver_url,
Expand All @@ -125,7 +153,8 @@ Block proposed receiver timeout: {}
config.l1_slots_per_epoch,
config.l2_slot_duration_sec,
config.validator_pubkey,
config.block_proposed_receiver_timeout_sec
config.block_proposed_receiver_timeout_sec,
config.p2p_network_config,
);

config
Expand Down
2 changes: 1 addition & 1 deletion Node/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod block;
pub mod block_proposed;
pub mod commit;
pub mod config;
pub mod node_message;
pub mod rpc_client;
pub mod rpc_server;
7 changes: 0 additions & 7 deletions Node/src/utils/node_message.rs

This file was deleted.

1 change: 1 addition & 0 deletions p2pNode/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target
Loading

0 comments on commit 9cd08b1

Please sign in to comment.