diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 97783d5d9319..4a359fe3c699 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -71,6 +71,7 @@ use polars_core::error::to_compute_err; use polars_core::prelude::*; use polars_core::utils::try_get_supertype; use polars_json::json::infer; +use polars_json::json::write::FallibleStreamingIterator; use simd_json::BorrowedValue; use crate::mmap::{MmapBytesReader, ReaderBytes}; @@ -153,6 +154,34 @@ where } } +pub struct BatchedWriter { + writer: W, +} + +impl BatchedWriter +where + W: Write, +{ + pub fn new(writer: W) -> Self { + BatchedWriter { writer } + } + /// Write a batch to the json writer. + /// + /// # Panics + /// The caller must ensure the chunks in the given [`DataFrame`] are aligned. + pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + let fields = df.iter().map(|s| s.field().to_arrow()).collect::>(); + let chunks = df.iter_chunks(); + let batches = + chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); + let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]); + while let Some(block) = serializer.next()? { + self.writer.write_all(block)?; + } + Ok(()) + } +} + /// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame. #[must_use] pub struct JsonReader<'a, R> diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index ff35e6f60baf..605cc437be36 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -47,7 +47,7 @@ async = [ cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio", "futures"] cloud_write = ["cloud"] ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe?/ipc"] -json = ["polars-io/json", "polars-plan/json", "polars-json"] +json = ["polars-io/json", "polars-plan/json", "polars-json", "polars-pipe/json"] csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe?/csv"] temporal = ["dtype-datetime", "dtype-date", "dtype-time", "dtype-duration", "polars-plan/temporal"] # debugging purposes diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 19cba5b90d42..7a614997d5fb 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -7,7 +7,12 @@ mod err; pub mod pivot; use std::borrow::Cow; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] use std::path::PathBuf; use std::sync::Arc; @@ -27,7 +32,12 @@ use polars_core::prelude::*; use polars_io::RowCount; pub use polars_plan::frame::{AllowedOptimizations, OptState}; use polars_plan::global::FETCH_ROWS; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" +))] use polars_plan::logical_plan::collect_fingerprints; use polars_plan::logical_plan::optimize; use polars_plan::utils::expr_output_name; @@ -596,13 +606,23 @@ impl LazyFrame { self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?; let finger_prints = if file_caching { - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] { let mut fps = Vec::with_capacity(8); collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena); Some(fps) } - #[cfg(not(any(feature = "ipc", feature = "parquet", feature = "csv")))] + #[cfg(not(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + )))] { None } @@ -669,23 +689,14 @@ impl LazyFrame { /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "parquet")] - pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::File { + pub fn sink_parquet(self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { + self.sink( + SinkType::File { path: Arc::new(path), file_type: FileType::Parquet(options), }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_parquet()` instead" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + "collect().write_parquet()", + ) } /// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit @@ -694,70 +705,81 @@ impl LazyFrame { /// streaming fashion. #[cfg(all(feature = "cloud_write", feature = "parquet"))] pub fn sink_parquet_cloud( - mut self, + self, uri: String, cloud_options: Option, parquet_options: ParquetWriteOptions, ) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::Cloud { + self.sink( + SinkType::Cloud { uri: Arc::new(uri), cloud_options, file_type: FileType::Parquet(parquet_options), }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + "collect().write_parquet()", + ) } /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "ipc")] - pub fn sink_ipc(mut self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::File { + pub fn sink_ipc(self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { + self.sink( + SinkType::File { path: Arc::new(path), file_type: FileType::Ipc(options), }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_ipc()` instead" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + "collect().write_ipc()", + ) } /// Stream a query result into an csv file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "csv")] - pub fn sink_csv(mut self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::File { + pub fn sink_csv(self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { + self.sink( + SinkType::File { path: Arc::new(path), file_type: FileType::Csv(options), }, + "collect().write_csv()", + ) + } + + /// Stream a query result into a json file. This is useful if the final result doesn't fit + /// into memory. This methods will return an error if the query cannot be completely done in a + /// streaming fashion. + #[cfg(feature = "json")] + pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { + self.sink( + SinkType::File { + path: Arc::new(path), + file_type: FileType::Json(options), + }, + "collect().write_ndjson()` or `collect().write_json()", + ) + } + + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "cloud_write", + feature = "csv", + feature = "json", + ))] + fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> { + self.opt_state.streaming = true; + self.logical_plan = LogicalPlan::Sink { + input: Box::new(self.logical_plan), + payload, }; let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; polars_ensure!( is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_csv()` instead" + ComputeError: format!("cannot run the whole query in a streaming order; \ + use `{msg_alternative}` instead", msg_alternative=msg_alternative) ); let _ = physical_plan.execute(&mut state)?; Ok(()) diff --git a/crates/polars-lazy/src/physical_plan/file_cache.rs b/crates/polars-lazy/src/physical_plan/file_cache.rs index 24ce3c1fb873..bf2b281fe3b2 100644 --- a/crates/polars-lazy/src/physical_plan/file_cache.rs +++ b/crates/polars-lazy/src/physical_plan/file_cache.rs @@ -1,7 +1,12 @@ use std::sync::Mutex; use polars_core::prelude::*; -#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))] +#[cfg(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" +))] use polars_plan::logical_plan::FileFingerPrint; use crate::prelude::*; diff --git a/crates/polars-lazy/src/physical_plan/mod.rs b/crates/polars-lazy/src/physical_plan/mod.rs index 082efd21a110..7c7f51f6a02e 100644 --- a/crates/polars-lazy/src/physical_plan/mod.rs +++ b/crates/polars-lazy/src/physical_plan/mod.rs @@ -2,7 +2,12 @@ pub mod executors; #[cfg(any(feature = "list_eval", feature = "pivot"))] pub(crate) mod exotic; pub mod expressions; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" +))] mod file_cache; mod node_timer; pub mod planner; diff --git a/crates/polars-lazy/src/physical_plan/state.rs b/crates/polars-lazy/src/physical_plan/state.rs index c8db863a126e..00b923781741 100644 --- a/crates/polars-lazy/src/physical_plan/state.rs +++ b/crates/polars-lazy/src/physical_plan/state.rs @@ -8,10 +8,20 @@ use polars_core::config::verbose; use polars_core::frame::group_by::GroupsProxy; use polars_core::prelude::*; use polars_ops::prelude::ChunkJoinOptIds; -#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))] +#[cfg(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" +))] use polars_plan::logical_plan::FileFingerPrint; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" +))] use super::file_cache::FileCache; use crate::physical_plan::node_timer::NodeTimer; @@ -65,7 +75,12 @@ pub struct ExecutionState { // cached by a `.cache` call and kept in memory for the duration of the plan. df_cache: Arc>>>>, // cache file reads until all branches got there file, then we delete it - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] pub(crate) file_cache: FileCache, pub(super) schema_cache: RwLock>, /// Used by Window Expression to prevent redundant grouping @@ -110,7 +125,12 @@ impl ExecutionState { pub(super) fn split(&self) -> Self { Self { df_cache: self.df_cache.clone(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: self.file_cache.clone(), schema_cache: Default::default(), group_tuples: Default::default(), @@ -126,7 +146,12 @@ impl ExecutionState { pub(super) fn clone(&self) -> Self { Self { df_cache: self.df_cache.clone(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: self.file_cache.clone(), schema_cache: self.schema_cache.read().unwrap().clone().into(), group_tuples: self.group_tuples.clone(), @@ -138,16 +163,31 @@ impl ExecutionState { } } - #[cfg(not(any(feature = "parquet", feature = "csv", feature = "ipc")))] + #[cfg(not(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" + )))] pub(crate) fn with_finger_prints(_finger_prints: Option) -> Self { Self::new() } - #[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))] + #[cfg(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" + ))] pub(crate) fn with_finger_prints(finger_prints: Option>) -> Self { Self { df_cache: Arc::new(Mutex::new(PlHashMap::default())), schema_cache: Default::default(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: FileCache::new(finger_prints), group_tuples: Arc::new(RwLock::new(PlHashMap::default())), join_tuples: Arc::new(Mutex::new(PlHashMap::default())), @@ -166,7 +206,12 @@ impl ExecutionState { Self { df_cache: Default::default(), schema_cache: Default::default(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: FileCache::new(None), group_tuples: Default::default(), join_tuples: Default::default(), diff --git a/crates/polars-lazy/src/prelude.rs b/crates/polars-lazy/src/prelude.rs index a5baeda1ea78..964ec7894a2d 100644 --- a/crates/polars-lazy/src/prelude.rs +++ b/crates/polars-lazy/src/prelude.rs @@ -8,6 +8,8 @@ pub use polars_plan::logical_plan::{ pub use polars_plan::prelude::CsvWriterOptions; #[cfg(feature = "ipc")] pub use polars_plan::prelude::IpcWriterOptions; +#[cfg(feature = "json")] +pub use polars_plan::prelude::JsonWriterOptions; #[cfg(feature = "parquet")] pub use polars_plan::prelude::ParquetWriteOptions; pub(crate) use polars_plan::prelude::*; diff --git a/crates/polars-pipe/Cargo.toml b/crates/polars-pipe/Cargo.toml index aede1207e55f..d59ea7510d65 100644 --- a/crates/polars-pipe/Cargo.toml +++ b/crates/polars-pipe/Cargo.toml @@ -35,6 +35,7 @@ csv = ["polars-plan/csv", "polars-io/csv"] cloud = ["async", "polars-io/cloud", "polars-plan/cloud", "tokio", "futures"] parquet = ["polars-plan/parquet", "polars-io/parquet", "polars-io/async"] ipc = ["polars-plan/ipc", "polars-io/ipc"] +json = ["polars-plan/json", "polars-io/json"] async = ["polars-plan/async", "polars-io/async"] nightly = ["polars-core/nightly", "polars-utils/nightly", "hashbrown/nightly"] cross_join = ["polars-ops/cross_join"] diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 6cc723be10fe..feb8942a1b34 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -6,6 +6,8 @@ use crossbeam_channel::{bounded, Receiver, Sender}; use polars_core::prelude::*; #[cfg(feature = "csv")] use polars_io::csv::CsvWriter; +#[cfg(feature = "json")] +use polars_io::json::BatchedWriter; #[cfg(feature = "parquet")] use polars_io::parquet::ParquetWriter; #[cfg(feature = "ipc")] @@ -17,7 +19,12 @@ use polars_plan::prelude::*; use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult}; use crate::pipeline::morsels_per_sink; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] trait SinkWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()>; fn _finish(&mut self) -> PolarsResult<()>; @@ -58,6 +65,17 @@ impl SinkWriter for polars_io::csv::BatchedWriter { } } +#[cfg(feature = "json")] +impl SinkWriter for BatchedWriter { + fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + self.write_batch(df) + } + + fn _finish(&mut self) -> PolarsResult<()> { + Ok(()) + } +} + #[cfg(feature = "parquet")] pub struct ParquetSink {} #[cfg(feature = "parquet")] @@ -215,7 +233,45 @@ impl CsvSink { } } -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(feature = "json")] +pub struct JsonSink {} +#[cfg(feature = "json")] +impl JsonSink { + #[allow(clippy::new_ret_no_self)] + pub fn new( + path: &Path, + options: JsonWriterOptions, + _schema: &Schema, + ) -> PolarsResult { + let file = std::fs::File::create(path)?; + let writer = BatchedWriter::new(file); + + let writer = Box::new(writer) as Box; + + let morsels_per_sink = morsels_per_sink(); + let backpressure = morsels_per_sink * 2; + let (sender, receiver) = bounded(backpressure); + + let io_thread_handle = Arc::new(Some(init_writer_thread( + receiver, + writer, + options.maintain_order, + morsels_per_sink, + ))); + + Ok(FilesSink { + sender, + io_thread_handle, + }) + } +} + +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] fn init_writer_thread( receiver: Receiver>, mut writer: Box, @@ -262,13 +318,23 @@ fn init_writer_thread( // Ensure the data is return in the order it was streamed #[derive(Clone)] -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] pub struct FilesSink { sender: Sender>, io_thread_handle: Arc>>, } -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] impl Sink for FilesSink { fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult { // don't add empty dataframes diff --git a/crates/polars-pipe/src/executors/sinks/mod.rs b/crates/polars-pipe/src/executors/sinks/mod.rs index 8c9b46366da7..6adb3a291a2e 100644 --- a/crates/polars-pipe/src/executors/sinks/mod.rs +++ b/crates/polars-pipe/src/executors/sinks/mod.rs @@ -1,4 +1,9 @@ -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] mod file_sink; pub(crate) mod group_by; mod io; @@ -10,7 +15,12 @@ mod slice; mod sort; mod utils; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] pub(crate) use file_sink::*; pub(crate) use joins::*; pub(crate) use ordered::*; diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index ca68a2f94b82..a9369fa8e229 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -174,27 +174,32 @@ where #[allow(unused_variables)] SinkType::File { path, file_type, .. - } => match &file_type { - #[cfg(feature = "parquet")] - FileType::Parquet(options) => { - let path = path.as_ref().as_path(); - Box::new(ParquetSink::new(path, *options, input_schema.as_ref())?) - as Box - }, - #[cfg(feature = "ipc")] - FileType::Ipc(options) => { - let path = path.as_ref().as_path(); - Box::new(IpcSink::new(path, *options, input_schema.as_ref())?) - as Box - }, - #[cfg(feature = "csv")] - FileType::Csv(options) => { - let path = path.as_ref().as_path(); - Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref())?) - as Box - }, - #[allow(unreachable_patterns)] - _ => unreachable!(), + } => { + let path = path.as_ref().as_path(); + match &file_type { + #[cfg(feature = "parquet")] + FileType::Parquet(options) => { + Box::new(ParquetSink::new(path, *options, input_schema.as_ref())?) + as Box + }, + #[cfg(feature = "ipc")] + FileType::Ipc(options) => { + Box::new(IpcSink::new(path, *options, input_schema.as_ref())?) + as Box + }, + #[cfg(feature = "csv")] + FileType::Csv(options) => { + Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref())?) + as Box + }, + #[cfg(feature = "json")] + FileType::Json(options) => { + Box::new(JsonSink::new(path, *options, input_schema.as_ref())?) + as Box + }, + #[allow(unreachable_patterns)] + _ => unreachable!(), + } }, #[cfg(feature = "cloud")] SinkType::Cloud { diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index d394327e41f4..d1cba80d7ad4 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -55,7 +55,13 @@ pub use schema::*; use serde::{Deserialize, Serialize}; use strum_macros::IntoStaticStr; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "cse"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "cse", + feature = "json" +))] pub use crate::logical_plan::optimizer::file_caching::{ collect_fingerprints, find_column_union_and_fingerprints, FileCacher, FileFingerPrint, }; diff --git a/crates/polars-plan/src/logical_plan/optimizer/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/mod.rs index 3f0dc062e994..755634f03643 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/mod.rs @@ -13,7 +13,13 @@ mod collect_members; #[cfg(feature = "cse")] mod cse_expr; mod fast_projection; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "cse"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "cse", + feature = "json" +))] pub(crate) mod file_caching; mod flatten_union; #[cfg(feature = "fused")] diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 4a850411672b..1ed5ad28593b 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -85,6 +85,14 @@ pub struct CsvWriterOptions { pub serialize_options: SerializeOptions, } +#[cfg(feature = "json")] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct JsonWriterOptions { + /// maintain the order the data was processed + pub maintain_order: bool, +} + #[derive(Clone, Debug, PartialEq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct IpcScanOptions { @@ -329,6 +337,8 @@ pub enum FileType { Ipc(IpcWriterOptions), #[cfg(feature = "csv")] Csv(CsvWriterOptions), + #[cfg(feature = "json")] + Json(JsonWriterOptions), } #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 30dd204cbe20..475572b36ba6 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -2244,6 +2244,64 @@ def sink_csv( maintain_order=maintain_order, ) + def sink_ndjson( + self, + path: str | Path, + *, + maintain_order: bool = True, + type_coercion: bool = True, + predicate_pushdown: bool = True, + projection_pushdown: bool = True, + simplify_expression: bool = True, + no_optimization: bool = False, + slice_pushdown: bool = True, + ) -> DataFrame: + """ + Persists a LazyFrame at the provided path. + + This allows streaming results that are larger than RAM to be written to disk. + + Parameters + ---------- + path + File path to which the file should be written. + maintain_order + Maintain the order in which data is processed. + Setting this to `False` will be slightly faster. + type_coercion + Do type coercion optimization. + predicate_pushdown + Do predicate pushdown optimization. + projection_pushdown + Do projection pushdown optimization. + simplify_expression + Run simplify expressions optimization. + no_optimization + Turn off (certain) optimizations. + slice_pushdown + Slice pushdown optimization. + + Returns + ------- + DataFrame + + Examples + -------- + >>> lf = pl.scan_csv("/path/to/my_larger_than_ram_file.csv") # doctest: +SKIP + >>> lf.sink_json("out.json") # doctest: +SKIP + + """ + lf = self._set_sink_optimizations( + type_coercion=type_coercion, + predicate_pushdown=predicate_pushdown, + projection_pushdown=projection_pushdown, + simplify_expression=simplify_expression, + no_optimization=no_optimization, + slice_pushdown=slice_pushdown, + ) + + return lf.sink_json(path=path, maintain_order=maintain_order) + def _set_sink_optimizations( self, *, diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index 4b2a23d216af..4572ccd3e5a8 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -628,6 +628,21 @@ impl PyLazyFrame { Ok(()) } + #[allow(clippy::too_many_arguments)] + #[cfg(all(feature = "streaming", feature = "json"))] + #[pyo3(signature = (path, maintain_order))] + fn sink_json(&self, py: Python, path: PathBuf, maintain_order: bool) -> PyResult<()> { + let options = JsonWriterOptions { maintain_order }; + + // if we don't allow threads and we have udfs trying to acquire the gil from different + // threads we deadlock. + py.allow_threads(|| { + let ldf = self.ldf.clone(); + ldf.sink_json(path, options).map_err(PyPolarsErr::from) + })?; + Ok(()) + } + fn fetch(&self, py: Python, n_rows: usize) -> PyResult { let ldf = self.ldf.clone(); let df = py.allow_threads(|| ldf.fetch(n_rows).map_err(PyPolarsErr::from))?; diff --git a/py-polars/tests/unit/io/files/foods1.json b/py-polars/tests/unit/io/files/foods1.json new file mode 100644 index 000000000000..327566874aa1 --- /dev/null +++ b/py-polars/tests/unit/io/files/foods1.json @@ -0,0 +1,27 @@ +[{"category":"vegetables","calories":45,"fats_g":0.5,"sugars_g":2}, +{"category":"seafood","calories":150,"fats_g":5.0,"sugars_g":0}, +{"category":"meat","calories":100,"fats_g":5.0,"sugars_g":0}, +{"category":"fruit","calories":60,"fats_g":0.0,"sugars_g":11}, +{"category":"seafood","calories":140,"fats_g":5.0,"sugars_g":1}, +{"category":"meat","calories":120,"fats_g":10.0,"sugars_g":1}, +{"category":"vegetables","calories":20,"fats_g":0.0,"sugars_g":2}, +{"category":"fruit","calories":30,"fats_g":0.0,"sugars_g":5}, +{"category":"seafood","calories":130,"fats_g":5.0,"sugars_g":0}, +{"category":"fruit","calories":50,"fats_g":4.5,"sugars_g":0}, +{"category":"meat","calories":110,"fats_g":7.0,"sugars_g":0}, +{"category":"vegetables","calories":25,"fats_g":0.0,"sugars_g":2}, +{"category":"fruit","calories":30,"fats_g":0.0,"sugars_g":3}, +{"category":"vegetables","calories":22,"fats_g":0.0,"sugars_g":3}, +{"category":"vegetables","calories":25,"fats_g":0.0,"sugars_g":4}, +{"category":"seafood","calories":100,"fats_g":5.0,"sugars_g":0}, +{"category":"seafood","calories":200,"fats_g":10.0,"sugars_g":0}, +{"category":"seafood","calories":200,"fats_g":7.0,"sugars_g":2}, +{"category":"fruit","calories":60,"fats_g":0.0,"sugars_g":11}, +{"category":"meat","calories":110,"fats_g":7.0,"sugars_g":0}, +{"category":"vegetables","calories":25,"fats_g":0.0,"sugars_g":3}, +{"category":"seafood","calories":200,"fats_g":7.0,"sugars_g":2}, +{"category":"seafood","calories":130,"fats_g":1.5,"sugars_g":0}, +{"category":"fruit","calories":130,"fats_g":0.0,"sugars_g":25}, +{"category":"meat","calories":100,"fats_g":7.0,"sugars_g":0}, +{"category":"vegetables","calories":30,"fats_g":0.0,"sugars_g":5}, +{"category":"fruit","calories":50,"fats_g":0.0,"sugars_g":11}] diff --git a/py-polars/tests/unit/io/test_lazy_json.py b/py-polars/tests/unit/io/test_lazy_json.py index 21e06522c3cf..36d6ae4c49f5 100644 --- a/py-polars/tests/unit/io/test_lazy_json.py +++ b/py-polars/tests/unit/io/test_lazy_json.py @@ -135,3 +135,19 @@ def test_anonymous_scan_explain(io_files_path: Path) -> None: q = pl.scan_ndjson(source=file) assert "Anonymous" in q.explain() assert "Anonymous" in q.show_graph(raw_output=True) # type: ignore[operator] + + +def test_sink_ndjson_should_write_same_data( + io_files_path: Path, tmp_path: Path +) -> None: + tmp_path.mkdir(exist_ok=True) + # Arrange + source_path = io_files_path / "foods1.csv" + target_path = tmp_path / "foods_test.ndjson" + expected = pl.read_csv(source_path) + lf = pl.scan_csv(source_path) + # Act + lf.sink_ndjson(target_path) + df = pl.read_ndjson(target_path) + # Assert + assert_frame_equal(df, expected)