diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index c4bc6ee08e957..8314192a09a11 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -75,15 +75,13 @@ use polars_core::error::to_compute_err; use polars_core::prelude::*; use polars_core::utils::try_get_supertype; use polars_json::json::infer; -use simd_json::BorrowedValue; - #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use simd_json::BorrowedValue; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::prelude::*; - /// The format to use to write the DataFrame to JSON: `Json` (a JSON array) or `JsonLines` (each row output on a /// separate line). In either case, each row is serialized as a JSON object whose keys are the column names and whose /// values are the row's corresponding values. @@ -120,12 +118,14 @@ pub struct JsonWriter { json_format: JsonFormat, } -impl JsonWriter where W: Write + 'static { +impl JsonWriter +where + W: Write + 'static, +{ pub fn with_json_format(mut self, format: JsonFormat) -> Self { self.json_format = format; self } - } impl SerWriter for JsonWriter @@ -169,21 +169,27 @@ pub trait BatchedWriter { fn finish(&mut self) -> PolarsResult<()>; } -pub struct JsonBatchedWriter{ +pub struct JsonBatchedWriter { writer: W, - is_first_row: bool + is_first_row: bool, } -impl JsonBatchedWriter where W: Write { - pub fn new(writer: W) -> Self{ +impl JsonBatchedWriter +where + W: Write, +{ + pub fn new(writer: W) -> Self { JsonBatchedWriter { writer, - is_first_row: true + is_first_row: true, } } } -impl BatchedWriter for JsonBatchedWriter where W: Write { +impl BatchedWriter for JsonBatchedWriter +where + W: Write, +{ /// Write a batch to the json writer. /// /// # Panics @@ -193,8 +199,7 @@ impl BatchedWriter for JsonBatchedWriter where W: Write { let batches = df .iter_chunks() .map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); - let mut serializer = - json::write::Serializer::new(batches, vec![]); + let mut serializer = json::write::Serializer::new(batches, vec![]); while let Some(block) = serializer.next()? { if self.is_first_row { @@ -216,18 +221,22 @@ impl BatchedWriter for JsonBatchedWriter where W: Write { } pub struct JsonLinesBatchedWriter { - writer: W + writer: W, } -impl JsonLinesBatchedWriter where W: Write { - pub fn new(writer: W) -> Self{ - JsonLinesBatchedWriter { - writer - } +impl JsonLinesBatchedWriter +where + W: Write, +{ + pub fn new(writer: W) -> Self { + JsonLinesBatchedWriter { writer } } } -impl BatchedWriter for JsonLinesBatchedWriter where W: Write { +impl BatchedWriter for JsonLinesBatchedWriter +where + W: Write, +{ /// Write a batch to the json writer. /// /// # Panics diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index f598aae9d6f9c..cb66be2a2dd42 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -17,7 +17,12 @@ mod file_list_reader; pub mod pivot; use std::borrow::Cow; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] use std::path::PathBuf; use std::sync::Arc; @@ -38,7 +43,12 @@ use polars_core::prelude::*; use polars_io::RowCount; pub use polars_plan::frame::{AllowedOptimizations, OptState}; use polars_plan::global::FETCH_ROWS; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" +))] use polars_plan::logical_plan::collect_fingerprints; use polars_plan::logical_plan::optimize; use polars_plan::utils::expr_to_leaf_column_names; @@ -589,7 +599,12 @@ impl LazyFrame { self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?; let finger_prints = if file_caching { - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] { let mut fps = Vec::with_capacity(8); collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena); @@ -663,7 +678,11 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "parquet")] pub fn sink_parquet(self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> { - self.sink_to_file(path, FileType::Parquet(options), "collect().write_parquet()") + self.sink_to_file( + path, + FileType::Parquet(options), + "collect().write_parquet()", + ) } /// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit @@ -687,12 +706,19 @@ impl LazyFrame { /// streaming fashion. #[cfg(feature = "json")] pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> { - self.sink_to_file(path, - FileType::Json(options), - "collect().write_ndjson()` or `collect().write_json()") + self.sink_to_file( + path, + FileType::Json(options), + "collect().write_ndjson()` or `collect().write_json()", + ) } - fn sink_to_file(mut self, path: PathBuf, file_type: FileType, msg_alternative: &str) -> Result<(), PolarsError> { + fn sink_to_file( + mut self, + path: PathBuf, + file_type: FileType, + msg_alternative: &str, + ) -> Result<(), PolarsError> { self.opt_state.streaming = true; self.logical_plan = LogicalPlan::FileSink { input: Box::new(self.logical_plan), diff --git a/crates/polars-lazy/src/physical_plan/mod.rs b/crates/polars-lazy/src/physical_plan/mod.rs index 0b9fb27420ffe..7c7f51f6a02e5 100644 --- a/crates/polars-lazy/src/physical_plan/mod.rs +++ b/crates/polars-lazy/src/physical_plan/mod.rs @@ -2,7 +2,12 @@ pub mod executors; #[cfg(any(feature = "list_eval", feature = "pivot"))] pub(crate) mod exotic; pub mod expressions; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" +))] mod file_cache; mod node_timer; pub mod planner; diff --git a/crates/polars-lazy/src/physical_plan/state.rs b/crates/polars-lazy/src/physical_plan/state.rs index 7267ea5270e19..f39b091703979 100644 --- a/crates/polars-lazy/src/physical_plan/state.rs +++ b/crates/polars-lazy/src/physical_plan/state.rs @@ -8,10 +8,20 @@ use polars_core::config::verbose; use polars_core::frame::group_by::GroupsProxy; use polars_core::frame::hash_join::ChunkJoinOptIds; use polars_core::prelude::*; -#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" +))] use polars_plan::logical_plan::FileFingerPrint; -#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" +))] use super::file_cache::FileCache; use crate::physical_plan::node_timer::NodeTimer; @@ -65,7 +75,12 @@ pub struct ExecutionState { // cached by a `.cache` call and kept in memory for the duration of the plan. df_cache: Arc>>>>, // cache file reads until all branches got there file, then we delete it - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] pub(crate) file_cache: FileCache, pub(super) schema_cache: RwLock>, /// Used by Window Expression to prevent redundant grouping @@ -110,7 +125,12 @@ impl ExecutionState { pub(super) fn split(&self) -> Self { Self { df_cache: self.df_cache.clone(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: self.file_cache.clone(), schema_cache: Default::default(), group_tuples: Default::default(), @@ -126,7 +146,12 @@ impl ExecutionState { pub(super) fn clone(&self) -> Self { Self { df_cache: self.df_cache.clone(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: self.file_cache.clone(), schema_cache: self.schema_cache.read().unwrap().clone().into(), group_tuples: self.group_tuples.clone(), @@ -142,12 +167,22 @@ impl ExecutionState { pub(crate) fn with_finger_prints(_finger_prints: Option) -> Self { Self::new() } - #[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "json"))] + #[cfg(any( + feature = "parquet", + feature = "csv", + feature = "ipc", + feature = "json" + ))] pub(crate) fn with_finger_prints(finger_prints: Option>) -> Self { Self { df_cache: Arc::new(Mutex::new(PlHashMap::default())), schema_cache: Default::default(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: FileCache::new(finger_prints), group_tuples: Arc::new(Mutex::new(PlHashMap::default())), join_tuples: Arc::new(Mutex::new(PlHashMap::default())), @@ -166,7 +201,12 @@ impl ExecutionState { Self { df_cache: Default::default(), schema_cache: Default::default(), - #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))] + #[cfg(any( + feature = "ipc", + feature = "parquet", + feature = "csv", + feature = "json" + ))] file_cache: FileCache::new(None), group_tuples: Default::default(), join_tuples: Default::default(), diff --git a/crates/polars-lazy/src/prelude.rs b/crates/polars-lazy/src/prelude.rs index 73949b9255abf..7253af4fd28cf 100644 --- a/crates/polars-lazy/src/prelude.rs +++ b/crates/polars-lazy/src/prelude.rs @@ -6,11 +6,11 @@ pub use polars_plan::logical_plan::{ pub use polars_plan::prelude::CsvWriterOptions; #[cfg(feature = "ipc")] pub use polars_plan::prelude::IpcWriterOptions; +#[cfg(feature = "json")] +pub use polars_plan::prelude::JsonWriterOptions; #[cfg(feature = "parquet")] pub use polars_plan::prelude::ParquetWriteOptions; pub(crate) use polars_plan::prelude::*; -#[cfg(feature = "json")] -pub use polars_plan::prelude::JsonWriterOptions; #[cfg(feature = "rolling_window")] pub use polars_time::{prelude::RollingOptions, Duration}; #[cfg(feature = "dynamic_group_by")] diff --git a/crates/polars-pipe/src/executors/sinks/file_sink.rs b/crates/polars-pipe/src/executors/sinks/file_sink.rs index 8fede903e107f..dd2254b1d4378 100644 --- a/crates/polars-pipe/src/executors/sinks/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/file_sink.rs @@ -5,20 +5,25 @@ use std::thread::JoinHandle; use crossbeam_channel::{bounded, Receiver, Sender}; use polars_core::prelude::*; use polars_io::csv::CsvWriter; +#[cfg(feature = "json")] +use polars_io::json::{BatchedWriter, JsonBatchedWriter, JsonFormat, JsonLinesBatchedWriter}; #[cfg(feature = "parquet")] use polars_io::parquet::ParquetWriter; #[cfg(feature = "ipc")] use polars_io::prelude::IpcWriter; #[cfg(feature = "ipc")] use polars_io::SerWriter; -#[cfg(feature = "json")] -use polars_io::json::{JsonFormat, JsonBatchedWriter, JsonLinesBatchedWriter, BatchedWriter}; use polars_plan::prelude::*; use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult}; use crate::pipeline::morsels_per_sink; -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] trait SinkWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()>; fn _finish(&mut self) -> PolarsResult<()>; @@ -60,7 +65,7 @@ impl SinkWriter for polars_io::csv::BatchedWriter { } #[cfg(feature = "json")] -impl SinkWriter for polars_io::json::JsonBatchedWriter { +impl SinkWriter for JsonBatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } @@ -72,7 +77,7 @@ impl SinkWriter for polars_io::json::JsonBatchedWriter { } #[cfg(feature = "json")] -impl SinkWriter for polars_io::json::JsonLinesBatchedWriter { +impl SinkWriter for JsonLinesBatchedWriter { fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { self.write_batch(df) } @@ -208,10 +213,12 @@ impl JsonSink { ) -> PolarsResult { let file = std::fs::File::create(path)?; let writer = match options.json_format { - JsonFormat::Json => - Box::new(JsonBatchedWriter::new(file)) as Box, - JsonFormat::JsonLines => + JsonFormat::Json => { + Box::new(JsonBatchedWriter::new(file)) as Box + }, + JsonFormat::JsonLines => { Box::new(JsonLinesBatchedWriter::new(file)) as Box + }, }; let morsels_per_sink = morsels_per_sink(); let backpressure = morsels_per_sink * 2; @@ -278,13 +285,23 @@ fn init_writer_thread( // Ensure the data is return in the order it was streamed #[derive(Clone)] -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] pub struct FilesSink { sender: Sender>, io_thread_handle: Arc>>, } -#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))] +#[cfg(any( + feature = "parquet", + feature = "ipc", + feature = "csv", + feature = "json" +))] impl Sink for FilesSink { fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult { // don't add empty dataframes diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 3c94cfab489cf..aef135c034913 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -148,8 +148,7 @@ where }, #[cfg(feature = "json")] FileType::Json(options) => { - Box::new(JsonSink::new(path, *options, input_schema.as_ref())?) - as Box + Box::new(JsonSink::new(path, *options, input_schema.as_ref())?) as Box }, FileType::Memory => { Box::new(OrderedSink::new(input_schema.into_owned())) as Box diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index a514ea3346c3c..bbec95c9a0af8 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -7,10 +7,10 @@ use polars_io::csv::SerializeOptions; use polars_io::csv::{CsvEncoding, NullValues}; #[cfg(feature = "ipc")] use polars_io::ipc::IpcCompression; -#[cfg(feature = "parquet")] -use polars_io::parquet::ParquetCompression; #[cfg(feature = "json")] use polars_io::json::JsonFormat; +#[cfg(feature = "parquet")] +use polars_io::parquet::ParquetCompression; use polars_io::RowCount; #[cfg(feature = "dynamic_group_by")] use polars_time::{DynamicGroupOptions, RollingGroupOptions}; @@ -94,7 +94,7 @@ pub struct JsonWriterOptions { /// format to use to write the DataFrame to JSON pub json_format: JsonFormat, /// maintain the order the data was processed - pub maintain_order: bool + pub maintain_order: bool, } #[derive(Clone, Debug, PartialEq)] @@ -331,7 +331,7 @@ pub enum FileType { Csv(CsvWriterOptions), #[cfg(feature = "json")] Json(JsonWriterOptions), - Memory + Memory, } #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index b9b496afccd1b..cea08e43f4aa8 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -87,12 +87,12 @@ ColumnNameOrSelector, CsvEncoding, CsvQuoteStyle, - JsonFormat, FillNullStrategy, FrameInitTypes, IntoExpr, JoinStrategy, JoinValidation, + JsonFormat, Orientation, ParallelStrategy, PolarsDataType, @@ -2127,17 +2127,17 @@ def sink_csv( ) def sink_json( - self, - path: str | Path, - *, - json_format: JsonFormat | None = None, - maintain_order: bool = True, - type_coercion: bool = True, - predicate_pushdown: bool = True, - projection_pushdown: bool = True, - simplify_expression: bool = True, - no_optimization: bool = False, - slice_pushdown: bool = True, + self, + path: str | Path, + *, + json_format: JsonFormat | None = None, + maintain_order: bool = True, + type_coercion: bool = True, + predicate_pushdown: bool = True, + projection_pushdown: bool = True, + simplify_expression: bool = True, + no_optimization: bool = False, + slice_pushdown: bool = True, ) -> DataFrame: """ Persists a LazyFrame at the provided path. @@ -2149,7 +2149,7 @@ def sink_json( path File path to which the file should be written. json_format : {'json', 'json_lines'} - Choose "json" for single JSON array containing each DataFrame row as an object. + Choose "json" for single JSON array containing each row as an object. Choose "json_lines" for each row output on a separate line. maintain_order Maintain the order in which data is processed. @@ -2171,10 +2171,6 @@ def sink_json( ------- DataFrame - Notes - ----- - json_format parameter is currently not supported. - Examples -------- >>> lf = pl.scan_csv("/path/to/my_larger_than_ram_file.csv") # doctest: +SKIP @@ -2197,9 +2193,7 @@ def sink_json( streaming=True, ) return lf.sink_json( - path=path, - json_format=json_format, - maintain_order=maintain_order + path=path, json_format=json_format, maintain_order=maintain_order ) @deprecate_renamed_parameter( diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index eccda56aa5368..3392b20f1df92 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -11,9 +11,9 @@ use polars::lazy::frame::LazyCsvReader; use polars::lazy::frame::LazyJsonLineReader; use polars::lazy::frame::{AllowedOptimizations, LazyFrame}; use polars::lazy::prelude::col; -use polars::prelude::{ClosedWindow, CsvEncoding, Field, JoinType, Schema}; #[cfg(feature = "json")] use polars::prelude::JsonFormat; +use polars::prelude::{ClosedWindow, CsvEncoding, Field, JoinType, Schema}; use polars::time::*; use polars_core::cloud; use polars_core::frame::explode::MeltArgs; @@ -596,10 +596,10 @@ impl PyLazyFrame { py: Python, path: PathBuf, json_format: Option>, - maintain_order: bool + maintain_order: bool, ) -> PyResult<()> { let options = JsonWriterOptions { - json_format: json_format.map(|c| c.0).unwrap_or(JsonFormat::default()), + json_format: json_format.map(|c| c.0).unwrap_or_default(), maintain_order, }; diff --git a/py-polars/tests/unit/io/test_lazy_json.py b/py-polars/tests/unit/io/test_lazy_json.py index 37a77daf3c996..e2195cf768153 100644 --- a/py-polars/tests/unit/io/test_lazy_json.py +++ b/py-polars/tests/unit/io/test_lazy_json.py @@ -116,7 +116,9 @@ def test_sink_json_should_write_same_data(io_files_path: Path, tmp_path: Path) - assert_frame_equal(df, expected) -def test_sink_json_should_write_same_data_with_json_argument(io_files_path: Path, tmp_path: Path) -> None: +def test_sink_json_should_write_same_data_with_json_argument( + io_files_path: Path, tmp_path: Path +) -> None: tmp_path.mkdir(exist_ok=True) # Arrange source_path = io_files_path / "foods1.csv" @@ -125,13 +127,15 @@ def test_sink_json_should_write_same_data_with_json_argument(io_files_path: Path expected = pl.read_csv(source_path) lf = pl.scan_csv(source_path) # Act - lf.sink_json(target_path, json_format="json") + lf.sink_json(target_path, json_format="json") df = pl.read_json(target_path) # Assert assert_frame_equal(df, expected) -def test_sink_json_should_write_same_data_with_json_lines_argument(io_files_path: Path, tmp_path: Path) -> None: +def test_sink_json_should_write_same_data_with_json_lines_argument( + io_files_path: Path, tmp_path: Path +) -> None: tmp_path.mkdir(exist_ok=True) # Arrange source_path = io_files_path / "foods1.csv" @@ -140,13 +144,15 @@ def test_sink_json_should_write_same_data_with_json_lines_argument(io_files_path expected = pl.read_csv(source_path) lf = pl.scan_csv(source_path) # Act - lf.sink_json(target_path, json_format="json_lines") + lf.sink_json(target_path, json_format="json_lines") df = pl.read_ndjson(target_path) -# Assert + # Assert assert_frame_equal(df, expected) -def test_sink_json_should_raise_exception_with_invalid_argument(io_files_path: Path, tmp_path: Path) -> None: +def test_sink_json_should_raise_exception_with_invalid_argument( + io_files_path: Path, tmp_path: Path +) -> None: tmp_path.mkdir(exist_ok=True) # Arrange source_path = io_files_path / "foods1.csv" @@ -155,5 +161,4 @@ def test_sink_json_should_raise_exception_with_invalid_argument(io_files_path: P lf = pl.scan_csv(source_path) # Act & Assert with pytest.raises(ValueError): - lf.sink_json(target_path, json_format="invalid_argument") - + lf.sink_json(target_path, json_format="invalid_argument") # type: ignore[arg-type]