From cdcd7377aba119192ae49bc0c0c743c7d59ca344 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 26 Sep 2023 07:45:46 +0200 Subject: [PATCH] feat: support 'hive partitioning' aware readers (#11284) --- Cargo.toml | 1 + crates/polars-core/src/frame/mod.rs | 2 +- crates/polars-core/src/schema.rs | 6 + crates/polars-io/src/csv/read_impl/mod.rs | 18 +-- crates/polars-io/src/csv/utils.rs | 18 +-- crates/polars-io/src/parquet/predicates.rs | 120 ++------------ crates/polars-io/src/parquet/read.rs | 18 +++ crates/polars-io/src/parquet/read_impl.rs | 21 +++ crates/polars-io/src/predicates.rs | 148 +++++++++++++++++- crates/polars-io/src/utils.rs | 15 ++ crates/polars-lazy/src/frame/mod.rs | 22 ++- .../physical_plan/executors/scan/parquet.rs | 22 ++- .../src/physical_plan/expressions/apply.rs | 8 +- .../src/physical_plan/expressions/binary.rs | 38 +++-- .../src/physical_plan/expressions/mod.rs | 2 +- .../src/physical_plan/planner/lp.rs | 2 +- crates/polars-lazy/src/scan/mod.rs | 13 +- crates/polars-lazy/src/scan/parquet.rs | 3 + .../src/executors/sources/parquet.rs | 21 ++- crates/polars-pipe/src/pipeline/convert.rs | 2 +- crates/polars-plan/Cargo.toml | 2 + .../polars-plan/src/logical_plan/builder.rs | 32 ++-- crates/polars-plan/src/logical_plan/hive.rs | 97 ++++++++++++ crates/polars-plan/src/logical_plan/mod.rs | 1 + .../src/logical_plan/optimizer/mod.rs | 5 +- .../optimizer/predicate_pushdown/mod.rs | 40 ++++- .../polars-plan/src/logical_plan/options.rs | 1 + crates/polars-plan/src/logical_plan/schema.rs | 21 +++ py-polars/Cargo.lock | 1 + py-polars/polars/io/parquet/functions.py | 5 + py-polars/polars/lazyframe/frame.py | 2 + py-polars/src/lazyframe.rs | 4 +- py-polars/tests/unit/io/test_hive.py | 49 ++++++ py-polars/tests/unit/io/test_parquet.py | 1 + 34 files changed, 557 insertions(+), 204 deletions(-) create mode 100644 crates/polars-plan/src/logical_plan/hive.rs create mode 100644 py-polars/tests/unit/io/test_hive.py diff --git a/Cargo.toml b/Cargo.toml index e8efc2a231f8..b505c08dc258 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ streaming-iterator = "0.1.9" itoa = "1.0.6" ryu = "1.0.13" lexical-core = "0.8.5" +percent-encoding = "2.3" xxhash-rust = { version = "0.8.6", features = ["xxh3"] } polars-core = { version = "0.33.2", path = "crates/polars-core", default-features = false } polars-arrow = { version = "0.33.2", path = "crates/polars-arrow", default-features = false } diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 0d82724a5e48..524a9be4a823 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -501,7 +501,7 @@ impl DataFrame { /// # Ok::<(), PolarsError>(()) /// ``` pub fn schema(&self) -> Schema { - self.iter().map(|s| s.field().into_owned()).collect() + self.columns.as_slice().into() } /// Get a reference to the [`DataFrame`] columns. diff --git a/crates/polars-core/src/schema.rs b/crates/polars-core/src/schema.rs index 29625de7e11a..350cd10944a1 100644 --- a/crates/polars-core/src/schema.rs +++ b/crates/polars-core/src/schema.rs @@ -34,6 +34,12 @@ impl Debug for Schema { } } +impl From<&[Series]> for Schema { + fn from(value: &[Series]) -> Self { + value.iter().map(|s| s.field().into_owned()).collect() + } +} + impl FromIterator for Schema where F: Into, diff --git a/crates/polars-io/src/csv/read_impl/mod.rs b/crates/polars-io/src/csv/read_impl/mod.rs index c544694f973c..b7334619276b 100644 --- a/crates/polars-io/src/csv/read_impl/mod.rs +++ b/crates/polars-io/src/csv/read_impl/mod.rs @@ -557,23 +557,7 @@ impl<'a> CoreReader<'a> { // An empty file with a schema should return an empty DataFrame with that schema if bytes.is_empty() { - // TODO! add DataFrame::new_from_schema - let buffers = init_buffers( - &projection, - 0, - &self.schema, - &self.init_string_size_stats(&str_columns, 0), - self.quote_char, - self.encoding, - self.ignore_errors, - )?; - let df = DataFrame::new_no_checks( - buffers - .into_iter() - .map(|buf| buf.into_series()) - .collect::>()?, - ); - return Ok(df); + return Ok(DataFrame::from(self.schema.as_ref())); } // all the buffers returned from the threads diff --git a/crates/polars-io/src/csv/utils.rs b/crates/polars-io/src/csv/utils.rs index c0a0c5b09624..e9aef318873a 100644 --- a/crates/polars-io/src/csv/utils.rs +++ b/crates/polars-io/src/csv/utils.rs @@ -3,22 +3,21 @@ use std::borrow::Cow; use std::io::Read; use std::mem::MaybeUninit; -use once_cell::sync::Lazy; use polars_core::datatypes::PlHashSet; use polars_core::prelude::*; #[cfg(feature = "polars-time")] use polars_time::chunkedarray::utf8::infer as date_infer; #[cfg(feature = "polars-time")] use polars_time::prelude::utf8::Pattern; -use regex::{Regex, RegexBuilder}; #[cfg(any(feature = "decompress", feature = "decompress-fast"))] use crate::csv::parser::next_line_position_naive; use crate::csv::parser::{next_line_position, skip_bom, skip_line_ending, SplitLines}; use crate::csv::splitfields::SplitFields; use crate::csv::CsvEncoding; -use crate::mmap::*; +use crate::mmap::ReaderBytes; use crate::prelude::NullValues; +use crate::utils::{BOOLEAN_RE, FLOAT_RE, INTEGER_RE}; pub(crate) fn get_file_chunks( bytes: &[u8], @@ -58,19 +57,6 @@ pub(crate) fn get_file_chunks( offsets } -static FLOAT_RE: Lazy = Lazy::new(|| { - Regex::new(r"^\s*[-+]?((\d*\.\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+\.)$").unwrap() -}); - -static INTEGER_RE: Lazy = Lazy::new(|| Regex::new(r"^\s*-?(\d+)$").unwrap()); - -static BOOLEAN_RE: Lazy = Lazy::new(|| { - RegexBuilder::new(r"^\s*(true)$|^(false)$") - .case_insensitive(true) - .build() - .unwrap() -}); - /// Infer the data type of a record fn infer_field_schema(string: &str, try_parse_dates: bool) -> DataType { // when quoting is enabled in the reader, these quotes aren't escaped, we default to diff --git a/crates/polars-io/src/parquet/predicates.rs b/crates/polars-io/src/parquet/predicates.rs index 1dfc6b231b5a..02262660e384 100644 --- a/crates/polars-io/src/parquet/predicates.rs +++ b/crates/polars-io/src/parquet/predicates.rs @@ -1,118 +1,18 @@ -use arrow::compute::concatenate::concatenate; use arrow::io::parquet::read::statistics::{deserialize, Statistics}; use arrow::io::parquet::read::RowGroupMetaData; use polars_core::prelude::*; -use crate::predicates::PhysicalIoExpr; +use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr}; use crate::ArrowResult; -/// The statistics for a column in a Parquet file -/// they typically hold -/// - max value -/// - min value -/// - null_count -#[cfg_attr(debug_assertions, derive(Debug))] -pub struct ColumnStats(Statistics, Field); - impl ColumnStats { - pub fn dtype(&self) -> DataType { - self.1.data_type().clone() - } - - pub fn null_count(&self) -> Option { - match self.1.data_type() { - #[cfg(feature = "dtype-struct")] - DataType::Struct(_) => None, - _ => { - // the array holds the null count for every row group - // so we sum them to get them of the whole file. - let s = Series::try_from(("", self.0.null_count.clone())).unwrap(); - - // if all null, there are no statistics. - if s.null_count() != s.len() { - s.sum() - } else { - None - } - }, - } - } - - pub fn to_min_max(&self) -> Option { - let max_val = &*self.0.max_value; - let min_val = &*self.0.min_value; - - let dtype = DataType::from(min_val.data_type()); - - if Self::use_min_max(dtype) { - let arr = concatenate(&[min_val, max_val]).unwrap(); - let s = Series::try_from(("", arr)).unwrap(); - if s.null_count() > 0 { - None - } else { - Some(s) - } - } else { - None - } - } - - pub fn to_min(&self) -> Option { - let min_val = self.0.min_value.clone(); - let dtype = DataType::from(min_val.data_type()); - - if !Self::use_min_max(dtype) || min_val.len() != 1 { - return None; - } - - let s = Series::try_from(("", min_val)).unwrap(); - if s.null_count() > 0 { - None - } else { - Some(s) - } - } - - pub fn to_max(&self) -> Option { - let max_val = self.0.max_value.clone(); - let dtype = DataType::from(max_val.data_type()); - - if !Self::use_min_max(dtype) || max_val.len() != 1 { - return None; - } - - let s = Series::try_from(("", max_val)).unwrap(); - if s.null_count() > 0 { - None - } else { - Some(s) - } - } - - #[cfg(feature = "dtype-binary")] - fn use_min_max(dtype: DataType) -> bool { - dtype.is_numeric() || matches!(dtype, DataType::Utf8) || matches!(dtype, DataType::Binary) - } - - #[cfg(not(feature = "dtype-binary"))] - fn use_min_max(dtype: DataType) -> bool { - dtype.is_numeric() || matches!(dtype, DataType::Utf8) - } -} - -/// A collection of column stats with a known schema. -pub struct BatchStats { - schema: Schema, - stats: Vec, -} - -impl BatchStats { - pub fn get_stats(&self, column: &str) -> polars_core::error::PolarsResult<&ColumnStats> { - self.schema.try_index_of(column).map(|i| &self.stats[i]) - } - - pub fn schema(&self) -> &Schema { - &self.schema + fn from_arrow_stats(stats: Statistics, field: &ArrowField) -> Self { + Self::new( + field.into(), + Some(Series::try_from(("", stats.null_count)).unwrap()), + Some(Series::try_from(("", stats.min_value)).unwrap()), + Some(Series::try_from(("", stats.max_value)).unwrap()), + ) } } @@ -133,13 +33,13 @@ pub(crate) fn collect_statistics( Some(rg) => deserialize(fld, &md[rg..rg + 1])?, }; schema.with_column((&fld.name).into(), (&fld.data_type).into()); - stats.push(ColumnStats(st, fld.into())); + stats.push(ColumnStats::from_arrow_stats(st, fld)); } Ok(if stats.is_empty() { None } else { - Some(BatchStats { schema, stats }) + Some(BatchStats::new(schema, stats)) }) } diff --git a/crates/polars-io/src/parquet/read.rs b/crates/polars-io/src/parquet/read.rs index 496d61a6a118..cdc8b5e57ca9 100644 --- a/crates/polars-io/src/parquet/read.rs +++ b/crates/polars-io/src/parquet/read.rs @@ -52,6 +52,7 @@ pub struct ParquetReader { row_count: Option, low_memory: bool, metadata: Option, + hive_partition_columns: Option>, use_statistics: bool, } @@ -78,6 +79,7 @@ impl ParquetReader { self.parallel, self.row_count, self.use_statistics, + self.hive_partition_columns.as_deref(), ) .map(|mut df| { if rechunk { @@ -145,6 +147,11 @@ impl ParquetReader { Ok(metadata.num_rows) } + pub fn with_hive_partition_columns(mut self, columns: Option>) -> Self { + self.hive_partition_columns = columns; + self + } + fn get_metadata(&mut self) -> PolarsResult<&FileMetaData> { if self.metadata.is_none() { self.metadata = Some(read::read_metadata(&mut self.reader)?); @@ -166,6 +173,7 @@ impl ParquetReader { self.row_count, chunk_size, self.use_statistics, + self.hive_partition_columns, ) } } @@ -184,6 +192,7 @@ impl SerReader for ParquetReader { low_memory: false, metadata: None, use_statistics: true, + hive_partition_columns: None, } } @@ -210,6 +219,7 @@ impl SerReader for ParquetReader { self.parallel, self.row_count, self.use_statistics, + self.hive_partition_columns.as_deref(), ) .map(|mut df| { if self.rechunk { @@ -230,6 +240,7 @@ pub struct ParquetAsyncReader { projection: Option>, row_count: Option, use_statistics: bool, + hive_partition_columns: Option>, } #[cfg(feature = "cloud")] @@ -245,6 +256,7 @@ impl ParquetAsyncReader { projection: None, row_count: None, use_statistics: true, + hive_partition_columns: None, }) } @@ -294,6 +306,11 @@ impl ParquetAsyncReader { self } + pub fn with_hive_partition_columns(mut self, columns: Option>) -> Self { + self.hive_partition_columns = columns; + self + } + #[tokio::main(flavor = "current_thread")] pub async fn batched(mut self, chunk_size: usize) -> PolarsResult { let metadata = self.reader.get_metadata().await?.to_owned(); @@ -311,6 +328,7 @@ impl ParquetAsyncReader { self.row_count, chunk_size, self.use_statistics, + self.hive_partition_columns, ) } diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index a5612751557f..922db0f459fc 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -84,6 +84,14 @@ pub(super) fn array_iter_to_series( } } +fn materialize_hive_partitions(df: &mut DataFrame, hive_partition_columns: Option<&[Series]>) { + if let Some(hive_columns) = hive_partition_columns { + for s in hive_columns { + unsafe { df.with_column_unchecked(s.new_from_index(0, df.height())) }; + } + } +} + #[allow(clippy::too_many_arguments)] // might parallelize over columns fn rg_to_dfs( @@ -99,6 +107,7 @@ fn rg_to_dfs( parallel: ParallelStrategy, projection: &[usize], use_statistics: bool, + hive_partition_columns: Option<&[Series]>, ) -> PolarsResult> { let mut dfs = Vec::with_capacity(row_group_end - row_group_start); @@ -148,6 +157,7 @@ fn rg_to_dfs( if let Some(rc) = &row_count { df.with_row_count_mut(&rc.name, Some(*previous_row_count + rc.offset)); } + materialize_hive_partitions(&mut df, hive_partition_columns); apply_predicate(&mut df, predicate.as_deref(), true)?; @@ -175,6 +185,7 @@ fn rg_to_dfs_par( row_count: Option, projection: &[usize], use_statistics: bool, + hive_partition_columns: Option<&[Series]>, ) -> PolarsResult> { // compute the limits per row group and the row count offsets let row_groups = file_metadata @@ -222,6 +233,7 @@ fn rg_to_dfs_par( if let Some(rc) = &row_count { df.with_row_count_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset)); } + materialize_hive_partitions(&mut df, hive_partition_columns); apply_predicate(&mut df, predicate.as_deref(), false)?; @@ -242,6 +254,7 @@ pub fn read_parquet( mut parallel: ParallelStrategy, row_count: Option, use_statistics: bool, + hive_partition_columns: Option<&[Series]>, ) -> PolarsResult { let file_metadata = metadata .map(Ok) @@ -297,6 +310,7 @@ pub fn read_parquet( parallel, &projection, use_statistics, + hive_partition_columns, )?, ParallelStrategy::RowGroups => rg_to_dfs_par( &store, @@ -310,6 +324,7 @@ pub fn read_parquet( row_count, &projection, use_statistics, + hive_partition_columns, )?, // auto should already be replaced by Columns or RowGroups ParallelStrategy::Auto => unimplemented!(), @@ -374,9 +389,11 @@ pub struct BatchedParquetReader { parallel: ParallelStrategy, chunk_size: usize, use_statistics: bool, + hive_partition_columns: Option>, } impl BatchedParquetReader { + #[allow(clippy::too_many_arguments)] pub fn new( row_group_fetcher: Box, metadata: FileMetaData, @@ -385,6 +402,7 @@ impl BatchedParquetReader { row_count: Option, chunk_size: usize, use_statistics: bool, + hive_partition_columns: Option>, ) -> PolarsResult { let schema = read::schema::infer_schema(&metadata)?; let n_row_groups = metadata.row_groups.len(); @@ -412,6 +430,7 @@ impl BatchedParquetReader { parallel, chunk_size, use_statistics, + hive_partition_columns, }) } @@ -438,6 +457,7 @@ impl BatchedParquetReader { ParallelStrategy::Columns, &self.projection, self.use_statistics, + self.hive_partition_columns.as_deref(), )?; self.row_group_offset += n; dfs @@ -455,6 +475,7 @@ impl BatchedParquetReader { self.row_count.clone(), &self.projection, self.use_statistics, + self.hive_partition_columns.as_deref(), )?; self.row_group_offset += n; dfs diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index ddf384e5cae3..022077d24d75 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -1,4 +1,6 @@ use polars_core::prelude::*; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; pub trait PhysicalIoExpr: Send + Sync { /// Take a [`DataFrame`] and produces a boolean [`Series`] that serves @@ -8,15 +10,13 @@ pub trait PhysicalIoExpr: Send + Sync { /// Can take &dyn Statistics and determine of a file should be /// read -> `true` /// or not -> `false` - #[cfg(feature = "parquet")] fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { None } } -#[cfg(feature = "parquet")] pub trait StatsEvaluator { - fn should_read(&self, stats: &crate::parquet::predicates::BatchStats) -> PolarsResult; + fn should_read(&self, stats: &BatchStats) -> PolarsResult; } #[cfg(feature = "parquet")] @@ -47,3 +47,145 @@ pub(crate) fn apply_predicate( } Ok(()) } + +/// The statistics for a column in a Parquet file +/// or Hive partition. +/// they typically hold +/// - max value +/// - min value +/// - null_count +#[derive(Debug)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct ColumnStats { + field: Field, + // The array may hold the null count for every row group, + // or for a single row group. + null_count: Option, + min_value: Option, + max_value: Option, +} + +impl ColumnStats { + pub fn new( + field: Field, + null_count: Option, + min_value: Option, + max_value: Option, + ) -> Self { + Self { + field, + null_count, + min_value, + max_value, + } + } + + pub fn from_column_literal(s: Series) -> Self { + debug_assert_eq!(s.len(), 1); + Self { + field: s.field().into_owned(), + null_count: None, + min_value: Some(s.clone()), + max_value: Some(s), + } + } + + pub fn dtype(&self) -> &DataType { + self.field.data_type() + } + + pub fn null_count(&self) -> Option { + match self.field.data_type() { + #[cfg(feature = "dtype-struct")] + DataType::Struct(_) => None, + _ => { + let s = self.null_count.as_ref()?; + // if all null, there are no statistics. + if s.null_count() != s.len() { + s.sum() + } else { + None + } + }, + } + } + + pub fn to_min_max(&self) -> Option { + let max_val = self.max_value.as_ref()?; + let min_val = self.min_value.as_ref()?; + + let dtype = min_val.dtype(); + + if Self::use_min_max(dtype) { + let mut min_max_values = min_val.clone(); + min_max_values.append(max_val).unwrap(); + if min_max_values.null_count() > 0 { + None + } else { + Some(min_max_values) + } + } else { + None + } + } + + pub fn to_min(&self) -> Option<&Series> { + let min_val = self.min_value.as_ref()?; + let dtype = min_val.dtype(); + + if !Self::use_min_max(dtype) || min_val.len() != 1 { + return None; + } + + if min_val.null_count() > 0 { + None + } else { + Some(min_val) + } + } + + pub fn to_max(&self) -> Option<&Series> { + let max_val = self.max_value.as_ref()?; + let dtype = max_val.dtype(); + + if !Self::use_min_max(dtype) || max_val.len() != 1 { + return None; + } + + if max_val.null_count() > 0 { + None + } else { + Some(max_val) + } + } + + fn use_min_max(dtype: &DataType) -> bool { + dtype.is_numeric() || matches!(dtype, DataType::Utf8 | DataType::Binary) + } +} + +/// A collection of column stats with a known schema. +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Debug)] +pub struct BatchStats { + schema: Schema, + stats: Vec, +} + +impl BatchStats { + pub fn new(schema: Schema, stats: Vec) -> Self { + Self { schema, stats } + } + + pub fn get_stats(&self, column: &str) -> polars_core::error::PolarsResult<&ColumnStats> { + self.schema.try_index_of(column).map(|i| &self.stats[i]) + } + + pub fn schema(&self) -> &Schema { + &self.schema + } + + pub fn column_stats(&self) -> &[ColumnStats] { + self.stats.as_ref() + } +} diff --git a/crates/polars-io/src/utils.rs b/crates/polars-io/src/utils.rs index c26b7092ad7d..b46bad765e17 100644 --- a/crates/polars-io/src/utils.rs +++ b/crates/polars-io/src/utils.rs @@ -1,8 +1,10 @@ use std::io::Read; use std::path::{Path, PathBuf}; +use once_cell::sync::Lazy; use polars_core::frame::DataFrame; use polars_core::prelude::*; +use regex::{Regex, RegexBuilder}; use crate::mmap::{MmapBytesReader, ReaderBytes}; #[cfg(any( @@ -151,6 +153,19 @@ pub(crate) fn overwrite_schema( Ok(()) } +pub static FLOAT_RE: Lazy = Lazy::new(|| { + Regex::new(r"^\s*[-+]?((\d*\.\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+\.)$").unwrap() +}); + +pub static INTEGER_RE: Lazy = Lazy::new(|| Regex::new(r"^\s*-?(\d+)$").unwrap()); + +pub static BOOLEAN_RE: Lazy = Lazy::new(|| { + RegexBuilder::new(r"^\s*(true)$|^(false)$") + .case_insensitive(true) + .build() + .unwrap() +}); + #[cfg(test)] mod tests { use std::path::PathBuf; diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index e9a00849e343..de459eb9d0eb 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -35,7 +35,7 @@ use smartstring::alias::String as SmartString; use crate::fallible; use crate::physical_plan::executors::Executor; -use crate::physical_plan::planner::create_physical_plan; +use crate::physical_plan::planner::{create_physical_expr, create_physical_plan}; use crate::physical_plan::state::ExecutionState; #[cfg(feature = "streaming")] use crate::physical_plan::streaming::insert_streaming_nodes; @@ -550,7 +550,25 @@ impl LazyFrame { ); opt_state.comm_subplan_elim = false; } - let lp_top = optimize(self.logical_plan, opt_state, lp_arena, expr_arena, scratch)?; + let lp_top = optimize( + self.logical_plan, + opt_state, + lp_arena, + expr_arena, + scratch, + Some(&|node, expr_arena| { + let phys_expr = create_physical_expr( + node, + Context::Default, + expr_arena, + None, + &mut Default::default(), + ) + .ok()?; + let io_expr = phys_expr_to_io_expr(phys_expr); + Some(io_expr) + }), + )?; if streaming { #[cfg(feature = "streaming")] diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 4584c49ec035..0a5ef72867d7 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -7,7 +7,7 @@ use super::*; pub struct ParquetExec { path: PathBuf, - schema: SchemaRef, + file_info: FileInfo, predicate: Option>, options: ParquetOptions, #[allow(dead_code)] @@ -18,7 +18,7 @@ pub struct ParquetExec { impl ParquetExec { pub(crate) fn new( path: PathBuf, - schema: SchemaRef, + file_info: FileInfo, predicate: Option>, options: ParquetOptions, cloud_options: Option, @@ -26,7 +26,7 @@ impl ParquetExec { ) -> Self { ParquetExec { path, - schema, + file_info, predicate, options, cloud_options, @@ -39,7 +39,7 @@ impl ParquetExec { &self.path, &self.predicate, &mut self.file_options.with_columns, - &mut self.schema, + &mut self.file_info.schema, self.file_options.n_rows, self.file_options.row_count.is_some(), ); @@ -52,6 +52,12 @@ impl ParquetExec { .set_rechunk(self.file_options.rechunk) .set_low_memory(self.options.low_memory) .use_statistics(self.options.use_statistics) + .with_hive_partition_columns( + self.file_info + .hive_parts + .as_ref() + .map(|hive| hive.materialize_partition_columns()), + ) ._finish_with_scan_ops(predicate, projection.as_ref().map(|v| v.as_ref())) } else if is_cloud_url(self.path.as_path()) { #[cfg(feature = "cloud")] @@ -62,7 +68,13 @@ impl ParquetExec { )? .with_n_rows(n_rows) .with_row_count(mem::take(&mut self.file_options.row_count)) - .use_statistics(self.options.use_statistics); + .use_statistics(self.options.use_statistics) + .with_hive_partition_columns( + self.file_info + .hive_parts + .as_ref() + .map(|hive| hive.materialize_partition_columns()), + ); reader.finish(predicate) } diff --git a/crates/polars-lazy/src/physical_plan/expressions/apply.rs b/crates/polars-lazy/src/physical_plan/expressions/apply.rs index aa6597ddcc7b..ef9b129ad9b1 100644 --- a/crates/polars-lazy/src/physical_plan/expressions/apply.rs +++ b/crates/polars-lazy/src/physical_plan/expressions/apply.rs @@ -5,9 +5,7 @@ use polars_core::frame::group_by::GroupsProxy; use polars_core::prelude::*; use polars_core::POOL; #[cfg(feature = "parquet")] -use polars_io::parquet::predicates::BatchStats; -#[cfg(feature = "parquet")] -use polars_io::predicates::StatsEvaluator; +use polars_io::predicates::{BatchStats, StatsEvaluator}; #[cfg(feature = "parquet")] use polars_plan::dsl::FunctionExpr; use rayon::prelude::*; @@ -467,8 +465,8 @@ impl ApplyExpr { let min = st.to_min()?; let max = st.to_max()?; - let all_smaller = || Some(ChunkCompare::lt(input, &min).ok()?.all()); - let all_bigger = || Some(ChunkCompare::gt(input, &max).ok()?.all()); + let all_smaller = || Some(ChunkCompare::lt(input, min).ok()?.all()); + let all_bigger = || Some(ChunkCompare::gt(input, max).ok()?.all()); Some(!all_smaller()? && !all_bigger()?) }; diff --git a/crates/polars-lazy/src/physical_plan/expressions/binary.rs b/crates/polars-lazy/src/physical_plan/expressions/binary.rs index 5664d032ba3d..502562f9056e 100644 --- a/crates/polars-lazy/src/physical_plan/expressions/binary.rs +++ b/crates/polars-lazy/src/physical_plan/expressions/binary.rs @@ -247,8 +247,7 @@ impl PhysicalExpr for BinaryExpr { #[cfg(feature = "parquet")] mod stats { - use polars_io::parquet::predicates::BatchStats; - use polars_io::predicates::StatsEvaluator; + use polars_io::predicates::{BatchStats, StatsEvaluator}; use super::*; @@ -267,10 +266,28 @@ mod stats { true } + fn apply_operator_stats_neq(min_max: &Series, literal: &Series) -> bool { + if min_max.len() < 2 || min_max.null_count() > 0 { + return true; + } + use ChunkCompare as C; + + // First check proofs all values are the same (e.g. min/max is the same) + // Second check proofs all values are equal, so we can skip as we search + // for non-equal values. + if min_max.get(0).unwrap() == min_max.get(1).unwrap() + && C::equal(literal, min_max).map(|s| s.all()).unwrap_or(false) + { + return false; + } + true + } + fn apply_operator_stats_rhs_lit(min_max: &Series, literal: &Series, op: Operator) -> bool { use ChunkCompare as C; match op { Operator::Eq => apply_operator_stats_eq(min_max, literal), + Operator::NotEq => apply_operator_stats_neq(min_max, literal), // col > lit // e.g. // [min, max] > 0 @@ -306,6 +323,7 @@ mod stats { use ChunkCompare as C; match op { Operator::Eq => apply_operator_stats_eq(min_max, literal), + Operator::NotEq => apply_operator_stats_eq(min_max, literal), Operator::Gt => { // Literal is bigger than max value, selection needs all rows. C::gt(literal, min_max).map(|ca| ca.any()).unwrap_or(false) @@ -337,19 +355,21 @@ mod stats { use Expr::*; use Operator::*; if !self.expr.into_iter().all(|e| match e { - BinaryExpr { op, .. } => !matches!( - op, - Multiply | Divide | TrueDivide | FloorDivide | Modulus | NotEq - ), + BinaryExpr { op, .. } => { + !matches!(op, Multiply | Divide | TrueDivide | FloorDivide | Modulus) + }, Column(_) | Literal(_) | Alias(_, _) => true, _ => false, }) { return Ok(true); } - let schema = stats.schema(); - let fld_l = self.left.to_field(schema)?; - let fld_r = self.right.to_field(schema)?; + let Some(fld_l) = self.left.to_field(schema).ok() else { + return Ok(true); + }; + let Some(fld_r) = self.right.to_field(schema).ok() else { + return Ok(true); + }; #[cfg(debug_assertions)] { diff --git a/crates/polars-lazy/src/physical_plan/expressions/mod.rs b/crates/polars-lazy/src/physical_plan/expressions/mod.rs index 3c9b455d897a..998fa73ec8e5 100644 --- a/crates/polars-lazy/src/physical_plan/expressions/mod.rs +++ b/crates/polars-lazy/src/physical_plan/expressions/mod.rs @@ -623,7 +623,7 @@ impl PhysicalIoExpr for PhysicalIoHelper { } } -pub(super) fn phys_expr_to_io_expr(expr: Arc) -> Arc { +pub(crate) fn phys_expr_to_io_expr(expr: Arc) -> Arc { let has_window_function = if let Some(expr) = expr.as_expression() { expr.into_iter() .any(|expr| matches!(expr, Expr::Window { .. })) diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index b576ed117feb..6c1d27b0ee97 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -233,7 +233,7 @@ pub fn create_physical_plan( cloud_options, } => Ok(Box::new(executors::ParquetExec::new( path, - file_info.schema, + file_info, predicate, options, cloud_options, diff --git a/crates/polars-lazy/src/scan/mod.rs b/crates/polars-lazy/src/scan/mod.rs index b797acec06d9..ed8705a01b8a 100644 --- a/crates/polars-lazy/src/scan/mod.rs +++ b/crates/polars-lazy/src/scan/mod.rs @@ -1,11 +1,12 @@ -pub mod anonymous_scan; +pub(super) mod anonymous_scan; #[cfg(feature = "csv")] -pub mod csv; -pub(crate) mod file_list_reader; +pub(super) mod csv; +pub(super) mod file_list_reader; #[cfg(feature = "ipc")] -pub mod ipc; +pub(super) mod ipc; #[cfg(feature = "json")] -pub mod ndjson; +pub(super) mod ndjson; #[cfg(feature = "parquet")] -pub mod parquet; +pub(super) mod parquet; + use file_list_reader::*; diff --git a/crates/polars-lazy/src/scan/parquet.rs b/crates/polars-lazy/src/scan/parquet.rs index 0a1b7c1b51b0..028e8cd06b1a 100644 --- a/crates/polars-lazy/src/scan/parquet.rs +++ b/crates/polars-lazy/src/scan/parquet.rs @@ -17,6 +17,7 @@ pub struct ScanArgsParquet { pub low_memory: bool, pub cloud_options: Option, pub use_statistics: bool, + pub hive_partitioning: bool, } impl Default for ScanArgsParquet { @@ -30,6 +31,7 @@ impl Default for ScanArgsParquet { low_memory: false, cloud_options: None, use_statistics: true, + hive_partitioning: false, } } } @@ -60,6 +62,7 @@ impl LazyFileListReader for LazyParquetReader { self.args.low_memory, self.args.cloud_options, self.args.use_statistics, + self.args.hive_partitioning, )? .build() .into(); diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index dca282f18a62..d4708f41247c 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -8,6 +8,7 @@ use polars_io::parquet::{BatchedParquetReader, ParquetReader}; #[cfg(feature = "async")] use polars_io::prelude::ParquetAsyncReader; use polars_io::{is_cloud_url, SerReader}; +use polars_plan::logical_plan::FileInfo; use polars_plan::prelude::{FileScanOptions, ParquetOptions}; use polars_utils::IdxSize; @@ -23,7 +24,7 @@ pub struct ParquetSource { file_options: Option, #[allow(dead_code)] cloud_options: Option, - schema: Option, + file_info: FileInfo, verbose: bool, } @@ -35,7 +36,7 @@ impl ParquetSource { let path = self.path.take().unwrap(); let options = self.options.take().unwrap(); let file_options = self.file_options.take().unwrap(); - let schema = self.schema.take().unwrap(); + let schema = self.file_info.schema.clone(); let projection: Option> = file_options.with_columns.map(|with_columns| { with_columns .iter() @@ -65,6 +66,12 @@ impl ParquetSource { .with_row_count(file_options.row_count) .with_projection(projection) .use_statistics(options.use_statistics) + .with_hive_partition_columns( + self.file_info + .hive_parts + .as_ref() + .map(|hive| hive.materialize_partition_columns()), + ) .batched(chunk_size)? } } else { @@ -75,6 +82,12 @@ impl ParquetSource { .with_row_count(file_options.row_count) .with_projection(projection) .use_statistics(options.use_statistics) + .with_hive_partition_columns( + self.file_info + .hive_parts + .as_ref() + .map(|hive| hive.materialize_partition_columns()), + ) .batched(chunk_size)? }; self.batched_reader = Some(batched_reader); @@ -87,7 +100,7 @@ impl ParquetSource { options: ParquetOptions, cloud_options: Option, file_options: FileScanOptions, - schema: SchemaRef, + file_info: FileInfo, verbose: bool, ) -> PolarsResult { let n_threads = POOL.current_num_threads(); @@ -100,7 +113,7 @@ impl ParquetSource { file_options: Some(file_options), path: Some(path), cloud_options, - schema: Some(schema), + file_info, verbose, }) } diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index cf4bbdb5d435..7e0009fc2d60 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -106,7 +106,7 @@ where parquet_options, cloud_options, file_options, - file_info.schema, + file_info, verbose, )?; Ok(Box::new(src) as Box) diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index e1b9ea70b48f..6140a57bfd83 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -28,6 +28,7 @@ chrono-tz = { workspace = true, optional = true } ciborium = { workspace = true, optional = true } futures = { workspace = true, optional = true } once_cell = { workspace = true } +percent-encoding = { workspace = true } pyo3 = { workspace = true, optional = true } rayon = { workspace = true } regex = { workspace = true, optional = true } @@ -140,6 +141,7 @@ cutqcut = ["polars-ops/cutqcut"] rle = ["polars-ops/rle"] extract_groups = ["regex", "dtype-struct", "polars-ops/extract_groups"] ffi_plugin = ["libloading", "polars-ffi"] +hive_partitions = [] bigidx = ["polars-core/bigidx"] diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index cba20688c312..6a261f247b97 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -96,10 +96,7 @@ impl LogicalPlanBuilder { None => function.schema(infer_schema_length)?, }); - let file_info = FileInfo { - schema: schema.clone(), - row_estimation: (n_rows, n_rows.unwrap_or(usize::MAX)), - }; + let file_info = FileInfo::new(schema.clone(), (n_rows, n_rows.unwrap_or(usize::MAX))); Ok(LogicalPlan::AnonymousScan { function, file_info, @@ -129,17 +126,18 @@ impl LogicalPlanBuilder { low_memory: bool, cloud_options: Option, use_statistics: bool, + hive_partitioning: bool, ) -> PolarsResult { use polars_io::{is_cloud_url, SerReader as _}; let path = path.into(); let (mut schema, num_rows) = if is_cloud_url(&path) { - #[cfg(not(feature = "async"))] + #[cfg(not(feature = "cloud"))] panic!( "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." ); - #[cfg(feature = "async")] + #[cfg(feature = "cloud")] { let uri = path.to_string_lossy(); ParquetAsyncReader::file_info(&uri, cloud_options.as_ref())? @@ -154,10 +152,9 @@ impl LogicalPlanBuilder { let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); } - let file_info = FileInfo { - schema: Arc::new(schema), - row_estimation: (Some(num_rows), num_rows), - }; + let mut file_info = FileInfo::new(Arc::new(schema), (Some(num_rows), num_rows)); + + file_info.set_hive_partitions(path.as_path()); let options = FileScanOptions { with_columns: None, @@ -166,6 +163,7 @@ impl LogicalPlanBuilder { rechunk, row_count, file_counter: Default::default(), + hive_partitioning, }; Ok(LogicalPlan::Scan { path, @@ -206,10 +204,7 @@ impl LogicalPlanBuilder { let schema = Arc::new(schema); let num_rows = reader._num_rows()?; - let file_info = FileInfo { - schema, - row_estimation: (None, num_rows), - }; + let file_info = FileInfo::new(schema, (None, num_rows)); let file_options = FileScanOptions { with_columns: None, @@ -218,6 +213,8 @@ impl LogicalPlanBuilder { rechunk, row_count, file_counter: Default::default(), + // TODO! add + hive_partitioning: false, }; Ok(LogicalPlan::Scan { path, @@ -316,10 +313,7 @@ impl LogicalPlanBuilder { let estimated_n_rows = (rows_read as f64 / bytes_read as f64 * n_bytes as f64) as usize; skip_rows += skip_rows_after_header; - let file_info = FileInfo { - schema, - row_estimation: (None, estimated_n_rows), - }; + let file_info = FileInfo::new(schema, (None, estimated_n_rows)); let options = FileScanOptions { with_columns: None, @@ -328,6 +322,8 @@ impl LogicalPlanBuilder { rechunk, row_count, file_counter: Default::default(), + // TODO! add + hive_partitioning: false, }; Ok(LogicalPlan::Scan { path, diff --git a/crates/polars-plan/src/logical_plan/hive.rs b/crates/polars-plan/src/logical_plan/hive.rs new file mode 100644 index 000000000000..28e438f9d857 --- /dev/null +++ b/crates/polars-plan/src/logical_plan/hive.rs @@ -0,0 +1,97 @@ +use std::path::Path; + +use percent_encoding::percent_decode_str; +use polars_core::prelude::*; +use polars_io::predicates::{BatchStats, ColumnStats}; +use polars_io::utils::{BOOLEAN_RE, FLOAT_RE, INTEGER_RE}; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Debug)] +pub struct HivePartitions { + /// Single value Series that can be used to run the predicate against. + /// They are to be broadcasted if the predicates don't filter them out. + stats: BatchStats, +} + +#[cfg(target_os = "windows")] +fn separator(url: &Path) -> char { + if polars_io::is_cloud_url(url) { + '/' + } else { + '\\' + } +} + +#[cfg(not(target_os = "windows"))] +fn separator(_url: &Path) -> char { + '/' +} + +impl HivePartitions { + pub fn get_statistics(&self) -> &BatchStats { + &self.stats + } + + /// Parse a url and optionally return HivePartitions + pub(crate) fn parse_url(url: &Path) -> Option { + let sep = separator(url); + + let partitions = url + .display() + .to_string() + .split(sep) + .filter_map(|part| { + let mut it = part.split('='); + let name = it.next()?; + let value = it.next()?; + + // Having multiple '=' doesn't seem like valid hive partition, + // continue as url. + if it.next().is_some() { + return None; + } + + let s = if INTEGER_RE.is_match(value) { + let value = value.parse::().ok()?; + Series::new(name, &[value]) + } else if BOOLEAN_RE.is_match(value) { + let value = value.parse::().ok()?; + Series::new(name, &[value]) + } else if FLOAT_RE.is_match(value) { + let value = value.parse::().ok()?; + Series::new(name, &[value]) + } else if value == "__HIVE_DEFAULT_PARTITION__" { + Series::full_null(name, 1, &DataType::Null) + } else { + Series::new(name, &[percent_decode_str(value).decode_utf8().ok()?]) + }; + Some(s) + }) + .collect::>(); + + if partitions.is_empty() { + None + } else { + let schema: Schema = partitions.as_slice().into(); + let stats = BatchStats::new( + schema, + partitions + .into_iter() + .map(ColumnStats::from_column_literal) + .collect(), + ); + + Some(HivePartitions { stats }) + } + } + + pub fn materialize_partition_columns(&self) -> Vec { + self.get_statistics() + .column_stats() + .iter() + .map(|cs| cs.to_min().unwrap().clone()) + .collect() + } +} diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index 81814647d931..25e873a9a790 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -24,6 +24,7 @@ pub(crate) mod debug; mod file_scan; mod format; mod functions; +pub(super) mod hive; pub(crate) mod iterator; mod lit; pub(crate) mod optimizer; diff --git a/crates/polars-plan/src/logical_plan/optimizer/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/mod.rs index 2f462239445b..3f0dc062e994 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/mod.rs @@ -31,6 +31,7 @@ use drop_nulls::ReplaceDropNulls; use fast_projection::FastProjectionAndCollapse; #[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] use file_caching::{find_column_union_and_fingerprints, FileCacher}; +use polars_io::predicates::PhysicalIoExpr; pub use predicate_pushdown::PredicatePushDown; pub use projection_pushdown::ProjectionPushDown; pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule}; @@ -42,6 +43,7 @@ use self::flatten_union::FlattenUnionRule; pub use crate::frame::{AllowedOptimizations, OptState}; #[cfg(feature = "cse")] use crate::logical_plan::optimizer::cse_expr::CommonSubExprOptimizer; +use crate::logical_plan::optimizer::predicate_pushdown::HiveEval; #[cfg(feature = "cse")] use crate::logical_plan::visitor::*; use crate::prelude::optimizer::collect_members::MemberCollector; @@ -63,6 +65,7 @@ pub fn optimize( lp_arena: &mut Arena, expr_arena: &mut Arena, scratch: &mut Vec, + hive_partition_eval: HiveEval<'_>, ) -> PolarsResult { // get toggle values let predicate_pushdown = opt_state.predicate_pushdown; @@ -134,7 +137,7 @@ pub fn optimize( } if predicate_pushdown { - let predicate_pushdown_opt = PredicatePushDown::default(); + let predicate_pushdown_opt = PredicatePushDown::new(hive_partition_eval); let alp = lp_arena.take(lp_top); let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?; lp_arena.replace(lp_top, alp); diff --git a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index 044a09e56412..b2a7702fe0b4 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -3,6 +3,7 @@ mod keys; mod rename; mod utils; +use polars_core::config::verbose; use polars_core::datatypes::PlHashMap; use polars_core::prelude::*; use utils::*; @@ -14,10 +15,21 @@ use crate::prelude::optimizer::predicate_pushdown::join::process_join; use crate::prelude::optimizer::predicate_pushdown::rename::process_rename; use crate::utils::{check_input_node, has_aexpr}; -#[derive(Default)] -pub struct PredicatePushDown {} +pub type HiveEval<'a> = Option<&'a dyn Fn(Node, &Arena) -> Option>>; + +pub struct PredicatePushDown<'a> { + hive_partition_eval: HiveEval<'a>, + verbose: bool, +} + +impl<'a> PredicatePushDown<'a> { + pub fn new(hive_partition_eval: HiveEval<'a>) -> Self { + Self { + hive_partition_eval, + verbose: verbose(), + } + } -impl PredicatePushDown { fn optional_apply_predicate( &self, lp: ALogicalPlan, @@ -221,6 +233,28 @@ impl PredicatePushDown { let local_predicates = partition_by_full_context(&mut acc_predicates, expr_arena); let predicate = predicate_at_scan(acc_predicates, predicate, expr_arena); + if let (Some(hive_part_stats), Some(predicate)) = (file_info.hive_parts.as_deref(), predicate) { + if let Some(io_expr) = self.hive_partition_eval.unwrap()(predicate, expr_arena) { + if let Some(stats_evaluator) = io_expr.as_stats_evaluator() { + if !stats_evaluator.should_read(hive_part_stats.get_statistics())? { + if self.verbose { + eprintln!("hive partitioning: skipped: {}", path.display()) + } + let schema = output_schema.as_ref().unwrap_or(&file_info.schema); + let df = DataFrame::from(schema.as_ref()); + + return Ok(DataFrameScan { + df: Arc::new(df), + schema: schema.clone(), + output_schema: None, + projection: None, + selection: None + }) + } + } + } + } + let lp = match (predicate, &scan_type) { #[cfg(feature = "csv")] (Some(predicate), FileScan::Csv {..}) => { diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 83be36cdba39..512b1ffa6821 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -101,6 +101,7 @@ pub struct FileScanOptions { pub row_count: Option, pub rechunk: bool, pub file_counter: FileCount, + pub hive_partitioning: bool, } #[derive(Clone, Debug, Copy, Default, Eq, PartialEq)] diff --git a/crates/polars-plan/src/logical_plan/schema.rs b/crates/polars-plan/src/logical_plan/schema.rs index 41aefebd2c1a..79e77bc584fd 100644 --- a/crates/polars-plan/src/logical_plan/schema.rs +++ b/crates/polars-plan/src/logical_plan/schema.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use std::path::Path; use polars_core::prelude::*; use polars_utils::format_smartstring; @@ -48,6 +49,26 @@ pub struct FileInfo { // - known size // - estimated size pub row_estimation: (Option, usize), + pub hive_parts: Option>, +} + +impl FileInfo { + pub fn new(schema: SchemaRef, row_estimation: (Option, usize)) -> Self { + Self { + schema, + row_estimation, + hive_parts: None, + } + } + + pub fn set_hive_partitions(&mut self, url: &Path) { + self.hive_parts = hive::HivePartitions::parse_url(url).map(|hive_parts| { + let schema = Arc::make_mut(&mut self.schema); + schema.merge(hive_parts.get_statistics().schema().clone()); + + Arc::new(hive_parts) + }); + } } #[cfg(feature = "streaming")] diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 3529403034fa..60440d4ff4f5 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -1826,6 +1826,7 @@ dependencies = [ "libloading", "nano-arrow", "once_cell", + "percent-encoding", "polars-arrow", "polars-core", "polars-ffi", diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index e4558ba0fb2f..1cf3ff4506c6 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -178,6 +178,7 @@ def scan_parquet( storage_options: dict[str, Any] | None = None, low_memory: bool = False, use_statistics: bool = True, + hive_partitioning: bool = True, ) -> LazyFrame: """ Lazily read from a parquet file or multiple files via glob patterns. @@ -228,6 +229,9 @@ def scan_parquet( use_statistics Use statistics in the parquet to determine if pages can be skipped from reading. + hive_partitioning + Infer statistics and schema from hive partitioned URL and use them + to prune reads. Examples -------- @@ -258,4 +262,5 @@ def scan_parquet( storage_options=storage_options, low_memory=low_memory, use_statistics=use_statistics, + hive_partitioning=hive_partitioning, ) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index e4e2398cdbba..0bdf3665d064 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -395,6 +395,7 @@ def _scan_parquet( storage_options: dict[str, object] | None = None, low_memory: bool = False, use_statistics: bool = True, + hive_partitioning: bool = True, ) -> Self: """ Lazily read from a parquet file or multiple files via glob patterns. @@ -429,6 +430,7 @@ def _scan_parquet( low_memory, cloud_options=storage_options, use_statistics=use_statistics, + hive_partitioning=hive_partitioning, ) return self diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index 1626facab42f..39710fc18b44 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -235,7 +235,7 @@ impl PyLazyFrame { #[staticmethod] #[allow(clippy::too_many_arguments)] #[pyo3(signature = (path, n_rows, cache, parallel, rechunk, row_count, - low_memory, cloud_options, use_statistics) + low_memory, cloud_options, use_statistics, hive_partitioning) )] fn new_from_parquet( path: String, @@ -247,6 +247,7 @@ impl PyLazyFrame { low_memory: bool, cloud_options: Option>, use_statistics: bool, + hive_partitioning: bool, ) -> PyResult { let cloud_options = cloud_options .map(|kv| parse_cloud_options(&path, kv)) @@ -261,6 +262,7 @@ impl PyLazyFrame { low_memory, cloud_options, use_statistics, + hive_partitioning, }; let lf = LazyFrame::scan_parquet(path, args).map_err(PyPolarsErr::from)?; Ok(lf.into()) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py new file mode 100644 index 000000000000..68875c5fc3d8 --- /dev/null +++ b/py-polars/tests/unit/io/test_hive.py @@ -0,0 +1,49 @@ +import warnings +from pathlib import Path +from typing import Any + +import pyarrow.parquet as pq +import pytest + +import polars as pl +from polars.testing import assert_frame_equal + + +@pytest.mark.write_disk() +def test_hive_partitioned_predicate_pushdown( + io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any +) -> None: + monkeypatch.setenv("POLARS_VERBOSE", "1") + df = pl.read_ipc(io_files_path / "*.ipc") + + root = tmp_path / "partitioned_data" + + # Ignore the pyarrow legacy warning until we can write properly with new settings. + warnings.filterwarnings("ignore") + pq.write_to_dataset( + df.to_arrow(), + root_path=root, + partition_cols=["category", "fats_g"], + use_legacy_dataset=True, + ) + + q = pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True) + + # Partitioning changes the order + sort_by = ["fats_g", "category", "calories", "sugars_g"] + + # The hive partitioned columns are appended, + # so we must ensure we assert in the proper order. + df = df.select(["calories", "sugars_g", "category", "fats_g"]) + for pred in [ + pl.col("category") == "vegetables", + pl.col("category") != "vegetables", + pl.col("fats_g") > 0.5, + (pl.col("fats_g") == 0.5) & (pl.col("category") == "vegetables"), + ]: + assert_frame_equal( + q.filter(pred).sort(sort_by).collect(), + df.filter(pred).sort(sort_by), + ) + err = capfd.readouterr().err + assert "hive partitioning" in err diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index f3de8823b6a4..0ca782b8bc87 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -30,6 +30,7 @@ ] +@pytest.mark.write_disk() def test_write_parquet_using_pyarrow_9753(tmpdir: Path) -> None: df = pl.DataFrame({"a": [1, 2, 3]}) df.write_parquet(