Skip to content

Commit

Permalink
Fix testing
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv committed Sep 2, 2024
1 parent 9aec288 commit 6c48ada
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub enum DialError {
}

/// Dialing resulted in an immediate error before performing any network operations.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Copy, Clone, Eq, PartialEq)]
pub enum ImmediateDialError {
/// The provided address does not include a peer ID.
#[error("`PeerId` missing from the address")]
Expand Down
2 changes: 1 addition & 1 deletion src/mock/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::error::{Error, SubstreamError};
use crate::error::SubstreamError;

use bytes::{Bytes, BytesMut};
use futures::{Sink, Stream};
Expand Down
16 changes: 7 additions & 9 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,9 @@ mod tests {
let mut executor = QueryExecutor::new();
let peer = PeerId::random();
let mut substream = MockSubstream::new();
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown))));
substream.expect_poll_next().times(1).return_once(|_| {
Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed)))
});

executor.read_message(
peer,
Expand Down Expand Up @@ -294,10 +293,9 @@ mod tests {
substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(())));
substream.expect_start_send().times(1).return_once(|_| Ok(()));
substream.expect_poll_flush().times(1).return_once(|_| Poll::Ready(Ok(())));
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown))));
substream.expect_poll_next().times(1).return_once(|_| {
Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed)))
});

