From 3999a4bcc5b1316882a7d438894c33b6071c0870 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 8 Jan 2025 11:37:20 +0100 Subject: [PATCH] perf: Only load statistics for live columns --- .../polars-io/src/parquet/read/async_impl.rs | 15 +++++++- .../polars-io/src/parquet/read/predicates.rs | 32 ++++++++++------ .../polars-io/src/parquet/read/read_impl.rs | 38 ++++++++++++++----- crates/polars-io/src/predicates.rs | 30 +++++---------- .../src/executors/hive_scan.rs | 6 ++- .../src/plans/conversion/dsl_to_ir.rs | 4 +- crates/polars-plan/src/plans/hive.rs | 33 +++++++++------- .../plans/optimizer/predicate_pushdown/mod.rs | 2 +- .../src/nodes/io_sources/parquet/init.rs | 13 ++++++- .../parquet/row_group_data_fetch.rs | 3 ++ 10 files changed, 113 insertions(+), 63 deletions(-) diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index bf35337a313d..c5a57dfc477b 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -242,6 +242,15 @@ impl FetchRowGroupsFromObjectStore { .collect() }); + let live_schema = predicate.as_ref().map_or_else(Schema::default, |p| { + let mut live_columns = PlIndexSet::new(); + p.collect_live_columns(&mut live_columns); + live_columns + .iter() + .map(|c| Field::from(schema.get(c).unwrap())) + .collect() + }); + let mut prefetched: PlHashMap = PlHashMap::new(); let mut row_groups = if let Some(pred) = predicate.as_deref() { @@ -249,8 +258,10 @@ impl FetchRowGroupsFromObjectStore { .filter_map(|i| { let rg = &row_groups[i]; - let should_be_read = - matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true)); + let should_be_read = matches!( + read_this_row_group(Some(pred), rg, &schema, &live_schema), + Ok(true) + ); // Already add the row groups that will be skipped to the prefetched data. if !should_be_read { diff --git a/crates/polars-io/src/parquet/read/predicates.rs b/crates/polars-io/src/parquet/read/predicates.rs index a3269341c1a3..93b2cf623302 100644 --- a/crates/polars-io/src/parquet/read/predicates.rs +++ b/crates/polars-io/src/parquet/read/predicates.rs @@ -1,3 +1,5 @@ +use std::borrow::Cow; + use polars_core::config; use polars_core::prelude::*; use polars_parquet::read::statistics::{deserialize, Statistics}; @@ -17,30 +19,35 @@ impl ColumnStats { } /// Collect the statistics in a row-group -pub(crate) fn collect_statistics( +pub(crate) fn collect_statistics<'a>( md: &RowGroupMetadata, schema: &ArrowSchema, -) -> PolarsResult> { + live_schema: &'a Schema, +) -> PolarsResult>> { // TODO! fix this performance. This is a full sequential scan. - let stats = schema - .iter_values() + let stats = live_schema + .iter_fields() .map(|field| { + if field.dtype().is_nested() { + return Ok(ColumnStats::new(field.clone(), None, None, None)); + } + + let arrow_field = schema.get(&field.name).unwrap(); let iter = md.columns_under_root_iter(&field.name).unwrap(); - Ok(if iter.len() == 0 { - ColumnStats::new(field.into(), None, None, None) - } else { - ColumnStats::from_arrow_stats(deserialize(field, iter)?, field) - }) + Ok(ColumnStats::from_arrow_stats( + deserialize(arrow_field, iter)?, + arrow_field, + )) }) - .collect::>>()?; + .collect::>>()?; if stats.is_empty() { return Ok(None); } Ok(Some(BatchStats::new( - Arc::new(Schema::from_arrow_schema(schema)), + live_schema, stats, Some(md.num_rows()), ))) @@ -50,6 +57,7 @@ pub fn read_this_row_group( predicate: Option<&dyn PhysicalIoExpr>, md: &RowGroupMetadata, schema: &ArrowSchema, + live_schema: &Schema, ) -> PolarsResult { if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() { return Ok(true); @@ -59,7 +67,7 @@ pub fn read_this_row_group( if let Some(pred) = predicate { if let Some(pred) = pred.as_stats_evaluator() { - if let Some(stats) = collect_statistics(md, schema)? { + if let Some(stats) = collect_statistics(md, schema, live_schema)? { let pred_result = pred.should_read(&stats); // a parquet file may not have statistics of all columns diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 79c7883f2f38..6255ace99671 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -166,11 +166,18 @@ fn rg_to_dfs( use ParallelStrategy as S; + let live_schema = predicate.map_or_else(Schema::default, |p| { + let mut live_columns = PlIndexSet::new(); + p.collect_live_columns(&mut live_columns); + live_columns + .iter() + .map(|c| Field::from(schema.get(c).unwrap())) + .collect() + }); + if parallel == S::Prefiltered { if let Some(predicate) = predicate { - let mut live_columns = PlIndexSet::new(); - predicate.collect_live_columns(&mut live_columns); - if !live_columns.is_empty() { + if !live_schema.is_empty() { return rg_to_dfs_prefiltered( store, previous_row_count, @@ -178,7 +185,7 @@ fn rg_to_dfs( row_group_end, file_metadata, schema, - live_columns, + live_schema, predicate, row_index, projection, @@ -198,6 +205,7 @@ fn rg_to_dfs( slice, file_metadata, schema, + live_schema, predicate, row_index, parallel, @@ -213,6 +221,7 @@ fn rg_to_dfs( slice, file_metadata, schema, + live_schema, predicate, row_index, projection, @@ -242,7 +251,7 @@ fn rg_to_dfs_prefiltered( row_group_end: usize, file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - live_columns: PlIndexSet, + live_schema: Schema, predicate: &dyn PhysicalIoExpr, row_index: Option, projection: &[usize], @@ -270,7 +279,7 @@ fn rg_to_dfs_prefiltered( }; // Get the number of live columns - let num_live_columns = live_columns.len(); + let num_live_columns = live_schema.len(); let num_dead_columns = projection.len() + hive_partition_columns.map_or(0, |x| x.len()) - num_live_columns; @@ -286,7 +295,7 @@ fn rg_to_dfs_prefiltered( for &i in projection.iter() { let name = schema.get_at_index(i).unwrap().0.as_str(); - if live_columns.contains(name) { + if live_schema.contains(name) { live_idx_to_col_idx.push(i); } else { dead_idx_to_col_idx.push(i); @@ -306,7 +315,7 @@ fn rg_to_dfs_prefiltered( let md = &file_metadata.row_groups[rg_idx]; if use_statistics { - match read_this_row_group(Some(predicate), md, schema) { + match read_this_row_group(Some(predicate), md, schema, &live_schema) { Ok(false) => return Ok(None), Ok(true) => {}, Err(e) => return Err(e), @@ -540,6 +549,7 @@ fn rg_to_dfs_optionally_par_over_columns( slice: (usize, usize), file_metadata: &FileMetadata, schema: &ArrowSchemaRef, + live_schema: Schema, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, parallel: ParallelStrategy, @@ -562,7 +572,12 @@ fn rg_to_dfs_optionally_par_over_columns( let current_row_count = md.num_rows() as IdxSize; if use_statistics - && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? + && !read_this_row_group( + predicate, + &file_metadata.row_groups[rg_idx], + schema, + &live_schema, + )? { *previous_row_count += rg_slice.1 as IdxSize; continue; @@ -675,6 +690,7 @@ fn rg_to_dfs_par_over_rg( slice: (usize, usize), file_metadata: &FileMetadata, schema: &ArrowSchemaRef, + live_schema: Schema, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, projection: &[usize], @@ -730,7 +746,9 @@ fn rg_to_dfs_par_over_rg( row_groups .into_par_iter() .map(|(md, slice, row_count_start)| { - if slice.1 == 0 || use_statistics && !read_this_row_group(predicate, md, schema)? { + if slice.1 == 0 + || use_statistics && !read_this_row_group(predicate, md, schema, &live_schema)? + { return Ok(None); } // test we don't read the parquet file if this env var is set diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index c455ae0966c8..184523a1b5a6 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -1,3 +1,5 @@ +use std::borrow::Cow; + use polars_core::prelude::*; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -205,30 +207,18 @@ fn use_min_max(dtype: &DataType) -> bool { } /// A collection of column stats with a known schema. -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Debug, Clone)] -pub struct BatchStats { - schema: SchemaRef, - stats: Vec, +pub struct BatchStats<'a> { + schema: &'a Schema, + stats: Cow<'a, [ColumnStats]>, // This might not be available, as when pruning hive partitions. num_rows: Option, } -impl Default for BatchStats { - fn default() -> Self { - Self { - schema: Arc::new(Schema::default()), - stats: Vec::new(), - num_rows: None, - } - } -} - -impl BatchStats { +impl<'a> BatchStats<'a> { /// Constructs a new [`BatchStats`]. /// /// The `stats` should match the order of the `schema`. - pub fn new(schema: SchemaRef, stats: Vec, num_rows: Option) -> Self { + pub fn new(schema: &'a Schema, stats: Cow<'a, [ColumnStats]>, num_rows: Option) -> Self { Self { schema, stats, @@ -237,8 +227,8 @@ impl BatchStats { } /// Returns the [`Schema`] of the batch. - pub fn schema(&self) -> &SchemaRef { - &self.schema + pub fn schema<'b: 'a>(&'b self) -> &'a Schema { + self.schema } /// Returns the [`ColumnStats`] of all columns in the batch, if known. @@ -260,7 +250,7 @@ impl BatchStats { self.num_rows } - pub fn with_schema(&mut self, schema: SchemaRef) { + pub fn with_schema(&mut self, schema: &'a Schema) { self.schema = schema; } diff --git a/crates/polars-mem-engine/src/executors/hive_scan.rs b/crates/polars-mem-engine/src/executors/hive_scan.rs index 538ab10bbb9e..043d792e68a9 100644 --- a/crates/polars-mem-engine/src/executors/hive_scan.rs +++ b/crates/polars-mem-engine/src/executors/hive_scan.rs @@ -410,7 +410,11 @@ impl MultiScanExec { if let Some(stats_evaluator) = stats_evaluator { let allow_predicate_skip = !stats_evaluator - .should_read(&BatchStats::default()) + .should_read(&BatchStats::new( + &Schema::default(), + Cow::Borrowed(&[]), + None, + )) .unwrap_or(true); if allow_predicate_skip && verbose { eprintln!( diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index c6a9b2e32275..a9daec29b951 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -281,8 +281,8 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult }; if let Some(ref hive_parts) = hive_parts { - let hive_schema = hive_parts[0].schema(); - file_info.update_schema_with_hive_schema(hive_schema.clone()); + let hive_schema = hive_parts[0].schema.clone(); + file_info.update_schema_with_hive_schema(hive_schema); } else if let Some(hive_schema) = file_options.hive_options.schema.clone() { // We hit here if we are passed the `hive_schema` to `scan_parquet` but end up with an empty file // list during path expansion. In this case we still want to return an empty DataFrame with this diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 597bde5eb62f..8855ac1a262a 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::path::{Path, PathBuf}; use polars_core::prelude::*; @@ -9,9 +10,8 @@ use serde::{Deserialize, Serialize}; #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone)] pub struct HivePartitions { - /// Single value Series that can be used to run the predicate against. - /// They are to be broadcasted if the predicates don't filter them out. - stats: BatchStats, + pub schema: SchemaRef, + pub stats: Vec, } impl HivePartitions { @@ -19,10 +19,10 @@ impl HivePartitions { &self, names: &PlHashSet, ) -> (SchemaRef, Vec) { - let mut out_schema = Schema::with_capacity(self.stats.schema().len()); - let mut out_indices = Vec::with_capacity(self.stats.column_stats().len()); + let mut out_schema = Schema::with_capacity(self.schema.len()); + let mut out_indices = Vec::with_capacity(self.stats.len()); - for (i, cs) in self.stats.column_stats().iter().enumerate() { + for (i, cs) in self.stats.iter().enumerate() { let name = cs.field_name(); if names.contains(name.as_str()) { out_indices.push(i); @@ -36,16 +36,19 @@ impl HivePartitions { } pub fn apply_projection(&mut self, new_schema: SchemaRef, column_indices: &[usize]) { - self.stats.with_schema(new_schema); - self.stats.take_indices(column_indices); + self.schema = new_schema; + self.stats = column_indices + .iter() + .map(|&i| self.stats[i].clone()) + .collect(); } - pub fn get_statistics(&self) -> &BatchStats { - &self.stats + pub fn get_statistics(&self) -> BatchStats { + BatchStats::new(self.schema.as_ref(), Cow::Borrowed(&self.stats), None) } - pub(crate) fn schema(&self) -> &SchemaRef { - self.get_statistics().schema() + pub(crate) fn schema(&self) -> &Schema { + self.schema.as_ref() } pub fn materialize_partition_columns(&self) -> Vec { @@ -224,8 +227,10 @@ pub fn hive_partitions_from_paths( ) } - let stats = BatchStats::new(hive_schema.clone(), column_stats, None); - hive_partitions.push(HivePartitions { stats }); + hive_partitions.push(HivePartitions { + schema: hive_schema.clone(), + stats: column_stats, + }); } Ok(Some(Arc::from(hive_partitions))) diff --git a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs index 40824c24ee06..0362f42c3612 100644 --- a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs @@ -413,7 +413,7 @@ impl PredicatePushDown<'_> { let path = &paths[i]; let hive_parts = &hive_parts[i]; - if stats_evaluator.should_read(hive_parts.get_statistics())? { + if stats_evaluator.should_read(&hive_parts.get_statistics())? { new_paths.push(path.clone()); new_hive_parts.push(hive_parts.clone()); } diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs index d2aa481d5006..ab4ca8293b86 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs @@ -2,7 +2,8 @@ use std::future::Future; use std::sync::Arc; use polars_core::frame::DataFrame; -use polars_core::prelude::PlIndexSet; +use polars_core::prelude::{Field, InitHashMaps, PlIndexSet}; +use polars_core::schema::Schema; use polars_error::PolarsResult; use polars_io::prelude::ParallelStrategy; use polars_io::prelude::_internal::PrefilterMaskSetting; @@ -88,11 +89,21 @@ impl ParquetSourceNode { let predicate = self.physical_predicate.clone(); let memory_prefetch_func = self.memory_prefetch_func; + let live_schema = predicate.as_ref().map_or_else(Schema::default, |p| { + let mut live_columns = PlIndexSet::new(); + p.collect_live_columns(&mut live_columns); + live_columns + .into_iter() + .map(|c| Field::from(reader_schema.get(&c).unwrap())) + .collect() + }); + let mut row_group_data_fetcher = RowGroupDataFetcher { metadata_rx, use_statistics, verbose, reader_schema, + live_schema, projection, predicate, slice_range: None, // Initialized later diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs index d7d004a33b7e..5a4ac73e80fb 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use polars_core::prelude::{ArrowSchema, PlHashMap}; +use polars_core::schema::Schema; use polars_core::series::IsSorted; use polars_core::utils::operation_exceeded_idxsize_msg; use polars_error::{polars_err, PolarsResult}; @@ -40,6 +41,7 @@ pub(super) struct RowGroupDataFetcher { pub(super) use_statistics: bool, pub(super) verbose: bool, pub(super) reader_schema: Arc, + pub(super) live_schema: Schema, pub(super) projection: Option>, pub(super) predicate: Option>, pub(super) slice_range: Option>, @@ -92,6 +94,7 @@ impl RowGroupDataFetcher { self.predicate.as_deref(), &row_group_metadata, self.reader_schema.as_ref(), + &self.live_schema, ) { Ok(v) => v, Err(e) => return Some(Err(e)),