Skip to content

Commit

Permalink
feat(QoL): some QoL improvments in Status
Browse files Browse the repository at this point in the history
Signed-off-by: gabrik <[email protected]>
  • Loading branch information
gabrik committed Apr 17, 2024
1 parent ac362cf commit e792e1b
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 90 deletions.
43 changes: 13 additions & 30 deletions zrpc-derive/examples/simplified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Hello for MyServer {
}

async fn sub(&self, _request: Request<SubRequest>) -> Result<Response<SubResponse>, Status> {
Err(Status::new(Code::NotImplemented, "Not yet!"))
Err(Status::not_implemented("Not yet!"))
}
}

Expand Down Expand Up @@ -124,8 +124,8 @@ where
}

_ => {
// Box::pin(async move { Err(Status::new(Code::Unavailable, "Unavailable")) })
Err(Status::new(Code::Unavailable, "Unavailable"))
// Box::pin(async move { Err(Status::unavailable "Unavailable")) })
Err(Status::unavailable("Unavailable"))
}
}
}
Expand Down Expand Up @@ -353,20 +353,15 @@ impl<'a> HelloClient<'a> {
.res()
.await
.map_err(|e| {
Status::new(
Code::Unavailable,
format!("Unable to perform liveliness query: {e:?}"),
)
Status::unavailable(format!("Unable to perform liveliness query: {e:?}"))
})?;

let ids = res
.into_iter()
.map(|e| {
self.extract_id_from_ke(
&e.sample
.map_err(|_| {
Status::new(Code::Unavailable, "Cannot get value from sample")
})?
.map_err(|_| Status::unavailable("Cannot get value from sample"))?
.key_expr,
)
})
Expand All @@ -382,38 +377,26 @@ impl<'a> HelloClient<'a> {
.map(|m| m.id)
.collect();

ids.pop()
.ok_or(Status::new(Code::Unavailable, "No servers found"))
ids.pop().ok_or(Status::unavailable("No servers found"))
}

fn extract_id_from_ke(&self, ke: &KeyExpr) -> Result<ZenohId, Status> {
let id_str = self
.ke_format
.parse(ke)
.map_err(|e| {
Status::new(
Code::InternalError,
format!("Unable to parse key expression: {e:?}"),
)
})?
.map_err(|e| Status::internal_error(format!("Unable to parse key expression: {e:?}")))?
.get("zid")
.map_err(|e| {
Status::new(
Code::InternalError,
format!("Unable to get server id from key expression: {e:?}"),
)
Status::internal_error(format!(
"Unable to get server id from key expression: {e:?}"
))
})?
.ok_or(Status::new(
Code::Unavailable,
.ok_or(Status::unavailable(
"Unable to get server id from key expression: Option is None",
))?;

ZenohId::from_str(id_str).map_err(|e| {
Status::new(
Code::InternalError,
format!("Unable to convert str to ZenohId: {e:?}"),
)
})
ZenohId::from_str(id_str)
.map_err(|e| Status::internal_error(format!("Unable to convert str to ZenohId: {e:?}")))
}
}

Expand Down
46 changes: 11 additions & 35 deletions zrpc-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl<'a> ServiceGenerator<'a> {
}
}
)*
_ => Err(zrpc::prelude::Status::new(zrpc::prelude::Code::Unavailable, "Unavailable")),
_ => Err(zrpc::prelude::Status::unavailable("Unavailable")),
}
}

Expand Down Expand Up @@ -543,23 +543,15 @@ impl<'a> ServiceGenerator<'a> {
.res()
.await
.map_err(|e| {
zrpc::prelude::Status::new(
zrpc::prelude::Code::Unavailable,
format!("Unable to perform liveliness query: {e:?}"),
)
zrpc::prelude::Status::unavailable(format!("Unable to perform liveliness query: {e:?}"))
})?;();

let ids = res
.into_iter()
.map(|e| {
self.extract_id_from_ke(
&e.sample
.map_err(|_| {
zrpc::prelude::Status::new(
zrpc::prelude::Code::Unavailable,
format!("Cannot get value from sample"),
)
})?
.map_err(|_| zrpc::prelude::Status::unavailable("Cannot get value from sample"))?
.key_expr,
)
})
Expand All @@ -574,38 +566,22 @@ impl<'a> ServiceGenerator<'a> {
.collect();

ids.pop()
.ok_or(Status::new(Code::Unavailable, "No servers found"))
.ok_or(zrpc::prelude::Status::unavailable("No servers found"))
}

fn extract_id_from_ke(&self, ke: &zenoh::key_expr::KeyExpr) -> std::result::Result<zenoh::prelude::ZenohId, zrpc::prelude::Status> {
use std::str::FromStr;
let id_str = self
.ke_format
.parse(ke)
.map_err(|e| {
zrpc::prelude::Status::new(
zrpc::prelude::Code::InternalError,
format!("Unable to parse key expression: {e:?}"),
)
})?
.map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to parse key expression: {e:?}")))?
.get("zid")
.map_err(|e| {
zrpc::prelude::Status::new(
zrpc::prelude::Code::InternalError,
format!("Unable to get server id from key expression: {e:?}"),
)
})?
.ok_or(zrpc::prelude::Status::new(
zrpc::prelude::Code::Unavailable,
"Unable to get server id from key expression: Option is None",
))?;

zenoh::prelude::ZenohId::from_str(id_str).map_err(|e| {
zrpc::prelude::Status::new(
zrpc::prelude::Code::InternalError,
format!("Unable to convert str to ZenohId: {e:?}"),
)
})
.map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to get server id from key expression: {e:?}")))?
.ok_or(zrpc::prelude::Status::unavailable( "Unable to get server id from key expression: Option is None"))?;

zenoh::prelude::ZenohId::from_str(id_str)
.map_err(|e| zrpc::prelude::Status::internal_error(format!("Unable to convert str to ZenohId: {e:?}")))

}
}
}
Expand Down
32 changes: 15 additions & 17 deletions zrpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,10 @@ impl Server {
self.tokens.lock().await.extend(tokens);

loop {
let query = queryable.recv_async().await.map_err(|e| {
Status::new(Code::InternalError, format!("Cannot receive query: {e:?}"))
})?;
let query = queryable
.recv_async()
.await
.map_err(|e| Status::internal_error(format!("Cannot receive query: {e:?}")))?;

// the query for RPC is is in the format: @rpc/<server id>/service/<service-name>/<method-name>
// everything is sent as payload of the query
Expand Down Expand Up @@ -207,49 +208,46 @@ impl Server {
log::trace!("Service response: {msg:?}");
let wmsg = WireMessage {
payload: Some(msg.body),
status: Status::new(Code::Ok, ""),
status: Status::ok(""),
};

serialize(&wmsg).map_err(|e| {
Status::new(Code::InternalError, format!("Serialization error: {e:?}"))
})
serialize(&wmsg)
.map_err(|e| Status::internal_error(format!("Serialization error: {e:?}")))
}
Err(e) => {
log::trace!("Service error is : {e:?}");
let wmsg = WireMessage {
payload: None,
status: e,
};
serialize(&wmsg).map_err(|e| {
Status::new(Code::InternalError, format!("Serialization error: {e:?}"))
})
serialize(&wmsg)
.map_err(|e| Status::internal_error(format!("Serialization error: {e:?}")))
}
}
}

