Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Only load Parquet statistics for live columns #20617

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,26 @@ impl FetchRowGroupsFromObjectStore {
.collect()
});

let live_schema = predicate.as_ref().map_or_else(Schema::default, |p| {
let mut live_columns = PlIndexSet::new();
p.collect_live_columns(&mut live_columns);
live_columns
.iter()
.map(|c| Field::from(schema.get(c).unwrap()))
.collect()
});

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();

let mut row_groups = if let Some(pred) = predicate.as_deref() {
row_group_range
.filter_map(|i| {
let rg = &row_groups[i];

let should_be_read =
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));
let should_be_read = matches!(
read_this_row_group(Some(pred), rg, &schema, &live_schema),
Ok(true)
);

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
Expand Down
32 changes: 20 additions & 12 deletions crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::borrow::Cow;

use polars_core::config;
use polars_core::prelude::*;
use polars_parquet::read::statistics::{deserialize, Statistics};
Expand All @@ -17,30 +19,35 @@ impl ColumnStats {
}

/// Collect the statistics in a row-group
pub(crate) fn collect_statistics(
pub(crate) fn collect_statistics<'a>(
md: &RowGroupMetadata,
schema: &ArrowSchema,
) -> PolarsResult<Option<BatchStats>> {
live_schema: &'a Schema,
) -> PolarsResult<Option<BatchStats<'a>>> {
// TODO! fix this performance. This is a full sequential scan.
let stats = schema
.iter_values()
let stats = live_schema
.iter_fields()
.map(|field| {
if field.dtype().is_nested() {
return Ok(ColumnStats::new(field.clone(), None, None, None));
}

let arrow_field = schema.get(&field.name).unwrap();
let iter = md.columns_under_root_iter(&field.name).unwrap();

Ok(if iter.len() == 0 {
ColumnStats::new(field.into(), None, None, None)
} else {
ColumnStats::from_arrow_stats(deserialize(field, iter)?, field)
})
Ok(ColumnStats::from_arrow_stats(
deserialize(arrow_field, iter)?,
arrow_field,
))
})
.collect::<PolarsResult<Vec<_>>>()?;
.collect::<PolarsResult<Cow<_>>>()?;

if stats.is_empty() {
return Ok(None);
}

Ok(Some(BatchStats::new(
Arc::new(Schema::from_arrow_schema(schema)),
live_schema,
stats,
Some(md.num_rows()),
)))
Expand All @@ -50,6 +57,7 @@ pub fn read_this_row_group(
predicate: Option<&dyn PhysicalIoExpr>,
md: &RowGroupMetadata,
schema: &ArrowSchema,
live_schema: &Schema,
) -> PolarsResult<bool> {
if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
return Ok(true);
Expand All @@ -59,7 +67,7 @@ pub fn read_this_row_group(

if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md, schema)? {
if let Some(stats) = collect_statistics(md, schema, live_schema)? {
let pred_result = pred.should_read(&stats);

// a parquet file may not have statistics of all columns
Expand Down
38 changes: 28 additions & 10 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,26 @@ fn rg_to_dfs(

use ParallelStrategy as S;

let live_schema = predicate.map_or_else(Schema::default, |p| {
let mut live_columns = PlIndexSet::new();
p.collect_live_columns(&mut live_columns);
live_columns
.iter()
.map(|c| Field::from(schema.get(c).unwrap()))
.collect()
});

if parallel == S::Prefiltered {
if let Some(predicate) = predicate {
let mut live_columns = PlIndexSet::new();
predicate.collect_live_columns(&mut live_columns);
if !live_columns.is_empty() {
if !live_schema.is_empty() {
return rg_to_dfs_prefiltered(
store,
previous_row_count,
row_group_start,
row_group_end,
file_metadata,
schema,
live_columns,
live_schema,
predicate,
row_index,
projection,
Expand All @@ -198,6 +205,7 @@ fn rg_to_dfs(
slice,
file_metadata,
schema,
live_schema,
predicate,
row_index,
parallel,
Expand All @@ -213,6 +221,7 @@ fn rg_to_dfs(
slice,
file_metadata,
schema,
live_schema,
predicate,
row_index,
projection,
Expand Down Expand Up @@ -242,7 +251,7 @@ fn rg_to_dfs_prefiltered(
row_group_end: usize,
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
live_columns: PlIndexSet<PlSmallStr>,
live_schema: Schema,
predicate: &dyn PhysicalIoExpr,
row_index: Option<RowIndex>,
projection: &[usize],
Expand Down Expand Up @@ -270,7 +279,7 @@ fn rg_to_dfs_prefiltered(
};

// Get the number of live columns
let num_live_columns = live_columns.len();
let num_live_columns = live_schema.len();
let num_dead_columns =
projection.len() + hive_partition_columns.map_or(0, |x| x.len()) - num_live_columns;

Expand All @@ -286,7 +295,7 @@ fn rg_to_dfs_prefiltered(
for &i in projection.iter() {
let name = schema.get_at_index(i).unwrap().0.as_str();

if live_columns.contains(name) {
if live_schema.contains(name) {
live_idx_to_col_idx.push(i);
} else {
dead_idx_to_col_idx.push(i);
Expand All @@ -306,7 +315,7 @@ fn rg_to_dfs_prefiltered(
let md = &file_metadata.row_groups[rg_idx];

if use_statistics {
match read_this_row_group(Some(predicate), md, schema) {
match read_this_row_group(Some(predicate), md, schema, &live_schema) {
Ok(false) => return Ok(None),
Ok(true) => {},
Err(e) => return Err(e),
Expand Down Expand Up @@ -540,6 +549,7 @@ fn rg_to_dfs_optionally_par_over_columns(
slice: (usize, usize),
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
live_schema: Schema,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
parallel: ParallelStrategy,
Expand All @@ -562,7 +572,12 @@ fn rg_to_dfs_optionally_par_over_columns(
let current_row_count = md.num_rows() as IdxSize;

if use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
&& !read_this_row_group(
predicate,
&file_metadata.row_groups[rg_idx],
schema,
&live_schema,
)?
{
*previous_row_count += rg_slice.1 as IdxSize;
continue;
Expand Down Expand Up @@ -675,6 +690,7 @@ fn rg_to_dfs_par_over_rg(
slice: (usize, usize),
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
live_schema: Schema,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
projection: &[usize],
Expand Down Expand Up @@ -730,7 +746,9 @@ fn rg_to_dfs_par_over_rg(
row_groups
.into_par_iter()
.map(|(md, slice, row_count_start)| {
if slice.1 == 0 || use_statistics && !read_this_row_group(predicate, md, schema)? {
if slice.1 == 0
|| use_statistics && !read_this_row_group(predicate, md, schema, &live_schema)?
{
return Ok(None);
}
// test we don't read the parquet file if this env var is set
Expand Down
30 changes: 10 additions & 20 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::borrow::Cow;

use polars_core::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -205,30 +207,18 @@ fn use_min_max(dtype: &DataType) -> bool {
}

/// A collection of column stats with a known schema.
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
pub struct BatchStats {
schema: SchemaRef,
stats: Vec<ColumnStats>,
pub struct BatchStats<'a> {
schema: &'a Schema,
stats: Cow<'a, [ColumnStats]>,
// This might not be available, as when pruning hive partitions.
num_rows: Option<usize>,
}

impl Default for BatchStats {
fn default() -> Self {
Self {
schema: Arc::new(Schema::default()),
stats: Vec::new(),
num_rows: None,
}
}
}

impl BatchStats {
impl<'a> BatchStats<'a> {
/// Constructs a new [`BatchStats`].
///
/// The `stats` should match the order of the `schema`.
pub fn new(schema: SchemaRef, stats: Vec<ColumnStats>, num_rows: Option<usize>) -> Self {
pub fn new(schema: &'a Schema, stats: Cow<'a, [ColumnStats]>, num_rows: Option<usize>) -> Self {
Self {
schema,
stats,
Expand All @@ -237,8 +227,8 @@ impl BatchStats {
}

/// Returns the [`Schema`] of the batch.
pub fn schema(&self) -> &SchemaRef {
&self.schema
pub fn schema<'b: 'a>(&'b self) -> &'a Schema {
self.schema
}

/// Returns the [`ColumnStats`] of all columns in the batch, if known.
Expand All @@ -260,7 +250,7 @@ impl BatchStats {
self.num_rows
}

pub fn with_schema(&mut self, schema: SchemaRef) {
pub fn with_schema(&mut self, schema: &'a Schema) {
self.schema = schema;
}

Expand Down
6 changes: 5 additions & 1 deletion crates/polars-mem-engine/src/executors/hive_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,11 @@ impl MultiScanExec {

if let Some(stats_evaluator) = stats_evaluator {
let allow_predicate_skip = !stats_evaluator
.should_read(&BatchStats::default())
.should_read(&BatchStats::new(
&Schema::default(),
Cow::Borrowed(&[]),
None,
))
.unwrap_or(true);
if allow_predicate_skip && verbose {
eprintln!(
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
};

if let Some(ref hive_parts) = hive_parts {
let hive_schema = hive_parts[0].schema();
file_info.update_schema_with_hive_schema(hive_schema.clone());
let hive_schema = hive_parts[0].schema.clone();
file_info.update_schema_with_hive_schema(hive_schema);
} else if let Some(hive_schema) = file_options.hive_options.schema.clone() {
// We hit here if we are passed the `hive_schema` to `scan_parquet` but end up with an empty file
// list during path expansion. In this case we still want to return an empty DataFrame with this
Expand Down
33 changes: 19 additions & 14 deletions crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::path::{Path, PathBuf};

use polars_core::prelude::*;
Expand All @@ -9,20 +10,19 @@ use serde::{Deserialize, Serialize};
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
pub struct HivePartitions {
/// Single value Series that can be used to run the predicate against.
/// They are to be broadcasted if the predicates don't filter them out.
stats: BatchStats,
pub schema: SchemaRef,
pub stats: Vec<ColumnStats>,
}

impl HivePartitions {
pub fn get_projection_schema_and_indices(
&self,
names: &PlHashSet<PlSmallStr>,
) -> (SchemaRef, Vec<usize>) {
let mut out_schema = Schema::with_capacity(self.stats.schema().len());
let mut out_indices = Vec::with_capacity(self.stats.column_stats().len());
let mut out_schema = Schema::with_capacity(self.schema.len());
let mut out_indices = Vec::with_capacity(self.stats.len());

for (i, cs) in self.stats.column_stats().iter().enumerate() {
for (i, cs) in self.stats.iter().enumerate() {
let name = cs.field_name();
if names.contains(name.as_str()) {
out_indices.push(i);
Expand All @@ -36,16 +36,19 @@ impl HivePartitions {
}

pub fn apply_projection(&mut self, new_schema: SchemaRef, column_indices: &[usize]) {
self.stats.with_schema(new_schema);
self.stats.take_indices(column_indices);
self.schema = new_schema;
self.stats = column_indices
.iter()
.map(|&i| self.stats[i].clone())
.collect();
}

pub fn get_statistics(&self) -> &BatchStats {
&self.stats
pub fn get_statistics(&self) -> BatchStats {
BatchStats::new(self.schema.as_ref(), Cow::Borrowed(&self.stats), None)
}

pub(crate) fn schema(&self) -> &SchemaRef {
self.get_statistics().schema()
pub(crate) fn schema(&self) -> &Schema {
self.schema.as_ref()
}

pub fn materialize_partition_columns(&self) -> Vec<Series> {
Expand Down Expand Up @@ -224,8 +227,10 @@ pub fn hive_partitions_from_paths(
)
}

let stats = BatchStats::new(hive_schema.clone(), column_stats, None);
hive_partitions.push(HivePartitions { stats });
hive_partitions.push(HivePartitions {
schema: hive_schema.clone(),
stats: column_stats,
});
}

Ok(Some(Arc::from(hive_partitions)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl PredicatePushDown<'_> {
let path = &paths[i];
let hive_parts = &hive_parts[i];

if stats_evaluator.should_read(hive_parts.get_statistics())? {
if stats_evaluator.should_read(&hive_parts.get_statistics())? {
new_paths.push(path.clone());
new_hive_parts.push(hive_parts.clone());
}
Expand Down
Loading
Loading