Skip to content

Commit

Permalink
feat(network): mixed behaviour in network manager (#1921)
Browse files Browse the repository at this point in the history
* feat(network): mixed behaviour in network manager

* feat(network): handle mixed behaviour events in net mngr
  • Loading branch information
nagmo-starkware authored Apr 18, 2024
1 parent c5fe07e commit f09ee9c
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 55 deletions.
11 changes: 6 additions & 5 deletions crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,15 @@ fn dial_if_requested(swarm: &mut Swarm<Behaviour>, args: &Args) {
async fn main() {
let args = Args::parse();

let config = Config {
session_timeout: Duration::from_secs(3600),
supported_inbound_protocols: vec![PROTOCOL_NAME],
};
let mut swarm = build_swarm(
vec![args.listen_address.clone()],
Duration::from_secs(args.idle_connection_timeout),
Behaviour::new(config),
|_| {
Behaviour::new(Config {
session_timeout: Duration::from_secs(3600),
supported_inbound_protocols: vec![PROTOCOL_NAME],
})
},
);

let mut outbound_session_measurements = HashMap::new();
Expand Down
6 changes: 3 additions & 3 deletions crates/papyrus_network/src/bin_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::str::FromStr;
use std::time::Duration;

use libp2p::identity::Keypair;
use libp2p::identity::{Keypair, PublicKey};
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{noise, yamux, Multiaddr, Swarm, SwarmBuilder};
Expand All @@ -10,7 +10,7 @@ use tracing::debug;
pub fn build_swarm<Behaviour: NetworkBehaviour>(
listen_addresses: Vec<String>,
idle_connection_timeout: Duration,
behaviour: Behaviour,
behaviour: impl Fn(PublicKey) -> Behaviour,
) -> Swarm<Behaviour>
where
{
Expand All @@ -27,7 +27,7 @@ where
.expect("Error building TCP transport")
// TODO: quic transpot does not work (failure appears in the command line when running in debug mode)
// .with_quic()
.with_behaviour(|_| behaviour)
.with_behaviour(|key| behaviour(key.public()))
.expect("Error while building the swarm")
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(idle_connection_timeout))
.build();
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_network/src/discovery/kad_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::discovery;
use crate::main_behaviour::mixed_behaviour;
use crate::main_behaviour::mixed_behaviour::BridgedBehaviour;

#[derive(Debug)]
pub enum KadFromOtherBehaviourEvent {
RequestKadQuery(PeerId),
FoundListenAddresses { peer_id: PeerId, listen_addresses: Vec<Multiaddr> },
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockHash, BlockHeader, BlockNumber, BlockSignature};
use starknet_api::state::ThinStateDiff;

// TODO: add peer manager config to the network config
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct NetworkConfig {
pub tcp_port: u16,
Expand Down
3 changes: 3 additions & 0 deletions crates/papyrus_network/src/main_behaviour/mixed_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ pub struct MixedBehaviour {
pub streamed_bytes: streamed_bytes::Behaviour,
}

#[derive(Debug)]
pub enum Event {
ExternalEvent(ExternalEvent),
#[allow(dead_code)]
InternalEvent(InternalEvent),
}

#[derive(Debug)]
pub enum ExternalEvent {
StreamedBytes(streamed_bytes::behaviour::ExternalEvent),
}

#[derive(Debug)]
pub enum InternalEvent {
NoOp,
NotifyKad(KadFromOtherBehaviourEvent),
Expand Down
122 changes: 97 additions & 25 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use futures::channel::mpsc::{Receiver, Sender};
use futures::future::pending;
use futures::stream::{self, BoxStream, SelectAll};
use futures::{FutureExt, StreamExt};
use libp2p::kad::store::MemoryStore;
use libp2p::swarm::{DialError, SwarmEvent};
use libp2p::{PeerId, Swarm};
use libp2p::{identify, kad, Multiaddr, PeerId, Swarm};
use metrics::gauge;
use papyrus_common::metrics as papyrus_metrics;
use papyrus_storage::StorageReader;
Expand All @@ -20,9 +21,27 @@ use self::swarm_trait::SwarmTrait;
use crate::bin_utils::build_swarm;
use crate::converters::{Router, RouterError};
use crate::db_executor::{self, BlockHeaderDBExecutor, DBExecutor, Data, QueryId};
use crate::streamed_bytes::behaviour::{Behaviour, Event, ExternalEvent};
use crate::streamed_bytes::{Config, InboundSessionId, OutboundSessionId, SessionId};
use crate::{DataType, NetworkConfig, PeerAddressConfig, Protocol, Query, ResponseReceivers};
use crate::main_behaviour::mixed_behaviour::{self, BridgedBehaviour};
use crate::peer_manager::PeerManagerConfig;
use crate::streamed_bytes::behaviour::SessionError;
use crate::streamed_bytes::{
self,
Config,
GenericEvent,
InboundSessionId,
OutboundSessionId,
SessionId,
};
use crate::{
discovery,
peer_manager,
DataType,
NetworkConfig,
PeerAddressConfig,
Protocol,
Query,
ResponseReceivers,
};

type StreamCollection = SelectAll<BoxStream<'static, (Data, InboundSessionId)>>;
type SubscriberChannels = (Receiver<Query>, Router);
Expand Down Expand Up @@ -101,7 +120,7 @@ impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecut
(sender, response_receiver)
}

fn handle_swarm_event(&mut self, event: SwarmEvent<Event>) {
fn handle_swarm_event(&mut self, event: SwarmEvent<mixed_behaviour::Event>) {
match event {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
self.peer_id = Some(peer_id);
Expand Down Expand Up @@ -177,12 +196,46 @@ impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecut
};
}

fn handle_behaviour_event(&mut self, event: Event) {
let Event::External(event) = event else {
unimplemented!();
};
fn handle_behaviour_event(&mut self, event: mixed_behaviour::Event) {
match event {
mixed_behaviour::Event::ExternalEvent(external_event) => {
self.handle_behaviour_external_event(external_event);
}
mixed_behaviour::Event::InternalEvent(internal_event) => {
self.handle_behaviour_internal_event(internal_event);
}
}
}

fn handle_behaviour_external_event(&mut self, event: mixed_behaviour::ExternalEvent) {
match event {
ExternalEvent::NewInboundSession {
mixed_behaviour::ExternalEvent::StreamedBytes(event) => {
self.handle_stream_bytes_behaviour_event(event);
}
}
}

fn handle_behaviour_internal_event(&mut self, event: mixed_behaviour::InternalEvent) {
match event {
mixed_behaviour::InternalEvent::NoOp => {}
mixed_behaviour::InternalEvent::NotifyKad(_) => {
self.swarm.behaviour_mut().kademlia.on_other_behaviour_event(event)
}
mixed_behaviour::InternalEvent::NotifyDiscovery(_) => {
self.swarm.behaviour_mut().discovery.on_other_behaviour_event(event)
}
mixed_behaviour::InternalEvent::NotifyStreamedBytes(_) => {
self.swarm.behaviour_mut().streamed_bytes.on_other_behaviour_event(event)
}
mixed_behaviour::InternalEvent::NotifyPeerManager(_) => {
self.swarm.behaviour_mut().peer_manager.on_other_behaviour_event(event)
}
}
}

fn handle_stream_bytes_behaviour_event(&mut self, event: GenericEvent<SessionError>) {
match event {
streamed_bytes::behaviour::ExternalEvent::NewInboundSession {
query,
inbound_session_id,
peer_id: _,
Expand Down Expand Up @@ -212,7 +265,10 @@ impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecut
.boxed(),
);
}
ExternalEvent::ReceivedData { outbound_session_id, data } => {
streamed_bytes::behaviour::ExternalEvent::ReceivedData {
outbound_session_id,
data,
} => {
trace!(
"Received data from peer for session id: {outbound_session_id:?}. sending to \
sync subscriber."
Expand Down Expand Up @@ -243,15 +299,17 @@ impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecut
}
}
}
ExternalEvent::SessionFailed { session_id, error } => {
streamed_bytes::behaviour::ExternalEvent::SessionFailed { session_id, error } => {
error!("Session {session_id:?} failed on {error:?}");
self.report_session_removed_to_metrics(session_id);
// TODO: Handle reputation and retry.
if let SessionId::OutboundSessionId(outbound_session_id) = session_id {
self.outbound_session_id_to_protocol.remove(&outbound_session_id);
}
}
ExternalEvent::SessionFinishedSuccessfully { session_id } => {
streamed_bytes::behaviour::ExternalEvent::SessionFinishedSuccessfully {
session_id,
} => {
debug!("Session completed successfully. session_id: {session_id:?}");
self.report_session_removed_to_metrics(session_id);
if let SessionId::OutboundSessionId(outbound_session_id) = session_id {
Expand Down Expand Up @@ -357,7 +415,8 @@ impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecut
}
}

pub type NetworkManager = GenericNetworkManager<BlockHeaderDBExecutor, Swarm<Behaviour>>;
pub type NetworkManager =
GenericNetworkManager<BlockHeaderDBExecutor, Swarm<mixed_behaviour::MixedBehaviour>>;

impl NetworkManager {
pub fn new(config: NetworkConfig, storage_reader: StorageReader) -> Self {
Expand All @@ -375,17 +434,30 @@ impl NetworkManager {
// format!("/ip4/0.0.0.0/udp/{quic_port}/quic-v1"),
format!("/ip4/0.0.0.0/tcp/{tcp_port}"),
];
let swarm = build_swarm(
listen_addresses,
idle_connection_timeout,
Behaviour::new(Config {
session_timeout,
supported_inbound_protocols: vec![
Protocol::SignedBlockHeader.into(),
Protocol::StateDiff.into(),
],
}),
);
// TODO: get config details from network manager config
// TODO: consider extraction this to a function of mixed_behaviour module
// TODO: change kadimilia protocol name
let behaviour = |key| {
let local_peer_id = PeerId::from_public_key(&key);
mixed_behaviour::MixedBehaviour {
peer_manager: peer_manager::PeerManager::new(PeerManagerConfig::default()),
// TODO: add real bootstrap peer
discovery: discovery::Behaviour::new(PeerId::random(), Multiaddr::empty()),
identify: identify::Behaviour::new(identify::Config::new(
"/staknet/identify/0.1.0-rc.0".to_string(),
key,
)),
kademlia: kad::Behaviour::new(local_peer_id, MemoryStore::new(local_peer_id)),
streamed_bytes: streamed_bytes::Behaviour::new(Config {
session_timeout,
supported_inbound_protocols: vec![
Protocol::SignedBlockHeader.into(),
Protocol::StateDiff.into(),
],
}),
}
};
let swarm = build_swarm(listen_addresses, idle_connection_timeout, behaviour);

let db_executor = BlockHeaderDBExecutor::new(storage_reader);
Self::generic_new(swarm, db_executor, header_buffer_size, peer)
Expand Down
19 changes: 13 additions & 6 deletions crates/papyrus_network/src/network_manager/swarm_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent};
use libp2p::{Multiaddr, PeerId, Swarm};

use crate::streamed_bytes::behaviour::{Behaviour, PeerNotConnected, SessionIdNotFoundError};
use crate::main_behaviour::mixed_behaviour;
use crate::streamed_bytes::behaviour::{PeerNotConnected, SessionIdNotFoundError};
use crate::streamed_bytes::{InboundSessionId, OutboundSessionId};
use crate::{PeerAddressConfig, Protocol};

pub type Event = SwarmEvent<<Behaviour as NetworkBehaviour>::ToSwarm>;
pub type Event = SwarmEvent<<mixed_behaviour::MixedBehaviour as NetworkBehaviour>::ToSwarm>;

pub trait SwarmTrait: Stream<Item = Event> + Unpin {
fn send_length_prefixed_data(
Expand All @@ -32,15 +33,17 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
&mut self,
session_id: InboundSessionId,
) -> Result<(), SessionIdNotFoundError>;

fn behaviour_mut(&mut self) -> &mut mixed_behaviour::MixedBehaviour;
}

impl SwarmTrait for Swarm<Behaviour> {
impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
fn send_length_prefixed_data(
&mut self,
data: Vec<u8>,
inbound_session_id: InboundSessionId,
) -> Result<(), SessionIdNotFoundError> {
self.behaviour_mut().send_length_prefixed_data(data, inbound_session_id)
self.behaviour_mut().streamed_bytes.send_length_prefixed_data(data, inbound_session_id)
}

fn send_query(
Expand All @@ -49,7 +52,7 @@ impl SwarmTrait for Swarm<Behaviour> {
peer_id: PeerId,
protocol: Protocol,
) -> Result<OutboundSessionId, PeerNotConnected> {
self.behaviour_mut().send_query(query, peer_id, protocol.into())
self.behaviour_mut().streamed_bytes.send_query(query, peer_id, protocol.into())
}

fn dial(&mut self, peer: PeerAddressConfig) -> Result<(), DialError> {
Expand All @@ -68,6 +71,10 @@ impl SwarmTrait for Swarm<Behaviour> {
&mut self,
session_id: InboundSessionId,
) -> Result<(), SessionIdNotFoundError> {
self.behaviour_mut().close_inbound_session(session_id)
self.behaviour_mut().streamed_bytes.close_inbound_session(session_id)
}

fn behaviour_mut(&mut self) -> &mut mixed_behaviour::MixedBehaviour {
self.behaviour_mut()
}
}
Loading

0 comments on commit f09ee9c

Please sign in to comment.