Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

Commit

Permalink
fix(network): report callback reports the originating peer
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed May 22, 2024
1 parent e15a3aa commit 9869d04
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 12 deletions.
21 changes: 17 additions & 4 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod test;

use std::collections::HashMap;

use futures::channel::mpsc::{Receiver, Sender};
use futures::channel::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender};
use futures::future::pending;
use futures::stream::{self, BoxStream, SelectAll};
use futures::{FutureExt, StreamExt};
Expand Down Expand Up @@ -49,6 +49,9 @@ pub struct GenericNetworkManager<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrai
broadcasted_messages_senders: HashMap<Topic, Sender<(Bytes, ReportCallback)>>,
query_id_to_inbound_session_id: HashMap<QueryId, InboundSessionId>,
outbound_session_id_to_protocol: HashMap<OutboundSessionId, Protocol>,
reported_peer_receiver: UnboundedReceiver<PeerId>,
// We keep this just for giving a clone of it for subscribers.
reported_peer_sender: UnboundedSender<PeerId>,
// Fields for metrics
num_active_inbound_sessions: usize,
num_active_outbound_sessions: usize,
Expand All @@ -65,6 +68,7 @@ impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBE
.map(|(query_receiver, _)| query_receiver.next().boxed())
.unwrap_or(pending().boxed()) => self.handle_sync_subscriber_query(res),
Some((topic, message)) = self.messages_to_broadcast_receivers.next() => self.broadcast_message(message, topic),
Some(peer_id) = self.reported_peer_receiver.next() => self.swarm.report_peer(peer_id),
}
}
}
Expand All @@ -75,6 +79,7 @@ impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBE
header_buffer_size: usize,
) -> Self {
gauge!(papyrus_metrics::PAPYRUS_NUM_CONNECTED_PEERS, 0f64);
let (reported_peer_sender, reported_peer_receiver) = futures::channel::mpsc::unbounded();
Self {
swarm,
db_executor,
Expand All @@ -85,6 +90,8 @@ impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBE
broadcasted_messages_senders: HashMap::new(),
query_id_to_inbound_session_id: HashMap::new(),
outbound_session_id_to_protocol: HashMap::new(),
reported_peer_sender,
reported_peer_receiver,
num_active_inbound_sessions: 0,
num_active_outbound_sessions: 0,
}
Expand Down Expand Up @@ -333,13 +340,19 @@ impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBE

fn handle_broadcast_behaviour_event(&mut self, event: broadcast::ExternalEvent) {
match event {
broadcast::ExternalEvent::Received { originated_peer_id: _peer_id, message, topic } => {
broadcast::ExternalEvent::Received { originated_peer_id, message, topic } => {
let Some(sender) = self.broadcasted_messages_senders.get_mut(&topic) else {
error!("Received a message from a topic we're not subscribed to: {topic:?}");
return;
};
// TODO(shahak): Implement the report callback.
let send_result = sender.try_send((message, Box::new(|| {})));
let reported_peer_sender = self.reported_peer_sender.clone();
let send_result = sender.try_send((
message,
Box::new(move || {
// TODO(shahak): Check if we can panic in case of error.
let _ = reported_peer_sender.unbounded_send(originated_peer_id);
}),
));
if let Err(e) = send_result {
if e.is_disconnected() {
panic!("Receiver was dropped. This should never happen.")
Expand Down
7 changes: 7 additions & 0 deletions crates/papyrus_network/src/network_manager/swarm_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent};
use libp2p::{Multiaddr, PeerId, Swarm};

use crate::broadcast::Topic;
use crate::peer_manager::ReputationModifier;
use crate::streamed_bytes::behaviour::{PeerNotConnected, SessionIdNotFoundError};
use crate::streamed_bytes::{Bytes, InboundSessionId, OutboundSessionId};
use crate::{mixed_behaviour, Protocol};
Expand Down Expand Up @@ -38,6 +39,8 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
fn add_external_address(&mut self, address: Multiaddr);

fn broadcast_message(&mut self, message: Bytes, topic: Topic);

fn report_peer(&mut self, peer_id: PeerId);
}

impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
Expand Down Expand Up @@ -84,4 +87,8 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
fn broadcast_message(&mut self, message: Bytes, topic: Topic) {
self.behaviour_mut().broadcast.broadcast_message(message, topic);
}

fn report_peer(&mut self, peer_id: PeerId) {
let _ = self.behaviour_mut().peer_manager.report_peer(peer_id, ReputationModifier::Bad {});
}
}
33 changes: 26 additions & 7 deletions crates/papyrus_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct MockSwarm {
pub pending_events: Queue<Event>,
pub sent_queries: Vec<(InternalQuery, PeerId)>,
broadcasted_messages_senders: Vec<UnboundedSender<(Bytes, Topic)>>,
reported_peer_senders: Vec<UnboundedSender<PeerId>>,
inbound_session_id_to_data_sender: HashMap<InboundSessionId, UnboundedSender<Data>>,
next_outbound_session_id: usize,
first_polled_event_notifier: Option<oneshot::Sender<()>>,
Expand Down Expand Up @@ -96,6 +97,12 @@ impl MockSwarm {
receiver
}

pub fn get_reported_peers_stream(&mut self) -> impl Stream<Item = PeerId> {
let (sender, receiver) = unbounded();
self.reported_peer_senders.push(sender);
receiver
}

fn create_received_data_events_for_query(
&self,
query: InternalQuery,
Expand Down Expand Up @@ -200,6 +207,12 @@ impl SwarmTrait for MockSwarm {
sender.unbounded_send((message.clone(), topic.clone())).unwrap();
}
}

fn report_peer(&mut self, peer_id: PeerId) {
for sender in &self.reported_peer_senders {
sender.unbounded_send(peer_id).unwrap();
}
}
}

#[derive(Default)]
Expand Down Expand Up @@ -461,18 +474,20 @@ async fn broadcast_message() {
}

#[tokio::test]
async fn receive_broadcasted_message() {
async fn receive_broadcasted_message_and_report_it() {
let topic = "TOPIC".to_owned();
let message = vec![1u8, 2u8, 3u8];
let originated_peer_id = PeerId::random();

let mock_swarm = MockSwarm::default();
let mut mock_swarm = MockSwarm::default();
mock_swarm.pending_events.push(Event::Behaviour(mixed_behaviour::Event::ExternalEvent(
mixed_behaviour::ExternalEvent::Broadcast(broadcast::ExternalEvent::Received {
originated_peer_id: PeerId::random(),
originated_peer_id,
message: message.clone(),
topic: topic.clone(),
}),
)));
let mut reported_peer_receiver = mock_swarm.get_reported_peers_stream();

let mock_db_executor = MockDBExecutor::default();
let mut network_manager =
Expand All @@ -485,11 +500,15 @@ async fn receive_broadcasted_message() {
tokio::select! {
_ = network_manager.run() => panic!("network manager ended"),
result = tokio::time::timeout(
TIMEOUT, broadcasted_messages_receiver.next()
TIMEOUT, async move {
let (actual_message, report_callback) =
broadcasted_messages_receiver.next().await.unwrap();
assert_eq!(message, actual_message);
report_callback();
assert_eq!(originated_peer_id, reported_peer_receiver.next().await.unwrap());
}
) => {
let (actual_message, _report_callback) = result.unwrap().unwrap();
assert_eq!(message, actual_message);
// TODO(shahak): Call the report callback once it's implemented.
result.unwrap()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ where
})
}

fn report_peer(
pub(crate) fn report_peer(
&mut self,
peer_id: PeerId,
reason: ReputationModifier,
Expand Down

0 comments on commit 9869d04

Please sign in to comment.