Skip to content

Commit

Permalink
Arrow IPC stream format doesn't require schema anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Aug 14, 2024
1 parent af89c05 commit 2e17ceb
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 15 deletions.
2 changes: 1 addition & 1 deletion examples/arrow-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn test_text_stream() -> impl IntoResponse {
Field::new("lat", DataType::Float64, false),
Field::new("lng", DataType::Float64, false),
]));
StreamBodyAs::arrow_ipc(schema.clone(), source_test_stream(schema.clone()))
StreamBodyAs::arrow_ipc(Some(schema.clone()), source_test_stream(schema.clone()))
}

#[tokio::main]
Expand Down
91 changes: 77 additions & 14 deletions src/arrow_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ use std::io::Write;
use std::sync::Arc;

pub struct ArrowRecordBatchIpcStreamFormat {
schema: SchemaRef,
schema: Option<SchemaRef>,
options: IpcWriteOptions,
}

impl ArrowRecordBatchIpcStreamFormat {
pub fn new(schema: Arc<Schema>) -> Self {
pub fn new(schema: Option<SchemaRef>) -> Self {
Self::with_options(schema, IpcWriteOptions::default())
}

pub fn with_options(schema: Arc<Schema>, options: IpcWriteOptions) -> Self {
pub fn with_options(schema: Option<SchemaRef>, options: IpcWriteOptions) -> Self {
Self {
schema: schema.clone(),
options: options.clone(),
Expand Down Expand Up @@ -72,7 +72,7 @@ impl StreamingFormat<RecordBatch> for ArrowRecordBatchIpcStreamFormat {
Ok(writer.into_inner().freeze())
}

let batch_schema = self.schema.clone();
let batch_maybe_schema = self.schema.clone();
let batch_options = self.options.clone();

let ipc_data_gen = IpcDataGenerator::default();
Expand All @@ -85,7 +85,9 @@ impl StreamingFormat<RecordBatch> for ArrowRecordBatchIpcStreamFormat {
Err(e) => futures::future::ready(Some(Err(e))),
Ok(batch) => futures::future::ready({
let prepend_schema = if *idx == 0 {
Some(batch_schema.clone())
batch_maybe_schema
.as_ref()
.map(|batch_schema| batch_schema.clone())
} else {
None
};
Expand Down Expand Up @@ -125,7 +127,7 @@ impl StreamingFormat<RecordBatch> for ArrowRecordBatchIpcStreamFormat {
}

impl<'a> crate::StreamBodyAs<'a> {
pub fn arrow_ipc<S>(schema: SchemaRef, stream: S) -> Self
pub fn arrow_ipc<S>(schema: Option<SchemaRef>, stream: S) -> Self
where
S: Stream<Item = RecordBatch> + 'a + Send,
{
Expand All @@ -135,15 +137,19 @@ impl<'a> crate::StreamBodyAs<'a> {
)
}

pub fn arrow_ipc_with_errors<S, E>(schema: SchemaRef, stream: S) -> Self
pub fn arrow_ipc_with_errors<S, E>(schema: Option<SchemaRef>, stream: S) -> Self
where
S: Stream<Item = Result<RecordBatch, E>> + 'a + Send,
E: Into<axum::Error>,
{
Self::new(ArrowRecordBatchIpcStreamFormat::new(schema), stream)
}

pub fn arrow_ipc_with_options<S>(schema: SchemaRef, stream: S, options: IpcWriteOptions) -> Self
pub fn arrow_ipc_with_options<S>(
schema: Option<SchemaRef>,
stream: S,
options: IpcWriteOptions,
) -> Self
where
S: Stream<Item = RecordBatch> + 'a + Send,
{
Expand All @@ -154,7 +160,7 @@ impl<'a> crate::StreamBodyAs<'a> {
}

pub fn arrow_ipc_with_options_errors<S, E>(
schema: SchemaRef,
schema: Option<SchemaRef>,
stream: S,
options: IpcWriteOptions,
) -> Self
Expand All @@ -170,7 +176,7 @@ impl<'a> crate::StreamBodyAs<'a> {
}

impl StreamBodyAsOptions {
pub fn arrow_ipc<'a, S>(self, schema: SchemaRef, stream: S) -> StreamBodyAs<'a>
pub fn arrow_ipc<'a, S>(self, schema: Option<SchemaRef>, stream: S) -> StreamBodyAs<'a>
where
S: Stream<Item = RecordBatch> + 'a + Send,
{
Expand All @@ -181,7 +187,11 @@ impl StreamBodyAsOptions {
)
}

pub fn arrow_ipc_with_errors<'a, S, E>(self, schema: SchemaRef, stream: S) -> StreamBodyAs<'a>
pub fn arrow_ipc_with_errors<'a, S, E>(
self,
schema: Option<SchemaRef>,
stream: S,
) -> StreamBodyAs<'a>
where
S: Stream<Item = Result<RecordBatch, E>> + 'a + Send,
E: Into<axum::Error>,
Expand All @@ -191,7 +201,7 @@ impl StreamBodyAsOptions {

pub fn arrow_ipc_with_options<'a, S>(
self,
schema: SchemaRef,
schema: Option<SchemaRef>,
stream: S,
options: IpcWriteOptions,
) -> StreamBodyAs<'a>
Expand All @@ -207,7 +217,7 @@ impl StreamBodyAsOptions {

pub fn arrow_ipc_with_options_errors<'a, S, E>(
self,
schema: SchemaRef,
schema: Option<SchemaRef>,
stream: S,
options: IpcWriteOptions,
) -> StreamBodyAs<'a>
Expand Down Expand Up @@ -269,7 +279,7 @@ mod tests {
"/",
get(|| async move {
StreamBodyAs::new(
ArrowRecordBatchIpcStreamFormat::new(app_schema.clone()),
ArrowRecordBatchIpcStreamFormat::new(Some(app_schema.clone())),
test_stream.map(Ok::<_, axum::Error>),
)
}),
Expand Down Expand Up @@ -297,4 +307,57 @@ mod tests {
assert_eq!(body.len(), expected_buf.len());
assert_eq!(body, expected_buf);
}

#[tokio::test]
async fn serialize_arrow_stream_format_without_schema() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("city", DataType::Utf8, false),
Field::new("lat", DataType::Float64, false),
Field::new("lng", DataType::Float64, false),
]));

fn create_test_batch(schema_ref: SchemaRef) -> Vec<RecordBatch> {
let vec_schema = schema_ref.clone();
(0i64..10i64)
.map(move |idx| {
RecordBatch::try_new(
vec_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![idx, idx * 2, idx * 3])),
Arc::new(StringArray::from(vec!["New York", "London", "Gothenburg"])),
Arc::new(Float64Array::from(vec![40.7128, 51.5074, 57.7089])),
Arc::new(Float64Array::from(vec![-74.0060, -0.1278, 11.9746])),
],
)
.unwrap()
})
.collect()
}

let test_stream = Box::pin(stream::iter(create_test_batch(schema.clone())));

let app = Router::new().route(
"/",
get(|| async move {
StreamBodyAs::new(
ArrowRecordBatchIpcStreamFormat::new(None),
test_stream.map(Ok::<_, axum::Error>),
)
}),
);

let client = TestClient::new(app).await;

let res = client.get("/").send().await.unwrap();
assert_eq!(
res.headers()
.get("content-type")
.and_then(|h| h.to_str().ok()),
Some("application/vnd.apache.arrow.stream")
);
let body = res.bytes().await.unwrap().to_vec();

assert!(body.len() > 0);
}
}

0 comments on commit 2e17ceb

Please sign in to comment.