Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

errors: Introduce immediate dial error and request-response rejection reasons #227

Merged
merged 14 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 62 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ pub enum Error {
ConnectionDoesntExist(ConnectionId),
#[error("Exceeded connection limits `{0:?}`")]
ConnectionLimit(ConnectionLimitsError),
#[error("Failed to dial peer immediately")]
ImmediateDialError(#[from] ImmediateDialError),
}

/// Error type for address parsing.
Expand All @@ -150,7 +152,7 @@ pub enum AddressError {
InvalidPeerId(Multihash),
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum ParseError {
/// The provided probuf message cannot be decoded.
#[error("Failed to decode protobuf message: `{0:?}`")]
Expand Down Expand Up @@ -180,10 +182,16 @@ pub enum ParseError {
InvalidData,
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum SubstreamError {
#[error("Connection closed")]
ConnectionClosed,
#[error("Connection channel clogged")]
ChannelClogged,
#[error("Connection to peer does not exist: `{0}`")]
PeerDoesNotExist(PeerId),
#[error("I/O error: `{0}`")]
IoError(ErrorKind),
#[error("yamux error: `{0}`")]
YamuxError(crate::yamux::ConnectionError, Direction),
#[error("Failed to read from substream, substream id `{0:?}`")]
Expand Down Expand Up @@ -232,6 +240,25 @@ pub enum NegotiationError {
WebSocket(#[from] tokio_tungstenite::tungstenite::error::Error),
}

impl PartialEq for NegotiationError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::MultistreamSelectError(lhs), Self::MultistreamSelectError(rhs)) => lhs == rhs,
(Self::SnowError(lhs), Self::SnowError(rhs)) => lhs == rhs,
(Self::ParseError(lhs), Self::ParseError(rhs)) => lhs == rhs,
(Self::IoError(lhs), Self::IoError(rhs)) => lhs == rhs,
(Self::PeerIdMismatch(lhs, lhs_1), Self::PeerIdMismatch(rhs, rhs_1)) =>
lhs == rhs && lhs_1 == rhs_1,
#[cfg(feature = "quic")]
(Self::Quic(lhs), Self::Quic(rhs)) => lhs == rhs,
#[cfg(feature = "websocket")]
(Self::WebSocket(lhs), Self::WebSocket(rhs)) =>
core::mem::discriminant(lhs) == core::mem::discriminant(rhs),
_ => core::mem::discriminant(self) == core::mem::discriminant(other),
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum NotificationError {
#[error("Peer already exists")]
Expand All @@ -246,7 +273,8 @@ pub enum NotificationError {

/// The error type for dialing a peer.
///
/// This error is reported via the litep2p events.
/// This error is reported via the litep2p events after performing
/// a network dialing operation.
#[derive(Debug, thiserror::Error)]
pub enum DialError {
/// The dialing operation timed out.
Expand All @@ -269,9 +297,32 @@ pub enum DialError {
NegotiationError(#[from] NegotiationError),
}

/// Dialing resulted in an immediate error before performing any network operations.
#[derive(Debug, thiserror::Error, Copy, Clone, Eq, PartialEq)]
pub enum ImmediateDialError {
/// The provided address does not include a peer ID.
#[error("`PeerId` missing from the address")]
PeerIdMissing,
/// The peer ID provided in the address is the same as the local peer ID.
#[error("Tried to dial self")]
TriedToDialSelf,
/// Cannot dial an already connected peer.
#[error("Already connected to peer")]
AlreadyConnected,
/// Cannot dial a peer that does not have any address available.
#[error("No address available for peer")]
NoAddressAvailable,
/// The essential task was closed.
#[error("TaskClosed")]
TaskClosed,
/// The channel is clogged.
#[error("Connection channel clogged")]
ChannelClogged,
}

/// Error during the QUIC transport negotiation.
#[cfg(feature = "quic")]
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum QuicError {
/// The provided certificate is invalid.
#[error("Invalid certificate")]
Expand All @@ -285,7 +336,7 @@ pub enum QuicError {
}

/// Error during DNS resolution.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum DnsError {
/// The DNS resolution failed to resolve the provided URL.
#[error("DNS failed to resolve url `{0}`")]
Expand All @@ -309,6 +360,12 @@ impl From<io::Error> for Error {
}
}

impl From<io::Error> for SubstreamError {
fn from(error: io::Error) -> SubstreamError {
SubstreamError::IoError(error.kind())
}
}

impl From<io::Error> for DialError {
fn from(error: io::Error) -> Self {
DialError::NegotiationError(NegotiationError::IoError(error.kind()))
Expand Down
50 changes: 32 additions & 18 deletions src/mock/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::error::Error;
use crate::error::SubstreamError;

use bytes::{Bytes, BytesMut};
use futures::{Sink, Stream};
Expand All @@ -31,15 +31,20 @@ use std::{

/// Trait which describes the behavior of a mock substream.
pub trait Substream:
Debug + Stream<Item = crate::Result<BytesMut>> + Sink<Bytes, Error = Error> + Send + Unpin + 'static
Debug
+ Stream<Item = Result<BytesMut, SubstreamError>>
+ Sink<Bytes, Error = SubstreamError>
+ Send
+ Unpin
+ 'static
{
}

/// Blanket implementation for [`Substream`].
impl<
T: Debug
+ Stream<Item = crate::Result<BytesMut>>
+ Sink<Bytes, Error = Error>
+ Stream<Item = Result<BytesMut, SubstreamError>>
+ Sink<Bytes, Error = SubstreamError>
+ Send
+ Unpin
+ 'static,
Expand All @@ -52,33 +57,33 @@ mockall::mock! {
pub Substream {}

impl Sink<bytes::Bytes> for Substream {
type Error = Error;
type Error = SubstreamError;

fn poll_ready<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Result<(), Error>>;
) -> Poll<Result<(), SubstreamError>>;

fn start_send(self: Pin<&mut Self>, item: bytes::Bytes) -> Result<(), Error>;
fn start_send(self: Pin<&mut Self>, item: bytes::Bytes) -> Result<(), SubstreamError>;

fn poll_flush<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Result<(), Error>>;
) -> Poll<Result<(), SubstreamError>>;

fn poll_close<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Result<(), Error>>;
) -> Poll<Result<(), SubstreamError>>;
}

impl Stream for Substream {
type Item = crate::Result<BytesMut>;
type Item = Result<BytesMut, SubstreamError>;

fn poll_next<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'a>
) -> Poll<Option<crate::Result<BytesMut>>>;
) -> Poll<Option<Result<BytesMut, SubstreamError>>>;
}
}

Expand All @@ -95,32 +100,41 @@ impl DummySubstream {
}

impl Sink<bytes::Bytes> for DummySubstream {
type Error = Error;
type Error = SubstreamError;

fn poll_ready<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
fn poll_ready<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
Poll::Pending
}

fn start_send(self: Pin<&mut Self>, _item: bytes::Bytes) -> Result<(), Error> {
fn start_send(self: Pin<&mut Self>, _item: bytes::Bytes) -> Result<(), SubstreamError> {
Ok(())
}

fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
fn poll_flush<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
Poll::Pending
}

fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
fn poll_close<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
Poll::Ready(Ok(()))
}
}

impl Stream for DummySubstream {
type Item = crate::Result<BytesMut>;
type Item = Result<BytesMut, SubstreamError>;

fn poll_next<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Option<crate::Result<BytesMut>>> {
) -> Poll<Option<Result<BytesMut, SubstreamError>>> {
Poll::Pending
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/multistream_select/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ where
}

/// Error that can happen when negotiating a protocol with the remote.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum NegotiationError {
/// A protocol error occurred during the negotiation.
#[error("A protocol error occurred during the negotiation: `{0:?}`")]
Expand Down
9 changes: 9 additions & 0 deletions src/multistream_select/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,15 @@ pub enum ProtocolError {
ProtocolNotSupported,
}

impl PartialEq for ProtocolError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(ProtocolError::IoError(lhs), ProtocolError::IoError(rhs)) => lhs.kind() == rhs.kind(),
_ => std::mem::discriminant(self) == std::mem::discriminant(other),
}
}
}

impl From<ProtocolError> for io::Error {
fn from(err: ProtocolError) -> Self {
if let ProtocolError::IoError(e) = err {
Expand Down
12 changes: 6 additions & 6 deletions src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! Connection-related helper code.

use crate::{
error::Error,
error::{Error, SubstreamError},
protocol::protocol_set::ProtocolCommand,
types::{protocol::ProtocolName, ConnectionId, SubstreamId},
};
Expand Down Expand Up @@ -110,11 +110,11 @@ impl ConnectionHandle {
fallback_names: Vec<ProtocolName>,
substream_id: SubstreamId,
permit: Permit,
) -> crate::Result<()> {
) -> Result<(), SubstreamError> {
match &self.connection {
ConnectionType::Active(active) => active.clone(),
ConnectionType::Inactive(inactive) =>
inactive.upgrade().ok_or(Error::ConnectionClosed)?,
inactive.upgrade().ok_or(SubstreamError::ConnectionClosed)?,
}
.try_send(ProtocolCommand::OpenSubstream {
protocol: protocol.clone(),
Expand All @@ -123,8 +123,8 @@ impl ConnectionHandle {
permit,
})
.map_err(|error| match error {
TrySendError::Full(_) => Error::ChannelClogged,
TrySendError::Closed(_) => Error::ConnectionClosed,
TrySendError::Full(_) => SubstreamError::ChannelClogged,
TrySendError::Closed(_) => SubstreamError::ConnectionClosed,
})
}

Expand Down Expand Up @@ -236,7 +236,7 @@ mod tests {
SubstreamId::new(),
permit,
) {
Err(Error::ChannelClogged) => {}
Err(SubstreamError::ChannelClogged) => {}
error => panic!("invalid error: {error:?}"),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/libp2p/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl Identify {
return Err(Error::SubstreamError(SubstreamError::ReadFailure(Some(
substream_id,
)))),
Ok(Some(Err(error))) => return Err(error),
Ok(Some(Err(error))) => return Err(error.into()),
Ok(Some(Ok(payload))) => payload,
};

Expand Down
18 changes: 8 additions & 10 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,9 @@ mod tests {
let mut executor = QueryExecutor::new();
let peer = PeerId::random();
let mut substream = MockSubstream::new();
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown))));
substream.expect_poll_next().times(1).return_once(|_| {
Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed)))
});

executor.read_message(
peer,
Expand Down Expand Up @@ -294,10 +293,9 @@ mod tests {
substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(())));
substream.expect_start_send().times(1).return_once(|_| Ok(()));
substream.expect_poll_flush().times(1).return_once(|_| Poll::Ready(Ok(())));
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown))));
substream.expect_poll_next().times(1).return_once(|_| {
Poll::Ready(Some(Err(crate::error::SubstreamError::ConnectionClosed)))
});

executor.send_request_read_response(
peer,
Expand Down Expand Up @@ -330,7 +328,7 @@ mod tests {
substream
.expect_poll_ready()
.times(1)
.return_once(|_| Poll::Ready(Err(crate::Error::Unknown)));
.return_once(|_| Poll::Ready(Err(crate::error::SubstreamError::ConnectionClosed)));
substream.expect_poll_close().times(1).return_once(|_| Poll::Ready(Ok(())));

executor.send_request_read_response(
Expand Down Expand Up @@ -393,7 +391,7 @@ mod tests {
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Err(crate::Error::Unknown))));
.return_once(|_| Poll::Ready(Some(Err(crate::error::SubstreamError::ChannelClogged))));

executor.read_message(
peer,
Expand Down
Loading