Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 4, 2024
1 parent ade7419 commit ec0e0b4
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 1 deletion.
2 changes: 1 addition & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void = "1.0.2"
prometheus-client = { workspace = true }

[dev-dependencies]
async-std = { version = "1.6.3", features = ["unstable"] }
async-std = { version = "1.6.3", features = ["unstable", "attributes"] }
hex = "0.4.2"
libp2p-core = { workspace = true }
libp2p-yamux = { workspace = true }
Expand Down
279 changes: 279 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use async_std::net::Ipv4Addr;
use byteorder::{BigEndian, ByteOrder};
use libp2p_core::ConnectedPoint;
use rand::Rng;
use std::future;
use std::thread::sleep;

#[derive(Default, Debug)]
Expand Down Expand Up @@ -5237,3 +5238,281 @@ fn test_graft_without_subscribe() {
// We unsubscribe from the topic.
let _ = gs.unsubscribe(&Topic::new(topic));
}

#[test]
fn test_all_queues_full() {
let gs_config = ConfigBuilder::default()
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();

let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap();

let topic_hash = Topic::new("Test").hash();
let mut peers = vec![];
let mut topics = BTreeSet::new();
topics.insert(topic_hash.clone());

let peer_id = PeerId::random();
peers.push(peer_id);
gs.connected_peers.insert(
peer_id,
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(2),
},
);

let publish_data = vec![0; 42];
gs.publish(topic_hash.clone(), publish_data.clone())
.unwrap();
let publish_data = vec![2; 59];
let err = gs.publish(topic_hash, publish_data).unwrap_err();
assert!(matches!(err, PublishError::AllQueuesFull(f) if f == 1));
}

#[test]
fn test_slow_peer_returns_failed_publish() {
let gs_config = ConfigBuilder::default()
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();

let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap();

let topic_hash = Topic::new("Test").hash();
let mut peers = vec![];
let mut topics = BTreeSet::new();
topics.insert(topic_hash.clone());

let slow_peer_id = PeerId::random();
peers.push(slow_peer_id);
gs.connected_peers.insert(
slow_peer_id,
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(2),
},
);
let peer_id = PeerId::random();
peers.push(peer_id);
gs.connected_peers.insert(
peer_id,
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(gs.config.connection_handler_queue_len()),
},
);

let publish_data = vec![0; 42];
gs.publish(topic_hash.clone(), publish_data.clone())
.unwrap();
let publish_data = vec![2; 59];
gs.publish(topic_hash.clone(), publish_data).unwrap();
gs.heartbeat();

gs.heartbeat();

let slow_peer_failed_messages = match gs.events.pop_front().unwrap() {
ToSwarm::GenerateEvent(Event::SlowPeer {
peer_id,
failed_messages,
}) if peer_id == slow_peer_id => failed_messages,
_ => panic!("invalid event"),
};

let failed_messages = FailedMessages {
publish: 1,
forward: 0,
priority: 1,
non_priority: 0,
};

assert_eq!(slow_peer_failed_messages.priority, failed_messages.priority);
assert_eq!(
slow_peer_failed_messages.non_priority,
failed_messages.non_priority
);
assert_eq!(slow_peer_failed_messages.publish, failed_messages.publish);
assert_eq!(slow_peer_failed_messages.forward, failed_messages.forward);
}

#[test]
fn test_slow_peer_returns_failed_forward() {
let gs_config = ConfigBuilder::default()
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();

let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap();

let topic_hash = Topic::new("Test").hash();
let mut peers = vec![];
let mut topics = BTreeSet::new();
topics.insert(topic_hash.clone());

let slow_peer_id = PeerId::random();
peers.push(slow_peer_id);
gs.connected_peers.insert(
slow_peer_id,
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(2),
},
);
peers.push(slow_peer_id);
let mesh = gs.mesh.entry(topic_hash.clone()).or_default();
mesh.insert(slow_peer_id);

