From 6a79d0c90d9fcd7791eaecdb433d780ca3ba5df8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Fri, 25 Aug 2023 13:32:58 -0600 Subject: [PATCH 01/26] add json options --- crates/polars-plan/src/logical_plan/options.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 4a850411672b..9100b71b6d4a 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -14,6 +14,7 @@ use polars_io::RowCount; use polars_time::{DynamicGroupOptions, RollingGroupOptions}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use polars_io::json::JsonFormat; #[cfg(feature = "python")] use crate::prelude::python_udf::PythonFunction; @@ -82,7 +83,14 @@ pub struct CsvWriterOptions { pub include_header: bool, pub batch_size: usize, pub maintain_order: bool, - pub serialize_options: SerializeOptions, + pub serialize_options: SerializeOptions +} + +#[cfg(feature = "json")] +#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct JsonWriterOptions { + json_format: JsonFormat } #[derive(Clone, Debug, PartialEq)] @@ -329,6 +337,8 @@ pub enum FileType { Ipc(IpcWriterOptions), #[cfg(feature = "csv")] Csv(CsvWriterOptions), + #[cfg(feature = "json")] + Json(JsonWriterOptions), } #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] From f7ed4462c547afb657d1ad7cb9e61d87d19864d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Fri, 25 Aug 2023 17:05:14 -0600 Subject: [PATCH 02/26] Add API changes --- crates/polars-lazy/src/frame/mod.rs | 23 ++++++++++ crates/polars-lazy/src/prelude.rs | 2 + py-polars/polars/lazyframe/frame.py | 71 +++++++++++++++++++++++++++++ py-polars/polars/type_aliases.py | 2 + py-polars/src/lazyframe.rs | 24 ++++++++++ 5 files changed, 122 insertions(+) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 19cba5b90d42..a8548e7ea5c5 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -763,6 +763,29 @@ impl LazyFrame { Ok(()) } + /// Stream a query result into a json file. This is useful if the final result doesn't fit + /// into memory. This methods will return an error if the query cannot be completely done in a + /// streaming fashion. + #[cfg(feature = "json")] + pub fn sink_json(mut self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { + self.opt_state.streaming = true; + self.logical_plan = LogicalPlan::FileSink { + input: Box::new(self.logical_plan), + payload: FileSinkOptions { + path: Arc::new(path), + file_type: FileType::Json(options), + }, + }; + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: "cannot run the whole query in a streaming order; \ + use `collect().write_ndjson()` instead" + ); + let _ = physical_plan.execute(&mut state)?; + Ok(()) + } + /// Filter by some predicate expression. /// /// The expression must yield boolean values. diff --git a/crates/polars-lazy/src/prelude.rs b/crates/polars-lazy/src/prelude.rs index a5baeda1ea78..0c7750b4c796 100644 --- a/crates/polars-lazy/src/prelude.rs +++ b/crates/polars-lazy/src/prelude.rs @@ -11,6 +11,8 @@ pub use polars_plan::prelude::IpcWriterOptions; #[cfg(feature = "parquet")] pub use polars_plan::prelude::ParquetWriteOptions; pub(crate) use polars_plan::prelude::*; +#[cfg(feature = "json")] +pub use polars_plan::prelude::JsonWriterOptions; #[cfg(feature = "rolling_window")] pub use polars_time::{prelude::RollingOptions, Duration}; #[cfg(feature = "dynamic_group_by")] diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 30dd204cbe20..554950542024 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -95,6 +95,7 @@ ColumnNameOrSelector, CsvEncoding, CsvQuoteStyle, + JsonFormat, FillNullStrategy, FrameInitTypes, IntoExpr, @@ -2244,6 +2245,76 @@ def sink_csv( maintain_order=maintain_order, ) + def sink_json( + self, + path: str | Path, + *, + json_format: JsonFormat | None = None, + maintain_order: bool = True, + type_coercion: bool = True, + predicate_pushdown: bool = True, + projection_pushdown: bool = True, + simplify_expression: bool = True, + no_optimization: bool = False, + slice_pushdown: bool = True, + ) -> DataFrame: + """ + Persists a LazyFrame at the provided path. + + This allows streaming results that are larger than RAM to be written to disk. + + Parameters + ---------- + path + File path to which the file should be written. + json_format : {'json', 'json_lines'} + Choose "json" for single JSON array containing each DataFrame row as an object. + Choose "json_lines" for each row output on a separate line. + maintain_order + Maintain the order in which data is processed. + Setting this to `False` will be slightly faster. + type_coercion + Do type coercion optimization. + predicate_pushdown + Do predicate pushdown optimization. + projection_pushdown + Do projection pushdown optimization. + simplify_expression + Run simplify expressions optimization. + no_optimization + Turn off (certain) optimizations. + slice_pushdown + Slice pushdown optimization. + + Returns + ------- + DataFrame + + Notes + ----- + json_format parameter is currently not supported. + + Examples + -------- + >>> lf = pl.scan_csv("/path/to/my_larger_than_ram_file.csv") # doctest: +SKIP + >>> lf.sink_json("out.json") # doctest: +SKIP + + """ + lf = self._set_sink_optimizations( + type_coercion=type_coercion, + predicate_pushdown=predicate_pushdown, + projection_pushdown=projection_pushdown, + simplify_expression=simplify_expression, + no_optimization=no_optimization, + slice_pushdown=slice_pushdown, + ) + + return lf.sink_json( + path=path, + json_format=json_format, + maintain_order=maintain_order + ) + def _set_sink_optimizations( self, *, diff --git a/py-polars/polars/type_aliases.py b/py-polars/polars/type_aliases.py index a1bbb246b1bc..d744a0016ec0 100644 --- a/py-polars/polars/type_aliases.py +++ b/py-polars/polars/type_aliases.py @@ -128,6 +128,8 @@ UniqueKeepStrategy: TypeAlias = Literal["first", "last", "any", "none"] UnstackDirection: TypeAlias = Literal["vertical", "horizontal"] MapElementsStrategy: TypeAlias = Literal["thread_local", "threading"] +ApplyStrategy: TypeAlias = Literal["thread_local", "threading"] +JsonFormat: TypeAlias = Literal["json", "json_lines"] # The following have a Rust enum equivalent with a different name AsofJoinStrategy: TypeAlias = Literal["backward", "forward", "nearest"] # AsofStrategy diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index 4b2a23d216af..21d52e6feb23 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -628,6 +628,30 @@ impl PyLazyFrame { Ok(()) } + #[allow(clippy::too_many_arguments)] + #[cfg(all(feature = "streaming", feature = "json"))] + #[pyo3(signature = (path, json_format, maintain_order))] + fn sink_json( + &self, + py: Python, + path: PathBuf, + json_format: Option, + maintain_order: bool + ) -> PyResult<()> { + let options = JsonWriterOptions { + json_format, + maintain_order, + }; + + // if we don't allow threads and we have udfs trying to acquire the gil from different + // threads we deadlock. + py.allow_threads(|| { + let ldf = self.ldf.clone(); + ldf.sink_json(path, options).map_err(PyPolarsErr::from) + })?; + Ok(()) + } + fn fetch(&self, py: Python, n_rows: usize) -> PyResult { let ldf = self.ldf.clone(); let df = py.allow_threads(|| ldf.fetch(n_rows).map_err(PyPolarsErr::from))?; From df1001e11459a0ba241b17d54b7a6febb5c2bcfb Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Fri, 25 Aug 2023 17:30:53 -0600 Subject: [PATCH 03/26] feat: adding batched json writer --- crates/polars-io/src/json/mod.rs | 39 +++++++++++++ crates/polars-lazy/src/frame/mod.rs | 4 +- crates/polars-lazy/src/physical_plan/mod.rs | 2 +- crates/polars-lazy/src/physical_plan/state.rs | 16 +++--- crates/polars-pipe/Cargo.toml | 1 + .../src/executors/sinks/file_sink.rs | 57 +++++++++++++++++-- crates/polars-pipe/src/executors/sinks/mod.rs | 4 +- crates/polars-pipe/src/pipeline/convert.rs | 47 ++++++++------- .../polars-plan/src/logical_plan/options.rs | 4 +- py-polars/Makefile | 2 +- 10 files changed, 136 insertions(+), 40 deletions(-) diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 97783d5d9319..9b600b6a7edc 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -73,12 +73,18 @@ use polars_core::utils::try_get_supertype; use polars_json::json::infer; use simd_json::BorrowedValue; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::prelude::*; + /// The format to use to write the DataFrame to JSON: `Json` (a JSON array) or `JsonLines` (each row output on a /// separate line). In either case, each row is serialized as a JSON object whose keys are the column names and whose /// values are the row's corresponding values. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum JsonFormat { /// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in /// the DataFrame. @@ -92,6 +98,7 @@ pub enum JsonFormat { /// at a time. But the output in its entirety is not valid JSON; only the individual lines are. /// /// It is recommended to use the file extension `.jsonl` when saving as JSON Lines. + #[default] JsonLines, } @@ -114,6 +121,14 @@ impl JsonWriter { self.json_format = format; self } + + pub fn batched(self, _schema: &Schema) -> PolarsResult> { + Ok( + BatchedWriter { + writer: self + } + ) + } } impl SerWriter for JsonWriter @@ -131,6 +146,12 @@ where fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> { df.align_chunks(); + self.write(df) + } +} + +impl JsonWriter where W: Write { + fn write(&mut self, df: &DataFrame) -> PolarsResult<()> { let fields = df.iter().map(|s| s.field().to_arrow()).collect::>(); let batches = df .iter_chunks() @@ -153,6 +174,24 @@ where } } +pub struct BatchedWriter { + writer: JsonWriter +} + +impl BatchedWriter { + /// Write a batch to the json writer. + /// + /// # Panics + /// The caller must ensure the chunks in the given [`DataFrame`] are aligned. + pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + self.writer.write(df) + } + + /// Writes the footer of the IPC file. + pub fn finish(&mut self) -> PolarsResult<()> { + Ok(()) + } +} /// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame. #[must_use] pub struct JsonReader<'a, R> diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index a8548e7ea5c5..aea134ee81c8 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -7,7 +7,7 @@ mod err; pub mod pivot; use std::borrow::Cow; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] use std::path::PathBuf; use std::sync::Arc; @@ -27,7 +27,7 @@ use polars_core::prelude::*; use polars_io::RowCount; pub use polars_plan::frame::{AllowedOptimizations, OptState}; use polars_plan::global::FETCH_ROWS; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] +#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] use polars_plan::logical_plan::collect_fingerprints; use polars_plan::logical_plan::optimize; use polars_plan::utils::expr_output_name; diff --git a/crates/polars-lazy/src/physical_plan/mod.rs b/crates/polars-lazy/src/physical_plan/mod.rs index 082efd21a110..0b9fb27420ff 100644 --- a/crates/polars-lazy/src/physical_plan/mod.rs +++ b/crates/polars-lazy/src/physical_plan/mod.rs @@ -2,7 +2,7 @@ pub mod executors; #[cfg(any(feature = "list_eval", feature = "pivot"))] pub(crate) mod exotic; pub mod expressions; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] +#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] mod file_cache; mod node_timer; pub mod planner; diff --git a/crates/polars-lazy/src/physical_plan/state.rs b/crates/polars-lazy/src/physical_plan/state.rs index c8db863a126e..34c0471b11b0 100644 --- a/crates/polars-lazy/src/physical_plan/state.rs +++ b/crates/polars-lazy/src/physical_plan/state.rs @@ -2,16 +2,16 @@ use std::borrow::Cow; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{Mutex, RwLock}; -use bitflags::bitflags; +use bitflags::{bitflags, Flags}; use once_cell::sync::OnceCell; use polars_core::config::verbose; use polars_core::frame::group_by::GroupsProxy; use polars_core::prelude::*; use polars_ops::prelude::ChunkJoinOptIds; -#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))] +#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "json"))] use polars_plan::logical_plan::FileFingerPrint; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] +#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] use super::file_cache::FileCache; use crate::physical_plan::node_timer::NodeTimer; @@ -65,7 +65,7 @@ pub struct ExecutionState { // cached by a `.cache` call and kept in memory for the duration of the plan. df_cache: Arc>>>>, // cache file reads until all branches got there file, then we delete it - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] pub(crate) file_cache: FileCache, pub(super) schema_cache: RwLock>, /// Used by Window Expression to prevent redundant grouping @@ -110,7 +110,7 @@ impl ExecutionState { pub(super) fn split(&self) -> Self { Self { df_cache: self.df_cache.clone(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] file_cache: self.file_cache.clone(), schema_cache: Default::default(), group_tuples: Default::default(), @@ -126,7 +126,7 @@ impl ExecutionState { pub(super) fn clone(&self) -> Self { Self { df_cache: self.df_cache.clone(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] file_cache: self.file_cache.clone(), schema_cache: self.schema_cache.read().unwrap().clone().into(), group_tuples: self.group_tuples.clone(), @@ -142,7 +142,7 @@ impl ExecutionState { pub(crate) fn with_finger_prints(_finger_prints: Option) -> Self { Self::new() } - #[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))] + #[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "json"))] pub(crate) fn with_finger_prints(finger_prints: Option>) -> Self { Self { df_cache: Arc::new(Mutex::new(PlHashMap::default())), @@ -166,7 +166,7 @@ impl ExecutionState { Self { df_cache: Default::default(), schema_cache: Default::default(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] file_cache: FileCache::new(None), group_tuples: Default::default(), join_tuples: Default::default(), diff --git a/crates/polars-pipe/Cargo.toml b/crates/polars-pipe/Cargo.toml index aede1207e55f..d59ea7510d65 100644 --- a/crates/polars-pipe/Cargo.toml +++ b/crates/polars-pipe/Cargo.toml @@ -35,6 +35,7 @@ csv = ["polars-plan/csv", "polars-io/csv"] cloud = ["async", "polars-io/cloud", "polars-plan/cloud", "tokio", "futures"] parquet = ["polars-plan/parquet", "polars-io/parquet", "polars-io/async"] ipc = ["polars-plan/ipc", "polars-io/ipc"] +json = ["polars-plan/json", "polars-io/json"] async = ["polars-plan/async", "polars-io/async"] nightly = ["polars-core/nightly", "polars-utils/nightly", "hashbrown/nightly"] cross_join = ["polars-ops/cross_join"] diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 6cc723be10fe..7e5e9872ea55 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -12,12 +12,14 @@ use polars_io::parquet::ParquetWriter; use polars_io::prelude::IpcWriter; #[cfg(any(feature = "ipc", feature = "csv"))] use polars_io::SerWriter; +#[cfg(feature = "json")] +use polars_io::json::JsonWriter; use polars_plan::prelude::*; use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult}; use crate::pipeline::morsels_per_sink; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] trait SinkWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()>; fn _finish(&mut self) -> PolarsResult<()>; @@ -58,6 +60,18 @@ impl SinkWriter for polars_io::csv::BatchedWriter { } } +#[cfg(feature = "json")] +impl SinkWriter for polars_io::json::BatchedWriter { + fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + self.write_batch(df) + } + + fn _finish(&mut self) -> PolarsResult<()> { + self.finish()?; + Ok(()) + } +} + #[cfg(feature = "parquet")] pub struct ParquetSink {} #[cfg(feature = "parquet")] @@ -215,7 +229,42 @@ impl CsvSink { } } -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(feature = "json")] +pub struct JsonSink {} +#[cfg(feature = "json")] +impl JsonSink { + #[allow(clippy::new_ret_no_self)] + pub fn new( + path: &Path, + options: JsonWriterOptions, + schema: &Schema, + ) -> PolarsResult { + let file = std::fs::File::create(path)?; + let writer = JsonWriter::new(file) + .with_json_format(options.json_format) + .batched(schema)?; + + let writer = Box::new(writer) as Box; + + let morsels_per_sink = morsels_per_sink(); + let backpressure = morsels_per_sink * 2; + let (sender, receiver) = bounded(backpressure); + + let io_thread_handle = Arc::new(Some(init_writer_thread( + receiver, + writer, + false, + morsels_per_sink, + ))); + + Ok(FilesSink { + sender, + io_thread_handle, + }) + } +} + +#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] fn init_writer_thread( receiver: Receiver>, mut writer: Box, @@ -262,13 +311,13 @@ fn init_writer_thread( // Ensure the data is return in the order it was streamed #[derive(Clone)] -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] pub struct FilesSink { sender: Sender>, io_thread_handle: Arc>>, } -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] impl Sink for FilesSink { fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult { // don't add empty dataframes diff --git a/crates/polars-pipe/src/executors/sinks/mod.rs b/crates/polars-pipe/src/executors/sinks/mod.rs index 8c9b46366da7..59759470170c 100644 --- a/crates/polars-pipe/src/executors/sinks/mod.rs +++ b/crates/polars-pipe/src/executors/sinks/mod.rs @@ -1,4 +1,4 @@ -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] mod file_sink; pub(crate) mod group_by; mod io; @@ -10,7 +10,7 @@ mod slice; mod sort; mod utils; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] +#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] pub(crate) use file_sink::*; pub(crate) use joins::*; pub(crate) use ordered::*; diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index ca68a2f94b82..a9369fa8e229 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -174,27 +174,32 @@ where #[allow(unused_variables)] SinkType::File { path, file_type, .. - } => match &file_type { - #[cfg(feature = "parquet")] - FileType::Parquet(options) => { - let path = path.as_ref().as_path(); - Box::new(ParquetSink::new(path, *options, input_schema.as_ref())?) - as Box - }, - #[cfg(feature = "ipc")] - FileType::Ipc(options) => { - let path = path.as_ref().as_path(); - Box::new(IpcSink::new(path, *options, input_schema.as_ref())?) - as Box - }, - #[cfg(feature = "csv")] - FileType::Csv(options) => { - let path = path.as_ref().as_path(); - Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref())?) - as Box - }, - #[allow(unreachable_patterns)] - _ => unreachable!(), + } => { + let path = path.as_ref().as_path(); + match &file_type { + #[cfg(feature = "parquet")] + FileType::Parquet(options) => { + Box::new(ParquetSink::new(path, *options, input_schema.as_ref())?) + as Box + }, + #[cfg(feature = "ipc")] + FileType::Ipc(options) => { + Box::new(IpcSink::new(path, *options, input_schema.as_ref())?) + as Box + }, + #[cfg(feature = "csv")] + FileType::Csv(options) => { + Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref())?) + as Box + }, + #[cfg(feature = "json")] + FileType::Json(options) => { + Box::new(JsonSink::new(path, *options, input_schema.as_ref())?) + as Box + }, + #[allow(unreachable_patterns)] + _ => unreachable!(), + } }, #[cfg(feature = "cloud")] SinkType::Cloud { diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 9100b71b6d4a..8b0eb1f943df 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -9,12 +9,13 @@ use polars_io::csv::{CsvEncoding, NullValues}; use polars_io::ipc::IpcCompression; #[cfg(feature = "parquet")] use polars_io::parquet::ParquetCompression; +#[cfg(feature = "json")] +use polars_io::json::JsonFormat; use polars_io::RowCount; #[cfg(feature = "dynamic_group_by")] use polars_time::{DynamicGroupOptions, RollingGroupOptions}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use polars_io::json::JsonFormat; #[cfg(feature = "python")] use crate::prelude::python_udf::PythonFunction; @@ -90,6 +91,7 @@ pub struct CsvWriterOptions { #[derive(Copy, Clone, Debug, PartialEq, Eq, Default)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct JsonWriterOptions { + /// format to use to write the DataFrame to JSON json_format: JsonFormat } diff --git a/py-polars/Makefile b/py-polars/Makefile index 16d1a4156f56..3cf0986ec682 100644 --- a/py-polars/Makefile +++ b/py-polars/Makefile @@ -19,7 +19,7 @@ requirements: .venv ## Install/refresh all project requirements @$(MAKE) -s -C .. $@ .PHONY: build -build: .venv ## Compile and install Polars for development +build: ## Compile and install Polars for development @unset CONDA_PREFIX && source $(VENV_BIN)/activate && maturin develop .PHONY: build-debug-opt From 7c7da80d7bafc5ce9685532c1f7f6334f6e173f5 Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Tue, 29 Aug 2023 12:00:55 -0600 Subject: [PATCH 04/26] integration and fixes --- .../src/executors/sinks/file_sink.rs | 2 +- crates/polars-plan/src/logical_plan/options.rs | 4 +++- py-polars/src/conversion.rs | 18 ++++++++++++++++++ py-polars/src/lazyframe.rs | 9 +++++---- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 7e5e9872ea55..fad8e7419c79 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -253,7 +253,7 @@ impl JsonSink { let io_thread_handle = Arc::new(Some(init_writer_thread( receiver, writer, - false, + options.maintain_order, morsels_per_sink, ))); diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 8b0eb1f943df..f02b21efd622 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -92,7 +92,9 @@ pub struct CsvWriterOptions { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct JsonWriterOptions { /// format to use to write the DataFrame to JSON - json_format: JsonFormat + pub json_format: JsonFormat, + /// maintain the order the data was processed + pub maintain_order: bool } #[derive(Clone, Debug, PartialEq)] diff --git a/py-polars/src/conversion.rs b/py-polars/src/conversion.rs index 0b6b4ab00e54..1c03f045e308 100644 --- a/py-polars/src/conversion.rs +++ b/py-polars/src/conversion.rs @@ -10,6 +10,8 @@ use polars::io::avro::AvroCompression; #[cfg(feature = "ipc")] use polars::io::ipc::IpcCompression; use polars::prelude::AnyValue; +#[cfg(feature = "json")] +use polars::prelude::JsonFormat; use polars::series::ops::NullBehavior; use polars_core::frame::row::any_values_to_dtype; use polars_core::prelude::{IndexOrder, QuantileInterpolOptions}; @@ -1316,6 +1318,22 @@ impl FromPyObject<'_> for Wrap { } } +#[cfg(feature = "json")] +impl FromPyObject<'_> for Wrap { + fn extract(ob: &PyAny) -> PyResult { + let parsed = match ob.extract::<&str>()? { + "json" => JsonFormat::Json, + "json_lines" => JsonFormat::JsonLines, + v => { + return Err(PyValueError::new_err(format!( + "json fommat must be one of: {{'json', 'json_lines'}}, got {v}", + ))) + }, + }; + Ok(Wrap(parsed)) + } +} + impl FromPyObject<'_> for Wrap { fn extract(ob: &PyAny) -> PyResult { let parsed = match ob.extract::<&str>()? { diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index 21d52e6feb23..336c8cc15a39 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -12,8 +12,9 @@ use polars::lazy::frame::LazyJsonLineReader; use polars::lazy::frame::{AllowedOptimizations, LazyFrame}; use polars::lazy::prelude::col; #[cfg(feature = "csv")] -use polars::prelude::CsvEncoding; -use polars::prelude::{ClosedWindow, Field, JoinType, Schema}; +use polars::prelude::{cloud, ClosedWindow, CsvEncoding, Field, JoinType, Schema}; +#[cfg(feature = "json")] +use polars::prelude::JsonFormat; use polars::time::*; use polars_core::frame::explode::MeltArgs; use polars_core::frame::UniqueKeepStrategy; @@ -635,11 +636,11 @@ impl PyLazyFrame { &self, py: Python, path: PathBuf, - json_format: Option, + json_format: Option>, maintain_order: bool ) -> PyResult<()> { let options = JsonWriterOptions { - json_format, + json_format: json_format.map(|c| c.0).unwrap_or(JsonFormat::default()), maintain_order, }; From 8ecf92949e24be173c04a9dba86704dcce45f977 Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Tue, 29 Aug 2023 12:55:54 -0600 Subject: [PATCH 05/26] small fixes --- crates/polars-lazy/src/physical_plan/state.rs | 2 +- crates/polars-plan/src/logical_plan/options.rs | 2 +- py-polars/Makefile | 2 +- py-polars/polars/lazyframe/frame.py | 2 +- py-polars/polars/type_aliases.py | 1 - 5 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/state.rs b/crates/polars-lazy/src/physical_plan/state.rs index 34c0471b11b0..70282bbfd390 100644 --- a/crates/polars-lazy/src/physical_plan/state.rs +++ b/crates/polars-lazy/src/physical_plan/state.rs @@ -147,7 +147,7 @@ impl ExecutionState { Self { df_cache: Arc::new(Mutex::new(PlHashMap::default())), schema_cache: Default::default(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] file_cache: FileCache::new(finger_prints), group_tuples: Arc::new(RwLock::new(PlHashMap::default())), join_tuples: Arc::new(Mutex::new(PlHashMap::default())), diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index f02b21efd622..e7d1e1590cb4 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -84,7 +84,7 @@ pub struct CsvWriterOptions { pub include_header: bool, pub batch_size: usize, pub maintain_order: bool, - pub serialize_options: SerializeOptions + pub serialize_options: SerializeOptions, } #[cfg(feature = "json")] diff --git a/py-polars/Makefile b/py-polars/Makefile index 3cf0986ec682..16d1a4156f56 100644 --- a/py-polars/Makefile +++ b/py-polars/Makefile @@ -19,7 +19,7 @@ requirements: .venv ## Install/refresh all project requirements @$(MAKE) -s -C .. $@ .PHONY: build -build: ## Compile and install Polars for development +build: .venv ## Compile and install Polars for development @unset CONDA_PREFIX && source $(VENV_BIN)/activate && maturin develop .PHONY: build-debug-opt diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 554950542024..50f9dfea4096 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -95,7 +95,7 @@ ColumnNameOrSelector, CsvEncoding, CsvQuoteStyle, - JsonFormat, + JsonFormat, FillNullStrategy, FrameInitTypes, IntoExpr, diff --git a/py-polars/polars/type_aliases.py b/py-polars/polars/type_aliases.py index d744a0016ec0..ad17e296202c 100644 --- a/py-polars/polars/type_aliases.py +++ b/py-polars/polars/type_aliases.py @@ -128,7 +128,6 @@ UniqueKeepStrategy: TypeAlias = Literal["first", "last", "any", "none"] UnstackDirection: TypeAlias = Literal["vertical", "horizontal"] MapElementsStrategy: TypeAlias = Literal["thread_local", "threading"] -ApplyStrategy: TypeAlias = Literal["thread_local", "threading"] JsonFormat: TypeAlias = Literal["json", "json_lines"] # The following have a Rust enum equivalent with a different name From 2d469372085d7b49352f6cf3eb21e3db4cdac755 Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Tue, 29 Aug 2023 13:26:27 -0600 Subject: [PATCH 06/26] small fix --- crates/polars-lazy/src/frame/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index aea134ee81c8..98039ef47c03 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -596,7 +596,7 @@ impl LazyFrame { self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?; let finger_prints = if file_caching { - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] + #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] { let mut fps = Vec::with_capacity(8); collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena); From 19ca57dd7949081703240b9ab068d0a15d8218eb Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Wed, 30 Aug 2023 18:30:41 -0600 Subject: [PATCH 07/26] refactor: new json batched writers --- crates/polars-io/src/json/mod.rs | 95 +++++++++++++++---- crates/polars-lazy/Cargo.toml | 6 +- .../src/executors/sinks/file_sink.rs | 5 +- 3 files changed, 81 insertions(+), 25 deletions(-) diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 9b600b6a7edc..b23eb68c5844 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -67,6 +67,7 @@ use std::ops::Deref; use arrow::array::StructArray; use arrow::legacy::conversion::chunk_to_struct; +use polars_json::json::write::FallibleStreamingIterator; use polars_core::error::to_compute_err; use polars_core::prelude::*; use polars_core::utils::try_get_supertype; @@ -75,6 +76,7 @@ use simd_json::BorrowedValue; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use polars_json::{json, ndjson}; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::prelude::*; @@ -116,18 +118,26 @@ pub struct JsonWriter { json_format: JsonFormat, } -impl JsonWriter { +impl JsonWriter where W: Write + 'static { pub fn with_json_format(mut self, format: JsonFormat) -> Self { self.json_format = format; self } - pub fn batched(self, _schema: &Schema) -> PolarsResult> { - Ok( - BatchedWriter { - writer: self - } - ) + pub fn batched(self, _schema: &Schema) -> PolarsResult>> { + match self.json_format { + JsonFormat::Json => Ok( + Box::new(JsonBatchedWriter { + writer: self.buffer, + is_first_row: true + }) + ), + JsonFormat::JsonLines => Ok( + Box::new(JsonLinesBatchedWriter { + writer: self.buffer + }) + ) + } } } @@ -146,12 +156,6 @@ where fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> { df.align_chunks(); - self.write(df) - } -} - -impl JsonWriter where W: Write { - fn write(&mut self, df: &DataFrame) -> PolarsResult<()> { let fields = df.iter().map(|s| s.field().to_arrow()).collect::>(); let batches = df .iter_chunks() @@ -174,24 +178,75 @@ impl JsonWriter where W: Write { } } -pub struct BatchedWriter { - writer: JsonWriter +pub trait BatchedWriter { + fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()>; + fn finish(&mut self) -> PolarsResult<()>; +} + +struct JsonBatchedWriter{ + writer: W, + is_first_row: bool +} + +impl BatchedWriter for JsonBatchedWriter where W: Write { + /// Write a batch to the json writer. + /// + /// # Panics + /// The caller must ensure the chunks in the given [`DataFrame`] are aligned. + fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + let fields = df.iter().map(|s| s.field().to_arrow()).collect::>(); + let batches = df + .iter_chunks() + .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); + let mut serializer = + json::write::Serializer::new(batches, vec![]); + + while let Some(block) = serializer.next()? { + if self.is_first_row { + self.writer.write_all(&[b'['])?; + self.is_first_row = false; + } else { + self.writer.write_all(&[b','])?; + } + self.writer.write_all(block)?; + } + Ok(()) + } + + /// Writes the footer of the Json file. + fn finish(&mut self) -> PolarsResult<()> { + self.writer.write_all(&[b']'])?; + Ok(()) + } +} + +struct JsonLinesBatchedWriter { + writer: W } -impl BatchedWriter { +impl BatchedWriter for JsonLinesBatchedWriter where W: Write { /// Write a batch to the json writer. /// /// # Panics /// The caller must ensure the chunks in the given [`DataFrame`] are aligned. - pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { - self.writer.write(df) + fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + let fields = df.iter().map(|s| s.field().to_arrow()).collect::>(); + let batches = df + .iter_chunks() + .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); + let mut serializer = ndjson::write::Serializer::new(batches, vec![]); + while let Some(block) = serializer.next()? { + self.writer.write_all(block)?; + } + Ok(()) } - /// Writes the footer of the IPC file. - pub fn finish(&mut self) -> PolarsResult<()> { + /// Writes the footer of the Json file. + fn finish(&mut self) -> PolarsResult<()> { Ok(()) } } + /// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame. #[must_use] pub struct JsonReader<'a, R> diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index ff35e6f60baf..a1c5ab760b8f 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -46,9 +46,9 @@ async = [ ] cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio", "futures"] cloud_write = ["cloud"] -ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe?/ipc"] -json = ["polars-io/json", "polars-plan/json", "polars-json"] -csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe?/csv"] +ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe/ipc"] +json = ["polars-io/json", "polars-plan/json", "polars-json", "polars-pipe/json"] +csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe/csv"] temporal = ["dtype-datetime", "dtype-date", "dtype-time", "dtype-duration", "polars-plan/temporal"] # debugging purposes fmt = ["polars-core/fmt", "polars-plan/fmt"] diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index fad8e7419c79..346fabf3e8ae 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::ops::Deref; use std::path::Path; use std::thread::JoinHandle; @@ -61,7 +62,7 @@ impl SinkWriter for polars_io::csv::BatchedWriter { } #[cfg(feature = "json")] -impl SinkWriter for polars_io::json::BatchedWriter { +impl SinkWriter for dyn polars_io::json::BatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } @@ -244,7 +245,7 @@ impl JsonSink { .with_json_format(options.json_format) .batched(schema)?; - let writer = Box::new(writer) as Box; + let writer = Box::new(writer.deref()) as Box; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; From c853e548399b97be43a387bafdb612d7fa0b3ed2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Thu, 31 Aug 2023 18:24:28 -0600 Subject: [PATCH 08/26] add py-polars unit testing --- crates/polars-io/src/json/mod.rs | 36 ++++++++++--------- .../src/executors/sinks/file_sink.rs | 31 ++++++++++------ py-polars/tests/unit/io/files/foods1.json | 27 ++++++++++++++ py-polars/tests/unit/io/test_lazy_json.py | 29 ++++++++++++++- 4 files changed, 95 insertions(+), 28 deletions(-) create mode 100644 py-polars/tests/unit/io/files/foods1.json diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index b23eb68c5844..e8d8c8bfc21c 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -124,21 +124,6 @@ impl JsonWriter where W: Write + 'static { self } - pub fn batched(self, _schema: &Schema) -> PolarsResult>> { - match self.json_format { - JsonFormat::Json => Ok( - Box::new(JsonBatchedWriter { - writer: self.buffer, - is_first_row: true - }) - ), - JsonFormat::JsonLines => Ok( - Box::new(JsonLinesBatchedWriter { - writer: self.buffer - }) - ) - } - } } impl SerWriter for JsonWriter @@ -183,11 +168,20 @@ pub trait BatchedWriter { fn finish(&mut self) -> PolarsResult<()>; } -struct JsonBatchedWriter{ +pub struct JsonBatchedWriter{ writer: W, is_first_row: bool } +impl JsonBatchedWriter where W: Write { + pub fn new(writer: W) -> Self{ + JsonBatchedWriter { + writer, + is_first_row: true + } + } +} + impl BatchedWriter for JsonBatchedWriter where W: Write { /// Write a batch to the json writer. /// @@ -220,10 +214,18 @@ impl BatchedWriter for JsonBatchedWriter where W: Write { } } -struct JsonLinesBatchedWriter { +pub struct JsonLinesBatchedWriter { writer: W } +impl JsonLinesBatchedWriter where W: Write { + pub fn new(writer: W) -> Self{ + JsonLinesBatchedWriter { + writer + } + } +} + impl BatchedWriter for JsonLinesBatchedWriter where W: Write { /// Write a batch to the json writer. /// diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 346fabf3e8ae..9770e21a9be1 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -1,5 +1,4 @@ use std::any::Any; -use std::ops::Deref; use std::path::Path; use std::thread::JoinHandle; @@ -14,7 +13,7 @@ use polars_io::prelude::IpcWriter; #[cfg(any(feature = "ipc", feature = "csv"))] use polars_io::SerWriter; #[cfg(feature = "json")] -use polars_io::json::JsonWriter; +use polars_io::json::{JsonFormat, JsonBatchedWriter, JsonLinesBatchedWriter, BatchedWriter}; use polars_plan::prelude::*; use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult}; @@ -62,7 +61,19 @@ impl SinkWriter for polars_io::csv::BatchedWriter { } #[cfg(feature = "json")] -impl SinkWriter for dyn polars_io::json::BatchedWriter { +impl SinkWriter for polars_io::json::JsonBatchedWriter { + fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + self.write_batch(df) + } + + fn _finish(&mut self) -> PolarsResult<()> { + self.finish()?; + Ok(()) + } +} + +#[cfg(feature = "json")] +impl SinkWriter for polars_io::json::JsonLinesBatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } @@ -238,15 +249,15 @@ impl JsonSink { pub fn new( path: &Path, options: JsonWriterOptions, - schema: &Schema, + _schema: &Schema, ) -> PolarsResult { let file = std::fs::File::create(path)?; - let writer = JsonWriter::new(file) - .with_json_format(options.json_format) - .batched(schema)?; - - let writer = Box::new(writer.deref()) as Box; - + let writer = match options.json_format { + JsonFormat::Json => + Box::new(JsonBatchedWriter::new(file)) as Box, + JsonFormat::JsonLines => + Box::new(JsonLinesBatchedWriter::new(file)) as Box + }; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; let (sender, receiver) = bounded(backpressure); diff --git a/py-polars/tests/unit/io/files/foods1.json b/py-polars/tests/unit/io/files/foods1.json new file mode 100644 index 000000000000..327566874aa1 --- /dev/null +++ b/py-polars/tests/unit/io/files/foods1.json @@ -0,0 +1,27 @@ +[{"category":"vegetables","calories":45,"fats_g":0.5,"sugars_g":2}, +{"category":"seafood","calories":150,"fats_g":5.0,"sugars_g":0}, +{"category":"meat","calories":100,"fats_g":5.0,"sugars_g":0}, +{"category":"fruit","calories":60,"fats_g":0.0,"sugars_g":11}, +{"category":"seafood","calories":140,"fats_g":5.0,"sugars_g":1}, +{"category":"meat","calories":120,"fats_g":10.0,"sugars_g":1}, +{"category":"vegetables","calories":20,"fats_g":0.0,"sugars_g":2}, +{"category":"fruit","calories":30,"fats_g":0.0,"sugars_g":5}, +{"category":"seafood","calories":130,"fats_g":5.0,"sugars_g":0}, +{"category":"fruit","calories":50,"fats_g":4.5,"sugars_g":0}, +{"category":"meat","calories":110,"fats_g":7.0,"sugars_g":0}, +{"category":"vegetables","calories":25,"fats_g":0.0,"sugars_g":2}, +{"category":"fruit","calories":30,"fats_g":0.0,"sugars_g":3}, +{"category":"vegetables","calories":22,"fats_g":0.0,"sugars_g":3}, +{"category":"vegetables","calories":25,"fats_g":0.0,"sugars_g":4}, +{"category":"seafood","calories":100,"fats_g":5.0,"sugars_g":0}, +{"category":"seafood","calories":200,"fats_g":10.0,"sugars_g":0}, +{"category":"seafood","calories":200,"fats_g":7.0,"sugars_g":2}, +{"category":"fruit","calories":60,"fats_g":0.0,"sugars_g":11}, +{"category":"meat","calories":110,"fats_g":7.0,"sugars_g":0}, +{"category":"vegetables","calories":25,"fats_g":0.0,"sugars_g":3}, +{"category":"seafood","calories":200,"fats_g":7.0,"sugars_g":2}, +{"category":"seafood","calories":130,"fats_g":1.5,"sugars_g":0}, +{"category":"fruit","calories":130,"fats_g":0.0,"sugars_g":25}, +{"category":"meat","calories":100,"fats_g":7.0,"sugars_g":0}, +{"category":"vegetables","calories":30,"fats_g":0.0,"sugars_g":5}, +{"category":"fruit","calories":50,"fats_g":0.0,"sugars_g":11}] diff --git a/py-polars/tests/unit/io/test_lazy_json.py b/py-polars/tests/unit/io/test_lazy_json.py index 21e06522c3cf..afed5c25a25f 100644 --- a/py-polars/tests/unit/io/test_lazy_json.py +++ b/py-polars/tests/unit/io/test_lazy_json.py @@ -129,9 +129,36 @@ def test_ndjson_list_arg(io_files_path: Path) -> None: assert df.row(-1) == ("seafood", 194, 12.0, 1) assert df.row(0) == ("vegetables", 45, 0.5, 2) - def test_anonymous_scan_explain(io_files_path: Path) -> None: file = io_files_path / "foods1.ndjson" q = pl.scan_ndjson(source=file) assert "Anonymous" in q.explain() assert "Anonymous" in q.show_graph(raw_output=True) # type: ignore[operator] + +def test_sink_json_should_write_same_data(io_files_path: Path, tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + # Arrange + source_path = io_files_path / "foods1.csv" + target_path = tmp_path / "foods_test.ndjson" + expected = pl.read_csv(source_path) + lz_df = pl.scan_csv(source_path) + # Act + lz_df.sink_json(target_path) + df = pl.read_ndjson(target_path) + # Assert + assert_frame_equal(df, expected) + + +def test_sink_json_should_support_with_options(io_files_path: Path, tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + + # Arrange + source_path = io_files_path / "foods1.ndjson" + target_path = tmp_path / "foods_test.json" + expected = pl.read_ndjson(source_path) + lz_df = pl.scan_ndjson(source_path) + # Act + lz_df.sink_json(target_path) + # df = pl.read_json(target_path) + # # Assert + # assert_frame_equal(df, expected) \ No newline at end of file From dc7ed347da3276e5e696355f581b41e63c0fc5ed Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Mon, 4 Sep 2023 12:55:23 -0600 Subject: [PATCH 09/26] refactor: removing duplicated code for sink_file --- crates/polars-lazy/src/frame/mod.rs | 110 +++++++++------------------- 1 file changed, 36 insertions(+), 74 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 98039ef47c03..ad19665d2578 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -670,22 +670,11 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "parquet")] pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::File { - path: Arc::new(path), - file_type: FileType::Parquet(options), - }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_parquet()` instead" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + self.sink( SinkType::File { + path: Arc::new(path), + file_type: FileType::Parquet(options), + }, + "collect().write_parquet()") } /// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit @@ -699,88 +688,61 @@ impl LazyFrame { cloud_options: Option, parquet_options: ParquetWriteOptions, ) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::Cloud { - uri: Arc::new(uri), - cloud_options, - file_type: FileType::Parquet(parquet_options), - }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + self.sink( SinkType::Cloud { + uri: Arc::new(uri), + cloud_options, + file_type: FileType::Parquet(parquet_options), + }, + "collect().write_parquet()") } /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "ipc")] - pub fn sink_ipc(mut self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::File { - path: Arc::new(path), - file_type: FileType::Ipc(options), - }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_ipc()` instead" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + pub fn sink_ipc(self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { + self.sink( SinkType::File { + path: Arc::new(path), + file_type: FileType::Ipc(options), + }, + "collect().write_ipc()") } /// Stream a query result into an csv file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "csv")] - pub fn sink_csv(mut self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::File { - path: Arc::new(path), - file_type: FileType::Csv(options), - }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_csv()` instead" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + pub fn sink_csv(self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { + self.sink( SinkType::File { + path: Arc::new(path), + file_type: FileType::Csv(options) + }, + "collect().write_csv()") } /// Stream a query result into a json file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "json")] - pub fn sink_json(mut self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { + pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { + self.sink( SinkType::File { + path: Arc::new(path), + file_type: FileType::Json(options) + }, + "collect().write_ndjson()` or `collect().write_json()") + } + + fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> { self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::FileSink { + self.logical_plan = LogicalPlan::Sink { input: Box::new(self.logical_plan), - payload: FileSinkOptions { - path: Arc::new(path), - file_type: FileType::Json(options), - }, + payload, }; let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; polars_ensure!( is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_ndjson()` instead" + ComputeError: format!("cannot run the whole query in a streaming order; \ + use `{msg_alternative}` instead", msg_alternative=msg_alternative) ); let _ = physical_plan.execute(&mut state)?; Ok(()) From 2311d3291c4f9b46ac991e65ba03bc98b2cc7388 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Mon, 4 Sep 2023 17:55:28 -0600 Subject: [PATCH 10/26] add unit testing for sink_json method --- py-polars/src/conversion.rs | 2 +- py-polars/tests/unit/io/test_lazy_json.py | 48 ++++++++++++++++++----- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/py-polars/src/conversion.rs b/py-polars/src/conversion.rs index 1c03f045e308..493b62f50ac1 100644 --- a/py-polars/src/conversion.rs +++ b/py-polars/src/conversion.rs @@ -1326,7 +1326,7 @@ impl FromPyObject<'_> for Wrap { "json_lines" => JsonFormat::JsonLines, v => { return Err(PyValueError::new_err(format!( - "json fommat must be one of: {{'json', 'json_lines'}}, got {v}", + "json format must be one of: {{'json', 'json_lines'}}, got {v}", ))) }, }; diff --git a/py-polars/tests/unit/io/test_lazy_json.py b/py-polars/tests/unit/io/test_lazy_json.py index afed5c25a25f..3c43c5515036 100644 --- a/py-polars/tests/unit/io/test_lazy_json.py +++ b/py-polars/tests/unit/io/test_lazy_json.py @@ -141,24 +141,54 @@ def test_sink_json_should_write_same_data(io_files_path: Path, tmp_path: Path) - source_path = io_files_path / "foods1.csv" target_path = tmp_path / "foods_test.ndjson" expected = pl.read_csv(source_path) - lz_df = pl.scan_csv(source_path) + lf = pl.scan_csv(source_path) # Act - lz_df.sink_json(target_path) + lf.sink_json(target_path) df = pl.read_ndjson(target_path) # Assert assert_frame_equal(df, expected) -def test_sink_json_should_support_with_options(io_files_path: Path, tmp_path: Path) -> None: +def test_sink_json_should_write_same_data_with_json_argument(io_files_path: Path, tmp_path: Path) -> None: tmp_path.mkdir(exist_ok=True) - # Arrange - source_path = io_files_path / "foods1.ndjson" + source_path = io_files_path / "foods1.csv" target_path = tmp_path / "foods_test.json" - expected = pl.read_ndjson(source_path) - lz_df = pl.scan_ndjson(source_path) + + expected = pl.read_csv(source_path) + lf = pl.scan_csv(source_path) # Act - lz_df.sink_json(target_path) # df = pl.read_json(target_path) # # Assert - # assert_frame_equal(df, expected) \ No newline at end of file + # assert_frame_equal(df, expected) + lf.sink_json(target_path, json_format="json") + df = pl.read_json(target_path) + # Assert + assert_frame_equal(df, expected) + + +def test_sink_json_should_write_same_data_with_json_lines_argument(io_files_path: Path, tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + # Arrange + source_path = io_files_path / "foods1.csv" + target_path = tmp_path / "foods_test.ndjson" + + expected = pl.read_csv(source_path) + lf = pl.scan_csv(source_path) + # Act + lf.sink_json(target_path, json_format="json_lines") + df = pl.read_ndjson(target_path) + # Assert + assert_frame_equal(df, expected) + + +def test_sink_json_should_raise_exception_with_invalid_argument(io_files_path: Path, tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + # Arrange + source_path = io_files_path / "foods1.csv" + target_path = tmp_path / "foods_test.ndjson" + + lf = pl.scan_csv(source_path) + # Act & Assert + with pytest.raises(ValueError): + lf.sink_json(target_path, json_format="invalid_argument") \ No newline at end of file From 48b833a91b81d849015df261e5acbc00b1689743 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Mon, 4 Sep 2023 18:36:02 -0600 Subject: [PATCH 11/26] style: fix code formatting --- crates/polars-io/src/json/mod.rs | 49 +++++++++------- crates/polars-lazy/src/frame/mod.rs | 21 ++++++- crates/polars-lazy/src/physical_plan/mod.rs | 7 ++- crates/polars-lazy/src/physical_plan/state.rs | 56 ++++++++++++++++--- crates/polars-lazy/src/prelude.rs | 4 +- .../src/executors/sinks/file_sink.rs | 37 ++++++++---- .../polars-plan/src/logical_plan/options.rs | 6 +- py-polars/polars/lazyframe/frame.py | 30 +++++----- py-polars/src/lazyframe.rs | 4 +- py-polars/tests/unit/io/test_lazy_json.py | 18 ++++-- 10 files changed, 160 insertions(+), 72 deletions(-) diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index e8d8c8bfc21c..7ad3a496d95d 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -72,16 +72,14 @@ use polars_core::error::to_compute_err; use polars_core::prelude::*; use polars_core::utils::try_get_supertype; use polars_json::json::infer; -use simd_json::BorrowedValue; - #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use polars_json::{json, ndjson}; +use simd_json::BorrowedValue; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::prelude::*; - /// The format to use to write the DataFrame to JSON: `Json` (a JSON array) or `JsonLines` (each row output on a /// separate line). In either case, each row is serialized as a JSON object whose keys are the column names and whose /// values are the row's corresponding values. @@ -118,12 +116,14 @@ pub struct JsonWriter { json_format: JsonFormat, } -impl JsonWriter where W: Write + 'static { +impl JsonWriter +where + W: Write + 'static, +{ pub fn with_json_format(mut self, format: JsonFormat) -> Self { self.json_format = format; self } - } impl SerWriter for JsonWriter @@ -168,21 +168,27 @@ pub trait BatchedWriter { fn finish(&mut self) -> PolarsResult<()>; } -pub struct JsonBatchedWriter{ +pub struct JsonBatchedWriter { writer: W, - is_first_row: bool + is_first_row: bool, } -impl JsonBatchedWriter where W: Write { - pub fn new(writer: W) -> Self{ +impl JsonBatchedWriter +where + W: Write, +{ + pub fn new(writer: W) -> Self { JsonBatchedWriter { writer, - is_first_row: true + is_first_row: true, } } } -impl BatchedWriter for JsonBatchedWriter where W: Write { +impl BatchedWriter for JsonBatchedWriter +where + W: Write, +{ /// Write a batch to the json writer. /// /// # Panics @@ -192,8 +198,7 @@ impl BatchedWriter for JsonBatchedWriter where W: Write { let batches = df .iter_chunks() .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); - let mut serializer = - json::write::Serializer::new(batches, vec![]); + let mut serializer = json::write::Serializer::new(batches, vec![]); while let Some(block) = serializer.next()? { if self.is_first_row { @@ -215,18 +220,22 @@ impl BatchedWriter for JsonBatchedWriter where W: Write { } pub struct JsonLinesBatchedWriter { - writer: W + writer: W, } -impl JsonLinesBatchedWriter where W: Write { - pub fn new(writer: W) -> Self{ - JsonLinesBatchedWriter { - writer - } +impl JsonLinesBatchedWriter +where + W: Write, +{ + pub fn new(writer: W) -> Self { + JsonLinesBatchedWriter { writer } } } -impl BatchedWriter for JsonLinesBatchedWriter where W: Write { +impl BatchedWriter for JsonLinesBatchedWriter +where + W: Write, +{ /// Write a batch to the json writer. /// /// # Panics diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index ad19665d2578..12721416f341 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -7,7 +7,12 @@ mod err; pub mod pivot; use std::borrow::Cow; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] use std::path::PathBuf; use std::sync::Arc; @@ -27,7 +32,12 @@ use polars_core::prelude::*; use polars_io::RowCount; pub use polars_plan::frame::{AllowedOptimizations, OptState}; use polars_plan::global::FETCH_ROWS; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" +))] use polars_plan::logical_plan::collect_fingerprints; use polars_plan::logical_plan::optimize; use polars_plan::utils::expr_output_name; @@ -596,7 +606,12 @@ impl LazyFrame { self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?; let finger_prints = if file_caching { - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] { let mut fps = Vec::with_capacity(8); collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena); diff --git a/crates/polars-lazy/src/physical_plan/mod.rs b/crates/polars-lazy/src/physical_plan/mod.rs index 0b9fb27420ff..7c7f51f6a02e 100644 --- a/crates/polars-lazy/src/physical_plan/mod.rs +++ b/crates/polars-lazy/src/physical_plan/mod.rs @@ -2,7 +2,12 @@ pub mod executors; #[cfg(any(feature = "list_eval", feature = "pivot"))] pub(crate) mod exotic; pub mod expressions; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" +))] mod file_cache; mod node_timer; pub mod planner; diff --git a/crates/polars-lazy/src/physical_plan/state.rs b/crates/polars-lazy/src/physical_plan/state.rs index 70282bbfd390..79a50b716435 100644 --- a/crates/polars-lazy/src/physical_plan/state.rs +++ b/crates/polars-lazy/src/physical_plan/state.rs @@ -8,10 +8,20 @@ use polars_core::config::verbose; use polars_core::frame::group_by::GroupsProxy; use polars_core::prelude::*; use polars_ops::prelude::ChunkJoinOptIds; -#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" +))] use polars_plan::logical_plan::FileFingerPrint; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" +))] use super::file_cache::FileCache; use crate::physical_plan::node_timer::NodeTimer; @@ -65,7 +75,12 @@ pub struct ExecutionState { // cached by a `.cache` call and kept in memory for the duration of the plan. df_cache: Arc>>>>, // cache file reads until all branches got there file, then we delete it - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] pub(crate) file_cache: FileCache, pub(super) schema_cache: RwLock>, /// Used by Window Expression to prevent redundant grouping @@ -110,7 +125,12 @@ impl ExecutionState { pub(super) fn split(&self) -> Self { Self { df_cache: self.df_cache.clone(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: self.file_cache.clone(), schema_cache: Default::default(), group_tuples: Default::default(), @@ -126,7 +146,12 @@ impl ExecutionState { pub(super) fn clone(&self) -> Self { Self { df_cache: self.df_cache.clone(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: self.file_cache.clone(), schema_cache: self.schema_cache.read().unwrap().clone().into(), group_tuples: self.group_tuples.clone(), @@ -142,12 +167,22 @@ impl ExecutionState { pub(crate) fn with_finger_prints(_finger_prints: Option) -> Self { Self::new() } - #[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "json"))] + #[cfg(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" + ))] pub(crate) fn with_finger_prints(finger_prints: Option>) -> Self { Self { df_cache: Arc::new(Mutex::new(PlHashMap::default())), schema_cache: Default::default(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: FileCache::new(finger_prints), group_tuples: Arc::new(RwLock::new(PlHashMap::default())), join_tuples: Arc::new(Mutex::new(PlHashMap::default())), @@ -166,7 +201,12 @@ impl ExecutionState { Self { df_cache: Default::default(), schema_cache: Default::default(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: FileCache::new(None), group_tuples: Default::default(), join_tuples: Default::default(), diff --git a/crates/polars-lazy/src/prelude.rs b/crates/polars-lazy/src/prelude.rs index 0c7750b4c796..964ec7894a2d 100644 --- a/crates/polars-lazy/src/prelude.rs +++ b/crates/polars-lazy/src/prelude.rs @@ -8,11 +8,11 @@ pub use polars_plan::logical_plan::{ pub use polars_plan::prelude::CsvWriterOptions; #[cfg(feature = "ipc")] pub use polars_plan::prelude::IpcWriterOptions; +#[cfg(feature = "json")] +pub use polars_plan::prelude::JsonWriterOptions; #[cfg(feature = "parquet")] pub use polars_plan::prelude::ParquetWriteOptions; pub(crate) use polars_plan::prelude::*; -#[cfg(feature = "json")] -pub use polars_plan::prelude::JsonWriterOptions; #[cfg(feature = "rolling_window")] pub use polars_time::{prelude::RollingOptions, Duration}; #[cfg(feature = "dynamic_group_by")] diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 9770e21a9be1..441f49f1c97a 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -6,20 +6,25 @@ use crossbeam_channel::{bounded, Receiver, Sender}; use polars_core::prelude::*; #[cfg(feature = "csv")] use polars_io::csv::CsvWriter; +#[cfg(feature = "json")] +use polars_io::json::{BatchedWriter, JsonBatchedWriter, JsonFormat, JsonLinesBatchedWriter}; #[cfg(feature = "parquet")] use polars_io::parquet::ParquetWriter; #[cfg(feature = "ipc")] use polars_io::prelude::IpcWriter; #[cfg(any(feature = "ipc", feature = "csv"))] use polars_io::SerWriter; -#[cfg(feature = "json")] -use polars_io::json::{JsonFormat, JsonBatchedWriter, JsonLinesBatchedWriter, BatchedWriter}; use polars_plan::prelude::*; use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult}; use crate::pipeline::morsels_per_sink; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] trait SinkWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()>; fn _finish(&mut self) -> PolarsResult<()>; @@ -61,7 +66,7 @@ impl SinkWriter for polars_io::csv::BatchedWriter { } #[cfg(feature = "json")] -impl SinkWriter for polars_io::json::JsonBatchedWriter { +impl SinkWriter for JsonBatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } @@ -73,7 +78,7 @@ impl SinkWriter for polars_io::json::JsonBatchedWriter { } #[cfg(feature = "json")] -impl SinkWriter for polars_io::json::JsonLinesBatchedWriter { +impl SinkWriter for JsonLinesBatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } @@ -253,10 +258,12 @@ impl JsonSink { ) -> PolarsResult { let file = std::fs::File::create(path)?; let writer = match options.json_format { - JsonFormat::Json => - Box::new(JsonBatchedWriter::new(file)) as Box, - JsonFormat::JsonLines => + JsonFormat::Json => { + Box::new(JsonBatchedWriter::new(file)) as Box + }, + JsonFormat::JsonLines => { Box::new(JsonLinesBatchedWriter::new(file)) as Box + }, }; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; @@ -323,13 +330,23 @@ fn init_writer_thread( // Ensure the data is return in the order it was streamed #[derive(Clone)] -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] pub struct FilesSink { sender: Sender>, io_thread_handle: Arc>>, } -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] impl Sink for FilesSink { fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult { // don't add empty dataframes diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index e7d1e1590cb4..64c40c2f243e 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -7,10 +7,10 @@ use polars_io::csv::SerializeOptions; use polars_io::csv::{CsvEncoding, NullValues}; #[cfg(feature = "ipc")] use polars_io::ipc::IpcCompression; -#[cfg(feature = "parquet")] -use polars_io::parquet::ParquetCompression; #[cfg(feature = "json")] use polars_io::json::JsonFormat; +#[cfg(feature = "parquet")] +use polars_io::parquet::ParquetCompression; use polars_io::RowCount; #[cfg(feature = "dynamic_group_by")] use polars_time::{DynamicGroupOptions, RollingGroupOptions}; @@ -94,7 +94,7 @@ pub struct JsonWriterOptions { /// format to use to write the DataFrame to JSON pub json_format: JsonFormat, /// maintain the order the data was processed - pub maintain_order: bool + pub maintain_order: bool, } #[derive(Clone, Debug, PartialEq)] diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 50f9dfea4096..148169808233 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -95,7 +95,6 @@ ColumnNameOrSelector, CsvEncoding, CsvQuoteStyle, - JsonFormat, FillNullStrategy, FrameInitTypes, IntoExpr, @@ -103,6 +102,7 @@ JoinStrategy, JoinValidation, Label, + JsonFormat, Orientation, ParallelStrategy, PolarsDataType, @@ -2246,17 +2246,17 @@ def sink_csv( ) def sink_json( - self, - path: str | Path, - *, - json_format: JsonFormat | None = None, - maintain_order: bool = True, - type_coercion: bool = True, - predicate_pushdown: bool = True, - projection_pushdown: bool = True, - simplify_expression: bool = True, - no_optimization: bool = False, - slice_pushdown: bool = True, + self, + path: str | Path, + *, + json_format: JsonFormat | None = None, + maintain_order: bool = True, + type_coercion: bool = True, + predicate_pushdown: bool = True, + projection_pushdown: bool = True, + simplify_expression: bool = True, + no_optimization: bool = False, + slice_pushdown: bool = True, ) -> DataFrame: """ Persists a LazyFrame at the provided path. @@ -2268,7 +2268,7 @@ def sink_json( path File path to which the file should be written. json_format : {'json', 'json_lines'} - Choose "json" for single JSON array containing each DataFrame row as an object. + Choose "json" for single JSON array containing each row as an object. Choose "json_lines" for each row output on a separate line. maintain_order Maintain the order in which data is processed. @@ -2290,10 +2290,6 @@ def sink_json( ------- DataFrame - Notes - ----- - json_format parameter is currently not supported. - Examples -------- >>> lf = pl.scan_csv("/path/to/my_larger_than_ram_file.csv") # doctest: +SKIP diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index 336c8cc15a39..d4ab0d208889 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -637,10 +637,10 @@ impl PyLazyFrame { py: Python, path: PathBuf, json_format: Option>, - maintain_order: bool + maintain_order: bool, ) -> PyResult<()> { let options = JsonWriterOptions { - json_format: json_format.map(|c| c.0).unwrap_or(JsonFormat::default()), + json_format: json_format.map(|c| c.0).unwrap_or_default(), maintain_order, }; diff --git a/py-polars/tests/unit/io/test_lazy_json.py b/py-polars/tests/unit/io/test_lazy_json.py index 3c43c5515036..cd6f7ca325b9 100644 --- a/py-polars/tests/unit/io/test_lazy_json.py +++ b/py-polars/tests/unit/io/test_lazy_json.py @@ -149,7 +149,9 @@ def test_sink_json_should_write_same_data(io_files_path: Path, tmp_path: Path) - assert_frame_equal(df, expected) -def test_sink_json_should_write_same_data_with_json_argument(io_files_path: Path, tmp_path: Path) -> None: +def test_sink_json_should_write_same_data_with_json_argument( + io_files_path: Path, tmp_path: Path +) -> None: tmp_path.mkdir(exist_ok=True) # Arrange source_path = io_files_path / "foods1.csv" @@ -161,13 +163,15 @@ def test_sink_json_should_write_same_data_with_json_argument(io_files_path: Path # df = pl.read_json(target_path) # # Assert # assert_frame_equal(df, expected) - lf.sink_json(target_path, json_format="json") + lf.sink_json(target_path, json_format="json") df = pl.read_json(target_path) # Assert assert_frame_equal(df, expected) -def test_sink_json_should_write_same_data_with_json_lines_argument(io_files_path: Path, tmp_path: Path) -> None: +def test_sink_json_should_write_same_data_with_json_lines_argument( + io_files_path: Path, tmp_path: Path +) -> None: tmp_path.mkdir(exist_ok=True) # Arrange source_path = io_files_path / "foods1.csv" @@ -176,13 +180,15 @@ def test_sink_json_should_write_same_data_with_json_lines_argument(io_files_path expected = pl.read_csv(source_path) lf = pl.scan_csv(source_path) # Act - lf.sink_json(target_path, json_format="json_lines") + lf.sink_json(target_path, json_format="json_lines") df = pl.read_ndjson(target_path) # Assert assert_frame_equal(df, expected) -def test_sink_json_should_raise_exception_with_invalid_argument(io_files_path: Path, tmp_path: Path) -> None: +def test_sink_json_should_raise_exception_with_invalid_argument( + io_files_path: Path, tmp_path: Path +) -> None: tmp_path.mkdir(exist_ok=True) # Arrange source_path = io_files_path / "foods1.csv" @@ -191,4 +197,4 @@ def test_sink_json_should_raise_exception_with_invalid_argument(io_files_path: P lf = pl.scan_csv(source_path) # Act & Assert with pytest.raises(ValueError): - lf.sink_json(target_path, json_format="invalid_argument") \ No newline at end of file + lf.sink_json(target_path, json_format="invalid_argument") # type: ignore[arg-type] From 89b5de52c3de2b112fdf882d50b35c35c7bc2e73 Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Wed, 6 Sep 2023 18:46:29 -0600 Subject: [PATCH 12/26] fix: unit tests --- crates/polars-io/src/json/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 7ad3a496d95d..55eab616128f 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -116,10 +116,7 @@ pub struct JsonWriter { json_format: JsonFormat, } -impl JsonWriter -where - W: Write + 'static, -{ +impl JsonWriter { pub fn with_json_format(mut self, format: JsonFormat) -> Self { self.json_format = format; self From 7b3d9013c6de008c71eaac6f5695a7f40e0d3828 Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Fri, 8 Sep 2023 13:48:21 -0600 Subject: [PATCH 13/26] style: fix code formatting --- crates/polars-lazy/src/frame/mod.rs | 64 +++++++++++++++++------------ py-polars/polars/lazyframe/frame.py | 4 +- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 12721416f341..a2028a92c8fc 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -684,12 +684,14 @@ impl LazyFrame { /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "parquet")] - pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { - self.sink( SinkType::File { - path: Arc::new(path), - file_type: FileType::Parquet(options), - }, - "collect().write_parquet()") + pub fn sink_parquet(self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { + self.sink( + SinkType::File { + path: Arc::new(path), + file_type: FileType::Parquet(options), + }, + "collect().write_parquet()", + ) } /// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit @@ -703,12 +705,14 @@ impl LazyFrame { cloud_options: Option, parquet_options: ParquetWriteOptions, ) -> PolarsResult<()> { - self.sink( SinkType::Cloud { - uri: Arc::new(uri), - cloud_options, - file_type: FileType::Parquet(parquet_options), - }, - "collect().write_parquet()") + self.sink( + SinkType::Cloud { + uri: Arc::new(uri), + cloud_options, + file_type: FileType::Parquet(parquet_options), + }, + "collect().write_parquet()", + ) } /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit @@ -716,11 +720,13 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "ipc")] pub fn sink_ipc(self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { - self.sink( SinkType::File { - path: Arc::new(path), - file_type: FileType::Ipc(options), - }, - "collect().write_ipc()") + self.sink( + SinkType::File { + path: Arc::new(path), + file_type: FileType::Ipc(options), + }, + "collect().write_ipc()", + ) } /// Stream a query result into an csv file. This is useful if the final result doesn't fit @@ -728,11 +734,13 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "csv")] pub fn sink_csv(self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { - self.sink( SinkType::File { - path: Arc::new(path), - file_type: FileType::Csv(options) - }, - "collect().write_csv()") + self.sink( + SinkType::File { + path: Arc::new(path), + file_type: FileType::Csv(options), + }, + "collect().write_csv()", + ) } /// Stream a query result into a json file. This is useful if the final result doesn't fit @@ -740,11 +748,13 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "json")] pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { - self.sink( SinkType::File { - path: Arc::new(path), - file_type: FileType::Json(options) - }, - "collect().write_ndjson()` or `collect().write_json()") + self.sink( + SinkType::File { + path: Arc::new(path), + file_type: FileType::Json(options), + }, + "collect().write_ndjson()` or `collect().write_json()", + ) } fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> { diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 148169808233..60d889adedaa 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -2306,9 +2306,7 @@ def sink_json( ) return lf.sink_json( - path=path, - json_format=json_format, - maintain_order=maintain_order + path=path, json_format=json_format, maintain_order=maintain_order ) def _set_sink_optimizations( From e1610cf0e9836d9c1508dc55c22d26e395441030 Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Fri, 8 Sep 2023 14:11:40 -0600 Subject: [PATCH 14/26] fix: rust lint --- crates/polars-lazy/src/frame/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index a2028a92c8fc..ad822fb0d980 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -700,7 +700,7 @@ impl LazyFrame { /// streaming fashion. #[cfg(all(feature = "cloud_write", feature = "parquet"))] pub fn sink_parquet_cloud( - mut self, + self, uri: String, cloud_options: Option, parquet_options: ParquetWriteOptions, From 4546a7d6fc2a50cb403305d48c2e00cca1850e4f Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Fri, 8 Sep 2023 16:07:33 -0600 Subject: [PATCH 15/26] fix: feature check --- crates/polars-lazy/src/frame/mod.rs | 14 +++++++++++++- crates/polars-lazy/src/physical_plan/file_cache.rs | 7 ++++++- crates/polars-lazy/src/physical_plan/state.rs | 7 ++++++- crates/polars-plan/src/logical_plan/mod.rs | 8 +++++++- .../polars-plan/src/logical_plan/optimizer/mod.rs | 8 +++++++- 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index ad822fb0d980..7a614997d5fb 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -617,7 +617,12 @@ impl LazyFrame { collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena); Some(fps) } - #[cfg(not(any(feature = "ipc", feature = "parquet", feature = "csv")))] + #[cfg(not(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + )))] { None } @@ -757,6 +762,13 @@ impl LazyFrame { ) } + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "cloud_write", + feature = "csv", + feature = "json", + ))] fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> { self.opt_state.streaming = true; self.logical_plan = LogicalPlan::Sink { diff --git a/crates/polars-lazy/src/physical_plan/file_cache.rs b/crates/polars-lazy/src/physical_plan/file_cache.rs index 24ce3c1fb873..bf2b281fe3b2 100644 --- a/crates/polars-lazy/src/physical_plan/file_cache.rs +++ b/crates/polars-lazy/src/physical_plan/file_cache.rs @@ -1,7 +1,12 @@ use std::sync::Mutex; use polars_core::prelude::*; -#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))] +#[cfg(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" +))] use polars_plan::logical_plan::FileFingerPrint; use crate::prelude::*; diff --git a/crates/polars-lazy/src/physical_plan/state.rs b/crates/polars-lazy/src/physical_plan/state.rs index 79a50b716435..d8e0334b41e2 100644 --- a/crates/polars-lazy/src/physical_plan/state.rs +++ b/crates/polars-lazy/src/physical_plan/state.rs @@ -163,7 +163,12 @@ impl ExecutionState { } } - #[cfg(not(any(feature = "parquet", feature = "csv", feature = "ipc")))] + #[cfg(not(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" + )))] pub(crate) fn with_finger_prints(_finger_prints: Option) -> Self { Self::new() } diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index d394327e41f4..d1cba80d7ad4 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -55,7 +55,13 @@ pub use schema::*; use serde::{Deserialize, Serialize}; use strum_macros::IntoStaticStr; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "cse"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "cse", + feature = "json" +))] pub use crate::logical_plan::optimizer::file_caching::{ collect_fingerprints, find_column_union_and_fingerprints, FileCacher, FileFingerPrint, }; diff --git a/crates/polars-plan/src/logical_plan/optimizer/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/mod.rs index 3f0dc062e994..755634f03643 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/mod.rs @@ -13,7 +13,13 @@ mod collect_members; #[cfg(feature = "cse")] mod cse_expr; mod fast_projection; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "cse"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "cse", + feature = "json" +))] pub(crate) mod file_caching; mod flatten_union; #[cfg(feature = "fused")] From ee1b225f489be283eea69d39efbdb6d7b14e12bf Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Tue, 19 Sep 2023 15:12:30 -0600 Subject: [PATCH 16/26] fix: using polars_json and formatting --- crates/polars-io/src/json/mod.rs | 6 +++--- .../polars-pipe/src/executors/sinks/file_sink.rs | 7 ++++++- crates/polars-pipe/src/executors/sinks/mod.rs | 14 ++++++++++++-- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 55eab616128f..c39b4f3677cb 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -67,11 +67,11 @@ use std::ops::Deref; use arrow::array::StructArray; use arrow::legacy::conversion::chunk_to_struct; -use polars_json::json::write::FallibleStreamingIterator; use polars_core::error::to_compute_err; use polars_core::prelude::*; use polars_core::utils::try_get_supertype; use polars_json::json::infer; +use polars_json::json::write::FallibleStreamingIterator; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use polars_json::{json, ndjson}; @@ -195,7 +195,7 @@ where let batches = df .iter_chunks() .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); - let mut serializer = json::write::Serializer::new(batches, vec![]); + let mut serializer = polars_json::json::write::Serializer::new(batches, vec![]); while let Some(block) = serializer.next()? { if self.is_first_row { @@ -242,7 +242,7 @@ where let batches = df .iter_chunks() .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); - let mut serializer = ndjson::write::Serializer::new(batches, vec![]); + let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]); while let Some(block) = serializer.next()? { self.writer.write_all(block)?; } diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 441f49f1c97a..a36220f1fa5b 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -283,7 +283,12 @@ impl JsonSink { } } -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] fn init_writer_thread( receiver: Receiver>, mut writer: Box, diff --git a/crates/polars-pipe/src/executors/sinks/mod.rs b/crates/polars-pipe/src/executors/sinks/mod.rs index 59759470170c..6adb3a291a2e 100644 --- a/crates/polars-pipe/src/executors/sinks/mod.rs +++ b/crates/polars-pipe/src/executors/sinks/mod.rs @@ -1,4 +1,9 @@ -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] mod file_sink; pub(crate) mod group_by; mod io; @@ -10,7 +15,12 @@ mod slice; mod sort; mod utils; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] pub(crate) use file_sink::*; pub(crate) use joins::*; pub(crate) use ordered::*; From 6fb40cef566f46b623750eb5739ef2a51d532ed8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Sat, 23 Sep 2023 22:22:02 -0600 Subject: [PATCH 17/26] feature: add code style --- crates/polars-lazy/src/physical_plan/state.rs | 2 +- py-polars/src/lazyframe.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/state.rs b/crates/polars-lazy/src/physical_plan/state.rs index d8e0334b41e2..00b923781741 100644 --- a/crates/polars-lazy/src/physical_plan/state.rs +++ b/crates/polars-lazy/src/physical_plan/state.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{Mutex, RwLock}; -use bitflags::{bitflags, Flags}; +use bitflags::bitflags; use once_cell::sync::OnceCell; use polars_core::config::verbose; use polars_core::frame::group_by::GroupsProxy; diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index d4ab0d208889..5f5d48bb2440 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -11,10 +11,10 @@ use polars::lazy::frame::LazyCsvReader; use polars::lazy::frame::LazyJsonLineReader; use polars::lazy::frame::{AllowedOptimizations, LazyFrame}; use polars::lazy::prelude::col; -#[cfg(feature = "csv")] -use polars::prelude::{cloud, ClosedWindow, CsvEncoding, Field, JoinType, Schema}; #[cfg(feature = "json")] use polars::prelude::JsonFormat; +#[cfg(feature = "csv")] +use polars::prelude::{ClosedWindow, CsvEncoding, Field, JoinType, Schema}; use polars::time::*; use polars_core::frame::explode::MeltArgs; use polars_core::frame::UniqueKeepStrategy; From f22727e2ad8f245256a0d748ee85172f046ace79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Tue, 26 Sep 2023 13:45:45 -0600 Subject: [PATCH 18/26] add json sink features --- crates/polars-io/src/json/mod.rs | 82 +++++++++---------- .../src/executors/sinks/file_sink.rs | 35 +------- 2 files changed, 40 insertions(+), 77 deletions(-) diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index c39b4f3677cb..c7edcf4068b3 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -63,6 +63,7 @@ //! use std::convert::TryFrom; use std::io::Write; +use std::iter::Map; use std::ops::Deref; use arrow::array::StructArray; @@ -76,6 +77,7 @@ use polars_json::json::write::FallibleStreamingIterator; use serde::{Deserialize, Serialize}; use polars_json::{json, ndjson}; use simd_json::BorrowedValue; +use polars_core::frame::{ArrowChunk, RecordBatchIter}; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::prelude::*; @@ -160,29 +162,26 @@ where } } -pub trait BatchedWriter { - fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()>; - fn finish(&mut self) -> PolarsResult<()>; -} - pub struct JsonBatchedWriter { writer: W, is_first_row: bool, + json_format: JsonFormat, } impl JsonBatchedWriter where W: Write, { - pub fn new(writer: W) -> Self { + pub fn new(writer: W, json_format: JsonFormat) -> Self { JsonBatchedWriter { writer, is_first_row: true, + json_format, } } } -impl BatchedWriter for JsonBatchedWriter +impl SinkWriter for JsonBatchedWriter where W: Write, { @@ -195,62 +194,57 @@ where let batches = df .iter_chunks() .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); - let mut serializer = polars_json::json::write::Serializer::new(batches, vec![]); - - while let Some(block) = serializer.next()? { - if self.is_first_row { - self.writer.write_all(&[b'['])?; - self.is_first_row = false; - } else { - self.writer.write_all(&[b','])?; + match self.json_format { + JsonFormat::Json => { + self.write_json_batch(batches)?; + }, + JsonFormat::JsonLines => { + self.write_json_lines_batch(batches)?; } - self.writer.write_all(block)?; } Ok(()) } /// Writes the footer of the Json file. fn finish(&mut self) -> PolarsResult<()> { - self.writer.write_all(&[b']'])?; + match self.json_format { + JsonFormat::Json => { + self.write_json_finish()?; + }, + _ => {} + } Ok(()) } } - -pub struct JsonLinesBatchedWriter { - writer: W, -} - -impl JsonLinesBatchedWriter -where - W: Write, -{ - pub fn new(writer: W) -> Self { - JsonLinesBatchedWriter { writer } +/// These are the methods implementation for json lines format +impl JsonBatchedWriter where W: Write { + fn write_json_lines_batch(&mut self, batches: I) -> Result<(), PolarsError> { + let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]); + while let Some(block) = serializer.next()? { + self.writer.write_all(block)?; + } + Ok(()) } } +/// These are the methods implementation for standard json format +impl JsonBatchedWriter where W: Write { + fn write_json_batch(&mut self, batches: I) -> Result<(), PolarsError> { + let mut serializer = polars_json::json::write::Serializer::new(batches, vec![]); -impl BatchedWriter for JsonLinesBatchedWriter -where - W: Write, -{ - /// Write a batch to the json writer. - /// - /// # Panics - /// The caller must ensure the chunks in the given [`DataFrame`] are aligned. - fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { - let fields = df.iter().map(|s| s.field().to_arrow()).collect::>(); - let batches = df - .iter_chunks() - .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); - let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]); while let Some(block) = serializer.next()? { + if self.is_first_row { + self.writer.write_all(&[b'['])?; + self.is_first_row = false; + } else { + self.writer.write_all(&[b','])?; + } self.writer.write_all(block)?; } Ok(()) } - /// Writes the footer of the Json file. - fn finish(&mut self) -> PolarsResult<()> { + fn write_json_finish(&mut self) -> Result<(), PolarsError> { + self.writer.write_all(&[b']'])?; Ok(()) } } diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index a36220f1fa5b..0eff719cbb65 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -7,7 +7,7 @@ use polars_core::prelude::*; #[cfg(feature = "csv")] use polars_io::csv::CsvWriter; #[cfg(feature = "json")] -use polars_io::json::{BatchedWriter, JsonBatchedWriter, JsonFormat, JsonLinesBatchedWriter}; +use polars_io::json::JsonBatchedWriter; #[cfg(feature = "parquet")] use polars_io::parquet::ParquetWriter; #[cfg(feature = "ipc")] @@ -65,30 +65,6 @@ impl SinkWriter for polars_io::csv::BatchedWriter { } } -#[cfg(feature = "json")] -impl SinkWriter for JsonBatchedWriter { - fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { - self.write_batch(df) - } - - fn _finish(&mut self) -> PolarsResult<()> { - self.finish()?; - Ok(()) - } -} - -#[cfg(feature = "json")] -impl SinkWriter for JsonLinesBatchedWriter { - fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { - self.write_batch(df) - } - - fn _finish(&mut self) -> PolarsResult<()> { - self.finish()?; - Ok(()) - } -} - #[cfg(feature = "parquet")] pub struct ParquetSink {} #[cfg(feature = "parquet")] @@ -257,14 +233,7 @@ impl JsonSink { _schema: &Schema, ) -> PolarsResult { let file = std::fs::File::create(path)?; - let writer = match options.json_format { - JsonFormat::Json => { - Box::new(JsonBatchedWriter::new(file)) as Box - }, - JsonFormat::JsonLines => { - Box::new(JsonLinesBatchedWriter::new(file)) as Box - }, - }; + let writer = Box::new(JsonBatchedWriter::new(file, options.json_format)) as Box; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; let (sender, receiver) = bounded(backpressure); From 38588e341f4bc57492a3f9524eac0b7d03c0585d Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Tue, 26 Sep 2023 19:12:08 -0600 Subject: [PATCH 19/26] refactor fixes --- crates/polars-io/src/json/mod.rs | 47 ++++++++++++------- .../src/executors/sinks/file_sink.rs | 15 +++++- 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index c7edcf4068b3..520e6e5a6ab4 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -63,12 +63,12 @@ //! use std::convert::TryFrom; use std::io::Write; -use std::iter::Map; use std::ops::Deref; use arrow::array::StructArray; use arrow::legacy::conversion::chunk_to_struct; use polars_core::error::to_compute_err; +use polars_core::frame::RecordBatchIter; use polars_core::prelude::*; use polars_core::utils::try_get_supertype; use polars_json::json::infer; @@ -77,7 +77,6 @@ use polars_json::json::write::FallibleStreamingIterator; use serde::{Deserialize, Serialize}; use polars_json::{json, ndjson}; use simd_json::BorrowedValue; -use polars_core::frame::{ArrowChunk, RecordBatchIter}; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::prelude::*; @@ -181,7 +180,7 @@ where } } -impl SinkWriter for JsonBatchedWriter +impl JsonBatchedWriter where W: Write, { @@ -189,36 +188,43 @@ where /// /// # Panics /// The caller must ensure the chunks in the given [`DataFrame`] are aligned. - fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { let fields = df.iter().map(|s| s.field().to_arrow()).collect::>(); - let batches = df - .iter_chunks() - .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); + let chunks = df.iter_chunks(); match self.json_format { JsonFormat::Json => { - self.write_json_batch(batches)?; + self.write_json_batch(fields, chunks)?; }, JsonFormat::JsonLines => { - self.write_json_lines_batch(batches)?; - } + self.write_json_lines_batch(fields, chunks)?; + }, } Ok(()) } /// Writes the footer of the Json file. - fn finish(&mut self) -> PolarsResult<()> { + pub fn finish(&mut self) -> PolarsResult<()> { match self.json_format { JsonFormat::Json => { self.write_json_finish()?; }, - _ => {} + _ => {}, } Ok(()) } } /// These are the methods implementation for json lines format -impl JsonBatchedWriter where W: Write { - fn write_json_lines_batch(&mut self, batches: I) -> Result<(), PolarsError> { +impl JsonBatchedWriter +where + W: Write, +{ + fn write_json_lines_batch( + &mut self, + fields: Vec, + chunks: RecordBatchIter, + ) -> Result<(), PolarsError> { + let batches = + chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]); while let Some(block) = serializer.next()? { self.writer.write_all(block)?; @@ -227,8 +233,17 @@ impl JsonBatchedWriter where W: Write { } } /// These are the methods implementation for standard json format -impl JsonBatchedWriter where W: Write { - fn write_json_batch(&mut self, batches: I) -> Result<(), PolarsError> { +impl JsonBatchedWriter +where + W: Write, +{ + fn write_json_batch( + &mut self, + fields: Vec, + chunks: RecordBatchIter, + ) -> Result<(), PolarsError> { + let batches = + chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); let mut serializer = polars_json::json::write::Serializer::new(batches, vec![]); while let Some(block) = serializer.next()? { diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 0eff719cbb65..3ec616946ca9 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -65,6 +65,18 @@ impl SinkWriter for polars_io::csv::BatchedWriter { } } +#[cfg(feature = "json")] +impl SinkWriter for JsonBatchedWriter { + fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { + self.write_batch(df) + } + + fn _finish(&mut self) -> PolarsResult<()> { + self.finish()?; + Ok(()) + } +} + #[cfg(feature = "parquet")] pub struct ParquetSink {} #[cfg(feature = "parquet")] @@ -233,7 +245,8 @@ impl JsonSink { _schema: &Schema, ) -> PolarsResult { let file = std::fs::File::create(path)?; - let writer = Box::new(JsonBatchedWriter::new(file, options.json_format)) as Box; + let writer = Box::new(JsonBatchedWriter::new(file, options.json_format)) + as Box; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; let (sender, receiver) = bounded(backpressure); From 935c58b43c1e7e4d6ff490d5a5b06fcf6fce2526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Wed, 27 Sep 2023 01:32:22 -0600 Subject: [PATCH 20/26] fix: replace match with if structure, for single_match --- crates/polars-io/src/json/mod.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 520e6e5a6ab4..9831a6cda5b2 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -204,11 +204,8 @@ where /// Writes the footer of the Json file. pub fn finish(&mut self) -> PolarsResult<()> { - match self.json_format { - JsonFormat::Json => { - self.write_json_finish()?; - }, - _ => {}, + if JsonFormat::Json == self.json_format { + self.write_json_finish()?; } Ok(()) } From da07fd86219df4be61a9e1ec0f76578ff70f5475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Wed, 27 Sep 2023 16:34:51 -0600 Subject: [PATCH 21/26] fix: update style with pre-commit --- py-polars/polars/lazyframe/frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 60d889adedaa..366e3afca23e 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -101,8 +101,8 @@ IntoExprColumn, JoinStrategy, JoinValidation, - Label, JsonFormat, + Label, Orientation, ParallelStrategy, PolarsDataType, From 4c95a5f0f09849db0ba27c9077bfa15a168775b2 Mon Sep 17 00:00:00 2001 From: Abraham Alcantara Gonzalez Date: Fri, 29 Sep 2023 16:48:19 -0600 Subject: [PATCH 22/26] reverting changes in dependencies for ipc and csv --- crates/polars-lazy/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index a1c5ab760b8f..605cc437be36 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -46,9 +46,9 @@ async = [ ] cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio", "futures"] cloud_write = ["cloud"] -ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe/ipc"] +ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe?/ipc"] json = ["polars-io/json", "polars-plan/json", "polars-json", "polars-pipe/json"] -csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe/csv"] +csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe?/csv"] temporal = ["dtype-datetime", "dtype-date", "dtype-time", "dtype-duration", "polars-plan/temporal"] # debugging purposes fmt = ["polars-core/fmt", "polars-plan/fmt"] From d02b9b2895c0b70cc1bfa966f261125617ef7230 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Mon, 6 Nov 2023 12:00:46 -0600 Subject: [PATCH 23/26] fix: removing json support --- crates/polars-io/src/json/mod.rs | 87 +-------------- crates/polars-lazy/src/frame/mod.rs | 103 +++++++++++------- .../src/executors/sinks/file_sink.rs | 11 +- .../polars-plan/src/logical_plan/options.rs | 4 - py-polars/polars/lazyframe/frame.py | 11 +- py-polars/polars/type_aliases.py | 1 - py-polars/src/conversion.rs | 18 --- py-polars/src/lazyframe.rs | 20 +--- py-polars/tests/unit/io/test_lazy_json.py | 53 +-------- 9 files changed, 82 insertions(+), 226 deletions(-) diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index 9831a6cda5b2..4a359fe3c699 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -68,14 +68,10 @@ use std::ops::Deref; use arrow::array::StructArray; use arrow::legacy::conversion::chunk_to_struct; use polars_core::error::to_compute_err; -use polars_core::frame::RecordBatchIter; use polars_core::prelude::*; use polars_core::utils::try_get_supertype; use polars_json::json::infer; use polars_json::json::write::FallibleStreamingIterator; -#[cfg(feature = "serde")] -use serde::{Deserialize, Serialize}; -use polars_json::{json, ndjson}; use simd_json::BorrowedValue; use crate::mmap::{MmapBytesReader, ReaderBytes}; @@ -84,8 +80,6 @@ use crate::prelude::*; /// The format to use to write the DataFrame to JSON: `Json` (a JSON array) or `JsonLines` (each row output on a /// separate line). In either case, each row is serialized as a JSON object whose keys are the column names and whose /// values are the row's corresponding values. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum JsonFormat { /// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in /// the DataFrame. @@ -99,7 +93,6 @@ pub enum JsonFormat { /// at a time. But the output in its entirety is not valid JSON; only the individual lines are. /// /// It is recommended to use the file extension `.jsonl` when saving as JSON Lines. - #[default] JsonLines, } @@ -161,29 +154,17 @@ where } } -pub struct JsonBatchedWriter { +pub struct BatchedWriter { writer: W, - is_first_row: bool, - json_format: JsonFormat, } -impl JsonBatchedWriter +impl BatchedWriter where W: Write, { - pub fn new(writer: W, json_format: JsonFormat) -> Self { - JsonBatchedWriter { - writer, - is_first_row: true, - json_format, - } + pub fn new(writer: W) -> Self { + BatchedWriter { writer } } -} - -impl JsonBatchedWriter -where - W: Write, -{ /// Write a batch to the json writer. /// /// # Panics @@ -191,35 +172,6 @@ where pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { let fields = df.iter().map(|s| s.field().to_arrow()).collect::>(); let chunks = df.iter_chunks(); - match self.json_format { - JsonFormat::Json => { - self.write_json_batch(fields, chunks)?; - }, - JsonFormat::JsonLines => { - self.write_json_lines_batch(fields, chunks)?; - }, - } - Ok(()) - } - - /// Writes the footer of the Json file. - pub fn finish(&mut self) -> PolarsResult<()> { - if JsonFormat::Json == self.json_format { - self.write_json_finish()?; - } - Ok(()) - } -} -/// These are the methods implementation for json lines format -impl JsonBatchedWriter -where - W: Write, -{ - fn write_json_lines_batch( - &mut self, - fields: Vec, - chunks: RecordBatchIter, - ) -> Result<(), PolarsError> { let batches = chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]); @@ -229,37 +181,6 @@ where Ok(()) } } -/// These are the methods implementation for standard json format -impl JsonBatchedWriter -where - W: Write, -{ - fn write_json_batch( - &mut self, - fields: Vec, - chunks: RecordBatchIter, - ) -> Result<(), PolarsError> { - let batches = - chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); - let mut serializer = polars_json::json::write::Serializer::new(batches, vec![]); - - while let Some(block) = serializer.next()? { - if self.is_first_row { - self.writer.write_all(&[b'['])?; - self.is_first_row = false; - } else { - self.writer.write_all(&[b','])?; - } - self.writer.write_all(block)?; - } - Ok(()) - } - - fn write_json_finish(&mut self) -> Result<(), PolarsError> { - self.writer.write_all(&[b']'])?; - Ok(()) - } -} /// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame. #[must_use] diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 7a614997d5fb..f98e7180d351 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -689,14 +689,23 @@ impl LazyFrame { /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "parquet")] - pub fn sink_parquet(self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { - self.sink( - SinkType::File { + pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { + self.opt_state.streaming = true; + self.logical_plan = LogicalPlan::Sink { + input: Box::new(self.logical_plan), + payload: SinkType::File { path: Arc::new(path), file_type: FileType::Parquet(options), }, - "collect().write_parquet()", - ) + }; + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: "cannot run the whole query in a streaming order; \ + use `collect().write_parquet()` instead" + ); + let _ = physical_plan.execute(&mut state)?; + Ok(()) } /// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit @@ -705,81 +714,93 @@ impl LazyFrame { /// streaming fashion. #[cfg(all(feature = "cloud_write", feature = "parquet"))] pub fn sink_parquet_cloud( - self, + mut self, uri: String, cloud_options: Option, parquet_options: ParquetWriteOptions, ) -> PolarsResult<()> { - self.sink( - SinkType::Cloud { + self.opt_state.streaming = true; + self.logical_plan = LogicalPlan::Sink { + input: Box::new(self.logical_plan), + payload: SinkType::Cloud { uri: Arc::new(uri), cloud_options, file_type: FileType::Parquet(parquet_options), }, - "collect().write_parquet()", - ) + }; + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: "cannot run the whole query in a streaming order" + ); + let _ = physical_plan.execute(&mut state)?; + Ok(()) } /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "ipc")] - pub fn sink_ipc(self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { - self.sink( - SinkType::File { + pub fn sink_ipc(mut self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { + self.opt_state.streaming = true; + self.logical_plan = LogicalPlan::Sink { + input: Box::new(self.logical_plan), + payload: SinkType::File { path: Arc::new(path), file_type: FileType::Ipc(options), }, - "collect().write_ipc()", - ) + }; + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: "cannot run the whole query in a streaming order; \ + use `collect().write_ipc()` instead" + ); + let _ = physical_plan.execute(&mut state)?; + Ok(()) } /// Stream a query result into an csv file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "csv")] - pub fn sink_csv(self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { - self.sink( - SinkType::File { + pub fn sink_csv(mut self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { + self.opt_state.streaming = true; + self.logical_plan = LogicalPlan::Sink { + input: Box::new(self.logical_plan), + payload: SinkType::File { path: Arc::new(path), file_type: FileType::Csv(options), }, - "collect().write_csv()", - ) + }; + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: "cannot run the whole query in a streaming order; \ + use `collect().write_csv()` instead" + ); + let _ = physical_plan.execute(&mut state)?; + Ok(()) } /// Stream a query result into a json file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "json")] - pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { - self.sink( - SinkType::File { - path: Arc::new(path), - file_type: FileType::Json(options), - }, - "collect().write_ndjson()` or `collect().write_json()", - ) - } - - #[cfg(any( - feature = "ipc", - feature = "parquet", - feature = "cloud_write", - feature = "csv", - feature = "json", - ))] - fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> { + pub fn sink_json(mut self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { self.opt_state.streaming = true; self.logical_plan = LogicalPlan::Sink { input: Box::new(self.logical_plan), - payload, + payload: SinkType::File { + path: Arc::new(path), + file_type: FileType::Json(options), + }, }; let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; polars_ensure!( is_streaming, - ComputeError: format!("cannot run the whole query in a streaming order; \ - use `{msg_alternative}` instead", msg_alternative=msg_alternative) + ComputeError: "cannot run the whole query in a streaming order; \ + use `collect().write_ndjson()` instead" ); let _ = physical_plan.execute(&mut state)?; Ok(()) diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 3ec616946ca9..feb8942a1b34 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -7,7 +7,7 @@ use polars_core::prelude::*; #[cfg(feature = "csv")] use polars_io::csv::CsvWriter; #[cfg(feature = "json")] -use polars_io::json::JsonBatchedWriter; +use polars_io::json::BatchedWriter; #[cfg(feature = "parquet")] use polars_io::parquet::ParquetWriter; #[cfg(feature = "ipc")] @@ -66,13 +66,12 @@ impl SinkWriter for polars_io::csv::BatchedWriter { } #[cfg(feature = "json")] -impl SinkWriter for JsonBatchedWriter { +impl SinkWriter for BatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } fn _finish(&mut self) -> PolarsResult<()> { - self.finish()?; Ok(()) } } @@ -245,8 +244,10 @@ impl JsonSink { _schema: &Schema, ) -> PolarsResult { let file = std::fs::File::create(path)?; - let writer = Box::new(JsonBatchedWriter::new(file, options.json_format)) - as Box; + let writer = BatchedWriter::new(file); + + let writer = Box::new(writer) as Box; + let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; let (sender, receiver) = bounded(backpressure); diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 64c40c2f243e..1ed5ad28593b 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -7,8 +7,6 @@ use polars_io::csv::SerializeOptions; use polars_io::csv::{CsvEncoding, NullValues}; #[cfg(feature = "ipc")] use polars_io::ipc::IpcCompression; -#[cfg(feature = "json")] -use polars_io::json::JsonFormat; #[cfg(feature = "parquet")] use polars_io::parquet::ParquetCompression; use polars_io::RowCount; @@ -91,8 +89,6 @@ pub struct CsvWriterOptions { #[derive(Copy, Clone, Debug, PartialEq, Eq, Default)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct JsonWriterOptions { - /// format to use to write the DataFrame to JSON - pub json_format: JsonFormat, /// maintain the order the data was processed pub maintain_order: bool, } diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 366e3afca23e..475572b36ba6 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -101,7 +101,6 @@ IntoExprColumn, JoinStrategy, JoinValidation, - JsonFormat, Label, Orientation, ParallelStrategy, @@ -2245,11 +2244,10 @@ def sink_csv( maintain_order=maintain_order, ) - def sink_json( + def sink_ndjson( self, path: str | Path, *, - json_format: JsonFormat | None = None, maintain_order: bool = True, type_coercion: bool = True, predicate_pushdown: bool = True, @@ -2267,9 +2265,6 @@ def sink_json( ---------- path File path to which the file should be written. - json_format : {'json', 'json_lines'} - Choose "json" for single JSON array containing each row as an object. - Choose "json_lines" for each row output on a separate line. maintain_order Maintain the order in which data is processed. Setting this to `False` will be slightly faster. @@ -2305,9 +2300,7 @@ def sink_json( slice_pushdown=slice_pushdown, ) - return lf.sink_json( - path=path, json_format=json_format, maintain_order=maintain_order - ) + return lf.sink_json(path=path, maintain_order=maintain_order) def _set_sink_optimizations( self, diff --git a/py-polars/polars/type_aliases.py b/py-polars/polars/type_aliases.py index ad17e296202c..a1bbb246b1bc 100644 --- a/py-polars/polars/type_aliases.py +++ b/py-polars/polars/type_aliases.py @@ -128,7 +128,6 @@ UniqueKeepStrategy: TypeAlias = Literal["first", "last", "any", "none"] UnstackDirection: TypeAlias = Literal["vertical", "horizontal"] MapElementsStrategy: TypeAlias = Literal["thread_local", "threading"] -JsonFormat: TypeAlias = Literal["json", "json_lines"] # The following have a Rust enum equivalent with a different name AsofJoinStrategy: TypeAlias = Literal["backward", "forward", "nearest"] # AsofStrategy diff --git a/py-polars/src/conversion.rs b/py-polars/src/conversion.rs index 493b62f50ac1..0b6b4ab00e54 100644 --- a/py-polars/src/conversion.rs +++ b/py-polars/src/conversion.rs @@ -10,8 +10,6 @@ use polars::io::avro::AvroCompression; #[cfg(feature = "ipc")] use polars::io::ipc::IpcCompression; use polars::prelude::AnyValue; -#[cfg(feature = "json")] -use polars::prelude::JsonFormat; use polars::series::ops::NullBehavior; use polars_core::frame::row::any_values_to_dtype; use polars_core::prelude::{IndexOrder, QuantileInterpolOptions}; @@ -1318,22 +1316,6 @@ impl FromPyObject<'_> for Wrap { } } -#[cfg(feature = "json")] -impl FromPyObject<'_> for Wrap { - fn extract(ob: &PyAny) -> PyResult { - let parsed = match ob.extract::<&str>()? { - "json" => JsonFormat::Json, - "json_lines" => JsonFormat::JsonLines, - v => { - return Err(PyValueError::new_err(format!( - "json format must be one of: {{'json', 'json_lines'}}, got {v}", - ))) - }, - }; - Ok(Wrap(parsed)) - } -} - impl FromPyObject<'_> for Wrap { fn extract(ob: &PyAny) -> PyResult { let parsed = match ob.extract::<&str>()? { diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index 5f5d48bb2440..4572ccd3e5a8 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -11,10 +11,9 @@ use polars::lazy::frame::LazyCsvReader; use polars::lazy::frame::LazyJsonLineReader; use polars::lazy::frame::{AllowedOptimizations, LazyFrame}; use polars::lazy::prelude::col; -#[cfg(feature = "json")] -use polars::prelude::JsonFormat; #[cfg(feature = "csv")] -use polars::prelude::{ClosedWindow, CsvEncoding, Field, JoinType, Schema}; +use polars::prelude::CsvEncoding; +use polars::prelude::{ClosedWindow, Field, JoinType, Schema}; use polars::time::*; use polars_core::frame::explode::MeltArgs; use polars_core::frame::UniqueKeepStrategy; @@ -631,18 +630,9 @@ impl PyLazyFrame { #[allow(clippy::too_many_arguments)] #[cfg(all(feature = "streaming", feature = "json"))] - #[pyo3(signature = (path, json_format, maintain_order))] - fn sink_json( - &self, - py: Python, - path: PathBuf, - json_format: Option>, - maintain_order: bool, - ) -> PyResult<()> { - let options = JsonWriterOptions { - json_format: json_format.map(|c| c.0).unwrap_or_default(), - maintain_order, - }; + #[pyo3(signature = (path, maintain_order))] + fn sink_json(&self, py: Python, path: PathBuf, maintain_order: bool) -> PyResult<()> { + let options = JsonWriterOptions { maintain_order }; // if we don't allow threads and we have udfs trying to acquire the gil from different // threads we deadlock. diff --git a/py-polars/tests/unit/io/test_lazy_json.py b/py-polars/tests/unit/io/test_lazy_json.py index cd6f7ca325b9..36d6ae4c49f5 100644 --- a/py-polars/tests/unit/io/test_lazy_json.py +++ b/py-polars/tests/unit/io/test_lazy_json.py @@ -129,72 +129,25 @@ def test_ndjson_list_arg(io_files_path: Path) -> None: assert df.row(-1) == ("seafood", 194, 12.0, 1) assert df.row(0) == ("vegetables", 45, 0.5, 2) + def test_anonymous_scan_explain(io_files_path: Path) -> None: file = io_files_path / "foods1.ndjson" q = pl.scan_ndjson(source=file) assert "Anonymous" in q.explain() assert "Anonymous" in q.show_graph(raw_output=True) # type: ignore[operator] -def test_sink_json_should_write_same_data(io_files_path: Path, tmp_path: Path) -> None: - tmp_path.mkdir(exist_ok=True) - # Arrange - source_path = io_files_path / "foods1.csv" - target_path = tmp_path / "foods_test.ndjson" - expected = pl.read_csv(source_path) - lf = pl.scan_csv(source_path) - # Act - lf.sink_json(target_path) - df = pl.read_ndjson(target_path) - # Assert - assert_frame_equal(df, expected) - - -def test_sink_json_should_write_same_data_with_json_argument( - io_files_path: Path, tmp_path: Path -) -> None: - tmp_path.mkdir(exist_ok=True) - # Arrange - source_path = io_files_path / "foods1.csv" - target_path = tmp_path / "foods_test.json" - - expected = pl.read_csv(source_path) - lf = pl.scan_csv(source_path) - # Act - # df = pl.read_json(target_path) - # # Assert - # assert_frame_equal(df, expected) - lf.sink_json(target_path, json_format="json") - df = pl.read_json(target_path) - # Assert - assert_frame_equal(df, expected) - -def test_sink_json_should_write_same_data_with_json_lines_argument( +def test_sink_ndjson_should_write_same_data( io_files_path: Path, tmp_path: Path ) -> None: tmp_path.mkdir(exist_ok=True) # Arrange source_path = io_files_path / "foods1.csv" target_path = tmp_path / "foods_test.ndjson" - expected = pl.read_csv(source_path) lf = pl.scan_csv(source_path) # Act - lf.sink_json(target_path, json_format="json_lines") + lf.sink_ndjson(target_path) df = pl.read_ndjson(target_path) # Assert assert_frame_equal(df, expected) - - -def test_sink_json_should_raise_exception_with_invalid_argument( - io_files_path: Path, tmp_path: Path -) -> None: - tmp_path.mkdir(exist_ok=True) - # Arrange - source_path = io_files_path / "foods1.csv" - target_path = tmp_path / "foods_test.ndjson" - - lf = pl.scan_csv(source_path) - # Act & Assert - with pytest.raises(ValueError): - lf.sink_json(target_path, json_format="invalid_argument") # type: ignore[arg-type] From 622899fca5d5eab758eeb9d26dc1437374e242e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Mon, 6 Nov 2023 12:30:26 -0600 Subject: [PATCH 24/26] feat: refactoring sink method, removing deplicated code --- crates/polars-lazy/src/frame/mod.rs | 110 +++++++++------------------- 1 file changed, 36 insertions(+), 74 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index f98e7180d351..76d557719bed 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -689,23 +689,12 @@ impl LazyFrame { /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "parquet")] - pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::File { - path: Arc::new(path), - file_type: FileType::Parquet(options), - }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_parquet()` instead" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + pub fn sink_parquet(self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { + self.sink( SinkType::File { + path: Arc::new(path), + file_type: FileType::Parquet(options), + }, + "collect().write_parquet()") } /// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit @@ -719,88 +708,61 @@ impl LazyFrame { cloud_options: Option, parquet_options: ParquetWriteOptions, ) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::Cloud { - uri: Arc::new(uri), - cloud_options, - file_type: FileType::Parquet(parquet_options), - }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + self.sink( SinkType::Cloud { + uri: Arc::new(uri), + cloud_options, + file_type: FileType::Parquet(parquet_options), + }, + "collect().write_parquet()") } /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "ipc")] - pub fn sink_ipc(mut self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::File { - path: Arc::new(path), - file_type: FileType::Ipc(options), - }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_ipc()` instead" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + pub fn sink_ipc(self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { + self.sink( SinkType::File { + path: Arc::new(path), + file_type: FileType::Ipc(options), + }, + "collect().write_ipc()") } /// Stream a query result into an csv file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "csv")] - pub fn sink_csv(mut self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { - self.opt_state.streaming = true; - self.logical_plan = LogicalPlan::Sink { - input: Box::new(self.logical_plan), - payload: SinkType::File { - path: Arc::new(path), - file_type: FileType::Csv(options), - }, - }; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; - polars_ensure!( - is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_csv()` instead" - ); - let _ = physical_plan.execute(&mut state)?; - Ok(()) + pub fn sink_csv(self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { + self.sink( SinkType::File { + path: Arc::new(path), + file_type: FileType::Csv(options) + }, + "collect().write_csv()") } /// Stream a query result into a json file. This is useful if the final result doesn't fit /// into memory. This methods will return an error if the query cannot be completely done in a /// streaming fashion. #[cfg(feature = "json")] - pub fn sink_json(mut self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { + pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { + self.sink( SinkType::File { + path: Arc::new(path), + file_type: FileType::Json(options) + }, + "collect().write_ndjson()` or `collect().write_json()") + } + + fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> { self.opt_state.streaming = true; self.logical_plan = LogicalPlan::Sink { input: Box::new(self.logical_plan), - payload: SinkType::File { - path: Arc::new(path), - file_type: FileType::Json(options), - }, + payload, }; let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; polars_ensure!( is_streaming, - ComputeError: "cannot run the whole query in a streaming order; \ - use `collect().write_ndjson()` instead" + ComputeError: format!("cannot run the whole query in a streaming order; \ + use `{msg_alternative}` instead", msg_alternative=msg_alternative) ); let _ = physical_plan.execute(&mut state)?; Ok(()) From 3fc17860a506dca4dfe0f95fcd522bfeefd9db26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Mon, 6 Nov 2023 12:41:50 -0600 Subject: [PATCH 25/26] fix: applying style format to mod.rs --- crates/polars-lazy/src/frame/mod.rs | 64 +++++++++++++++++------------ 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 76d557719bed..da7f3be75000 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -690,11 +690,13 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "parquet")] pub fn sink_parquet(self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { - self.sink( SinkType::File { - path: Arc::new(path), - file_type: FileType::Parquet(options), - }, - "collect().write_parquet()") + self.sink( + SinkType::File { + path: Arc::new(path), + file_type: FileType::Parquet(options), + }, + "collect().write_parquet()", + ) } /// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit @@ -703,17 +705,19 @@ impl LazyFrame { /// streaming fashion. #[cfg(all(feature = "cloud_write", feature = "parquet"))] pub fn sink_parquet_cloud( - mut self, + self, uri: String, cloud_options: Option, parquet_options: ParquetWriteOptions, ) -> PolarsResult<()> { - self.sink( SinkType::Cloud { - uri: Arc::new(uri), - cloud_options, - file_type: FileType::Parquet(parquet_options), - }, - "collect().write_parquet()") + self.sink( + SinkType::Cloud { + uri: Arc::new(uri), + cloud_options, + file_type: FileType::Parquet(parquet_options), + }, + "collect().write_parquet()", + ) } /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit @@ -721,11 +725,13 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "ipc")] pub fn sink_ipc(self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> { - self.sink( SinkType::File { - path: Arc::new(path), - file_type: FileType::Ipc(options), - }, - "collect().write_ipc()") + self.sink( + SinkType::File { + path: Arc::new(path), + file_type: FileType::Ipc(options), + }, + "collect().write_ipc()", + ) } /// Stream a query result into an csv file. This is useful if the final result doesn't fit @@ -733,11 +739,13 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "csv")] pub fn sink_csv(self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> { - self.sink( SinkType::File { - path: Arc::new(path), - file_type: FileType::Csv(options) - }, - "collect().write_csv()") + self.sink( + SinkType::File { + path: Arc::new(path), + file_type: FileType::Csv(options), + }, + "collect().write_csv()", + ) } /// Stream a query result into a json file. This is useful if the final result doesn't fit @@ -745,11 +753,13 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "json")] pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { - self.sink( SinkType::File { - path: Arc::new(path), - file_type: FileType::Json(options) - }, - "collect().write_ndjson()` or `collect().write_json()") + self.sink( + SinkType::File { + path: Arc::new(path), + file_type: FileType::Json(options), + }, + "collect().write_ndjson()` or `collect().write_json()", + ) } fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> { From 47745b47273ee3fcc5f1704844d71f35feb4589b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Casta=C3=B1eda?= <73617305+fernandocast@users.noreply.github.com> Date: Fri, 10 Nov 2023 12:43:04 -0600 Subject: [PATCH 26/26] add feature label --- crates/polars-lazy/src/frame/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index da7f3be75000..7a614997d5fb 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -762,6 +762,13 @@ impl LazyFrame { ) } + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "cloud_write", + feature = "csv", + feature = "json", + ))] fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> { self.opt_state.streaming = true; self.logical_plan = LogicalPlan::Sink {