diff --git a/crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs b/crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs index 03b7737a77..2148d001db 100644 --- a/crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs +++ b/crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs @@ -224,14 +224,15 @@ fn dial_if_requested(swarm: &mut Swarm, args: &Args) { async fn main() { let args = Args::parse(); - let config = Config { - session_timeout: Duration::from_secs(3600), - supported_inbound_protocols: vec![PROTOCOL_NAME], - }; let mut swarm = build_swarm( vec![args.listen_address.clone()], Duration::from_secs(args.idle_connection_timeout), - Behaviour::new(config), + |_| { + Behaviour::new(Config { + session_timeout: Duration::from_secs(3600), + supported_inbound_protocols: vec![PROTOCOL_NAME], + }) + }, ); let mut outbound_session_measurements = HashMap::new(); diff --git a/crates/papyrus_network/src/bin_utils/mod.rs b/crates/papyrus_network/src/bin_utils/mod.rs index a73daa264b..46b7abb1d6 100644 --- a/crates/papyrus_network/src/bin_utils/mod.rs +++ b/crates/papyrus_network/src/bin_utils/mod.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use std::time::Duration; -use libp2p::identity::Keypair; +use libp2p::identity::{Keypair, PublicKey}; use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::NetworkBehaviour; use libp2p::{noise, yamux, Multiaddr, Swarm, SwarmBuilder}; @@ -10,7 +10,7 @@ use tracing::debug; pub fn build_swarm( listen_addresses: Vec, idle_connection_timeout: Duration, - behaviour: Behaviour, + behaviour: impl Fn(PublicKey) -> Behaviour, ) -> Swarm where { @@ -27,7 +27,7 @@ where .expect("Error building TCP transport") // TODO: quic transpot does not work (failure appears in the command line when running in debug mode) // .with_quic() - .with_behaviour(|_| behaviour) + .with_behaviour(|key| behaviour(key.public())) .expect("Error while building the swarm") .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(idle_connection_timeout)) .build(); diff --git a/crates/papyrus_network/src/discovery/kad_impl.rs b/crates/papyrus_network/src/discovery/kad_impl.rs index aeee298cb2..83fe4c2fc3 100644 --- a/crates/papyrus_network/src/discovery/kad_impl.rs +++ b/crates/papyrus_network/src/discovery/kad_impl.rs @@ -5,6 +5,7 @@ use crate::discovery; use crate::main_behaviour::mixed_behaviour; use crate::main_behaviour::mixed_behaviour::BridgedBehaviour; +#[derive(Debug)] pub enum KadFromOtherBehaviourEvent { RequestKadQuery(PeerId), FoundListenAddresses { peer_id: PeerId, listen_addresses: Vec }, diff --git a/crates/papyrus_network/src/lib.rs b/crates/papyrus_network/src/lib.rs index a7ba611a39..18e95ff27d 100644 --- a/crates/papyrus_network/src/lib.rs +++ b/crates/papyrus_network/src/lib.rs @@ -34,6 +34,7 @@ use serde::{Deserialize, Serialize}; use starknet_api::block::{BlockHash, BlockHeader, BlockNumber, BlockSignature}; use starknet_api::state::ThinStateDiff; +// TODO: add peer manager config to the network config #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] pub struct NetworkConfig { pub tcp_port: u16, diff --git a/crates/papyrus_network/src/main_behaviour/mixed_behaviour.rs b/crates/papyrus_network/src/main_behaviour/mixed_behaviour.rs index 61a6ee5168..74de5538ea 100644 --- a/crates/papyrus_network/src/main_behaviour/mixed_behaviour.rs +++ b/crates/papyrus_network/src/main_behaviour/mixed_behaviour.rs @@ -17,16 +17,19 @@ pub struct MixedBehaviour { pub streamed_bytes: streamed_bytes::Behaviour, } +#[derive(Debug)] pub enum Event { ExternalEvent(ExternalEvent), #[allow(dead_code)] InternalEvent(InternalEvent), } +#[derive(Debug)] pub enum ExternalEvent { StreamedBytes(streamed_bytes::behaviour::ExternalEvent), } +#[derive(Debug)] pub enum InternalEvent { NoOp, NotifyKad(KadFromOtherBehaviourEvent), diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index ff0a0e45d6..74aaeeab49 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -9,8 +9,9 @@ use futures::channel::mpsc::{Receiver, Sender}; use futures::future::pending; use futures::stream::{self, BoxStream, SelectAll}; use futures::{FutureExt, StreamExt}; +use libp2p::kad::store::MemoryStore; use libp2p::swarm::{DialError, SwarmEvent}; -use libp2p::{PeerId, Swarm}; +use libp2p::{identify, kad, Multiaddr, PeerId, Swarm}; use metrics::gauge; use papyrus_common::metrics as papyrus_metrics; use papyrus_storage::StorageReader; @@ -20,9 +21,27 @@ use self::swarm_trait::SwarmTrait; use crate::bin_utils::build_swarm; use crate::converters::{Router, RouterError}; use crate::db_executor::{self, BlockHeaderDBExecutor, DBExecutor, Data, QueryId}; -use crate::streamed_bytes::behaviour::{Behaviour, Event, ExternalEvent}; -use crate::streamed_bytes::{Config, InboundSessionId, OutboundSessionId, SessionId}; -use crate::{DataType, NetworkConfig, PeerAddressConfig, Protocol, Query, ResponseReceivers}; +use crate::main_behaviour::mixed_behaviour::{self, BridgedBehaviour}; +use crate::peer_manager::PeerManagerConfig; +use crate::streamed_bytes::behaviour::SessionError; +use crate::streamed_bytes::{ + self, + Config, + GenericEvent, + InboundSessionId, + OutboundSessionId, + SessionId, +}; +use crate::{ + discovery, + peer_manager, + DataType, + NetworkConfig, + PeerAddressConfig, + Protocol, + Query, + ResponseReceivers, +}; type StreamCollection = SelectAll>; type SubscriberChannels = (Receiver, Router); @@ -101,7 +120,7 @@ impl GenericNetworkManager) { + fn handle_swarm_event(&mut self, event: SwarmEvent) { match event { SwarmEvent::ConnectionEstablished { peer_id, .. } => { self.peer_id = Some(peer_id); @@ -177,12 +196,46 @@ impl GenericNetworkManager { + self.handle_behaviour_external_event(external_event); + } + mixed_behaviour::Event::InternalEvent(internal_event) => { + self.handle_behaviour_internal_event(internal_event); + } + } + } + + fn handle_behaviour_external_event(&mut self, event: mixed_behaviour::ExternalEvent) { match event { - ExternalEvent::NewInboundSession { + mixed_behaviour::ExternalEvent::StreamedBytes(event) => { + self.handle_stream_bytes_behaviour_event(event); + } + } + } + + fn handle_behaviour_internal_event(&mut self, event: mixed_behaviour::InternalEvent) { + match event { + mixed_behaviour::InternalEvent::NoOp => {} + mixed_behaviour::InternalEvent::NotifyKad(_) => { + self.swarm.behaviour_mut().kademlia.on_other_behaviour_event(event) + } + mixed_behaviour::InternalEvent::NotifyDiscovery(_) => { + self.swarm.behaviour_mut().discovery.on_other_behaviour_event(event) + } + mixed_behaviour::InternalEvent::NotifyStreamedBytes(_) => { + self.swarm.behaviour_mut().streamed_bytes.on_other_behaviour_event(event) + } + mixed_behaviour::InternalEvent::NotifyPeerManager(_) => { + self.swarm.behaviour_mut().peer_manager.on_other_behaviour_event(event) + } + } + } + + fn handle_stream_bytes_behaviour_event(&mut self, event: GenericEvent) { + match event { + streamed_bytes::behaviour::ExternalEvent::NewInboundSession { query, inbound_session_id, peer_id: _, @@ -212,7 +265,10 @@ impl GenericNetworkManager { + streamed_bytes::behaviour::ExternalEvent::ReceivedData { + outbound_session_id, + data, + } => { trace!( "Received data from peer for session id: {outbound_session_id:?}. sending to \ sync subscriber." @@ -243,7 +299,7 @@ impl GenericNetworkManager { + streamed_bytes::behaviour::ExternalEvent::SessionFailed { session_id, error } => { error!("Session {session_id:?} failed on {error:?}"); self.report_session_removed_to_metrics(session_id); // TODO: Handle reputation and retry. @@ -251,7 +307,9 @@ impl GenericNetworkManager { + streamed_bytes::behaviour::ExternalEvent::SessionFinishedSuccessfully { + session_id, + } => { debug!("Session completed successfully. session_id: {session_id:?}"); self.report_session_removed_to_metrics(session_id); if let SessionId::OutboundSessionId(outbound_session_id) = session_id { @@ -357,7 +415,8 @@ impl GenericNetworkManager>; +pub type NetworkManager = + GenericNetworkManager>; impl NetworkManager { pub fn new(config: NetworkConfig, storage_reader: StorageReader) -> Self { @@ -375,17 +434,30 @@ impl NetworkManager { // format!("/ip4/0.0.0.0/udp/{quic_port}/quic-v1"), format!("/ip4/0.0.0.0/tcp/{tcp_port}"), ]; - let swarm = build_swarm( - listen_addresses, - idle_connection_timeout, - Behaviour::new(Config { - session_timeout, - supported_inbound_protocols: vec![ - Protocol::SignedBlockHeader.into(), - Protocol::StateDiff.into(), - ], - }), - ); + // TODO: get config details from network manager config + // TODO: consider extraction this to a function of mixed_behaviour module + // TODO: change kadimilia protocol name + let behaviour = |key| { + let local_peer_id = PeerId::from_public_key(&key); + mixed_behaviour::MixedBehaviour { + peer_manager: peer_manager::PeerManager::new(PeerManagerConfig::default()), + // TODO: add real bootstrap peer + discovery: discovery::Behaviour::new(PeerId::random(), Multiaddr::empty()), + identify: identify::Behaviour::new(identify::Config::new( + "/staknet/identify/0.1.0-rc.0".to_string(), + key, + )), + kademlia: kad::Behaviour::new(local_peer_id, MemoryStore::new(local_peer_id)), + streamed_bytes: streamed_bytes::Behaviour::new(Config { + session_timeout, + supported_inbound_protocols: vec![ + Protocol::SignedBlockHeader.into(), + Protocol::StateDiff.into(), + ], + }), + } + }; + let swarm = build_swarm(listen_addresses, idle_connection_timeout, behaviour); let db_executor = BlockHeaderDBExecutor::new(storage_reader); Self::generic_new(swarm, db_executor, header_buffer_size, peer) diff --git a/crates/papyrus_network/src/network_manager/swarm_trait.rs b/crates/papyrus_network/src/network_manager/swarm_trait.rs index bcf5d2f03f..f943fec633 100644 --- a/crates/papyrus_network/src/network_manager/swarm_trait.rs +++ b/crates/papyrus_network/src/network_manager/swarm_trait.rs @@ -4,11 +4,12 @@ use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent}; use libp2p::{Multiaddr, PeerId, Swarm}; -use crate::streamed_bytes::behaviour::{Behaviour, PeerNotConnected, SessionIdNotFoundError}; +use crate::main_behaviour::mixed_behaviour; +use crate::streamed_bytes::behaviour::{PeerNotConnected, SessionIdNotFoundError}; use crate::streamed_bytes::{InboundSessionId, OutboundSessionId}; use crate::{PeerAddressConfig, Protocol}; -pub type Event = SwarmEvent<::ToSwarm>; +pub type Event = SwarmEvent<::ToSwarm>; pub trait SwarmTrait: Stream + Unpin { fn send_length_prefixed_data( @@ -32,15 +33,17 @@ pub trait SwarmTrait: Stream + Unpin { &mut self, session_id: InboundSessionId, ) -> Result<(), SessionIdNotFoundError>; + + fn behaviour_mut(&mut self) -> &mut mixed_behaviour::MixedBehaviour; } -impl SwarmTrait for Swarm { +impl SwarmTrait for Swarm { fn send_length_prefixed_data( &mut self, data: Vec, inbound_session_id: InboundSessionId, ) -> Result<(), SessionIdNotFoundError> { - self.behaviour_mut().send_length_prefixed_data(data, inbound_session_id) + self.behaviour_mut().streamed_bytes.send_length_prefixed_data(data, inbound_session_id) } fn send_query( @@ -49,7 +52,7 @@ impl SwarmTrait for Swarm { peer_id: PeerId, protocol: Protocol, ) -> Result { - self.behaviour_mut().send_query(query, peer_id, protocol.into()) + self.behaviour_mut().streamed_bytes.send_query(query, peer_id, protocol.into()) } fn dial(&mut self, peer: PeerAddressConfig) -> Result<(), DialError> { @@ -68,6 +71,10 @@ impl SwarmTrait for Swarm { &mut self, session_id: InboundSessionId, ) -> Result<(), SessionIdNotFoundError> { - self.behaviour_mut().close_inbound_session(session_id) + self.behaviour_mut().streamed_bytes.close_inbound_session(session_id) + } + + fn behaviour_mut(&mut self) -> &mut mixed_behaviour::MixedBehaviour { + self.behaviour_mut() } } diff --git a/crates/papyrus_network/src/network_manager/test.rs b/crates/papyrus_network/src/network_manager/test.rs index 08128369d7..2a5fefd129 100644 --- a/crates/papyrus_network/src/network_manager/test.rs +++ b/crates/papyrus_network/src/network_manager/test.rs @@ -31,14 +31,10 @@ use crate::db_executor::{ FetchBlockDataFromDb, QueryId, }; +use crate::main_behaviour::mixed_behaviour; use crate::protobuf_messages::protobuf; -use crate::streamed_bytes::behaviour::{ - Event as StreamedBytesEvent, - ExternalEvent, - PeerNotConnected, - SessionIdNotFoundError, -}; -use crate::streamed_bytes::{InboundSessionId, OutboundSessionId}; +use crate::streamed_bytes::behaviour::{PeerNotConnected, SessionIdNotFoundError}; +use crate::streamed_bytes::{GenericEvent, InboundSessionId, OutboundSessionId}; use crate::{BlockHashOrNumber, DataType, Direction, InternalQuery, PeerAddressConfig, Query}; #[derive(Default)] @@ -110,8 +106,11 @@ impl MockSwarm { ) .encode(&mut data_bytes) .expect("failed to convert data to bytes"); - self.pending_events.push(Event::Behaviour(StreamedBytesEvent::External( - ExternalEvent::ReceivedData { data: data_bytes, outbound_session_id }, + self.pending_events.push(Event::Behaviour(mixed_behaviour::Event::ExternalEvent( + mixed_behaviour::ExternalEvent::StreamedBytes(GenericEvent::ReceivedData { + data: data_bytes, + outbound_session_id, + }), ))); } } @@ -172,6 +171,10 @@ impl SwarmTrait for MockSwarm { } Ok(()) } + + fn behaviour_mut(&mut self) -> &mut mixed_behaviour::MixedBehaviour { + unimplemented!() + } } #[derive(Default)] @@ -312,13 +315,13 @@ async fn process_incoming_query() { } .encode(&mut query_bytes) .unwrap(); - mock_swarm.pending_events.push(Event::Behaviour(StreamedBytesEvent::External( - ExternalEvent::NewInboundSession { + mock_swarm.pending_events.push(Event::Behaviour(mixed_behaviour::Event::ExternalEvent( + mixed_behaviour::ExternalEvent::StreamedBytes(GenericEvent::NewInboundSession { query: query_bytes, inbound_session_id, peer_id: PeerId::random(), protocol_name: crate::Protocol::SignedBlockHeader.into(), - }, + }), ))); // Create a future that will return when Fin is sent with the data sent on the swarm. @@ -417,13 +420,13 @@ async fn close_inbound_session() { let mut mock_swarm = MockSwarm::default(); let inbound_session_id = InboundSessionId { value: 0 }; let _fut = mock_swarm.get_data_sent_to_inbound_session(inbound_session_id); - mock_swarm.pending_events.push(Event::Behaviour(StreamedBytesEvent::External( - ExternalEvent::NewInboundSession { + mock_swarm.pending_events.push(Event::Behaviour(mixed_behaviour::Event::ExternalEvent( + mixed_behaviour::ExternalEvent::StreamedBytes(GenericEvent::NewInboundSession { query: query_bytes, inbound_session_id, peer_id: PeerId::random(), protocol_name: crate::Protocol::SignedBlockHeader.into(), - }, + }), ))); // Initiate swarm notifier to notify upon session closed diff --git a/crates/papyrus_network/src/peer_manager/mod.rs b/crates/papyrus_network/src/peer_manager/mod.rs index 48ed584cfa..45fb3abbfb 100644 --- a/crates/papyrus_network/src/peer_manager/mod.rs +++ b/crates/papyrus_network/src/peer_manager/mod.rs @@ -66,7 +66,7 @@ impl

PeerManager

where P: PeerTrait, { - fn new(config: PeerManagerConfig) -> Self { + pub(crate) fn new(config: PeerManagerConfig) -> Self { let peers = HashMap::new(); Self { peers,