Skip to content

Commit

Permalink
eliminate double-boxing
Browse files Browse the repository at this point in the history
  • Loading branch information
kanarus committed Dec 14, 2024
1 parent 31a5bfa commit b594ca9
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
14 changes: 10 additions & 4 deletions ohkami/src/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use ohkami_lib::{CowSlice, Slice};
#[cfg(feature="__rt_native__")]
use crate::__rt__::AsyncWrite;
#[cfg(feature="sse")]
use crate::{sse, util::StreamExt};
use crate::{sse, util::{Stream, StreamExt}};


/// # HTTP Response
Expand Down Expand Up @@ -268,7 +268,7 @@ impl Response {
#[inline]
pub fn with_stream<T: sse::Data>(
mut self,
stream: impl ohkami_lib::Stream<Item = T> + Unpin + Send + 'static
stream: impl Stream<Item = T> + Unpin + Send + 'static
) -> Self {
self.set_stream(stream);
self
Expand All @@ -277,10 +277,16 @@ impl Response {
#[inline]
pub fn set_stream<T: sse::Data>(
&mut self,
stream: impl ohkami_lib::Stream<Item = T> + Unpin + Send + 'static
stream: impl Stream<Item = T> + Unpin + Send + 'static
) {
let stream = Box::pin(stream.map(sse::Data::encode));
self.set_stream_raw(Box::pin(stream.map(sse::Data::encode)));
}

#[inline]
pub fn set_stream_raw(
&mut self,
stream: std::pin::Pin<Box<dyn Stream<Item = String> + Send>>
) {
self.headers.set()
.ContentType("text/event-stream")
.CacheControl("no-cache, must-revalidate")
Expand Down
31 changes: 19 additions & 12 deletions ohkami/src/sse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![cfg(feature="sse")]

use ohkami_lib::Stream;
use ohkami_lib::{Stream, StreamExt};
use ohkami_lib::stream::impls::{QueueStream, Queue};
use std::{pin::Pin, future::Future};
use std::{pin::Pin, future::Future, marker::PhantomData};

/// Streaming response with data of type `T` (default: `String`).
///
Expand Down Expand Up @@ -36,7 +36,8 @@ use std::{pin::Pin, future::Future};
/// }
/// ```
pub struct DataStream<T: Data = String>(
Pin<Box<dyn Stream<Item = T> + Send>>
Pin<Box<dyn Stream<Item = String> + Send>>,
PhantomData<fn()->T>
);

pub trait Data: 'static {
Expand All @@ -54,7 +55,9 @@ const _: () = {
impl<T: Data> crate::IntoResponse for DataStream<T> {
#[inline]
fn into_response(self) -> crate::Response {
crate::Response::OK().with_stream(self.0)
let mut res = crate::Response::OK();
res.set_stream_raw(self.0);/* no additional boxing */
res
}
}

Expand All @@ -63,7 +66,7 @@ where
S: Stream<Item = T> + Send + 'static
{
fn from(stream: S) -> Self {
Self(Box::pin(stream))
Self(Box::pin(stream.map(Data::encode)), PhantomData)
}
}

Expand Down Expand Up @@ -96,22 +99,26 @@ impl<T: Data + Send + 'static> DataStream<T> {
F: FnOnce(handle::Stream<T>) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
Self(Box::pin(QueueStream::new(
|q| f(handle::Stream(q))
)))
Self(Box::pin(QueueStream::new(|q| f(handle::Stream::from(q)))), PhantomData)
}
}

pub mod handle {
use super::*;

pub struct Stream<T>(
pub(super) Queue<T>
pub(super) Queue<String>,
pub(super) PhantomData<fn()->T>
);
impl<T> Stream<T> {
#[inline(always)]
impl<T> From<Queue<String>> for self::Stream<T> {
fn from(q: Queue<String>) -> Self {
Self(q, PhantomData)
}
}
impl<T: Data> self::Stream<T> {
#[inline]
pub fn send(&mut self, data: impl Into<T>) {
self.0.push(data.into());
self.0.push(Data::encode(data.into()));
}
}
}

0 comments on commit b594ca9

Please sign in to comment.