diff --git a/src/error.rs b/src/error.rs index 8cfa3181..3bb8a4cb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -279,7 +279,7 @@ pub enum DialError { } /// Dialing resulted in an immediate error before performing any network operations. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, Copy, Clone, Eq, PartialEq)] pub enum ImmediateDialError { /// The provided address does not include a peer ID. #[error("`PeerId` missing from the address")] diff --git a/src/mock/substream.rs b/src/mock/substream.rs index b1308abb..4c52394e 100644 --- a/src/mock/substream.rs +++ b/src/mock/substream.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::error::{Error, SubstreamError}; +use crate::error::SubstreamError; use bytes::{Bytes, BytesMut}; use futures::{Sink, Stream}; diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index a2e99adb..c2a04bf9 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -259,10 +259,9 @@ mod tests { let mut executor = QueryExecutor::new(); let peer = PeerId::random(); let mut substream = MockSubstream::new(); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown)))); + substream.expect_poll_next().times(1).return_once(|_| { + Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed))) + }); executor.read_message( peer, @@ -294,10 +293,9 @@ mod tests { substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(()))); substream.expect_start_send().times(1).return_once(|_| Ok(())); substream.expect_poll_flush().times(1).return_once(|_| Poll::Ready(Ok(()))); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown)))); + substream.expect_poll_next().times(1).return_once(|_| { + Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed))) + }); executor.send_request_read_response( peer, @@ -330,7 +328,7 @@ mod tests { substream .expect_poll_ready() .times(1) - .return_once(|_| Poll::Ready(Err(crate::Error::Unknown))); + .return_once(|_| Poll::Ready(Err(crate::error::SubstreamError::ConnectionClosed))); substream.expect_poll_close().times(1).return_once(|_| Poll::Ready(Ok(()))); executor.send_request_read_response( diff --git a/src/protocol/notification/negotiation.rs b/src/protocol/notification/negotiation.rs index 8d238dfe..9c53c760 100644 --- a/src/protocol/notification/negotiation.rs +++ b/src/protocol/notification/negotiation.rs @@ -328,7 +328,6 @@ mod tests { use crate::{ mock::substream::{DummySubstream, MockSubstream}, types::SubstreamId, - Error, }; use futures::StreamExt; @@ -344,7 +343,10 @@ mod tests { let mut substream = MockSubstream::new(); substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(()))); - substream.expect_start_send().times(1).return_once(|_| Err(Error::Unknown)); + substream + .expect_start_send() + .times(1) + .return_once(|_| Err(crate::error::SubstreamError::ConnectionClosed)); let peer = PeerId::random(); let substream = Substream::new_mock(peer, SubstreamId::from(0usize), Box::new(substream)); @@ -382,7 +384,7 @@ mod tests { substream .expect_poll_flush() .times(1) - .return_once(|_| Poll::Ready(Err(Error::Unknown))); + .return_once(|_| Poll::Ready(Err(crate::error::SubstreamError::ConnectionClosed))); let peer = PeerId::random(); let substream = Substream::new_mock(peer, SubstreamId::from(0usize), Box::new(substream)); diff --git a/src/protocol/notification/tests/notification.rs b/src/protocol/notification/tests/notification.rs index f766fc44..fc2c1ee7 100644 --- a/src/protocol/notification/tests/notification.rs +++ b/src/protocol/notification/tests/notification.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - error::Error, mock::substream::{DummySubstream, MockSubstream}, protocol::{ self, @@ -1029,7 +1028,7 @@ async fn second_inbound_substream_opened_while_outbound_substream_was_opening() substream1 .expect_poll_ready() .times(1) - .return_once(|_| Poll::Ready(Err(Error::Unknown))); + .return_once(|_| Poll::Ready(Err(SubstreamError::ConnectionClosed))); notif.peers.insert( peer, diff --git a/src/protocol/notification/tests/substream_validation.rs b/src/protocol/notification/tests/substream_validation.rs index cf2d6bb8..b2ac5054 100644 --- a/src/protocol/notification/tests/substream_validation.rs +++ b/src/protocol/notification/tests/substream_validation.rs @@ -256,7 +256,7 @@ async fn accept_fails_due_to_closed_substream() { substream .expect_poll_ready() .times(1) - .return_once(|_| Poll::Ready(Err(Error::SubstreamError(SubstreamError::ConnectionClosed)))); + .return_once(|_| Poll::Ready(Err(SubstreamError::ConnectionClosed))); let (proto_tx, _proto_rx) = channel(256); tx.send(InnerTransportEvent::ConnectionEstablished { diff --git a/src/protocol/request_response/handle.rs b/src/protocol/request_response/handle.rs index 3ffb8d7b..fc3e6b71 100644 --- a/src/protocol/request_response/handle.rs +++ b/src/protocol/request_response/handle.rs @@ -65,6 +65,14 @@ pub enum RequestResponseError { UnsupportedProtocol, } +impl std::cmp::PartialEq for RequestResponseError { + fn eq(&self, other: &Self) -> bool { + // We are not interested in the error details of rejections + // for equality checks. + core::mem::discriminant(self) == core::mem::discriminant(other) + } +} + /// The reason why a request was rejected. #[derive(Debug)] pub enum RejectReason { @@ -162,7 +170,7 @@ impl From for RequestResponseEvent { } /// Request-response events. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum RequestResponseEvent { /// Request received from remote RequestReceived { diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index fd2dd1f1..b717886f 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -34,7 +34,6 @@ use crate::{ use bytes::BytesMut; use futures::{channel, future::BoxFuture, stream::FuturesUnordered, StreamExt}; -use handle::RejectReason; use tokio::{ sync::{ mpsc::{Receiver, Sender}, @@ -54,7 +53,9 @@ use std::{ }; pub use config::{Config, ConfigBuilder}; -pub use handle::{DialOptions, RequestResponseError, RequestResponseEvent, RequestResponseHandle}; +pub use handle::{ + DialOptions, RejectReason, RequestResponseError, RequestResponseEvent, RequestResponseHandle, +}; mod config; mod handle; diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index 9c75fcbe..73d8ce86 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -189,7 +189,7 @@ async fn inbound_substream_error() { substream .expect_poll_next() .times(1) - .return_once(|_| Poll::Ready(Some(Err(SubstreamError::Unknown)))); + .return_once(|_| Poll::Ready(Some(Err(SubstreamError::ConnectionClosed)))); // register inbound substream from peer protocol diff --git a/src/transport/manager/handle.rs b/src/transport/manager/handle.rs index f0b42904..0183832b 100644 --- a/src/transport/manager/handle.rs +++ b/src/transport/manager/handle.rs @@ -573,7 +573,7 @@ mod tests { handle.supported_transport.insert(SupportedTransport::Tcp); let err = handle.dial(&handle.local_peer_id).unwrap_err(); - assert!(matches!(err, ImmediateDialError::NoAddressAvailable)); + assert_eq!(err, ImmediateDialError::TriedToDialSelf); assert!(rx.try_recv().is_err()); } diff --git a/tests/protocol/request_response.rs b/tests/protocol/request_response.rs index d34e85c2..ee962b87 100644 --- a/tests/protocol/request_response.rs +++ b/tests/protocol/request_response.rs @@ -22,8 +22,8 @@ use litep2p::{ config::ConfigBuilder as Litep2pConfigBuilder, crypto::ed25519::Keypair, protocol::request_response::{ - Config as RequestResponseConfig, ConfigBuilder, DialOptions, RequestResponseError, - RequestResponseEvent, + Config as RequestResponseConfig, ConfigBuilder, DialOptions, RejectReason, + RequestResponseError, RequestResponseEvent, }, transport::tcp::config::Config as TcpConfig, types::{protocol::ProtocolName, RequestId}, @@ -314,7 +314,7 @@ async fn reject_request(transport1: Transport, transport2: Transport) { RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected + error: RequestResponseError::Rejected(RejectReason::ConnectionClosed) } ); } @@ -789,7 +789,7 @@ async fn connection_close_while_request_is_pending(transport1: Transport, transp RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected, + error: RequestResponseError::Rejected(RejectReason::ConnectionClosed), } ); } @@ -1005,7 +1005,7 @@ async fn response_too_big(transport1: Transport, transport2: Transport) { RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected, + error: RequestResponseError::Rejected(RejectReason::ConnectionClosed), } ); } @@ -1569,7 +1569,7 @@ async fn dial_peer_but_no_known_address(transport1: Transport, transport2: Trans RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected, + error: RequestResponseError::Rejected(RejectReason::ConnectionClosed), } ); } @@ -1916,7 +1916,7 @@ async fn excess_inbound_request_rejected(transport1: Transport, transport2: Tran RequestResponseEvent::RequestFailed { peer: peer2, request_id, - error: RequestResponseError::Rejected + error: RequestResponseError::Rejected(RejectReason::ConnectionClosed) } ); } @@ -2008,7 +2008,7 @@ async fn feedback_received_for_succesful_response(transport1: Transport, transpo .await .unwrap(); - assert!(matches!( + assert_eq!( handle2.next().await.unwrap(), RequestResponseEvent::RequestReceived { peer: peer1, @@ -2016,13 +2016,13 @@ async fn feedback_received_for_succesful_response(transport1: Transport, transpo request_id, request: vec![1, 3, 3, 7] }, - )); + ); // send response with feedback and verify that the response was sent successfully let (feedback_tx, feedback_rx) = channel::oneshot::channel(); handle2.send_response_with_feedback(request_id, vec![1, 3, 3, 8], feedback_tx); - assert!(matches!( + assert_eq!( handle1.next().await.unwrap(), RequestResponseEvent::ResponseReceived { peer: peer2, @@ -2030,7 +2030,7 @@ async fn feedback_received_for_succesful_response(transport1: Transport, transpo response: vec![1, 3, 3, 8], fallback: None, } - )); + ); assert!(feedback_rx.await.is_ok()); } @@ -2350,7 +2350,7 @@ async fn dial_failure(transport: Transport) { RequestResponseEvent::RequestFailed { peer, request_id, - error: RequestResponseError::Rejected + error: RequestResponseError::Rejected(RejectReason::ConnectionClosed) } ); } @@ -2865,23 +2865,19 @@ async fn binary_incompatible_fallback_two_fallback_protocols_inbound_request( .await .unwrap(); - match handle1.next().await.unwrap() { + assert_eq!( + handle1.next().await.unwrap(), RequestResponseEvent::RequestReceived { peer: peer2, fallback: Some(ProtocolName::from("/genesis/protocol/1")), request_id, - request, - } => { - assert_eq!(peer2, peer1); - assert_eq!(request_id, request_id); - assert_eq!(request, vec![1, 2, 3, 4]); + request: vec![1, 2, 3, 4], } - _ => panic!("unexpected event"), - }; + ); handle1.send_response(request_id, vec![1, 3, 3, 7]); - assert!(matches!( + assert_eq!( handle2.next().await.unwrap(), RequestResponseEvent::ResponseReceived { peer: peer1, @@ -2889,7 +2885,7 @@ async fn binary_incompatible_fallback_two_fallback_protocols_inbound_request( response: vec![1, 3, 3, 7], fallback: None, } - )); + ); } #[tokio::test] @@ -2987,7 +2983,7 @@ async fn binary_incompatible_fallback_compatible_nodes( .await .unwrap(); - assert!(matches!( + assert_eq!( handle2.next().await.unwrap(), RequestResponseEvent::RequestReceived { peer: peer1, @@ -2995,11 +2991,11 @@ async fn binary_incompatible_fallback_compatible_nodes( request_id, request: vec![1, 2, 3, 4], } - )); + ); handle2.send_response(request_id, vec![1, 3, 3, 7]); - assert!(matches!( + assert_eq!( handle1.next().await.unwrap(), RequestResponseEvent::ResponseReceived { peer: peer2, @@ -3007,5 +3003,5 @@ async fn binary_incompatible_fallback_compatible_nodes( response: vec![1, 3, 3, 7], fallback: None, } - )); + ); } diff --git a/tests/substream.rs b/tests/substream.rs index 1dde6d3b..dbce163b 100644 --- a/tests/substream.rs +++ b/tests/substream.rs @@ -21,6 +21,7 @@ use litep2p::{ codec::ProtocolCodec, config::ConfigBuilder, + error::SubstreamError, protocol::{Direction, TransportEvent, TransportService, UserProtocol}, substream::{Substream, SubstreamSet}, transport::tcp::config::Config as TcpConfig, @@ -169,7 +170,7 @@ impl UserProtocol for CustomProtocol { } Some(mut substream) => { let payload = Bytes::from(payload); - let res = substream.send(payload).await; + let res = substream.send(payload).await.map_err(Into::into); tx.send(res).unwrap(); let _ = substream.close().await; } @@ -411,12 +412,15 @@ async fn too_big_identity_payload_sink(transport1: Transport, transport2: Transp panic!("failed to open substream"); }; - // send too large paylod to peer + // send too large payload to peer let (tx, rx) = oneshot::channel(); tx1.send(Command::SendPayloadSink(peer2, vec![0u8; 16], tx)).await.unwrap(); match rx.await { Ok(Err(Error::IoError(ErrorKind::PermissionDenied))) => {} + Ok(Err(Error::SubstreamError(SubstreamError::IoError( + ErrorKind::PermissionDenied, + )))) => {} event => panic!("invalid event received: {event:?}"), } }