Skip to content

Commit

Permalink
Partially address testing
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv committed Sep 2, 2024
1 parent 042acec commit 9aec288
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 51 deletions.
50 changes: 32 additions & 18 deletions src/mock/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::error::Error;
use crate::error::{Error, SubstreamError};

use bytes::{Bytes, BytesMut};
use futures::{Sink, Stream};
Expand All @@ -31,15 +31,20 @@ use std::{

/// Trait which describes the behavior of a mock substream.
pub trait Substream:
Debug + Stream<Item = crate::Result<BytesMut>> + Sink<Bytes, Error = Error> + Send + Unpin + 'static
Debug
+ Stream<Item = Result<BytesMut, SubstreamError>>
+ Sink<Bytes, Error = SubstreamError>
+ Send
+ Unpin
+ 'static
{
}

/// Blanket implementation for [`Substream`].
impl<
T: Debug
+ Stream<Item = crate::Result<BytesMut>>
+ Sink<Bytes, Error = Error>
+ Stream<Item = Result<BytesMut, SubstreamError>>
+ Sink<Bytes, Error = SubstreamError>
+ Send
+ Unpin
+ 'static,
Expand All @@ -52,33 +57,33 @@ mockall::mock! {
pub Substream {}

impl Sink<bytes::Bytes> for Substream {
type Error = Error;
type Error = SubstreamError;

fn poll_ready<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Result<(), Error>>;
) -> Poll<Result<(), SubstreamError>>;

fn start_send(self: Pin<&mut Self>, item: bytes::Bytes) -> Result<(), Error>;
fn start_send(self: Pin<&mut Self>, item: bytes::Bytes) -> Result<(), SubstreamError>;

fn poll_flush<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Result<(), Error>>;
) -> Poll<Result<(), SubstreamError>>;

fn poll_close<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Result<(), Error>>;
) -> Poll<Result<(), SubstreamError>>;
}

impl Stream for Substream {
type Item = crate::Result<BytesMut>;
type Item = Result<BytesMut, SubstreamError>;

fn poll_next<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Option<crate::Result<BytesMut>>>;
) -> Poll<Option<Result<BytesMut, SubstreamError>>>;
}
}

Expand All @@ -95,32 +100,41 @@ impl DummySubstream {
}

impl Sink<bytes::Bytes> for DummySubstream {
type Error = Error;
type Error = SubstreamError;

fn poll_ready<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
fn poll_ready<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
Poll::Pending
}

fn start_send(self: Pin<&mut Self>, _item: bytes::Bytes) -> Result<(), Error> {
fn start_send(self: Pin<&mut Self>, _item: bytes::Bytes) -> Result<(), SubstreamError> {
Ok(())
}

fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
fn poll_flush<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
Poll::Pending
}

fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
fn poll_close<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
Poll::Ready(Ok(()))
}
}

impl Stream for DummySubstream {
type Item = crate::Result<BytesMut>;
type Item = Result<BytesMut, SubstreamError>;

fn poll_next<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Option<crate::Result<BytesMut>>> {
) -> Poll<Option<Result<BytesMut, SubstreamError>>> {
Poll::Pending
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ mod tests {
SubstreamId::new(),
permit,
) {
Err(Error::ChannelClogged) => {}
Err(SubstreamError::ChannelClogged) => {}
error => panic!("invalid error: {error:?}"),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ mod tests {
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown))));
.return_once(|_| Poll::Ready(Some(Err(crate::error::SubstreamError::ChannelClogged))));

executor.read_message(
peer,
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ mod protocol_set;
mod transport_service;

/// Substream direction.
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum Direction {
/// Substream was opened by the remote peer.
Inbound,
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/request_response/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async fn inbound_substream_error() {
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(Error::Unknown))));
.return_once(|_| Poll::Ready(Some(Err(SubstreamError::Unknown))));