let peer_id = PeerId::random();
peers.push(peer_id);
gs.connected_peers.insert(
peer_id,
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(gs.config.connection_handler_queue_len()),
},
);

let publish_data = vec![1; 59];
let transformed = gs
.data_transform
.outbound_transform(&topic_hash, publish_data.clone())
.unwrap();
let raw_message = gs
.build_raw_message(topic_hash.clone(), transformed)
.unwrap();
let msg_id = gs.config.message_id(&Message {
source: raw_message.source,
data: publish_data,
sequence_number: raw_message.sequence_number,
topic: raw_message.topic.clone(),
});

gs.forward_msg(&msg_id, raw_message.clone(), None, HashSet::new());
gs.forward_msg(&msg_id, raw_message, None, HashSet::new());

gs.heartbeat();

let slow_peer_failed_messages = gs
.events
.into_iter()
.find_map(|e| match e {
ToSwarm::GenerateEvent(Event::SlowPeer {
peer_id,
failed_messages,
}) if peer_id == slow_peer_id => Some(failed_messages),
_ => None,
})
.unwrap();

let failed_messages = FailedMessages {
publish: 0,
forward: 1,
priority: 0,
non_priority: 1,
};

assert_eq!(slow_peer_failed_messages.priority, failed_messages.priority);
assert_eq!(
slow_peer_failed_messages.non_priority,
failed_messages.non_priority
);
assert_eq!(slow_peer_failed_messages.publish, failed_messages.publish);
assert_eq!(slow_peer_failed_messages.forward, failed_messages.forward);
}

#[test]
fn test_slow_peer_is_downscored_on_publish() {
let gs_config = ConfigBuilder::default()
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();

let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap();
let slow_peer_params = PeerScoreParams::default();
gs.with_peer_score(slow_peer_params.clone(), PeerScoreThresholds::default())
.unwrap();

let topic_hash = Topic::new("Test").hash();
let mut peers = vec![];
let mut topics = BTreeSet::new();
topics.insert(topic_hash.clone());

let slow_peer_id = PeerId::random();
peers.push(slow_peer_id);
let mesh = gs.mesh.entry(topic_hash.clone()).or_default();
mesh.insert(slow_peer_id);
gs.connected_peers.insert(
slow_peer_id,
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(2),
},
);
gs.peer_score.as_mut().unwrap().0.add_peer(slow_peer_id);
let peer_id = PeerId::random();
peers.push(peer_id);
gs.connected_peers.insert(
peer_id,
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(gs.config.connection_handler_queue_len()),
},
);

let publish_data = vec![0; 42];
gs.publish(topic_hash.clone(), publish_data.clone())
.unwrap();
let publish_data = vec![2; 59];
gs.publish(topic_hash.clone(), publish_data).unwrap();
gs.heartbeat();
let slow_peer_score = gs.peer_score(&slow_peer_id).unwrap();
assert_eq!(slow_peer_score, slow_peer_params.slow_peer_weight);
}

#[async_std::test]
async fn test_timedout_messages_are_reported() {
let gs_config = ConfigBuilder::default()
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();

let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap();

let mut sender = RpcSender::new(2);
let topic_hash = Topic::new("Test").hash();
let publish_data = vec![2; 59];
let raw_message = gs.build_raw_message(topic_hash, publish_data).unwrap();

sender
.publish(raw_message, Duration::from_nanos(1), None)
.unwrap();
let mut receiver = sender.new_receiver();
let stale = future::poll_fn(|cx| receiver.poll_stale(cx)).await.unwrap();
assert!(matches!(stale, RpcOut::Publish { .. }));
}

#[test]
fn test_priority_messages_are_always_sent() {
let mut sender = RpcSender::new(2);
let topic_hash = Topic::new("Test").hash();
// Fill the buffer with the first message.
sender.subscribe(topic_hash.clone());
sender.subscribe(topic_hash.clone());
sender.unsubscribe(topic_hash.clone());
}

0 comments on commit ec0e0b4

Please sign in to comment.