Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement LazyFrame.sink_ndjson #10786

Merged
merged 26 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6a79d0c
add json options
fernandocast Aug 25, 2023
f7ed446
Add API changes
fernandocast Aug 25, 2023
df1001e
feat: adding batched json writer
abealcantara Aug 25, 2023
7c7da80
integration and fixes
abealcantara Aug 29, 2023
8ecf929
small fixes
abealcantara Aug 29, 2023
2d46937
small fix
abealcantara Aug 29, 2023
19ca57d
refactor: new json batched writers
abealcantara Aug 31, 2023
c853e54
add py-polars unit testing
fernandocast Sep 1, 2023
dc7ed34
refactor: removing duplicated code for sink_file
abealcantara Sep 4, 2023
2311d32
add unit testing for sink_json method
fernandocast Sep 4, 2023
48b833a
style: fix code formatting
fernandocast Sep 5, 2023
89b5de5
fix: unit tests
abealcantara Sep 7, 2023
7b3d901
style: fix code formatting
abealcantara Sep 8, 2023
e1610cf
fix: rust lint
abealcantara Sep 8, 2023
4546a7d
fix: feature check
abealcantara Sep 8, 2023
ee1b225
fix: using polars_json and formatting
abealcantara Sep 19, 2023
6fb40ce
feature: add code style
fernandocast Sep 24, 2023
f22727e
add json sink features
fernandocast Sep 26, 2023
38588e3
refactor fixes
abealcantara Sep 27, 2023
935c58b
fix: replace match with if structure, for single_match
fernandocast Sep 27, 2023
da07fd8
fix: update style with pre-commit
fernandocast Sep 27, 2023
4c95a5f
reverting changes in dependencies for ipc and csv
abealcantara Sep 29, 2023
d02b9b2
fix: removing json support
fernandocast Nov 6, 2023
622899f
feat: refactoring sink method, removing deplicated code
fernandocast Nov 6, 2023
3fc1786
fix: applying style format to mod.rs
fernandocast Nov 6, 2023
47745b4
add feature label
fernandocast Nov 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions crates/polars-io/src/json/mod.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have the JsonFormat enum, I think it may be easier to use if the batched writers were combined & accepted a JsonFormat as an input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it, could you check again?

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_core::utils::try_get_supertype;
use polars_json::json::infer;
use polars_json::json::write::FallibleStreamingIterator;
use simd_json::BorrowedValue;

use crate::mmap::{MmapBytesReader, ReaderBytes};
Expand Down Expand Up @@ -153,6 +154,34 @@ where
}
}

pub struct BatchedWriter<W: Write> {
writer: W,
}

impl<W> BatchedWriter<W>
where
W: Write,
{
pub fn new(writer: W) -> Self {
BatchedWriter { writer }
}
/// Write a batch to the json writer.
///
/// # Panics
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
let fields = df.iter().map(|s| s.field().to_arrow()).collect::<Vec<_>>();
let chunks = df.iter_chunks();
let batches =
chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
while let Some(block) = serializer.next()? {
self.writer.write_all(block)?;
}
Ok(())
}
}

/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
#[must_use]
pub struct JsonReader<'a, R>
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async = [
cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio", "futures"]
cloud_write = ["cloud"]
ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe?/ipc"]
json = ["polars-io/json", "polars-plan/json", "polars-json"]
json = ["polars-io/json", "polars-plan/json", "polars-json", "polars-pipe/json"]
csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe?/csv"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time", "dtype-duration", "polars-plan/temporal"]
# debugging purposes
Expand Down
126 changes: 74 additions & 52 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ mod err;
pub mod pivot;

use std::borrow::Cow;
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
#[cfg(any(
feature = "parquet",
feature = "ipc",
feature = "csv",
feature = "json"
))]
use std::path::PathBuf;
use std::sync::Arc;

Expand All @@ -27,7 +32,12 @@ use polars_core::prelude::*;
use polars_io::RowCount;
pub use polars_plan::frame::{AllowedOptimizations, OptState};
use polars_plan::global::FETCH_ROWS;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
use polars_plan::logical_plan::collect_fingerprints;
use polars_plan::logical_plan::optimize;
use polars_plan::utils::expr_output_name;
Expand Down Expand Up @@ -596,13 +606,23 @@ impl LazyFrame {
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;

let finger_prints = if file_caching {
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
{
let mut fps = Vec::with_capacity(8);
collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena);
Some(fps)
}
#[cfg(not(any(feature = "ipc", feature = "parquet", feature = "csv")))]
#[cfg(not(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
)))]
{
None
}
Expand Down Expand Up @@ -669,23 +689,14 @@ impl LazyFrame {
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(feature = "parquet")]
pub fn sink_parquet(mut self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::File {
pub fn sink_parquet(self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path),
file_type: FileType::Parquet(options),
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order; \
use `collect().write_parquet()` instead"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
"collect().write_parquet()",
)
}

/// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit
Expand All @@ -694,70 +705,81 @@ impl LazyFrame {
/// streaming fashion.
#[cfg(all(feature = "cloud_write", feature = "parquet"))]
pub fn sink_parquet_cloud(
mut self,
self,
uri: String,
cloud_options: Option<polars_io::cloud::CloudOptions>,
parquet_options: ParquetWriteOptions,
) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::Cloud {
self.sink(
SinkType::Cloud {
uri: Arc::new(uri),
cloud_options,
file_type: FileType::Parquet(parquet_options),
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
"collect().write_parquet()",
)
}

/// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(feature = "ipc")]
pub fn sink_ipc(mut self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::File {
pub fn sink_ipc(self, path: PathBuf, options: IpcWriterOptions) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path),
file_type: FileType::Ipc(options),
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order; \
use `collect().write_ipc()` instead"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
"collect().write_ipc()",
)
}

/// Stream a query result into an csv file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(feature = "csv")]
pub fn sink_csv(mut self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload: SinkType::File {
pub fn sink_csv(self, path: PathBuf, options: CsvWriterOptions) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path),
file_type: FileType::Csv(options),
},
"collect().write_csv()",
)
}

/// Stream a query result into a json file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(feature = "json")]
stinodego marked this conversation as resolved.
Show resolved Hide resolved
pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path),
file_type: FileType::Json(options),
},
"collect().write_ndjson()` or `collect().write_json()",
)
}

#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "cloud_write",
feature = "csv",
feature = "json",
))]
fn sink(mut self, payload: SinkType, msg_alternative: &str) -> Result<(), PolarsError> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::Sink {
input: Box::new(self.logical_plan),
payload,
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order; \
use `collect().write_csv()` instead"
ComputeError: format!("cannot run the whole query in a streaming order; \
use `{msg_alternative}` instead", msg_alternative=msg_alternative)
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
Expand Down
7 changes: 6 additions & 1 deletion crates/polars-lazy/src/physical_plan/file_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::sync::Mutex;

use polars_core::prelude::*;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
use polars_plan::logical_plan::FileFingerPrint;

use crate::prelude::*;
Expand Down
7 changes: 6 additions & 1 deletion crates/polars-lazy/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ pub mod executors;
#[cfg(any(feature = "list_eval", feature = "pivot"))]
pub(crate) mod exotic;
pub mod expressions;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
mod file_cache;
mod node_timer;
pub mod planner;
Expand Down
63 changes: 54 additions & 9 deletions crates/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,20 @@ use polars_core::config::verbose;
use polars_core::frame::group_by::GroupsProxy;
use polars_core::prelude::*;
use polars_ops::prelude::ChunkJoinOptIds;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
use polars_plan::logical_plan::FileFingerPrint;

#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
use super::file_cache::FileCache;
use crate::physical_plan::node_timer::NodeTimer;

Expand Down Expand Up @@ -65,7 +75,12 @@ pub struct ExecutionState {
// cached by a `.cache` call and kept in memory for the duration of the plan.
df_cache: Arc<Mutex<PlHashMap<usize, Arc<OnceCell<DataFrame>>>>>,
// cache file reads until all branches got there file, then we delete it
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
pub(crate) file_cache: FileCache,
pub(super) schema_cache: RwLock<Option<SchemaRef>>,
/// Used by Window Expression to prevent redundant grouping
Expand Down Expand Up @@ -110,7 +125,12 @@ impl ExecutionState {
pub(super) fn split(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: Default::default(),
group_tuples: Default::default(),
Expand All @@ -126,7 +146,12 @@ impl ExecutionState {
pub(super) fn clone(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: self.schema_cache.read().unwrap().clone().into(),
group_tuples: self.group_tuples.clone(),
Expand All @@ -138,16 +163,31 @@ impl ExecutionState {
}
}

#[cfg(not(any(feature = "parquet", feature = "csv", feature = "ipc")))]
#[cfg(not(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
)))]
pub(crate) fn with_finger_prints(_finger_prints: Option<usize>) -> Self {
Self::new()
}
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
pub(crate) fn with_finger_prints(finger_prints: Option<Vec<FileFingerPrint>>) -> Self {
Self {
df_cache: Arc::new(Mutex::new(PlHashMap::default())),
schema_cache: Default::default(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: FileCache::new(finger_prints),
group_tuples: Arc::new(RwLock::new(PlHashMap::default())),
join_tuples: Arc::new(Mutex::new(PlHashMap::default())),
Expand All @@ -166,7 +206,12 @@ impl ExecutionState {
Self {
df_cache: Default::default(),
schema_cache: Default::default(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: FileCache::new(None),
group_tuples: Default::default(),
join_tuples: Default::default(),
Expand Down
Loading