From b594ca9425132c58c6a5c5de2d4440abf74ea0d6 Mon Sep 17 00:00:00 2001 From: kanarus Date: Sun, 15 Dec 2024 03:56:23 +0900 Subject: [PATCH] eliminate double-boxing --- ohkami/src/response/mod.rs | 14 ++++++++++---- ohkami/src/sse/mod.rs | 31 +++++++++++++++++++------------ 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/ohkami/src/response/mod.rs b/ohkami/src/response/mod.rs index d0fa064a..3b4db250 100644 --- a/ohkami/src/response/mod.rs +++ b/ohkami/src/response/mod.rs @@ -21,7 +21,7 @@ use ohkami_lib::{CowSlice, Slice}; #[cfg(feature="__rt_native__")] use crate::__rt__::AsyncWrite; #[cfg(feature="sse")] -use crate::{sse, util::StreamExt}; +use crate::{sse, util::{Stream, StreamExt}}; /// # HTTP Response @@ -268,7 +268,7 @@ impl Response { #[inline] pub fn with_stream( mut self, - stream: impl ohkami_lib::Stream + Unpin + Send + 'static + stream: impl Stream + Unpin + Send + 'static ) -> Self { self.set_stream(stream); self @@ -277,10 +277,16 @@ impl Response { #[inline] pub fn set_stream( &mut self, - stream: impl ohkami_lib::Stream + Unpin + Send + 'static + stream: impl Stream + Unpin + Send + 'static ) { - let stream = Box::pin(stream.map(sse::Data::encode)); + self.set_stream_raw(Box::pin(stream.map(sse::Data::encode))); + } + #[inline] + pub fn set_stream_raw( + &mut self, + stream: std::pin::Pin + Send>> + ) { self.headers.set() .ContentType("text/event-stream") .CacheControl("no-cache, must-revalidate") diff --git a/ohkami/src/sse/mod.rs b/ohkami/src/sse/mod.rs index 927e2d86..1340d348 100644 --- a/ohkami/src/sse/mod.rs +++ b/ohkami/src/sse/mod.rs @@ -1,8 +1,8 @@ #![cfg(feature="sse")] -use ohkami_lib::Stream; +use ohkami_lib::{Stream, StreamExt}; use ohkami_lib::stream::impls::{QueueStream, Queue}; -use std::{pin::Pin, future::Future}; +use std::{pin::Pin, future::Future, marker::PhantomData}; /// Streaming response with data of type `T` (default: `String`). /// @@ -36,7 +36,8 @@ use std::{pin::Pin, future::Future}; /// } /// ``` pub struct DataStream( - Pin + Send>> + Pin + Send>>, + PhantomDataT> ); pub trait Data: 'static { @@ -54,7 +55,9 @@ const _: () = { impl crate::IntoResponse for DataStream { #[inline] fn into_response(self) -> crate::Response { - crate::Response::OK().with_stream(self.0) + let mut res = crate::Response::OK(); + res.set_stream_raw(self.0);/* no additional boxing */ + res } } @@ -63,7 +66,7 @@ where S: Stream + Send + 'static { fn from(stream: S) -> Self { - Self(Box::pin(stream)) + Self(Box::pin(stream.map(Data::encode)), PhantomData) } } @@ -96,9 +99,7 @@ impl DataStream { F: FnOnce(handle::Stream) -> Fut + Send + 'static, Fut: Future + Send + 'static, { - Self(Box::pin(QueueStream::new( - |q| f(handle::Stream(q)) - ))) + Self(Box::pin(QueueStream::new(|q| f(handle::Stream::from(q)))), PhantomData) } } @@ -106,12 +107,18 @@ pub mod handle { use super::*; pub struct Stream( - pub(super) Queue + pub(super) Queue, + pub(super) PhantomDataT> ); - impl Stream { - #[inline(always)] + impl From> for self::Stream { + fn from(q: Queue) -> Self { + Self(q, PhantomData) + } + } + impl self::Stream { + #[inline] pub fn send(&mut self, data: impl Into) { - self.0.push(data.into()); + self.0.push(Data::encode(data.into())); } } }