Skip to content

Commit

Permalink
fix(rust,python): ensure projections containing only hive columns are…
Browse files Browse the repository at this point in the history
… projected (#11803)
  • Loading branch information
nameexhaustion authored Oct 17, 2023
1 parent d85c452 commit 8d29d3c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -499,15 +499,13 @@ where
D: NestedDecoder<'a>,
{
// front[a1, a2, a3, ...]back
if items.len() > 1 {
return MaybeNext::Some(Ok(items.pop_front().unwrap()));
if *remaining == 0 && items.is_empty() {
return MaybeNext::None;
}
if *remaining == 0 {
return match items.pop_front() {
Some(decoded) => MaybeNext::Some(Ok(decoded)),
None => MaybeNext::None,
};
if !items.is_empty() && items.front().unwrap().0.len() > chunk_size.unwrap_or(usize::MAX) {
return MaybeNext::Some(Ok(items.pop_front().unwrap()));
}

match iter.next() {
Err(e) => MaybeNext::Some(Err(e.into())),
Ok(None) => {
Expand Down Expand Up @@ -541,7 +539,8 @@ where
Err(e) => return MaybeNext::Some(Err(e)),
};

if (items.len() == 1)
// if possible, return the value immediately.
if !items.is_empty()
&& items.front().unwrap().0.len() > chunk_size.unwrap_or(usize::MAX)
{
MaybeNext::Some(Ok(items.pop_front().unwrap()))
Expand Down
49 changes: 34 additions & 15 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ pub(super) fn array_iter_to_series(
}

/// Materializes hive partitions.
fn materialize_hive_partitions(df: &mut DataFrame, hive_partition_columns: Option<&[Series]>) {
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
/// Safety: num_rows equals the height of the df when the df height is non-zero.
fn materialize_hive_partitions(
df: &mut DataFrame,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
) {
if let Some(hive_columns) = hive_partition_columns {
let num_rows = df.height();

for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
}
Expand Down Expand Up @@ -191,6 +196,7 @@ fn rg_to_dfs_optionally_par_over_columns(
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

let projection_height = (*remaining_rows).min(md.num_rows());
let chunk_size = md.num_rows();
let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
Expand All @@ -200,7 +206,7 @@ fn rg_to_dfs_optionally_par_over_columns(
column_idx_to_series(
*column_i,
md,
*remaining_rows,
projection_height,
schema,
store,
chunk_size,
Expand All @@ -212,20 +218,26 @@ fn rg_to_dfs_optionally_par_over_columns(
projection
.iter()
.map(|column_i| {
column_idx_to_series(*column_i, md, *remaining_rows, schema, store, chunk_size)
column_idx_to_series(
*column_i,
md,
projection_height,
schema,
store,
chunk_size,
)
})
.collect::<PolarsResult<Vec<_>>>()?
};

*remaining_rows =
remaining_rows.saturating_sub(file_metadata.row_groups[rg_idx].num_rows());
*remaining_rows -= projection_height;

let mut df = DataFrame::new_no_checks(columns);
if let Some(rc) = &row_count {
df.with_row_count_mut(&rc.name, Some(*previous_row_count + rc.offset));
}
materialize_hive_partitions(&mut df, hive_partition_columns);

materialize_hive_partitions(&mut df, hive_partition_columns, projection_height);
apply_predicate(&mut df, predicate, true)?;

*previous_row_count += current_row_count;
Expand Down Expand Up @@ -265,17 +277,17 @@ fn rg_to_dfs_par_over_rg(
let row_count_start = *previous_row_count;
let num_rows = rg_md.num_rows();
*previous_row_count += num_rows as IdxSize;
let local_limit = *remaining_rows;
*remaining_rows = remaining_rows.saturating_sub(num_rows);
let projection_height = (*remaining_rows).min(num_rows);
*remaining_rows -= projection_height;

(rg_idx, rg_md, local_limit, row_count_start)
(rg_idx, rg_md, projection_height, row_count_start)
})
.collect::<Vec<_>>();

let dfs = row_groups
.into_par_iter()
.map(|(rg_idx, md, local_limit, row_count_start)| {
if local_limit == 0
.map(|(rg_idx, md, projection_height, row_count_start)| {
if projection_height == 0
|| use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
Expand All @@ -291,7 +303,14 @@ fn rg_to_dfs_par_over_rg(
let columns = projection
.iter()
.map(|column_i| {
column_idx_to_series(*column_i, md, local_limit, schema, store, chunk_size)
column_idx_to_series(
*column_i,
md,
projection_height,
schema,
store,
chunk_size,
)
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand All @@ -300,8 +319,8 @@ fn rg_to_dfs_par_over_rg(
if let Some(rc) = &row_count {
df.with_row_count_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset));
}
materialize_hive_partitions(&mut df, hive_partition_columns);

materialize_hive_partitions(&mut df, hive_partition_columns, projection_height);
apply_predicate(&mut df, predicate, false)?;

Ok(Some(df))
Expand Down
12 changes: 12 additions & 0 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,15 @@ def test_hive_partitioned_projection_pushdown(
columns = ["sugars_g", "category"]
for streaming in [True, False]:
assert q.select(columns).collect(streaming=streaming).columns == columns

# test that hive partition columns are projected with the correct height when
# the projection contains only hive partition columns (11796)
for parallel in ("row_groups", "columns"):
q = pl.scan_parquet(
root / "**/*.parquet", hive_partitioning=True, parallel=parallel # type: ignore[arg-type]
)

expect = q.collect().select("category")
actual = q.select("category").collect()

assert expect.frame_equal(actual)

0 comments on commit 8d29d3c

Please sign in to comment.