// register inbound substream from peer
protocol
Expand Down Expand Up @@ -297,7 +297,7 @@ async fn request_failure_reported_once() {
}) => {
assert_eq!(request_peer, peer);
assert_eq!(request_id, RequestId::from(1337usize));
assert_eq!(error, RequestResponseError::Rejected);
assert!(matches!(error, RequestResponseError::Rejected(_)));
}
event => panic!("unexpected event: {event:?}"),
}
Expand Down
4 changes: 2 additions & 2 deletions src/substream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ impl Substream {
match &mut self.substream {
#[cfg(test)]
SubstreamType::Mock(ref mut substream) =>
futures::SinkExt::send(substream, bytes).await,
futures::SinkExt::send(substream, bytes).await.map_err(Into::into),
SubstreamType::Tcp(ref mut substream) => match self.codec {
ProtocolCodec::Unspecified => panic!("codec is unspecified"),
ProtocolCodec::Identity(payload_size) =>
Expand Down Expand Up @@ -942,7 +942,7 @@ mod tests {
assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..]));

match set.next().await {
Some((exited_peer, Err(Error::SubstreamError(SubstreamError::ConnectionClosed)))) => {
Some((exited_peer, Err(SubstreamError::ConnectionClosed))) => {
assert_eq!(peer, exited_peer);
}
_ => panic!("inavlid event received"),
Expand Down
17 changes: 6 additions & 11 deletions src/transport/manager/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ mod tests {
};

match handle.dial(&peer) {
Err(Error::AlreadyConnected) => {}
Err(ImmediateDialError::AlreadyConnected) => {}
_ => panic!("invalid return value"),
}
}
Expand Down Expand Up @@ -518,12 +518,8 @@ mod tests {
peer
};

match handle.dial(&peer) {
Err(Error::NoAddressAvailable(failed_peer)) => {
assert_eq!(failed_peer, peer);
}
_ => panic!("invalid return value"),
}
let err = handle.dial(&peer).unwrap_err();
assert!(matches!(err, ImmediateDialError::NoAddressAvailable));
}

#[tokio::test]
Expand Down Expand Up @@ -576,10 +572,9 @@ mod tests {
let (mut handle, mut rx) = make_transport_manager_handle();
handle.supported_transport.insert(SupportedTransport::Tcp);

match handle.dial(&handle.local_peer_id) {
Err(Error::TriedToDialSelf) => {}
_ => panic!("invalid return value"),
}
let err = handle.dial(&handle.local_peer_id).unwrap_err();
assert!(matches!(err, ImmediateDialError::NoAddressAvailable));

assert!(rx.try_recv().is_err());
}

Expand Down
32 changes: 18 additions & 14 deletions tests/protocol/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2008,29 +2008,29 @@ async fn feedback_received_for_succesful_response(transport1: Transport, transpo
.await
.unwrap();

assert_eq!(
assert!(matches!(
handle2.next().await.unwrap(),
RequestResponseEvent::RequestReceived {
peer: peer1,
fallback: None,
request_id,
request: vec![1, 3, 3, 7]
},
);
));

// send response with feedback and verify that the response was sent successfully
let (feedback_tx, feedback_rx) = channel::oneshot::channel();
handle2.send_response_with_feedback(request_id, vec![1, 3, 3, 8], feedback_tx);

assert_eq!(
assert!(matches!(
handle1.next().await.unwrap(),
RequestResponseEvent::ResponseReceived {
peer: peer2,
request_id,
response: vec![1, 3, 3, 8],
fallback: None,
}
);
));
assert!(feedback_rx.await.is_ok());
}

Expand Down Expand Up @@ -2865,27 +2865,31 @@ async fn binary_incompatible_fallback_two_fallback_protocols_inbound_request(
.await
.unwrap();

assert_eq!(
handle1.next().await.unwrap(),
match handle1.next().await.unwrap() {
RequestResponseEvent::RequestReceived {
peer: peer2,
fallback: Some(ProtocolName::from("/genesis/protocol/1")),
request_id,
request: vec![1, 2, 3, 4],
request,
} => {
assert_eq!(peer2, peer1);
assert_eq!(request_id, request_id);
assert_eq!(request, vec![1, 2, 3, 4]);
}
);
_ => panic!("unexpected event"),
};

handle1.send_response(request_id, vec![1, 3, 3, 7]);

assert_eq!(
assert!(matches!(
handle2.next().await.unwrap(),
RequestResponseEvent::ResponseReceived {
peer: peer1,
request_id,
response: vec![1, 3, 3, 7],
fallback: None,
}
);
));
}

#[tokio::test]
Expand Down Expand Up @@ -2983,25 +2987,25 @@ async fn binary_incompatible_fallback_compatible_nodes(
.await
.unwrap();

assert_eq!(
assert!(matches!(
handle2.next().await.unwrap(),
RequestResponseEvent::RequestReceived {
peer: peer1,
fallback: None,
request_id,
request: vec![1, 2, 3, 4],
}
);
));

handle2.send_response(request_id, vec![1, 3, 3, 7]);

assert_eq!(
assert!(matches!(
handle1.next().await.unwrap(),
RequestResponseEvent::ResponseReceived {
peer: peer2,
request_id,
response: vec![1, 3, 3, 7],
fallback: None,
}
);
));
}
2 changes: 1 addition & 1 deletion tests/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl UserProtocol for CustomProtocol {
}
Some(mut substream) => {
let payload = Bytes::from(payload);
let res = substream.send_framed(payload).await;
let res = substream.send_framed(payload).await.map_err(Into::into);
tx.send(res).unwrap();
let _ = substream.close().await;
}
Expand Down

0 comments on commit 9aec288

Please sign in to comment.