Skip to content

Commit

Permalink
style: fix code formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
fernandocast committed Sep 6, 2023
1 parent c574b9e commit 878ef5d
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 86 deletions.
49 changes: 29 additions & 20 deletions crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_core::utils::try_get_supertype;
use polars_json::json::infer;
use simd_json::BorrowedValue;

#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use simd_json::BorrowedValue;

use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::prelude::*;


/// The format to use to write the DataFrame to JSON: `Json` (a JSON array) or `JsonLines` (each row output on a
/// separate line). In either case, each row is serialized as a JSON object whose keys are the column names and whose
/// values are the row's corresponding values.
Expand Down Expand Up @@ -120,12 +118,14 @@ pub struct JsonWriter<W: Write> {
json_format: JsonFormat,
}

impl<W: Write> JsonWriter<W> where W: Write + 'static {
impl<W: Write> JsonWriter<W>
where
W: Write + 'static,
{
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
self.json_format = format;
self
}

}

impl<W> SerWriter<W> for JsonWriter<W>
Expand Down Expand Up @@ -169,21 +169,27 @@ pub trait BatchedWriter<W: Write> {
fn finish(&mut self) -> PolarsResult<()>;
}

pub struct JsonBatchedWriter<W: Write>{
pub struct JsonBatchedWriter<W: Write> {
writer: W,
is_first_row: bool
is_first_row: bool,
}

impl <W> JsonBatchedWriter<W> where W: Write {
pub fn new(writer: W) -> Self{
impl<W> JsonBatchedWriter<W>
where
W: Write,
{
pub fn new(writer: W) -> Self {
JsonBatchedWriter {
writer,
is_first_row: true
is_first_row: true,
}
}
}

impl <W> BatchedWriter<W> for JsonBatchedWriter<W> where W: Write {
impl<W> BatchedWriter<W> for JsonBatchedWriter<W>
where
W: Write,
{
/// Write a batch to the json writer.
///
/// # Panics
Expand All @@ -193,8 +199,7 @@ impl <W> BatchedWriter<W> for JsonBatchedWriter<W> where W: Write {
let batches = df
.iter_chunks()
.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
let mut serializer =
json::write::Serializer::new(batches, vec![]);
let mut serializer = json::write::Serializer::new(batches, vec![]);

while let Some(block) = serializer.next()? {
if self.is_first_row {
Expand All @@ -216,18 +221,22 @@ impl <W> BatchedWriter<W> for JsonBatchedWriter<W> where W: Write {
}

pub struct JsonLinesBatchedWriter<W: Write> {
writer: W
writer: W,
}

impl <W> JsonLinesBatchedWriter<W> where W: Write {
pub fn new(writer: W) -> Self{
JsonLinesBatchedWriter {
writer
}
impl<W> JsonLinesBatchedWriter<W>
where
W: Write,
{
pub fn new(writer: W) -> Self {
JsonLinesBatchedWriter { writer }
}
}

impl <W> BatchedWriter<W> for JsonLinesBatchedWriter<W> where W: Write {
impl<W> BatchedWriter<W> for JsonLinesBatchedWriter<W>
where
W: Write,
{
/// Write a batch to the json writer.
///
/// # Panics
Expand Down
42 changes: 34 additions & 8 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ mod file_list_reader;
pub mod pivot;

use std::borrow::Cow;
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "parquet",
feature = "ipc",
feature = "csv",
feature = "json"
))]
use std::path::PathBuf;
use std::sync::Arc;

Expand All @@ -38,7 +43,12 @@ use polars_core::prelude::*;
use polars_io::RowCount;
pub use polars_plan::frame::{AllowedOptimizations, OptState};
use polars_plan::global::FETCH_ROWS;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
use polars_plan::logical_plan::collect_fingerprints;
use polars_plan::logical_plan::optimize;
use polars_plan::utils::expr_to_leaf_column_names;
Expand Down Expand Up @@ -589,7 +599,12 @@ impl LazyFrame {
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;

let finger_prints = if file_caching {
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
{
let mut fps = Vec::with_capacity(8);
collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena);
Expand Down Expand Up @@ -663,7 +678,11 @@ impl LazyFrame {
/// streaming fashion.
#[cfg(feature = "parquet")]
pub fn sink_parquet(self, path: PathBuf, options: ParquetWriteOptions) -> PolarsResult<()> {
self.sink_to_file(path, FileType::Parquet(options), "collect().write_parquet()")
self.sink_to_file(
path,
FileType::Parquet(options),
"collect().write_parquet()",
)
}

/// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
Expand All @@ -687,12 +706,19 @@ impl LazyFrame {
/// streaming fashion.
#[cfg(feature = "json")]
pub fn sink_json(self, path: PathBuf, options: JsonWriterOptions) -> PolarsResult<()> {
self.sink_to_file(path,
FileType::Json(options),
"collect().write_ndjson()` or `collect().write_json()")
self.sink_to_file(
path,
FileType::Json(options),
"collect().write_ndjson()` or `collect().write_json()",
)
}

fn sink_to_file(mut self, path: PathBuf, file_type: FileType, msg_alternative: &str) -> Result<(), PolarsError> {
fn sink_to_file(
mut self,
path: PathBuf,
file_type: FileType,
msg_alternative: &str,
) -> Result<(), PolarsError> {
self.opt_state.streaming = true;
self.logical_plan = LogicalPlan::FileSink {
input: Box::new(self.logical_plan),
Expand Down
7 changes: 6 additions & 1 deletion crates/polars-lazy/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ pub mod executors;
#[cfg(any(feature = "list_eval", feature = "pivot"))]
pub(crate) mod exotic;
pub mod expressions;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
mod file_cache;
mod node_timer;
pub mod planner;
Expand Down
56 changes: 48 additions & 8 deletions crates/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,20 @@ use polars_core::config::verbose;
use polars_core::frame::group_by::GroupsProxy;
use polars_core::frame::hash_join::ChunkJoinOptIds;
use polars_core::prelude::*;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "json"))]
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
use polars_plan::logical_plan::FileFingerPrint;

#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
use super::file_cache::FileCache;
use crate::physical_plan::node_timer::NodeTimer;

Expand Down Expand Up @@ -65,7 +75,12 @@ pub struct ExecutionState {
// cached by a `.cache` call and kept in memory for the duration of the plan.
df_cache: Arc<Mutex<PlHashMap<usize, Arc<OnceCell<DataFrame>>>>>,
// cache file reads until all branches got there file, then we delete it
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
pub(crate) file_cache: FileCache,
pub(super) schema_cache: RwLock<Option<SchemaRef>>,
/// Used by Window Expression to prevent redundant grouping
Expand Down Expand Up @@ -110,7 +125,12 @@ impl ExecutionState {
pub(super) fn split(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: Default::default(),
group_tuples: Default::default(),
Expand All @@ -126,7 +146,12 @@ impl ExecutionState {
pub(super) fn clone(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: self.schema_cache.read().unwrap().clone().into(),
group_tuples: self.group_tuples.clone(),
Expand All @@ -142,12 +167,22 @@ impl ExecutionState {
pub(crate) fn with_finger_prints(_finger_prints: Option<usize>) -> Self {
Self::new()
}
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "json"))]
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
pub(crate) fn with_finger_prints(finger_prints: Option<Vec<FileFingerPrint>>) -> Self {
Self {
df_cache: Arc::new(Mutex::new(PlHashMap::default())),
schema_cache: Default::default(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: FileCache::new(finger_prints),
group_tuples: Arc::new(Mutex::new(PlHashMap::default())),
join_tuples: Arc::new(Mutex::new(PlHashMap::default())),
Expand All @@ -166,7 +201,12 @@ impl ExecutionState {
Self {
df_cache: Default::default(),
schema_cache: Default::default(),
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "json"))]
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: FileCache::new(None),
group_tuples: Default::default(),
join_tuples: Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ pub use polars_plan::logical_plan::{
pub use polars_plan::prelude::CsvWriterOptions;
#[cfg(feature = "ipc")]
pub use polars_plan::prelude::IpcWriterOptions;
#[cfg(feature = "json")]
pub use polars_plan::prelude::JsonWriterOptions;
#[cfg(feature = "parquet")]
pub use polars_plan::prelude::ParquetWriteOptions;
pub(crate) use polars_plan::prelude::*;
#[cfg(feature = "json")]
pub use polars_plan::prelude::JsonWriterOptions;
#[cfg(feature = "rolling_window")]
pub use polars_time::{prelude::RollingOptions, Duration};
#[cfg(feature = "dynamic_group_by")]
Expand Down
Loading

0 comments on commit 878ef5d

Please sign in to comment.