diff --git a/Cargo.lock b/Cargo.lock index a1ffd2c8..2c87d29d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1265,6 +1265,7 @@ dependencies = [ "arrow-array", "arrow-buffer", "arrow-cast", + "arrow-csv", "arrow-data", "arrow-ipc", "arrow-schema", diff --git a/Cargo.toml b/Cargo.toml index a061fa80..92500e52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ categories = ["science::geo"] rust-version = "1.80" [features] -csv = ["geozero/with-csv"] +csv = ["dep:arrow-csv", "geozero/with-csv"] flatgeobuf = ["dep:flatgeobuf"] flatgeobuf_async = [ "flatgeobuf/http", @@ -48,6 +48,7 @@ arrow = { version = "53", features = ["ffi"] } arrow-array = { version = "53", features = ["chrono-tz"] } arrow-buffer = "53" arrow-cast = { version = "53" } +arrow-csv = { version = "53", optional = true } arrow-data = "53" arrow-ipc = "53" arrow-schema = "53" diff --git a/src/io/csv/writer.rs b/src/io/csv/writer.rs index f376ba03..c0f31d3d 100644 --- a/src/io/csv/writer.rs +++ b/src/io/csv/writer.rs @@ -1,16 +1,52 @@ +use crate::array::NativeArrayDyn; use crate::error::Result; use crate::io::stream::RecordBatchReader; -use geozero::csv::CsvWriter; -use geozero::GeozeroDatasource; +use crate::io::wkt::ToWKT; +use crate::{ArrayBase, NativeArray}; +use arrow_array::RecordBatch; +use arrow_schema::Schema; use std::io::Write; +use std::sync::Arc; + +// TODO: add CSV writer options /// Write a Table to CSV pub fn write_csv>(stream: S, writer: W) -> Result<()> { - let mut csv_writer = CsvWriter::new(writer); - stream.into().process(&mut csv_writer)?; + let mut stream: RecordBatchReader = stream.into(); + let reader = stream.take().unwrap(); + + let mut csv_writer = arrow_csv::Writer::new(writer); + for batch in reader { + csv_writer.write(&encode_batch(batch?)?)?; + } + Ok(()) } +fn encode_batch(batch: RecordBatch) -> Result { + let schema = batch.schema(); + let fields = schema.fields(); + + let mut new_fields = Vec::with_capacity(fields.len()); + let mut new_columns = Vec::with_capacity(fields.len()); + + for (field, column) in schema.fields().iter().zip(batch.columns()) { + if let Ok(arr) = NativeArrayDyn::from_arrow_array(&column, field) { + let wkt_arr = arr.as_ref().to_wkt::(); + new_fields.push(wkt_arr.extension_field()); + new_columns.push(wkt_arr.into_array_ref()); + } else { + new_fields.push(field.clone()); + new_columns.push(column.clone()); + } + } + + Ok(RecordBatch::try_new( + Arc::new(Schema::new(new_fields).with_metadata(schema.metadata().clone())), + new_columns, + )?) +} + #[cfg(test)] mod test { use super::*;