async fn server_metadata(labels: HashSet<String>, id: ZenohId) -> Result<Vec<u8>, Status> {
let metadata = ServerMetadata { labels, id };
let serialized_metadata = serialize(&metadata)
.map_err(|e| Status::new(Code::InternalError, format!("Serialization error: {e:?}")))?;
.map_err(|e| Status::internal_error(format!("Serialization error: {e:?}")))?;

let wmsg = WireMessage {
payload: Some(serialized_metadata),
status: Status::new(Code::Ok, ""),
status: Status::ok(""),
};

serialize(&wmsg)
.map_err(|e| Status::new(Code::InternalError, format!("Serialization error: {e:?}")))
serialize(&wmsg).map_err(|e| Status::internal_error(format!("Serialization error: {e:?}")))
}

async fn create_error() -> Result<Vec<u8>, Status> {
Err(Status::new(Code::Unavailable, "Unavailable"))
Err(Status::unavailable("Unavailable"))
}

fn get_service_name<'a>(ke: &'a KeyExpr) -> Result<&'a str, Status> {
Self::get_token(ke, 3).ok_or(Status::new(Code::InternalError, "Cannot get service name"))
Self::get_token(ke, 3).ok_or(Status::internal_error("Cannot get service name"))
}
fn get_method_name<'a>(ke: &'a KeyExpr) -> Result<&'a str, Status> {
Self::get_token(ke, 4).ok_or(Status::new(Code::InternalError, "Cannot get method name"))
Self::get_token(ke, 4).ok_or(Status::internal_error("Cannot get method name"))
}

fn get_token<'a>(ke: &'a KeyExpr, index: usize) -> Option<&'a str> {
Expand Down
70 changes: 70 additions & 0 deletions zrpc/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,76 @@ impl Status {
pub fn message(&self) -> &String {
&self.message
}

pub fn ok<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::Ok, message)
}

pub fn created<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::Created, message)
}

pub fn bad_request<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::BadRequest, message)
}

pub fn unauthorized<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::Unauthorized, message)
}

pub fn forbidden<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::Forbidden, message)
}

pub fn not_found<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::NotFound, message)
}

pub fn timeout<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::Timeout, message)
}

pub fn internal_error<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::InternalError, message)
}

pub fn not_implemented<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::NotImplemented, message)
}

pub fn unavailable<IntoString>(message: IntoString) -> Self
where
IntoString: Into<String>,
{
Self::new(Code::Unavailable, message)
}
}

/// Zenoh-RPC status codes
Expand Down
11 changes: 3 additions & 8 deletions zrpc/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@
* ZettaScale Zenoh Team, <[email protected]>
*********************************************************************************/

use crate::{
request::Request,
response::Response,
serialize::serialize,
status::{Code, Status},
};
use crate::{request::Request, response::Response, serialize::serialize, status::Status};
use futures::Future;
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -79,7 +74,7 @@ where
method: "".into(),
body: serialize(value.get_ref()).unwrap(),
metadata: value.get_metadata().clone(),
status: Status::new(Code::Ok, ""),
status: Status::ok(""),
}
}
}
Expand All @@ -94,7 +89,7 @@ where
method: "".into(),
body: serialize(value.get_ref()).unwrap(),
metadata: value.get_metadata().clone(),
status: Status::new(Code::Ok, ""),
status: Status::ok(""),
}
}
}
Expand Down

0 comments on commit e792e1b

Please sign in to comment.