Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid intermediate JSON AST creation #1

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
.vscode/
6 changes: 3 additions & 3 deletions src/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ pub struct RequestBody<'a> {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
pub enum Outgoing {
pub enum Outgoing<Resp: Serialize> {
#[serde(rename_all = "camelCase")]
Next { request_id: ReqId, payload: Value },
Next { request_id: ReqId, payload: Resp },
#[serde(rename_all = "camelCase")]
Complete { request_id: ReqId },
#[serde(rename_all = "camelCase")]
Expand All @@ -64,7 +64,7 @@ pub enum ErrorKind {
},
}

impl Outgoing {
impl<Resp: Serialize> Outgoing<Resp> {
#[cfg(test)]
pub fn request_id(&self) -> ReqId {
match self {
Expand Down
117 changes: 66 additions & 51 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const INTER_STREAM_FAIRNESS: u64 = 64;

pub trait Service {
type Req: DeserializeOwned;
type Resp: Serialize + 'static;
type Resp: Serialize + Send + 'static;
type Error: Serialize + 'static;
type Ctx: Clone;

Expand All @@ -48,36 +48,36 @@ pub trait Service {
req: Self::Req,
) -> BoxStream<'static, Result<Self::Resp, Self::Error>>;

fn boxed(self) -> BoxedService<Self::Ctx>
fn boxed(self) -> BoxedService<Self::Resp, Self::Ctx>
where
Self: Send + Sized + Sync + 'static,
{
Box::new(self)
}
}

pub trait WebsocketService<Ctx: Clone> {
pub trait WebsocketService<Resp: Serialize + 'static, Ctx: Clone> {
fn serve_ws(
&self,
ctx: Ctx,
raw_req: Value,
service_id: &str,
) -> BoxStream<'static, Result<Value, ErrorKind>>;
) -> BoxStream<'static, Result<Resp, ErrorKind>>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return the Serializable instead of the AST

}

impl<Req, Resp, Ctx, S> WebsocketService<Ctx> for S
impl<Req, Resp, Ctx, S> WebsocketService<Resp, Ctx> for S
where
S: Service<Req = Req, Resp = Resp, Ctx = Ctx>,
Req: DeserializeOwned,
Resp: Serialize + 'static,
Resp: Serialize + Send + 'static,
Ctx: Clone,
{
fn serve_ws(
&self,
ctx: Ctx,
raw_req: Value,
service_id: &str,
) -> BoxStream<'static, Result<Value, ErrorKind>> {
) -> BoxStream<'static, Result<Resp, ErrorKind>> {
trace!(
"Serving raw request for service {}: {:?}",
service_id,
Expand All @@ -86,16 +86,11 @@ where
match serde_json::from_value(raw_req) {
Ok(req) => self
.serve(ctx, req)
.map(|resp_result| {
resp_result
.map(|resp| {
serde_json::to_value(&resp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip this step

.expect("Could not serialize service response")
})
.map_err(|err| ErrorKind::ServiceError {
value: serde_json::to_value(&err)
.expect("Could not serialize service error response"),
})
.map(|res| {
res.map_err(|err| ErrorKind::ServiceError {
value: serde_json::to_value(&err)
.expect("Could not serialize service error response"),
})
})
.boxed(),
Err(cause) => {
Expand All @@ -110,11 +105,11 @@ where
}
}

pub type BoxedService<Ctx> = Box<dyn WebsocketService<Ctx> + Send + Sync>;
pub type BoxedService<Resp, Ctx> = Box<dyn WebsocketService<Resp, Ctx> + Send + Sync>;

pub async fn serve<Ctx: Clone + Send + 'static>(
pub async fn serve<Resp: Serialize + Send + 'static, Ctx: Clone + Send + 'static>(
ws: warp::ws::Ws,
services: Arc<BTreeMap<&'static str, BoxedService<Ctx>>>,
services: Arc<BTreeMap<&'static str, BoxedService<Resp, Ctx>>>,
ctx: Ctx,
) -> Result<impl warp::Reply, warp::Rejection> {
// Set the max frame size to 64 MB (defaults to 16 MB which we have hit at CTA)
Expand All @@ -126,11 +121,10 @@ pub async fn serve<Ctx: Clone + Send + 'static>(
// on_upgrade does not take in errors any longer
}

#[allow(clippy::cognitive_complexity)]
fn client_connected<Ctx: Clone + Send + 'static>(
fn client_connected<Resp: Serialize + Send + 'static, Ctx: Clone + Send + 'static>(
ws: WebSocket,
ctx: Ctx,
services: Arc<BTreeMap<&'static str, BoxedService<Ctx>>>,
services: Arc<BTreeMap<&'static str, BoxedService<Resp, Ctx>>>,
) -> impl Future<Output = Result<(), ()>> {
let (ws_out, ws_in) = ws.split();

Expand Down Expand Up @@ -225,8 +219,6 @@ fn client_connected<Ctx: Clone + Send + 'static>(
})
}

// Wtf, clippy?
#[allow(clippy::cognitive_complexity)]
fn cancel_response_stream(snd_cancel: oneshot::Sender<()>) {
if snd_cancel.is_canceled() {
trace!("Not trying to cancel response stream whose cancel rcv has already dropped")
Expand All @@ -250,44 +242,41 @@ fn cancel_response_streams_close_channel(
mux_in.close_channel();
}

fn serve_request_stream<Ctx: Clone>(
srv: &BoxedService<Ctx>,
fn serve_request_stream<Resp: Serialize + 'static, Ctx: Clone>(
srv: &BoxedService<Resp, Ctx>,
ctx: Ctx,
service_id: &str,
req_id: ReqId,
request_id: ReqId,
payload: Value,
) -> impl Stream<Item = Result<Message, warp::Error>> {
let resp_stream = srv
.serve_ws(ctx, payload, service_id)
.map(move |payload_result| match payload_result {
.map(move |res| match res {
Ok(payload) => Outgoing::Next {
request_id: req_id,
request_id,
payload,
},
Err(kind) => Outgoing::Error {
request_id: req_id,
kind,
},
Err(kind) => Outgoing::Error { request_id, kind },
});

AssertUnwindSafe(resp_stream)
.catch_unwind()
.map(move |msg_result| match msg_result {
.map(move |res| match res {
Ok(msg) => msg,
Err(_) => Outgoing::Error {
request_id: req_id,
request_id,
kind: ErrorKind::InternalError,
},
})
.chain(stream::once(future::ready(Outgoing::Complete {
request_id: req_id,
request_id,
})))
.map(|env| Ok(Message::text(serde_json::to_string(&env).unwrap())))
}

fn serve_request<T: std::fmt::Debug, Ctx: Clone>(
fn serve_request<T: std::fmt::Debug, Resp: Serialize + 'static, Ctx: Clone>(
canceled: oneshot::Receiver<()>,
srv: &BoxedService<Ctx>,
srv: &BoxedService<Resp, Ctx>,
ctx: Ctx,
service_id: &str,
req_id: ReqId,
Expand Down Expand Up @@ -317,7 +306,7 @@ fn serve_error(
error_kind: ErrorKind,
output: impl Sink<Result<Message, warp::Error>>,
) -> impl Future<Output = ()> {
let msg = Outgoing::Error {
let msg: Outgoing<()> = Outgoing::Error {
request_id: req_id,
kind: error_kind,
};
Expand Down Expand Up @@ -361,7 +350,7 @@ mod tests {
bad_field: String,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
struct Response(u64);

struct TestService();
Expand Down Expand Up @@ -403,12 +392,38 @@ mod tests {
}
}

fn test_client<Req: Serialize, Resp: DeserializeOwned>(
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
struct Response2(u64);

struct TestService2();

impl TestService2 {
fn new() -> TestService2 {
TestService2()
}
}

impl Service for TestService2 {
type Req = String;
type Resp = String;
type Error = String;
type Ctx = ();

fn serve(
&self,
_ctx: (),
req: Self::Req,
) -> BoxStream<'static, Result<Self::Resp, String>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we need to box Self::Resp here but that fails with

the trait formats::_::serde::Serializecannot be made into an objectformats::::_serde::Serialize cannot be made into an objectrustc(E0038)

which https://crates.io/crates/erased-serde might solve.

stream::once(future::ok(req.chars().rev().collect::<String>())).boxed()
}
}

fn test_client<Req: Serialize, Resp: Serialize + DeserializeOwned + Clone>(
addr: SocketAddr,
endpoint: &str,
id: u64,
req: Req,
) -> (Vec<Resp>, Outgoing) {
) -> (Vec<Resp>, Outgoing<Resp>) {
let addr = format!("ws://{}/test_ws", addr);
let client = ClientBuilder::new(&*addr)
.expect("Could not setup client")
Expand All @@ -430,14 +445,14 @@ mod tests {
.send_message(&OwnedMessage::Text(req_env_json))
.expect("Could not send request");

let mut completion: Option<Outgoing> = None;
let mut completion: Option<Outgoing<Resp>> = None;

let msgs = receiver
.incoming_messages()
.filter_map(move |msg| {
let msg_ok = msg.expect("Expected message but got websocket error");
if let OwnedMessage::Text(raw_resp) = msg_ok {
let resp_env: Outgoing = serde_json::from_str(&*raw_resp)
let resp_env: Outgoing<Resp> = serde_json::from_str(&*raw_resp)
.expect("Could not deserialize response envelope");
if resp_env.request_id().0 == id {
Some(resp_env)
Expand All @@ -452,16 +467,13 @@ mod tests {
if let Outgoing::Next { .. } = env {
true
} else {
completion = Some(env.clone());
completion = Some(env.to_owned());
false
}
})
.filter_map(|env| {
if let Outgoing::Next { payload, .. } = env {
Some(
serde_json::from_value::<Resp>(payload)
.expect("Could not deserialize response"),
)
Some(payload)
} else {
None
}
Expand All @@ -471,7 +483,10 @@ mod tests {
}

async fn start_test_service() -> SocketAddr {
let services = Arc::new(maplit::btreemap! {"test" => TestService::new().boxed()});
let services = Arc::new(maplit::btreemap! {
"test" => TestService::new().boxed(),
"test2" => TestService2::new().boxed(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this now fails as the return type of serve differs.

});
let ws = warp::path("test_ws")
.and(warp::ws())
.and(warp::any().map(move || services.clone()))
Expand Down