Skip to content

Commit

Permalink
perf: Fix pathological small chunk parquet writing (#16433)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored May 23, 2024
1 parent 462bc8b commit 717277e
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 11 deletions.
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl TryFrom<(RecordBatch, &[ArrowField])> for DataFrame {
}

impl DataFrame {
pub fn split_chunks(mut self) -> impl Iterator<Item = DataFrame> {
pub fn split_chunks(&mut self) -> impl Iterator<Item = DataFrame> + '_ {
self.align_chunks();

(0..self.n_chunks()).map(move |i| unsafe {
Expand Down
51 changes: 48 additions & 3 deletions crates/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,56 @@ pub(crate) fn chunk_df_for_writing(
// ensures all chunks are aligned.
df.align_chunks();

// Accumulate many small chunks to the row group size.
// See: #16403
if !df.get_columns().is_empty()
&& df.get_columns()[0]
.chunk_lengths()
.take(5)
.all(|len| len < row_group_size)
{
fn finish(scratch: &mut Vec<DataFrame>, new_chunks: &mut Vec<DataFrame>) {
let mut new = accumulate_dataframes_vertical_unchecked(scratch.drain(..));
new.as_single_chunk_par();
new_chunks.push(new);
}

let mut new_chunks = Vec::with_capacity(df.n_chunks()); // upper limit;
let mut scratch = vec![];
let mut remaining = row_group_size;

for df in df.split_chunks() {
remaining = remaining.saturating_sub(df.height());
scratch.push(df);

if remaining == 0 {
remaining = row_group_size;
finish(&mut scratch, &mut new_chunks);
}
}
if !scratch.is_empty() {
finish(&mut scratch, &mut new_chunks);
}
return Ok(Cow::Owned(accumulate_dataframes_vertical_unchecked(
new_chunks,
)));
}

let n_splits = df.height() / row_group_size;
let result = if n_splits > 0 {
Cow::Owned(accumulate_dataframes_vertical_unchecked(split_df_as_ref(
df, n_splits, false,
)))
let mut splits = split_df_as_ref(df, n_splits, false);

for df in splits.iter_mut() {
// If the chunks are small enough, writing many small chunks
// leads to slow writing performance, so in that case we
// merge them.
let n_chunks = df.n_chunks();
if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
df.as_single_chunk_par();
}
}

Cow::Owned(accumulate_dataframes_vertical_unchecked(splits))
} else {
Cow::Borrowed(df)
};
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl FilterExec {

fn execute_impl(
&mut self,
df: DataFrame,
mut df: DataFrame,
state: &mut ExecutionState,
) -> PolarsResult<DataFrame> {
let n_partitions = POOL.current_num_threads();
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl CsvExec {
for i in 0..self.paths.len() {
let path = &self.paths[i];

let df = options_base
let mut df = options_base
.clone()
.with_row_index(self.file_options.row_index.clone().map(|mut ri| {
ri.offset += n_rows_read as IdxSize;
Expand Down
2 changes: 1 addition & 1 deletion py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def test_parquet_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> Non
assert df.n_chunks("all") == [4, 4]

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)
df.write_parquet(file_path, statistics=True, use_pyarrow=False, row_group_size=50)

for pred in [
pl.col("idx") < 50,
Expand Down
5 changes: 1 addition & 4 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,11 @@ def small_parquet_path(io_files_path: Path) -> Path:
def test_to_from_buffer(
df: pl.DataFrame, compression: ParquetCompression, use_pyarrow: bool
) -> None:
print(df)
df = df[["list_str"]]
print(df)
buf = io.BytesIO()
df.write_parquet(buf, compression=compression, use_pyarrow=use_pyarrow)
buf.seek(0)
read_df = pl.read_parquet(buf, use_pyarrow=use_pyarrow)
print(read_df)
assert_frame_equal(df, read_df, categorical_as_str=True)


Expand All @@ -113,7 +110,7 @@ def test_read_parquet_respects_rechunk_16416(
df = pl.DataFrame({"a": [1]})
df = pl.concat([df, df, df])
buf = io.BytesIO()
df.write_parquet(buf)
df.write_parquet(buf, row_group_size=1)
buf.seek(0)

rechunk, expected_chunks = rechunk_and_expected_chunks
Expand Down

0 comments on commit 717277e

Please sign in to comment.