From ec0e0b4c7a3c24fceb69b15e79b95628a0ef08cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 31 Oct 2024 18:38:42 +0000 Subject: [PATCH] add tests --- protocols/gossipsub/Cargo.toml | 2 +- protocols/gossipsub/src/behaviour/tests.rs | 279 +++++++++++++++++++++ 2 files changed, 280 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index dbde81eeee0..b2745aa90e7 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -44,7 +44,7 @@ void = "1.0.2" prometheus-client = { workspace = true } [dev-dependencies] -async-std = { version = "1.6.3", features = ["unstable"] } +async-std = { version = "1.6.3", features = ["unstable", "attributes"] } hex = "0.4.2" libp2p-core = { workspace = true } libp2p-yamux = { workspace = true } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 7b142c47629..0ba23996078 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -28,6 +28,7 @@ use async_std::net::Ipv4Addr; use byteorder::{BigEndian, ByteOrder}; use libp2p_core::ConnectedPoint; use rand::Rng; +use std::future; use std::thread::sleep; #[derive(Default, Debug)] @@ -5237,3 +5238,281 @@ fn test_graft_without_subscribe() { // We unsubscribe from the topic. let _ = gs.unsubscribe(&Topic::new(topic)); } + +#[test] +fn test_all_queues_full() { + let gs_config = ConfigBuilder::default() + .validation_mode(ValidationMode::Permissive) + .build() + .unwrap(); + + let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap(); + + let topic_hash = Topic::new("Test").hash(); + let mut peers = vec![]; + let mut topics = BTreeSet::new(); + topics.insert(topic_hash.clone()); + + let peer_id = PeerId::random(); + peers.push(peer_id); + gs.connected_peers.insert( + peer_id, + PeerConnections { + kind: PeerKind::Gossipsubv1_1, + connections: vec![ConnectionId::new_unchecked(0)], + topics: topics.clone(), + sender: RpcSender::new(2), + }, + ); + + let publish_data = vec![0; 42]; + gs.publish(topic_hash.clone(), publish_data.clone()) + .unwrap(); + let publish_data = vec![2; 59]; + let err = gs.publish(topic_hash, publish_data).unwrap_err(); + assert!(matches!(err, PublishError::AllQueuesFull(f) if f == 1)); +} + +#[test] +fn test_slow_peer_returns_failed_publish() { + let gs_config = ConfigBuilder::default() + .validation_mode(ValidationMode::Permissive) + .build() + .unwrap(); + + let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap(); + + let topic_hash = Topic::new("Test").hash(); + let mut peers = vec![]; + let mut topics = BTreeSet::new(); + topics.insert(topic_hash.clone()); + + let slow_peer_id = PeerId::random(); + peers.push(slow_peer_id); + gs.connected_peers.insert( + slow_peer_id, + PeerConnections { + kind: PeerKind::Gossipsubv1_1, + connections: vec![ConnectionId::new_unchecked(0)], + topics: topics.clone(), + sender: RpcSender::new(2), + }, + ); + let peer_id = PeerId::random(); + peers.push(peer_id); + gs.connected_peers.insert( + peer_id, + PeerConnections { + kind: PeerKind::Gossipsubv1_1, + connections: vec![ConnectionId::new_unchecked(0)], + topics: topics.clone(), + sender: RpcSender::new(gs.config.connection_handler_queue_len()), + }, + ); + + let publish_data = vec![0; 42]; + gs.publish(topic_hash.clone(), publish_data.clone()) + .unwrap(); + let publish_data = vec![2; 59]; + gs.publish(topic_hash.clone(), publish_data).unwrap(); + gs.heartbeat(); + + gs.heartbeat(); + + let slow_peer_failed_messages = match gs.events.pop_front().unwrap() { + ToSwarm::GenerateEvent(Event::SlowPeer { + peer_id, + failed_messages, + }) if peer_id == slow_peer_id => failed_messages, + _ => panic!("invalid event"), + }; + + let failed_messages = FailedMessages { + publish: 1, + forward: 0, + priority: 1, + non_priority: 0, + }; + + assert_eq!(slow_peer_failed_messages.priority, failed_messages.priority); + assert_eq!( + slow_peer_failed_messages.non_priority, + failed_messages.non_priority + ); + assert_eq!(slow_peer_failed_messages.publish, failed_messages.publish); + assert_eq!(slow_peer_failed_messages.forward, failed_messages.forward); +} + +#[test] +fn test_slow_peer_returns_failed_forward() { + let gs_config = ConfigBuilder::default() + .validation_mode(ValidationMode::Permissive) + .build() + .unwrap(); + + let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap(); + + let topic_hash = Topic::new("Test").hash(); + let mut peers = vec![]; + let mut topics = BTreeSet::new(); + topics.insert(topic_hash.clone()); + + let slow_peer_id = PeerId::random(); + peers.push(slow_peer_id); + gs.connected_peers.insert( + slow_peer_id, + PeerConnections { + kind: PeerKind::Gossipsubv1_1, + connections: vec![ConnectionId::new_unchecked(0)], + topics: topics.clone(), + sender: RpcSender::new(2), + }, + ); + peers.push(slow_peer_id); + let mesh = gs.mesh.entry(topic_hash.clone()).or_default(); + mesh.insert(slow_peer_id); + + let peer_id = PeerId::random(); + peers.push(peer_id); + gs.connected_peers.insert( + peer_id, + PeerConnections { + kind: PeerKind::Gossipsubv1_1, + connections: vec![ConnectionId::new_unchecked(0)], + topics: topics.clone(), + sender: RpcSender::new(gs.config.connection_handler_queue_len()), + }, + ); + + let publish_data = vec![1; 59]; + let transformed = gs + .data_transform + .outbound_transform(&topic_hash, publish_data.clone()) + .unwrap(); + let raw_message = gs + .build_raw_message(topic_hash.clone(), transformed) + .unwrap(); + let msg_id = gs.config.message_id(&Message { + source: raw_message.source, + data: publish_data, + sequence_number: raw_message.sequence_number, + topic: raw_message.topic.clone(), + }); + + gs.forward_msg(&msg_id, raw_message.clone(), None, HashSet::new()); + gs.forward_msg(&msg_id, raw_message, None, HashSet::new()); + + gs.heartbeat(); + + let slow_peer_failed_messages = gs + .events + .into_iter() + .find_map(|e| match e { + ToSwarm::GenerateEvent(Event::SlowPeer { + peer_id, + failed_messages, + }) if peer_id == slow_peer_id => Some(failed_messages), + _ => None, + }) + .unwrap(); + + let failed_messages = FailedMessages { + publish: 0, + forward: 1, + priority: 0, + non_priority: 1, + }; + + assert_eq!(slow_peer_failed_messages.priority, failed_messages.priority); + assert_eq!( + slow_peer_failed_messages.non_priority, + failed_messages.non_priority + ); + assert_eq!(slow_peer_failed_messages.publish, failed_messages.publish); + assert_eq!(slow_peer_failed_messages.forward, failed_messages.forward); +} + +#[test] +fn test_slow_peer_is_downscored_on_publish() { + let gs_config = ConfigBuilder::default() + .validation_mode(ValidationMode::Permissive) + .build() + .unwrap(); + + let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap(); + let slow_peer_params = PeerScoreParams::default(); + gs.with_peer_score(slow_peer_params.clone(), PeerScoreThresholds::default()) + .unwrap(); + + let topic_hash = Topic::new("Test").hash(); + let mut peers = vec![]; + let mut topics = BTreeSet::new(); + topics.insert(topic_hash.clone()); + + let slow_peer_id = PeerId::random(); + peers.push(slow_peer_id); + let mesh = gs.mesh.entry(topic_hash.clone()).or_default(); + mesh.insert(slow_peer_id); + gs.connected_peers.insert( + slow_peer_id, + PeerConnections { + kind: PeerKind::Gossipsubv1_1, + connections: vec![ConnectionId::new_unchecked(0)], + topics: topics.clone(), + sender: RpcSender::new(2), + }, + ); + gs.peer_score.as_mut().unwrap().0.add_peer(slow_peer_id); + let peer_id = PeerId::random(); + peers.push(peer_id); + gs.connected_peers.insert( + peer_id, + PeerConnections { + kind: PeerKind::Gossipsubv1_1, + connections: vec![ConnectionId::new_unchecked(0)], + topics: topics.clone(), + sender: RpcSender::new(gs.config.connection_handler_queue_len()), + }, + ); + + let publish_data = vec![0; 42]; + gs.publish(topic_hash.clone(), publish_data.clone()) + .unwrap(); + let publish_data = vec![2; 59]; + gs.publish(topic_hash.clone(), publish_data).unwrap(); + gs.heartbeat(); + let slow_peer_score = gs.peer_score(&slow_peer_id).unwrap(); + assert_eq!(slow_peer_score, slow_peer_params.slow_peer_weight); +} + +#[async_std::test] +async fn test_timedout_messages_are_reported() { + let gs_config = ConfigBuilder::default() + .validation_mode(ValidationMode::Permissive) + .build() + .unwrap(); + + let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap(); + + let mut sender = RpcSender::new(2); + let topic_hash = Topic::new("Test").hash(); + let publish_data = vec![2; 59]; + let raw_message = gs.build_raw_message(topic_hash, publish_data).unwrap(); + + sender + .publish(raw_message, Duration::from_nanos(1), None) + .unwrap(); + let mut receiver = sender.new_receiver(); + let stale = future::poll_fn(|cx| receiver.poll_stale(cx)).await.unwrap(); + assert!(matches!(stale, RpcOut::Publish { .. })); +} + +#[test] +fn test_priority_messages_are_always_sent() { + let mut sender = RpcSender::new(2); + let topic_hash = Topic::new("Test").hash(); + // Fill the buffer with the first message. + sender.subscribe(topic_hash.clone()); + sender.subscribe(topic_hash.clone()); + sender.unsubscribe(topic_hash.clone()); +}