Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

Commit

Permalink
refactor(network): rename streamed bytes to sqmr (#2032)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama authored May 29, 2024
1 parent 7f9ecc4 commit da9fe10
Show file tree
Hide file tree
Showing 22 changed files with 43 additions and 57 deletions.
12 changes: 3 additions & 9 deletions crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@ use futures::StreamExt;
use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, StreamProtocol, Swarm};
use papyrus_network::bin_utils::{build_swarm, dial};
use papyrus_network::streamed_bytes::behaviour::{Behaviour, Event, ExternalEvent, SessionError};
use papyrus_network::streamed_bytes::messages::with_length_prefix;
use papyrus_network::streamed_bytes::{
Bytes,
Config,
InboundSessionId,
OutboundSessionId,
SessionId,
};
use papyrus_network::sqmr::behaviour::{Behaviour, Event, ExternalEvent, SessionError};
use papyrus_network::sqmr::messages::with_length_prefix;
use papyrus_network::sqmr::{Bytes, Config, InboundSessionId, OutboundSessionId, SessionId};

const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/papyrus/bench/1");
const CONST_BYTE: u8 = 1;
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_network/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use libp2p::{Multiaddr, PeerId};
use crate::mixed_behaviour;
use crate::mixed_behaviour::BridgedBehaviour;
// TODO(shahak): move Bytes to a more generic file.
use crate::streamed_bytes::Bytes;
use crate::sqmr::Bytes;

pub struct Behaviour;

Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod discovery;
pub mod mixed_behaviour;
pub mod network_manager;
mod peer_manager;
pub mod streamed_bytes;
pub mod sqmr;
#[cfg(test)]
mod test_utils;
mod utils;
Expand Down
12 changes: 6 additions & 6 deletions crates/papyrus_network/src/mixed_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use libp2p::{identify, kad, Multiaddr, PeerId};
use crate::discovery::identify_impl::{IdentifyToOtherBehaviourEvent, IDENTIFY_PROTOCOL_VERSION};
use crate::discovery::kad_impl::KadToOtherBehaviourEvent;
use crate::peer_manager::PeerManagerConfig;
use crate::{broadcast, discovery, peer_manager, streamed_bytes};
use crate::{broadcast, discovery, peer_manager, sqmr};

// TODO: consider reducing the pulicity of all behaviour to pub(crate)
#[derive(NetworkBehaviour)]
Expand All @@ -21,7 +21,7 @@ pub struct MixedBehaviour {
pub identify: identify::Behaviour,
// TODO(shahak): Consider using a different store.
pub kademlia: kad::Behaviour<MemoryStore>,
pub streamed_bytes: streamed_bytes::Behaviour,
pub sqmr: sqmr::Behaviour,
pub broadcast: broadcast::Behaviour,
}

