From 5aec5a5214c3dc4678036b1f39d3db99ca896049 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 24 Oct 2024 10:47:09 -0400 Subject: [PATCH] chore: Refactor Network Subcommands (#108) ### Description Refactors the network subcommands to make them more explicit. Now there is just Discovery subcommand ``` hera disc ``` Gossip subcommand ``` hera gossip ``` --------- Co-authored-by: nicolas <48695862+merklefruit@users.noreply.github.com> --- bin/hera/src/disc.rs | 45 ++++++++++++++++++++++ bin/hera/src/{network.rs => gossip.rs} | 52 ++++++++------------------ bin/hera/src/main.rs | 12 ++++-- crates/net/README.md | 15 ++++---- crates/net/src/builder.rs | 12 +++++- crates/net/src/discovery/driver.rs | 6 ++- crates/net/src/driver.rs | 4 +- crates/net/src/gossip/driver.rs | 10 ++++- crates/net/src/gossip/handler.rs | 10 +++-- 9 files changed, 108 insertions(+), 58 deletions(-) create mode 100644 bin/hera/src/disc.rs rename bin/hera/src/{network.rs => gossip.rs} (52%) diff --git a/bin/hera/src/disc.rs b/bin/hera/src/disc.rs new file mode 100644 index 0000000..2a2aa3d --- /dev/null +++ b/bin/hera/src/disc.rs @@ -0,0 +1,45 @@ +//! Discovery subcommand for Hera. + +use crate::globals::GlobalArgs; +use clap::Args; +use eyre::Result; +use op_net::discovery::builder::DiscoveryBuilder; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +/// The Hera discovery subcommand. +#[derive(Debug, Clone, Args)] +#[non_exhaustive] +pub struct DiscCommand { + /// Port to listen for gossip on. + #[clap(long, short = 'l', default_value = "9099", help = "Port to listen for gossip on")] + pub gossip_port: u16, + /// Interval to send discovery packets. + #[clap(long, short = 'i', default_value = "1", help = "Interval to send discovery packets")] + pub interval: u64, +} + +impl DiscCommand { + /// Run the discovery subcommand. + pub async fn run(self, args: &GlobalArgs) -> Result<()> { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port); + tracing::info!("Starting discovery service on {:?}", socket); + + let mut discovery_builder = + DiscoveryBuilder::new().with_address(socket).with_chain_id(args.l2_chain_id); + let mut discovery = discovery_builder.build()?; + discovery.interval = std::time::Duration::from_secs(self.interval); + let mut peer_recv = discovery.start()?; + tracing::info!("Discovery service started, receiving peers."); + + loop { + match peer_recv.recv().await { + Some(peer) => { + tracing::info!("Received peer: {:?}", peer); + } + None => { + tracing::warn!("Failed to receive peer"); + } + } + } + } +} diff --git a/bin/hera/src/network.rs b/bin/hera/src/gossip.rs similarity index 52% rename from bin/hera/src/network.rs rename to bin/hera/src/gossip.rs index b35528d..a55fbee 100644 --- a/bin/hera/src/network.rs +++ b/bin/hera/src/gossip.rs @@ -1,36 +1,27 @@ -//! Networking subcommand for Hera. +//! Gossip subcommand for Hera. use crate::globals::GlobalArgs; use clap::Args; use eyre::Result; -use op_net::{discovery::builder::DiscoveryBuilder, driver::NetworkDriver}; +use op_net::driver::NetworkDriver; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use superchain::ROLLUP_CONFIGS; -/// The Hera network subcommand. +/// The Hera gossip subcommand. #[derive(Debug, Clone, Args)] #[non_exhaustive] -pub struct NetworkCommand { - /// Run the peer discovery service. - #[clap(long, short = 'p', help = "Runs only peer discovery")] - pub only_disc: bool, +pub struct GossipCommand { /// Port to listen for gossip on. #[clap(long, short = 'l', default_value = "9099", help = "Port to listen for gossip on")] pub gossip_port: u16, + /// Interval to send discovery packets. + #[clap(long, short = 'i', default_value = "1", help = "Interval to send discovery packets")] + pub interval: u64, } -impl NetworkCommand { - /// Run the network subcommand. +impl GossipCommand { + /// Run the gossip subcommand. pub async fn run(self, args: &GlobalArgs) -> Result<()> { - if self.only_disc { - self.run_discovery(args).await - } else { - self.run_network(args) - } - } - - /// Runs the full network. - pub fn run_network(&self, args: &GlobalArgs) -> Result<()> { let signer = ROLLUP_CONFIGS .get(&args.l2_chain_id) .ok_or(eyre::eyre!("No rollup config found for chain ID"))? @@ -39,15 +30,21 @@ impl NetworkCommand { .as_ref() .ok_or(eyre::eyre!("No system config found for chain ID"))? .batcher_address; + tracing::info!("Gossip configured with signer: {:?}", signer); + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port); + tracing::info!("Starting gossip driver on {:?}", socket); + let mut driver = NetworkDriver::builder() .with_chain_id(args.l2_chain_id) .with_unsafe_block_signer(signer) .with_gossip_addr(socket) + .with_interval(std::time::Duration::from_secs(self.interval)) .build()?; let recv = driver.take_unsafe_block_recv().ok_or(eyre::eyre!("No unsafe block receiver"))?; driver.start()?; + tracing::info!("Gossip driver started, receiving blocks."); loop { match recv.recv() { Ok(block) => { @@ -59,23 +56,4 @@ impl NetworkCommand { } } } - - /// Runs only the discovery service. - pub async fn run_discovery(&self, args: &GlobalArgs) -> Result<()> { - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port); - let mut discovery_builder = - DiscoveryBuilder::new().with_address(socket).with_chain_id(args.l2_chain_id); - let discovery = discovery_builder.build()?; - let mut peer_recv = discovery.start()?; - loop { - match peer_recv.recv().await { - Some(peer) => { - tracing::info!("Received peer: {:?}", peer); - } - None => { - tracing::warn!("Failed to receive peer"); - } - } - } - } } diff --git a/bin/hera/src/main.rs b/bin/hera/src/main.rs index 24342e9..0ae1f51 100644 --- a/bin/hera/src/main.rs +++ b/bin/hera/src/main.rs @@ -8,8 +8,9 @@ use clap::{Parser, Subcommand}; use eyre::Result; +mod disc; mod globals; -mod network; +mod gossip; mod node; /// The Hera CLI Arguments. @@ -30,8 +31,10 @@ pub(crate) struct HeraArgs { pub(crate) enum HeraSubcommand { /// Run the standalone Hera node. Node(node::NodeCommand), - /// Networking utility commands. - Network(network::NetworkCommand), + /// Discovery service command. + Disc(disc::DiscCommand), + /// Gossip service command. + Gossip(gossip::GossipCommand), } #[tokio::main] @@ -45,6 +48,7 @@ async fn main() -> Result<()> { // Dispatch on subcommand. match args.subcommand { HeraSubcommand::Node(node) => node.run(&args.global).await, - HeraSubcommand::Network(network) => network.run(&args.global).await, + HeraSubcommand::Disc(disc) => disc.run(&args.global).await, + HeraSubcommand::Gossip(gossip) => gossip.run(&args.global).await, } } diff --git a/crates/net/README.md b/crates/net/README.md index 06087da..20b6527 100644 --- a/crates/net/README.md +++ b/crates/net/README.md @@ -4,6 +4,13 @@ Contains a gossipsub driver to run discv5 peer discovery and block gossip. ### Example +> **Warning** +> +> Notice, the socket address uses `0.0.0.0`. +> If you are experiencing issues connecting to peers for discovery, +> check to make sure you are not using the loopback address, +> `127.0.0.1` aka "localhost", which can prevent outward facing connections. + ```rust,no_run use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use alloy_primitives::address; @@ -25,15 +32,9 @@ driver.start().expect("Failed to start network driver"); println!("NetworkDriver started."); ``` -> [!WARNING] -> -> Notice, the socket address uses `0.0.0.0`. -> If you are experiencing issues connecting to peers for discovery, -> check to make sure you are not using the loopback address, -> `127.0.0.1` aka "localhost", which can prevent outward facing connections. - [!WARNING]: ###example ### Acknowledgements Largely based off [magi](https://github.com/a16z/magi)'s [p2p module](https://github.com/a16z/magi/tree/master/src/network). + diff --git a/crates/net/src/builder.rs b/crates/net/src/builder.rs index b8d6eaf..ba7b92e 100644 --- a/crates/net/src/builder.rs +++ b/crates/net/src/builder.rs @@ -34,7 +34,8 @@ pub struct NetworkDriverBuilder { pub discovery_addr: Option, /// The [GossipConfig] constructs the config for `gossipsub`. pub gossip_config: Option, - + /// The interval to discovery random nodes. + pub interval: Option, /// The [Config] constructs the config for `discv5`. pub discovery_config: Option, /// The [Keypair] for the node. @@ -67,6 +68,12 @@ impl NetworkDriverBuilder { self } + /// Specifies the interval to discovery random nodes. + pub fn with_interval(&mut self, interval: Duration) -> &mut Self { + self.interval = Some(interval); + self + } + /// Specifies the socket address that the gossip service is listening on. pub fn with_gossip_addr(&mut self, socket: SocketAddr) -> &mut Self { self.gossip_addr = Some(socket); @@ -278,7 +285,8 @@ impl NetworkDriverBuilder { discovery_builder = discovery_builder.with_discovery_config(discovery_config); } - let discovery = discovery_builder.build()?; + let mut discovery = discovery_builder.build()?; + discovery.interval = self.interval.unwrap_or(Duration::from_secs(10)); Ok(NetworkDriver { discovery, diff --git a/crates/net/src/discovery/driver.rs b/crates/net/src/discovery/driver.rs index 489ca41..1229557 100644 --- a/crates/net/src/discovery/driver.rs +++ b/crates/net/src/discovery/driver.rs @@ -24,6 +24,8 @@ pub struct DiscoveryDriver { pub disc: Discv5, /// The chain ID of the network. pub chain_id: u64, + /// The interval to discovery random nodes. + pub interval: Duration, } impl DiscoveryDriver { @@ -34,7 +36,7 @@ impl DiscoveryDriver { /// Instantiates a new [DiscoveryDriver]. pub fn new(disc: Discv5, chain_id: u64) -> Self { - Self { disc, chain_id } + Self { disc, chain_id, interval: Duration::from_secs(10) } } /// Spawns a new [Discv5] discovery service in a new tokio task. @@ -108,7 +110,7 @@ impl DiscoveryDriver { } } - sleep(Duration::from_secs(10)).await; + sleep(self.interval).await; } }); diff --git a/crates/net/src/driver.rs b/crates/net/src/driver.rs index a10d81e..efd26fb 100644 --- a/crates/net/src/driver.rs +++ b/crates/net/src/driver.rs @@ -53,9 +53,11 @@ impl NetworkDriver { loop { select! { peer = peer_recv.recv() => { - self.gossip.dial_opt(peer).await; + self.gossip.dial_opt(peer.clone()).await; + tracing::info!("Received peer: {:?} | Connected peers: {:?}", peer, self.gossip.connected_peers()); }, event = self.gossip.select_next_some() => { + tracing::debug!("Received event: {:?}", event); self.gossip.handle_event(event); }, } diff --git a/crates/net/src/gossip/driver.rs b/crates/net/src/gossip/driver.rs index bd47a4a..054f4d5 100644 --- a/crates/net/src/gossip/driver.rs +++ b/crates/net/src/gossip/driver.rs @@ -43,13 +43,19 @@ impl GossipDriver { self.swarm.select_next_some().await } + /// Returns the number of connected peers. + pub fn connected_peers(&self) -> usize { + self.swarm.connected_peers().count() + } + /// Dials the given [`Option`]. pub async fn dial_opt(&mut self, peer: Option>) { let Some(addr) = peer else { return; }; - if let Err(e) = self.dial(addr).await { - error!("Failed to dial peer: {:?}", e); + match self.dial(addr).await { + Ok(_) => info!("Dialed peer"), + Err(e) => error!("Failed to dial peer: {:?}", e), } } diff --git a/crates/net/src/gossip/handler.rs b/crates/net/src/gossip/handler.rs index a204e17..60e80b8 100644 --- a/crates/net/src/gossip/handler.rs +++ b/crates/net/src/gossip/handler.rs @@ -49,10 +49,13 @@ impl Handler for BlockHandler { tracing::debug!("received block"); let decoded = if msg.topic == self.blocks_v1_topic.hash() { + tracing::debug!("received v1 block"); OpNetworkPayloadEnvelope::decode_v1(&msg.data) } else if msg.topic == self.blocks_v2_topic.hash() { + tracing::debug!("received v2 block"); OpNetworkPayloadEnvelope::decode_v2(&msg.data) } else if msg.topic == self.blocks_v3_topic.hash() { + tracing::debug!("received v3 block"); OpNetworkPayloadEnvelope::decode_v3(&msg.data) } else { return MessageAcceptance::Reject; @@ -115,11 +118,12 @@ impl BlockHandler { let msg = envelope.payload_hash.signature_message(self.chain_id); let block_signer = *self.unsafe_signer_recv.borrow(); - let Ok(msg_signer) = envelope.signature.recover_address_from_msg(msg) else { - // TODO: add telemetry here if this happens. + let Ok(msg_signer) = envelope.signature.recover_address_from_prehash(&msg) else { + tracing::warn!("Failed to recover address from message"); return false; }; - time_valid && msg_signer == block_signer + let signer_valid = msg_signer == block_signer; + time_valid && signer_valid } }