From 3e961d8bbbce7f64cf49b9aeba88c141aa41a117 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Thu, 20 Jun 2024 16:06:47 +0200 Subject: [PATCH 1/2] Content-type configuration support for all formats --- src/arrow_format.rs | 3 ++- src/csv_format.rs | 3 ++- src/json_formats.rs | 12 ++++-------- src/protobuf_format.rs | 3 ++- src/stream_body_as.rs | 14 ++++++++------ src/stream_format.rs | 4 +++- src/text_format.rs | 3 ++- 7 files changed, 23 insertions(+), 19 deletions(-) diff --git a/src/arrow_format.rs b/src/arrow_format.rs index 3693193..fe04672 100644 --- a/src/arrow_format.rs +++ b/src/arrow_format.rs @@ -34,6 +34,7 @@ impl StreamingFormat for ArrowRecordBatchIpcStreamFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, RecordBatch>, + _: &'a StreamBodyAsOptions ) -> BoxStream<'b, Result> { fn write_batch( ipc_data_gen: &mut IpcDataGenerator, @@ -110,7 +111,7 @@ impl StreamingFormat for ArrowRecordBatchIpcStreamFormat { Box::pin(batch_stream.chain(append_stream)) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self,_: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, diff --git a/src/csv_format.rs b/src/csv_format.rs index e62466e..87796f4 100644 --- a/src/csv_format.rs +++ b/src/csv_format.rs @@ -98,6 +98,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + _: &'a StreamBodyAsOptions ) -> BoxStream<'b, Result> { let stream_with_header = self.has_headers; let stream_delimiter = self.delimiter; @@ -131,7 +132,7 @@ where }) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, diff --git a/src/json_formats.rs b/src/json_formats.rs index aa45624..61b4750 100644 --- a/src/json_formats.rs +++ b/src/json_formats.rs @@ -42,6 +42,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + _: &'a StreamBodyAsOptions ) -> BoxStream<'b, Result> { let stream_bytes: BoxStream> = Box::pin({ stream.enumerate().map(|(index, obj)| { @@ -115,7 +116,7 @@ where Box::pin(prepend_stream.chain(stream_bytes.chain(append_stream))) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, @@ -140,6 +141,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + _: &'a StreamBodyAsOptions ) -> BoxStream<'b, Result> { Box::pin({ stream.map(|obj| { @@ -155,7 +157,7 @@ where }) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, @@ -398,12 +400,6 @@ mod tests { my_array: Vec, } - #[derive(Debug, Clone, Serialize)] - struct TestEmptyEnvelopeStructure { - #[serde(skip_serializing_if = "Vec::is_empty")] - my_array: Vec, - } - let test_stream_vec = vec![ TestItemStructure { foo: "bar".to_string() diff --git a/src/protobuf_format.rs b/src/protobuf_format.rs index 44ec022..5b906cc 100644 --- a/src/protobuf_format.rs +++ b/src/protobuf_format.rs @@ -21,6 +21,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + _: &'a StreamBodyAsOptions ) -> BoxStream<'b, Result> { fn write_protobuf_record(obj: T) -> Result, axum::Error> where @@ -43,7 +44,7 @@ where }) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, diff --git a/src/stream_body_as.rs b/src/stream_body_as.rs index 6cdba16..7a75457 100644 --- a/src/stream_body_as.rs +++ b/src/stream_body_as.rs @@ -42,8 +42,8 @@ impl<'a> StreamBodyAs<'a> { S: Stream + 'a + Send, { Self { - stream: Self::create_stream_frames(&stream_format, stream, options), - headers: stream_format.http_response_trailers(), + stream: Self::create_stream_frames(&stream_format, stream, &options), + headers: stream_format.http_response_trailers(&options), } } @@ -65,7 +65,7 @@ impl<'a> StreamBodyAs<'a> { fn create_stream_frames( stream_format: &FMT, stream: S, - options: StreamBodyAsOptions, + options: &StreamBodyAsOptions, ) -> BoxStream<'a, Result, axum::Error>> where FMT: StreamingFormat, @@ -73,7 +73,7 @@ impl<'a> StreamBodyAs<'a> { { match (options.buffering_ready_items, options.buffering_bytes) { (Some(buffering_ready_items), _) => stream_format - .to_bytes_stream(Box::pin(stream)) + .to_bytes_stream(Box::pin(stream), options) .ready_chunks(buffering_ready_items) .map(|chunks| { let mut buf = BytesMut::new(); @@ -86,7 +86,7 @@ impl<'a> StreamBodyAs<'a> { (_, Some(buffering_bytes)) => { let bytes_stream = stream_format - .to_bytes_stream(Box::pin(stream)) + .to_bytes_stream(Box::pin(stream), options) .chain(futures::stream::once(futures::future::ready(Ok( bytes::Bytes::new(), )))); @@ -117,7 +117,7 @@ impl<'a> StreamBodyAs<'a> { .boxed() } (None, None) => stream_format - .to_bytes_stream(Box::pin(stream)) + .to_bytes_stream(Box::pin(stream), options) .map(|res| res.map(Frame::data)) .boxed(), } @@ -150,6 +150,7 @@ impl<'a> HttpBody for StreamBodyAs<'a> { pub struct StreamBodyAsOptions { pub buffering_ready_items: Option, pub buffering_bytes: Option, + pub content_type: Option, } impl StreamBodyAsOptions { @@ -157,6 +158,7 @@ impl StreamBodyAsOptions { Self { buffering_ready_items: None, buffering_bytes: None, + content_type: None, } } diff --git a/src/stream_format.rs b/src/stream_format.rs index e91c789..59be9fd 100644 --- a/src/stream_format.rs +++ b/src/stream_format.rs @@ -1,11 +1,13 @@ use futures::stream::BoxStream; use http::HeaderMap; +use crate::StreamBodyAsOptions; pub trait StreamingFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, + options: &'a StreamBodyAsOptions ) -> BoxStream<'b, Result>; - fn http_response_trailers(&self) -> Option; + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option; } diff --git a/src/text_format.rs b/src/text_format.rs index 766d47f..13ecdd1 100644 --- a/src/text_format.rs +++ b/src/text_format.rs @@ -18,6 +18,7 @@ impl StreamingFormat for TextStreamFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, String>, + _: &'a StreamBodyAsOptions ) -> BoxStream<'b, Result> { fn write_text_record(obj: String) -> Result, axum::Error> { let obj_vec = obj.as_bytes().to_vec(); @@ -27,7 +28,7 @@ impl StreamingFormat for TextStreamFormat { Box::pin(stream.map(move |obj| write_text_record(obj).map(|data| data.into()))) } - fn http_response_trailers(&self) -> Option { + fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, From 17860f16cfbb460196f272594852cc580a7b95a5 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Thu, 20 Jun 2024 16:13:57 +0200 Subject: [PATCH 2/2] Docs and sample update --- README.md | 2 +- examples/text-example.rs | 4 +++- src/arrow_format.rs | 8 +++++--- src/csv_format.rs | 9 ++++++--- src/json_formats.rs | 11 +++++++---- src/lib.rs | 1 + src/protobuf_format.rs | 8 +++++--- src/stream_body_as.rs | 20 +++++++++++++------- src/stream_format.rs | 4 ++-- src/text_format.rs | 8 +++++--- 10 files changed, 48 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index d15060b..63dc2a3 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ and want to avoid huge memory allocation. Cargo.toml: ```toml [dependencies] -axum-streams = { version = "0.14", features=["json", "csv", "protobuf", "text"] } +axum-streams = { version = "0.15", features=["json", "csv", "protobuf", "text"] } ``` ## Compatibility matrix diff --git a/examples/text-example.rs b/examples/text-example.rs index 52397af..bfe4ea8 100644 --- a/examples/text-example.rs +++ b/examples/text-example.rs @@ -18,7 +18,9 @@ fn source_test_stream() -> impl Stream { } async fn test_text_stream() -> impl IntoResponse { - StreamBodyAs::text(source_test_stream()) + StreamBodyAsOptions::new() + .content_type(HttpHeaderValue::from_static("text/plain; charset=utf-8")) + .text(source_test_stream()); } #[tokio::main] diff --git a/src/arrow_format.rs b/src/arrow_format.rs index fe04672..7395f2d 100644 --- a/src/arrow_format.rs +++ b/src/arrow_format.rs @@ -34,7 +34,7 @@ impl StreamingFormat for ArrowRecordBatchIpcStreamFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, RecordBatch>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { fn write_batch( ipc_data_gen: &mut IpcDataGenerator, @@ -111,11 +111,13 @@ impl StreamingFormat for ArrowRecordBatchIpcStreamFormat { Box::pin(batch_stream.chain(append_stream)) } - fn http_response_trailers(&self,_: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/vnd.apache.arrow.stream"), + options.content_type.clone().unwrap_or_else(|| { + http::header::HeaderValue::from_static("application/vnd.apache.arrow.stream") + }), ); Some(header_map) } diff --git a/src/csv_format.rs b/src/csv_format.rs index 87796f4..1161758 100644 --- a/src/csv_format.rs +++ b/src/csv_format.rs @@ -98,7 +98,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { let stream_with_header = self.has_headers; let stream_delimiter = self.delimiter; @@ -132,11 +132,14 @@ where }) } - fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("text/csv"), + options + .content_type + .clone() + .unwrap_or_else(|| http::header::HeaderValue::from_static("text/csv")), ); Some(header_map) } diff --git a/src/json_formats.rs b/src/json_formats.rs index 61b4750..6e818cf 100644 --- a/src/json_formats.rs +++ b/src/json_formats.rs @@ -42,7 +42,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { let stream_bytes: BoxStream> = Box::pin({ stream.enumerate().map(|(index, obj)| { @@ -116,11 +116,14 @@ where Box::pin(prepend_stream.chain(stream_bytes.chain(append_stream))) } - fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/json"), + options + .content_type + .clone() + .unwrap_or_else(|| http::header::HeaderValue::from_static("application/json")), ); Some(header_map) } @@ -141,7 +144,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { Box::pin({ stream.map(|obj| { diff --git a/src/lib.rs b/src/lib.rs index 9bb9072..84937f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,6 +73,7 @@ mod stream_format; pub use stream_format::*; mod stream_body_as; +pub use self::stream_body_as::HttpHeaderValue; pub use self::stream_body_as::StreamBodyAs; pub use self::stream_body_as::StreamBodyAsOptions; diff --git a/src/protobuf_format.rs b/src/protobuf_format.rs index 5b906cc..5889f7b 100644 --- a/src/protobuf_format.rs +++ b/src/protobuf_format.rs @@ -21,7 +21,7 @@ where fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { fn write_protobuf_record(obj: T) -> Result, axum::Error> where @@ -44,11 +44,13 @@ where }) } - fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/x-protobuf-stream"), + options.content_type.clone().unwrap_or_else(|| { + http::header::HeaderValue::from_static("application/x-protobuf-stream") + }), ); Some(header_map) } diff --git a/src/stream_body_as.rs b/src/stream_body_as.rs index 7a75457..3373aa5 100644 --- a/src/stream_body_as.rs +++ b/src/stream_body_as.rs @@ -84,12 +84,11 @@ impl<'a> StreamBodyAs<'a> { }) .boxed(), (_, Some(buffering_bytes)) => { - let bytes_stream = - stream_format - .to_bytes_stream(Box::pin(stream), options) - .chain(futures::stream::once(futures::future::ready(Ok( - bytes::Bytes::new(), - )))); + let bytes_stream = stream_format + .to_bytes_stream(Box::pin(stream), options) + .chain(futures::stream::once(futures::future::ready(Ok( + bytes::Bytes::new(), + )))); bytes_stream .scan( @@ -147,10 +146,12 @@ impl<'a> HttpBody for StreamBodyAs<'a> { } } +pub type HttpHeaderValue = http::header::HeaderValue; + pub struct StreamBodyAsOptions { pub buffering_ready_items: Option, pub buffering_bytes: Option, - pub content_type: Option, + pub content_type: Option, } impl StreamBodyAsOptions { @@ -171,6 +172,11 @@ impl StreamBodyAsOptions { self.buffering_bytes = Some(ready_bytes); self } + + pub fn content_type(mut self, content_type: HttpHeaderValue) -> Self { + self.content_type = Some(content_type); + self + } } #[cfg(test)] diff --git a/src/stream_format.rs b/src/stream_format.rs index 59be9fd..beb8c33 100644 --- a/src/stream_format.rs +++ b/src/stream_format.rs @@ -1,12 +1,12 @@ +use crate::StreamBodyAsOptions; use futures::stream::BoxStream; use http::HeaderMap; -use crate::StreamBodyAsOptions; pub trait StreamingFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, T>, - options: &'a StreamBodyAsOptions + options: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result>; fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option; diff --git a/src/text_format.rs b/src/text_format.rs index 13ecdd1..c519af6 100644 --- a/src/text_format.rs +++ b/src/text_format.rs @@ -18,7 +18,7 @@ impl StreamingFormat for TextStreamFormat { fn to_bytes_stream<'a, 'b>( &'a self, stream: BoxStream<'b, String>, - _: &'a StreamBodyAsOptions + _: &'a StreamBodyAsOptions, ) -> BoxStream<'b, Result> { fn write_text_record(obj: String) -> Result, axum::Error> { let obj_vec = obj.as_bytes().to_vec(); @@ -28,11 +28,13 @@ impl StreamingFormat for TextStreamFormat { Box::pin(stream.map(move |obj| write_text_record(obj).map(|data| data.into()))) } - fn http_response_trailers(&self, _: &StreamBodyAsOptions) -> Option { + fn http_response_trailers(&self, options: &StreamBodyAsOptions) -> Option { let mut header_map = HeaderMap::new(); header_map.insert( http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("text/plain; charset=utf-8"), + options.content_type.clone().unwrap_or_else(|| { + http::header::HeaderValue::from_static("text/plain; charset=utf-8") + }), ); Some(header_map) }