Expand All @@ -33,7 +33,7 @@ pub enum Event {

#[derive(Debug)]
pub enum ExternalEvent {
StreamedBytes(streamed_bytes::behaviour::ExternalEvent),
Sqmr(sqmr::behaviour::ExternalEvent),
Broadcast(broadcast::ExternalEvent),
}

Expand All @@ -44,7 +44,7 @@ pub enum ToOtherBehaviourEvent {
Kad(KadToOtherBehaviourEvent),
Discovery(discovery::ToOtherBehaviourEvent),
PeerManager(peer_manager::ToOtherBehaviourEvent),
StreamedBytes(streamed_bytes::ToOtherBehaviourEvent),
Sqmr(sqmr::ToOtherBehaviourEvent),
}

pub trait BridgedBehaviour {
Expand All @@ -57,7 +57,7 @@ impl MixedBehaviour {
pub fn new(
key: PublicKey,
bootstrap_peer_multiaddr: Option<Multiaddr>,
streamed_bytes_config: streamed_bytes::Config,
streamed_bytes_config: sqmr::Config,
) -> Self {
let local_peer_id = PeerId::from_public_key(&key);
Self {
Expand All @@ -78,7 +78,7 @@ impl MixedBehaviour {
)),
// TODO: change kademlia protocol name
kademlia: kad::Behaviour::new(local_peer_id, MemoryStore::new(local_peer_id)),
streamed_bytes: streamed_bytes::Behaviour::new(streamed_bytes_config),
sqmr: sqmr::Behaviour::new(streamed_bytes_config),
broadcast: broadcast::Behaviour::new(),
}
}
Expand Down
28 changes: 10 additions & 18 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use papyrus_common::metrics as papyrus_metrics;
use papyrus_protobuf::protobuf;
use papyrus_storage::StorageReader;
use prost::Message;
use streamed_bytes::Bytes;
use sqmr::Bytes;
use tracing::{debug, error, info, trace};

use self::swarm_trait::SwarmTrait;
Expand All @@ -25,7 +25,7 @@ use crate::broadcast::Topic;
use crate::converters::{Router, RouterError};
use crate::db_executor::{self, DBExecutor, DBExecutorTrait, Data, QueryId};
use crate::mixed_behaviour::{self, BridgedBehaviour};
use crate::streamed_bytes::{self, InboundSessionId, OutboundSessionId, SessionId};
use crate::sqmr::{self, InboundSessionId, OutboundSessionId, SessionId};
use crate::utils::StreamHashMap;
use crate::{broadcast, DataType, NetworkConfig, Protocol, Query, ResponseReceivers};

Expand Down Expand Up @@ -220,7 +220,7 @@ impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBE

fn handle_behaviour_external_event(&mut self, event: mixed_behaviour::ExternalEvent) {
match event {
mixed_behaviour::ExternalEvent::StreamedBytes(event) => {
mixed_behaviour::ExternalEvent::Sqmr(event) => {
self.handle_stream_bytes_behaviour_event(event);
}
mixed_behaviour::ExternalEvent::Broadcast(event) => {
Expand All @@ -239,17 +239,14 @@ impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBE
if let Some(discovery) = self.swarm.behaviour_mut().discovery.as_mut() {
discovery.on_other_behaviour_event(&event);
}
self.swarm.behaviour_mut().streamed_bytes.on_other_behaviour_event(&event);
self.swarm.behaviour_mut().sqmr.on_other_behaviour_event(&event);
self.swarm.behaviour_mut().peer_manager.on_other_behaviour_event(&event);
self.swarm.behaviour_mut().broadcast.on_other_behaviour_event(&event);
}

fn handle_stream_bytes_behaviour_event(
&mut self,
event: streamed_bytes::behaviour::ExternalEvent,
) {
fn handle_stream_bytes_behaviour_event(&mut self, event: sqmr::behaviour::ExternalEvent) {
match event {
streamed_bytes::behaviour::ExternalEvent::NewInboundSession {
sqmr::behaviour::ExternalEvent::NewInboundSession {
query,
inbound_session_id,
peer_id: _,
Expand Down Expand Up @@ -279,10 +276,7 @@ impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBE
.boxed(),
);
}
streamed_bytes::behaviour::ExternalEvent::ReceivedData {
outbound_session_id,
data,
} => {
sqmr::behaviour::ExternalEvent::ReceivedData { outbound_session_id, data } => {
trace!(
"Received data from peer for session id: {outbound_session_id:?}. sending to \
sync subscriber."
Expand Down Expand Up @@ -313,17 +307,15 @@ impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBE
}
}
}
streamed_bytes::behaviour::ExternalEvent::SessionFailed { session_id, error } => {
sqmr::behaviour::ExternalEvent::SessionFailed { session_id, error } => {
error!("Session {session_id:?} failed on {error:?}");
self.report_session_removed_to_metrics(session_id);
// TODO: Handle reputation and retry.
if let SessionId::OutboundSessionId(outbound_session_id) = session_id {
self.outbound_session_id_to_protocol.remove(&outbound_session_id);
}
}
streamed_bytes::behaviour::ExternalEvent::SessionFinishedSuccessfully {
session_id,
} => {
sqmr::behaviour::ExternalEvent::SessionFinishedSuccessfully { session_id } => {
debug!("Session completed successfully. session_id: {session_id:?}");
self.report_session_removed_to_metrics(session_id);
if let SessionId::OutboundSessionId(outbound_session_id) = session_id {
Expand Down Expand Up @@ -455,7 +447,7 @@ impl NetworkManager {
mixed_behaviour::MixedBehaviour::new(
key,
bootstrap_peer_multiaddr.clone(),
streamed_bytes::Config {
sqmr::Config {
session_timeout,
supported_inbound_protocols: vec![
Protocol::SignedBlockHeader.into(),
Expand Down
10 changes: 5 additions & 5 deletions crates/papyrus_network/src/network_manager/swarm_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent};
use libp2p::{Multiaddr, PeerId, Swarm};

use crate::broadcast::Topic;
use crate::streamed_bytes::behaviour::{PeerNotConnected, SessionIdNotFoundError};
use crate::streamed_bytes::{Bytes, InboundSessionId, OutboundSessionId};
use crate::sqmr::behaviour::{PeerNotConnected, SessionIdNotFoundError};
use crate::sqmr::{Bytes, InboundSessionId, OutboundSessionId};
use crate::{mixed_behaviour, Protocol};

pub type Event = SwarmEvent<<mixed_behaviour::MixedBehaviour as NetworkBehaviour>::ToSwarm>;
Expand Down Expand Up @@ -46,7 +46,7 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
data: Vec<u8>,
inbound_session_id: InboundSessionId,
) -> Result<(), SessionIdNotFoundError> {
self.behaviour_mut().streamed_bytes.send_length_prefixed_data(data, inbound_session_id)
self.behaviour_mut().sqmr.send_length_prefixed_data(data, inbound_session_id)
}

// TODO: change this function signature
Expand All @@ -56,7 +56,7 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
_peer_id: PeerId,
protocol: Protocol,
) -> Result<OutboundSessionId, PeerNotConnected> {
Ok(self.behaviour_mut().streamed_bytes.start_query(query, protocol.into()))
Ok(self.behaviour_mut().sqmr.start_query(query, protocol.into()))
}

fn dial(&mut self, peer_multiaddr: Multiaddr) -> Result<(), DialError> {
Expand All @@ -70,7 +70,7 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
&mut self,
session_id: InboundSessionId,
) -> Result<(), SessionIdNotFoundError> {
self.behaviour_mut().streamed_bytes.close_inbound_session(session_id)
self.behaviour_mut().sqmr.close_inbound_session(session_id)
}

fn behaviour_mut(&mut self) -> &mut mixed_behaviour::MixedBehaviour {
Expand Down
10 changes: 5 additions & 5 deletions crates/papyrus_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use crate::db_executor::{
FetchBlockDataFromDb,
QueryId,
};
use crate::streamed_bytes::behaviour::{PeerNotConnected, SessionIdNotFoundError};
use crate::streamed_bytes::{Bytes, GenericEvent, InboundSessionId, OutboundSessionId};
use crate::sqmr::behaviour::{PeerNotConnected, SessionIdNotFoundError};
use crate::sqmr::{Bytes, GenericEvent, InboundSessionId, OutboundSessionId};
use crate::{broadcast, mixed_behaviour, DataType};

const TIMEOUT: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -111,7 +111,7 @@ impl MockSwarm {
let data_bytes =
protobuf::BlockHeadersResponse::from(Some(signed_header)).encode_to_vec();
self.pending_events.push(Event::Behaviour(mixed_behaviour::Event::ExternalEvent(
mixed_behaviour::ExternalEvent::StreamedBytes(GenericEvent::ReceivedData {
mixed_behaviour::ExternalEvent::Sqmr(GenericEvent::ReceivedData {
data: data_bytes,
outbound_session_id,
}),
Expand Down Expand Up @@ -331,7 +331,7 @@ async fn process_incoming_query() {
.encode(&mut query_bytes)
.unwrap();
mock_swarm.pending_events.push(Event::Behaviour(mixed_behaviour::Event::ExternalEvent(
mixed_behaviour::ExternalEvent::StreamedBytes(GenericEvent::NewInboundSession {
mixed_behaviour::ExternalEvent::Sqmr(GenericEvent::NewInboundSession {
query: query_bytes,
inbound_session_id,
peer_id: PeerId::random(),
Expand Down Expand Up @@ -402,7 +402,7 @@ async fn close_inbound_session() {
let inbound_session_id = InboundSessionId { value: 0 };
let _fut = mock_swarm.get_data_sent_to_inbound_session(inbound_session_id);
mock_swarm.pending_events.push(Event::Behaviour(mixed_behaviour::Event::ExternalEvent(
mixed_behaviour::ExternalEvent::StreamedBytes(GenericEvent::NewInboundSession {
mixed_behaviour::ExternalEvent::Sqmr(GenericEvent::NewInboundSession {
query: query_bytes,
inbound_session_id,
peer_id: PeerId::random(),
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_network/src/peer_manager/behaviour_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{debug, error};

use super::peer::PeerTrait;
use super::{PeerManager, PeerManagerError};
use crate::streamed_bytes::OutboundSessionId;
use crate::sqmr::OutboundSessionId;

#[derive(Debug)]
pub enum ToOtherBehaviourEvent {
Expand Down
8 changes: 4 additions & 4 deletions crates/papyrus_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use tracing::info;
pub use self::behaviour_impl::ToOtherBehaviourEvent;
use self::peer::PeerTrait;
use crate::mixed_behaviour::BridgedBehaviour;
use crate::streamed_bytes::OutboundSessionId;
use crate::{mixed_behaviour, streamed_bytes};
use crate::sqmr::OutboundSessionId;
use crate::{mixed_behaviour, sqmr};

pub(crate) mod behaviour_impl;
pub(crate) mod peer;
Expand Down Expand Up @@ -188,8 +188,8 @@ impl From<ToOtherBehaviourEvent> for mixed_behaviour::Event {

impl<P: PeerTrait + 'static> BridgedBehaviour for PeerManager<P> {
fn on_other_behaviour_event(&mut self, event: &mixed_behaviour::ToOtherBehaviourEvent) {
let mixed_behaviour::ToOtherBehaviourEvent::StreamedBytes(
streamed_bytes::ToOtherBehaviourEvent::RequestPeerAssignment { outbound_session_id },
let mixed_behaviour::ToOtherBehaviourEvent::Sqmr(
sqmr::ToOtherBehaviourEvent::RequestPeerAssignment { outbound_session_id },
) = event
else {
return;
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_network/src/peer_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::time::sleep;
use super::behaviour_impl::ToOtherBehaviourEvent;
use crate::peer_manager::peer::{MockPeerTrait, Peer, PeerTrait};
use crate::peer_manager::{PeerManager, PeerManagerConfig, ReputationModifier};
use crate::streamed_bytes::OutboundSessionId;
use crate::sqmr::OutboundSessionId;

#[test]
fn peer_assignment_round_robin() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,11 @@ impl From<Event> for mixed_behaviour::Event {
fn from(event: Event) -> Self {
match event {
Event::External(external_event) => {
Self::ExternalEvent(mixed_behaviour::ExternalEvent::StreamedBytes(external_event))
Self::ExternalEvent(mixed_behaviour::ExternalEvent::Sqmr(external_event))
}
Event::ToOtherBehaviourEvent(event) => {
Self::ToOtherBehaviourEvent(mixed_behaviour::ToOtherBehaviourEvent::Sqmr(event))
}
Event::ToOtherBehaviourEvent(event) => Self::ToOtherBehaviourEvent(
mixed_behaviour::ToOtherBehaviourEvent::StreamedBytes(event),
),
}
}
}
4 changes: 2 additions & 2 deletions crates/papyrus_network/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;

use crate::streamed_bytes::Bytes;
use crate::sqmr::Bytes;
use crate::utils::StreamHashMap;

/// Create two streams that are connected to each other. Return them and a join handle for a thread
Expand Down Expand Up @@ -44,7 +44,7 @@ pub(crate) fn dummy_data() -> Vec<Bytes> {
vec![vec![1u8], vec![2u8, 3u8], vec![4u8, 5u8, 6u8]]
}

impl crate::streamed_bytes::Config {
impl crate::sqmr::Config {
pub fn get_test_config() -> Self {
Self {
session_timeout: Duration::MAX,
Expand Down

0 comments on commit da9fe10

Please sign in to comment.