diff --git a/README.md b/README.md index d15060b..63dc2a3 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ and want to avoid huge memory allocation. Cargo.toml: ```toml [dependencies] -axum-streams = { version = "0.14", features=["json", "csv", "protobuf", "text"] } +axum-streams = { version = "0.15", features=["json", "csv", "protobuf", "text"] } ``` ## Compatibility matrix diff --git a/examples/text-example.rs b/examples/text-example.rs index 52397af..bfe4ea8 100644 --- a/examples/text-example.rs +++ b/examples/text-example.rs @@ -18,7 +18,9 @@ fn source_test_stream() -> impl Stream { } async fn test_text_stream() -> impl IntoResponse { - StreamBodyAs::text(source_test_stream()) + StreamBodyAsOptions::new() + .content_type(HttpHeaderValue::from_static("text/plain; charset=utf-8")) + .text(source_test_stream()); } #[tokio::main] diff --git a/src/arrow_format.rs b/src/arrow_format.rs index 3693193..7395f2d 100644 --- a/src/arrow_format.rs +++ b/src/arrow_format.rs @@ -34,6 +34,7 @@ impl StreamingFormat for ArrowRecordBatchIpcStreamFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, RecordBatch>, + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { fn write_batch( ipc_data_gen: &mut IpcDataGenerator, @@ -110,11 +111,13 @@ impl StreamingFormat for ArrowRecordBatchIpcStreamFormat { Box::pin(batch_stream.chain(append_stream)) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/vnd.apache.arrow.stream"), + options.content_type.clone().unwrap_or_else(|| { + http::header::HeaderValue::from_static("application/vnd.apache.arrow.stream") + }), ); Some(header_map) } diff --git a/src/csv_format.rs b/src/csv_format.rs index e62466e..1161758 100644 --- a/src/csv_format.rs +++ b/src/csv_format.rs @@ -98,6 +98,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { let stream_with_header = self.has_headers; let stream_delimiter = self.delimiter; @@ -131,11 +132,14 @@ where }) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("text/csv"), + options + .content_type + .clone() + .unwrap_or_else(|| http::header::HeaderValue::from_static("text/csv")), ); Some(header_map) } diff --git a/src/json_formats.rs b/src/json_formats.rs index aa45624..6e818cf 100644 --- a/src/json_formats.rs +++ b/src/json_formats.rs @@ -42,6 +42,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { let stream_bytes: BoxStream> = Box::pin({ stream.enumerate().map(|(index, obj)| { @@ -115,11 +116,14 @@ where Box::pin(prepend_stream.chain(stream_bytes.chain(append_stream))) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/json"), + options + .content_type + .clone() + .unwrap_or_else(|| http::header::HeaderValue::from_static("application/json")), ); Some(header_map) } @@ -140,6 +144,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { Box::pin({ stream.map(|obj| { @@ -155,7 +160,7 @@ where }) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, @@ -398,12 +403,6 @@ mod tests { my_array: Vec, } - #[derive(Debug, Clone, Serialize)] - struct TestEmptyEnvelopeStructure { - #[serde(skip_serializing_if = "Vec::is_empty")] - my_array: Vec, - } - let test_stream_vec = vec![ TestItemStructure { foo: "bar".to_string() diff --git a/src/lib.rs b/src/lib.rs index 9bb9072..84937f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,6 +73,7 @@ mod stream_format; pub use stream_format::*; mod stream_body_as; +pub use self::stream_body_as::HttpHeaderValue; pub use self::stream_body_as::StreamBodyAs; pub use self::stream_body_as::StreamBodyAsOptions; diff --git a/src/protobuf_format.rs b/src/protobuf_format.rs index 44ec022..5889f7b 100644 --- a/src/protobuf_format.rs +++ b/src/protobuf_format.rs @@ -21,6 +21,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { fn write_protobuf_record(obj: T) -> Result, axum::Error> where @@ -43,11 +44,13 @@ where }) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/x-protobuf-stream"), + options.content_type.clone().unwrap_or_else(|| { + http::header::HeaderValue::from_static("application/x-protobuf-stream") + }), ); Some(header_map) } diff --git a/src/stream_body_as.rs b/src/stream_body_as.rs index 6cdba16..3373aa5 100644 --- a/src/stream_body_as.rs +++ b/src/stream_body_as.rs @@ -42,8 +42,8 @@ impl<'a> StreamBodyAs<'a> { S: Stream + 'a + Send, { Self { - stream: Self::create_stream_frames(&stream_format, stream, options), - headers: stream_format.http_response_trailers(), + stream: Self::create_stream_frames(&stream_format, stream, &options), + headers: stream_format.http_response_trailers(&options), } } @@ -65,7 +65,7 @@ impl<'a> StreamBodyAs<'a> { fn create_stream_frames( stream_format: &FMT, stream: S, - options: StreamBodyAsOptions, + options: &StreamBodyAsOptions, ) -> BoxStream<'a, Result, axum::Error>> where FMT: StreamingFormat, @@ -73,7 +73,7 @@ impl<'a> StreamBodyAs<'a> { { match (options.buffering_ready_items, options.buffering_bytes) { (Some(buffering_ready_items), _) => stream_format - .to_bytes_stream(Box::pin(stream)) + .to_bytes_stream(Box::pin(stream), options) .ready_chunks(buffering_ready_items) .map(|chunks| { let mut buf = BytesMut::new(); @@ -84,12 +84,11 @@ impl<'a> StreamBodyAs<'a> { }) .boxed(), (_, Some(buffering_bytes)) => { - let bytes_stream = - stream_format - .to_bytes_stream(Box::pin(stream)) - .chain(futures::stream::once(futures::future::ready(Ok( - bytes::Bytes::new(), - )))); + let bytes_stream = stream_format + .to_bytes_stream(Box::pin(stream), options) + .chain(futures::stream::once(futures::future::ready(Ok( + bytes::Bytes::new(), + )))); bytes_stream .scan( @@ -117,7 +116,7 @@ impl<'a> StreamBodyAs<'a> { .boxed() } (None, None) => stream_format - .to_bytes_stream(Box::pin(stream)) + .to_bytes_stream(Box::pin(stream), options) .map(|res| res.map(Frame::data)) .boxed(), } @@ -147,9 +146,12 @@ impl<'a> HttpBody for StreamBodyAs<'a> { } } +pub type HttpHeaderValue = http::header::HeaderValue; + pub struct StreamBodyAsOptions { pub buffering_ready_items: Option, pub buffering_bytes: Option, + pub content_type: Option, } impl StreamBodyAsOptions { @@ -157,6 +159,7 @@ impl StreamBodyAsOptions { Self { buffering_ready_items: None, buffering_bytes: None, + content_type: None, } } @@ -169,6 +172,11 @@ impl StreamBodyAsOptions { self.buffering_bytes = Some(ready_bytes); self } + + pub fn content_type(mut self, content_type: HttpHeaderValue) -> Self { + self.content_type = Some(content_type); + self + } } #[cfg(test)] diff --git a/src/stream_format.rs b/src/stream_format.rs index e91c789..beb8c33 100644 --- a/src/stream_format.rs +++ b/src/stream_format.rs @@ -1,3 +1,4 @@ +use crate::StreamBodyAsOptions; use futures::stream::BoxStream; use http::HeaderMap; @@ -5,7 +6,8 @@ pub trait StreamingFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + options: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result>; - fn http_response_trailers(&self) -> Option; + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option; } diff --git a/src/text_format.rs b/src/text_format.rs index 766d47f..c519af6 100644 --- a/src/text_format.rs +++ b/src/text_format.rs @@ -18,6 +18,7 @@ impl StreamingFormat for TextStreamFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, String>, + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { fn write_text_record(obj: String) -> Result, axum::Error> { let obj_vec = obj.as_bytes().to_vec(); @@ -27,11 +28,13 @@ impl StreamingFormat for TextStreamFormat { Box::pin(stream.map(move |obj| write_text_record(obj).map(|data| data.into()))) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("text/plain; charset=utf-8"), + options.content_type.clone().unwrap_or_else(|| { + http::header::HeaderValue::from_static("text/plain; charset=utf-8") + }), ); Some(header_map) }