executor.send_request_read_response(
peer,
Expand Down Expand Up @@ -330,7 +328,7 @@ mod tests {
substream
.expect_poll_ready()
.times(1)
.return_once(|_| Poll::Ready(Err(crate::Error::Unknown)));
.return_once(|_| Poll::Ready(Err(crate::error::SubstreamError::ConnectionClosed)));
substream.expect_poll_close().times(1).return_once(|_| Poll::Ready(Ok(())));

executor.send_request_read_response(
Expand Down
8 changes: 5 additions & 3 deletions src/protocol/notification/negotiation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ mod tests {
use crate::{
mock::substream::{DummySubstream, MockSubstream},
types::SubstreamId,
Error,
};
use futures::StreamExt;

Expand All @@ -344,7 +343,10 @@ mod tests {

let mut substream = MockSubstream::new();
substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(())));
substream.expect_start_send().times(1).return_once(|_| Err(Error::Unknown));
substream
.expect_start_send()
.times(1)
.return_once(|_| Err(crate::error::SubstreamError::ConnectionClosed));

let peer = PeerId::random();
let substream = Substream::new_mock(peer, SubstreamId::from(0usize), Box::new(substream));
Expand Down Expand Up @@ -382,7 +384,7 @@ mod tests {
substream
.expect_poll_flush()
.times(1)
.return_once(|_| Poll::Ready(Err(Error::Unknown)));
.return_once(|_| Poll::Ready(Err(crate::error::SubstreamError::ConnectionClosed)));

let peer = PeerId::random();
let substream = Substream::new_mock(peer, SubstreamId::from(0usize), Box::new(substream));
Expand Down
3 changes: 1 addition & 2 deletions src/protocol/notification/tests/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
error::Error,
mock::substream::{DummySubstream, MockSubstream},
protocol::{
self,
Expand Down Expand Up @@ -1029,7 +1028,7 @@ async fn second_inbound_substream_opened_while_outbound_substream_was_opening()
substream1
.expect_poll_ready()
.times(1)
.return_once(|_| Poll::Ready(Err(Error::Unknown)));
.return_once(|_| Poll::Ready(Err(SubstreamError::ConnectionClosed)));

notif.peers.insert(
peer,
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/notification/tests/substream_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async fn accept_fails_due_to_closed_substream() {
substream
.expect_poll_ready()
.times(1)
.return_once(|_| Poll::Ready(Err(Error::SubstreamError(SubstreamError::ConnectionClosed))));
.return_once(|_| Poll::Ready(Err(SubstreamError::ConnectionClosed)));

let (proto_tx, _proto_rx) = channel(256);
tx.send(InnerTransportEvent::ConnectionEstablished {
Expand Down
10 changes: 9 additions & 1 deletion src/protocol/request_response/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ pub enum RequestResponseError {
UnsupportedProtocol,
}

impl std::cmp::PartialEq for RequestResponseError {
fn eq(&self, other: &Self) -> bool {
// We are not interested in the error details of rejections
// for equality checks.
core::mem::discriminant(self) == core::mem::discriminant(other)
}
}

/// The reason why a request was rejected.
#[derive(Debug)]
pub enum RejectReason {
Expand Down Expand Up @@ -162,7 +170,7 @@ impl From<InnerRequestResponseEvent> for RequestResponseEvent {
}

/// Request-response events.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum RequestResponseEvent {
/// Request received from remote
RequestReceived {
Expand Down
5 changes: 3 additions & 2 deletions src/protocol/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::{

use bytes::BytesMut;
use futures::{channel, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use handle::RejectReason;
use tokio::{
sync::{
mpsc::{Receiver, Sender},
Expand All @@ -54,7 +53,9 @@ use std::{
};

pub use config::{Config, ConfigBuilder};
pub use handle::{DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle};
pub use handle::{
DialOptions, RejectReason, RequestResponseError, RequestResponseEvent, RequestResponseHandle,
};

mod config;
mod handle;
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/request_response/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async fn inbound_substream_error() {
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(SubstreamError::Unknown))));
.return_once(|_| Poll::Ready(Some(Err(SubstreamError::ConnectionClosed))));

// register inbound substream from peer
protocol
Expand Down
2 changes: 1 addition & 1 deletion src/transport/manager/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ mod tests {
handle.supported_transport.insert(SupportedTransport::Tcp);

let err = handle.dial(&handle.local_peer_id).unwrap_err();
assert!(matches!(err, ImmediateDialError::NoAddressAvailable));
assert_eq!(err, ImmediateDialError::TriedToDialSelf);

assert!(rx.try_recv().is_err());
}
Expand Down
48 changes: 22 additions & 26 deletions tests/protocol/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use litep2p::{
config::ConfigBuilder as Litep2pConfigBuilder,
crypto::ed25519::Keypair,
protocol::request_response::{
Config as RequestResponseConfig, ConfigBuilder, DialOptions, RequestResponseError,
RequestResponseEvent,
Config as RequestResponseConfig, ConfigBuilder, DialOptions, RejectReason,
RequestResponseError, RequestResponseEvent,
},
transport::tcp::config::Config as TcpConfig,
types::{protocol::ProtocolName, RequestId},
Expand Down Expand Up @@ -314,7 +314,7 @@ async fn reject_request(transport1: Transport, transport2: Transport) {
RequestResponseEvent::RequestFailed {
peer: peer2,
request_id,
error: RequestResponseError::Rejected
error: RequestResponseError::Rejected(RejectReason::ConnectionClosed)
}
);
}
Expand Down Expand Up @@ -789,7 +789,7 @@ async fn connection_close_while_request_is_pending(transport1: Transport, transp
RequestResponseEvent::RequestFailed {
peer: peer2,
request_id,
error: RequestResponseError::Rejected,
error: RequestResponseError::Rejected(RejectReason::ConnectionClosed),
}
);
}
Expand Down Expand Up @@ -1005,7 +1005,7 @@ async fn response_too_big(transport1: Transport, transport2: Transport) {
RequestResponseEvent::RequestFailed {
peer: peer2,
request_id,
error: RequestResponseError::Rejected,
error: RequestResponseError::Rejected(RejectReason::ConnectionClosed),
}
);
}
Expand Down Expand Up @@ -1569,7 +1569,7 @@ async fn dial_peer_but_no_known_address(transport1: Transport, transport2: Trans
RequestResponseEvent::RequestFailed {
peer: peer2,
request_id,
error: RequestResponseError::Rejected,
error: RequestResponseError::Rejected(RejectReason::ConnectionClosed),
}
);
}
Expand Down Expand Up @@ -1916,7 +1916,7 @@ async fn excess_inbound_request_rejected(transport1: Transport, transport2: Tran
RequestResponseEvent::RequestFailed {
peer: peer2,
request_id,
error: RequestResponseError::Rejected
error: RequestResponseError::Rejected(RejectReason::ConnectionClosed)
}
);
}
Expand Down Expand Up @@ -2008,29 +2008,29 @@ async fn feedback_received_for_succesful_response(transport1: Transport, transpo
.await
.unwrap();

assert!(matches!(
assert_eq!(
handle2.next().await.unwrap(),
RequestResponseEvent::RequestReceived {
peer: peer1,
fallback: None,
request_id,
request: vec![1, 3, 3, 7]
},
));
);

// send response with feedback and verify that the response was sent successfully
let (feedback_tx, feedback_rx) = channel::oneshot::channel();
handle2.send_response_with_feedback(request_id, vec![1, 3, 3, 8], feedback_tx);

assert!(matches!(
assert_eq!(
handle1.next().await.unwrap(),
RequestResponseEvent::ResponseReceived {
peer: peer2,
request_id,
response: vec![1, 3, 3, 8],
fallback: None,
}
));
);
assert!(feedback_rx.await.is_ok());
}

Expand Down Expand Up @@ -2350,7 +2350,7 @@ async fn dial_failure(transport: Transport) {
RequestResponseEvent::RequestFailed {
peer,
request_id,
error: RequestResponseError::Rejected
error: RequestResponseError::Rejected(RejectReason::ConnectionClosed)
}
);
}
Expand Down Expand Up @@ -2865,31 +2865,27 @@ async fn binary_incompatible_fallback_two_fallback_protocols_inbound_request(
.await
.unwrap();

match handle1.next().await.unwrap() {
assert_eq!(
handle1.next().await.unwrap(),
RequestResponseEvent::RequestReceived {
peer: peer2,
fallback: Some(ProtocolName::from("/genesis/protocol/1")),
request_id,
request,
} => {
assert_eq!(peer2, peer1);
assert_eq!(request_id, request_id);
assert_eq!(request, vec![1, 2, 3, 4]);
request: vec![1, 2, 3, 4],
}
_ => panic!("unexpected event"),
};
);

handle1.send_response(request_id, vec![1, 3, 3, 7]);

assert!(matches!(
assert_eq!(
handle2.next().await.unwrap(),
RequestResponseEvent::ResponseReceived {
peer: peer1,
request_id,
response: vec![1, 3, 3, 7],
fallback: None,
}
));
);
}

#[tokio::test]
Expand Down Expand Up @@ -2987,25 +2983,25 @@ async fn binary_incompatible_fallback_compatible_nodes(
.await
.unwrap();

assert!(matches!(
assert_eq!(
handle2.next().await.unwrap(),
RequestResponseEvent::RequestReceived {
peer: peer1,
fallback: None,
request_id,
request: vec![1, 2, 3, 4],
}
));
);

handle2.send_response(request_id, vec![1, 3, 3, 7]);

assert!(matches!(
assert_eq!(
handle1.next().await.unwrap(),
RequestResponseEvent::ResponseReceived {
peer: peer2,
request_id,
response: vec![1, 3, 3, 7],
fallback: None,
}
));
);
}
8 changes: 6 additions & 2 deletions tests/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use litep2p::{
codec::ProtocolCodec,
config::ConfigBuilder,
error::SubstreamError,
protocol::{Direction, TransportEvent, TransportService, UserProtocol},
substream::{Substream, SubstreamSet},
transport::tcp::config::Config as TcpConfig,
Expand Down Expand Up @@ -169,7 +170,7 @@ impl UserProtocol for CustomProtocol {
}
Some(mut substream) => {
let payload = Bytes::from(payload);
let res = substream.send(payload).await;
let res = substream.send(payload).await.map_err(Into::into);
tx.send(res).unwrap();
let _ = substream.close().await;
}
Expand Down Expand Up @@ -411,12 +412,15 @@ async fn too_big_identity_payload_sink(transport1: Transport, transport2: Transp
panic!("failed to open substream");
};

// send too large paylod to peer
// send too large payload to peer
let (tx, rx) = oneshot::channel();
tx1.send(Command::SendPayloadSink(peer2, vec![0u8; 16], tx)).await.unwrap();

match rx.await {
Ok(Err(Error::IoError(ErrorKind::PermissionDenied))) => {}
Ok(Err(Error::SubstreamError(SubstreamError::IoError(
ErrorKind::PermissionDenied,
)))) => {}
event => panic!("invalid event received: {event:?}"),
}
}
Expand Down

0 comments on commit 6c48ada

Please sign in to comment.