Skip to content

Commit

Permalink
feat: error handling overhaul
Browse files Browse the repository at this point in the history
  • Loading branch information
heilhead committed Feb 16, 2024
1 parent 5d8a12a commit 7ecb23f
Show file tree
Hide file tree
Showing 15 changed files with 521 additions and 367 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ url = "2.3"
warp = { version = "0.3", default-features = false }
serde_json = "1.0"
rand = "0.8.5"
futures-util = "0.3"
once_cell = "1.19"

[[example]]
name = "websocket_client"
Expand Down
6 changes: 3 additions & 3 deletions examples/websocket_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
relay_client::{
error::Error,
error::ClientError,
websocket::{Client, CloseFrame, ConnectionHandler, PublishedMessage},
ConnectionOptions,
},
Expand Down Expand Up @@ -49,11 +49,11 @@ impl ConnectionHandler for Handler {
);
}

fn inbound_error(&mut self, error: Error) {
fn inbound_error(&mut self, error: ClientError) {
println!("[{}] inbound error: {error}", self.name);
}

fn outbound_error(&mut self, error: Error) {
fn outbound_error(&mut self, error: ClientError) {
println!("[{}] outbound error: {error}", self.name);
}
}
Expand Down
62 changes: 59 additions & 3 deletions relay_client/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use relay_rpc::rpc::{self, error::ServiceError};

pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// Errors generated while parsing
Expand All @@ -23,7 +25,7 @@ pub enum RequestBuildError {

/// Possible Relay client errors.
#[derive(Debug, thiserror::Error)]
pub enum Error {
pub enum ClientError {
#[error("Failed to build connection request: {0}")]
RequestBuilder(#[from] RequestBuildError),

Expand All @@ -42,15 +44,69 @@ pub enum Error {
#[error("Invalid response ID")]
InvalidResponseId,

#[error("Invalid error response")]
InvalidErrorResponse,

#[error("Serialization failed: {0}")]
Serialization(serde_json::Error),

#[error("Deserialization failed: {0}")]
Deserialization(serde_json::Error),

#[error("RPC error ({code}): {message}")]
Rpc { code: i32, message: String },
#[error("RPC error: code={code} data={data:?} message={message}")]
Rpc {
code: i32,
message: String,
data: Option<String>,
},

#[error("Invalid request type")]
InvalidRequestType,
}

impl From<rpc::ErrorData> for ClientError {
fn from(err: rpc::ErrorData) -> Self {
Self::Rpc {
code: err.code,
message: err.message,
data: err.data,
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum Error<T: ServiceError> {
/// Client errors encountered while performing the request.
#[error(transparent)]
Client(ClientError),

/// Error response received from the relay.
#[error(transparent)]
Response(#[from] rpc::Error<T>),
}

impl<T: ServiceError> From<ClientError> for Error<T> {
fn from(err: ClientError) -> Self {
match err {
ClientError::Rpc {
code,
message,
data,
} => {
let err = rpc::ErrorData {
code,
message,
data,
};

match rpc::Error::try_from(err) {
Ok(err) => Error::Response(err),

Err(_) => Error::Client(ClientError::InvalidErrorResponse),
}
}

_ => Error::Client(err),
}
}
}
80 changes: 47 additions & 33 deletions relay_client/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
error::{BoxError, Error},
error::{BoxError, ClientError, Error},
ConnectionOptions,
MessageIdGenerator,
},
Expand All @@ -9,15 +9,15 @@ use {
auth::ed25519_dalek::SigningKey,
domain::{DecodedClientId, SubscriptionId, Topic},
jwt::{self, JwtError, VerifyableClaims},
rpc::{self, Receipt, RequestPayload},
rpc::{self, Receipt, ServiceRequest},
},
std::{sync::Arc, time::Duration},
url::Url,
};

pub type TransportError = reqwest::Error;
pub type Response<T> = Result<<T as RequestPayload>::Response, Error>;
pub type EmptyResponse = Result<(), Error>;
pub type Response<T> = Result<<T as ServiceRequest>::Response, Error<<T as ServiceRequest>::Error>>;
pub type EmptyResponse<T> = Result<(), Error<<T as ServiceRequest>::Error>>;

#[derive(Debug, thiserror::Error)]
pub enum RequestParamsError {
Expand All @@ -41,9 +41,6 @@ pub enum HttpClientError {

#[error("JWT error: {0}")]
Jwt(#[from] JwtError),

#[error("RPC error: code={} message={}", .0.code, .0.message)]
RpcError(rpc::ErrorData),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -82,7 +79,7 @@ pub struct Client {
}

impl Client {
pub fn new(opts: &ConnectionOptions) -> Result<Self, Error> {
pub fn new(opts: &ConnectionOptions) -> Result<Self, ClientError> {
let mut headers = HeaderMap::new();
opts.update_request_headers(&mut headers)?;

Expand Down Expand Up @@ -111,11 +108,14 @@ impl Client {
tag: u32,
ttl: Duration,
prompt: bool,
) -> EmptyResponse {
) -> EmptyResponse<rpc::Publish> {
let ttl_secs = ttl
.as_secs()
.try_into()
.map_err(|_| HttpClientError::InvalidRequest(RequestParamsError::InvalidTtl.into()))?;
.map_err(|_| {
HttpClientError::InvalidRequest(RequestParamsError::InvalidTtl.into()).into()
})
.map_err(Error::Client)?;

self.request(rpc::Publish {
topic,
Expand Down Expand Up @@ -175,7 +175,8 @@ impl Client {
.ttl
.as_secs()
.try_into()
.map_err(|err| HttpClientError::InvalidRequest(Box::new(err)))?;
.map_err(|err| HttpClientError::InvalidRequest(Box::new(err)).into())
.map_err(Error::Client)?;
let exp = iat + ttl_sec;

let claims = rpc::WatchRegisterClaims {
Expand All @@ -194,7 +195,11 @@ impl Client {
};

let payload = rpc::WatchRegister {
register_auth: claims.encode(keypair).map_err(HttpClientError::Jwt)?,
register_auth: claims
.encode(keypair)
.map_err(HttpClientError::Jwt)
.map_err(ClientError::from)
.map_err(Error::Client)?,
};

self.request(payload).await
Expand Down Expand Up @@ -230,7 +235,11 @@ impl Client {
};

let payload = rpc::WatchUnregister {
unregister_auth: claims.encode(keypair).map_err(HttpClientError::Jwt)?,
unregister_auth: claims
.encode(keypair)
.map_err(HttpClientError::Jwt)
.map_err(ClientError::from)
.map_err(Error::Client)?,
};

self.request(payload).await
Expand Down Expand Up @@ -299,45 +308,50 @@ impl Client {

pub(crate) async fn request<T>(&self, payload: T) -> Response<T>
where
T: RequestPayload,
T: ServiceRequest,
{
let payload = rpc::Payload::Request(rpc::Request {
id: self.id_generator.next(),
jsonrpc: rpc::JSON_RPC_VERSION.clone(),
params: payload.into_params(),
});

let result = self
.client
.post(self.url.clone())
.json(&payload)
.send()
.await
.map_err(HttpClientError::Transport)?;
let response = async {
let result = self
.client
.post(self.url.clone())
.json(&payload)
.send()
.await
.map_err(HttpClientError::Transport)?;

let status = result.status();
let status = result.status();

if !status.is_success() {
let body = result.text().await;
return Err(HttpClientError::InvalidHttpCode(status, body).into());
}
if !status.is_success() {
let body = result.text().await;
return Err(HttpClientError::InvalidHttpCode(status, body));
}

let response = result
.json::<rpc::Payload>()
.await
.map_err(|_| HttpClientError::InvalidResponse)?;
result
.json::<rpc::Payload>()
.await
.map_err(|_| HttpClientError::InvalidResponse)
}
.await
.map_err(ClientError::from)
.map_err(Error::Client)?;

match response {
rpc::Payload::Response(rpc::Response::Success(response)) => {
serde_json::from_value(response.result)
.map_err(|_| HttpClientError::InvalidResponse.into())
.map_err(|_| Error::Client(HttpClientError::InvalidResponse.into()))
}

rpc::Payload::Response(rpc::Response::Error(response)) => {
Err(HttpClientError::RpcError(response.error).into())
Err(ClientError::from(response.error).into())
}

_ => Err(HttpClientError::InvalidResponse.into()),
_ => Err(Error::Client(HttpClientError::InvalidResponse.into())),
}
}
}
2 changes: 1 addition & 1 deletion relay_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::error::{Error, RequestBuildError},
crate::error::{ClientError, RequestBuildError},
::http::HeaderMap,
relay_rpc::{
auth::{SerializedAuthToken, RELAY_WEBSOCKET_ADDRESS},
Expand Down
20 changes: 10 additions & 10 deletions relay_client/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
self::connection::{connection_event_loop, ConnectionControl},
crate::{error::Error, ConnectionOptions},
crate::{error::ClientError, ConnectionOptions},
relay_rpc::{
domain::{MessageId, SubscriptionId, Topic},
rpc::{
Expand Down Expand Up @@ -114,11 +114,11 @@ pub trait ConnectionHandler: Send + 'static {

/// Called when an inbound error occurs, such as data deserialization
/// failure, or an unknown response message ID.
fn inbound_error(&mut self, _error: Error) {}
fn inbound_error(&mut self, _error: ClientError) {}

/// Called when an outbound error occurs, i.e. failed to write to the
/// websocket stream.
fn outbound_error(&mut self, _error: Error) {}
fn outbound_error(&mut self, _error: ClientError) {}
}

/// The Relay WebSocket RPC client.
Expand Down Expand Up @@ -291,7 +291,7 @@ impl Client {
}

/// Opens a connection to the Relay.
pub async fn connect(&self, opts: &ConnectionOptions) -> Result<(), Error> {
pub async fn connect(&self, opts: &ConnectionOptions) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();
let request = opts.as_ws_request()?;

Expand All @@ -300,24 +300,24 @@ impl Client {
.send(ConnectionControl::Connect { request, tx })
.is_ok()
{
rx.await.map_err(|_| Error::ChannelClosed)?
rx.await.map_err(|_| ClientError::ChannelClosed)?
} else {
Err(Error::ChannelClosed)
Err(ClientError::ChannelClosed)
}
}

/// Closes the Relay connection.
pub async fn disconnect(&self) -> Result<(), Error> {
pub async fn disconnect(&self) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();

if self
.control_tx
.send(ConnectionControl::Disconnect { tx })
.is_ok()
{
rx.await.map_err(|_| Error::ChannelClosed)?
rx.await.map_err(|_| ClientError::ChannelClosed)?
} else {
Err(Error::ChannelClosed)
Err(ClientError::ChannelClosed)
}
}

Expand All @@ -330,7 +330,7 @@ impl Client {
unreachable!();
};

request.tx.send(Err(Error::ChannelClosed)).ok();
request.tx.send(Err(ClientError::ChannelClosed)).ok();
}
}
}
Loading

0 comments on commit 7ecb23f

Please sign in to comment.