Skip to content

Commit

Permalink
feat: support 'hive partitioning' aware readers (pola-rs#11284)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and Romano Vacca committed Oct 1, 2023
1 parent 939da96 commit cdcd737
Show file tree
Hide file tree
Showing 34 changed files with 557 additions and 204 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ streaming-iterator = "0.1.9"
itoa = "1.0.6"
ryu = "1.0.13"
lexical-core = "0.8.5"
percent-encoding = "2.3"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
polars-core = { version = "0.33.2", path = "crates/polars-core", default-features = false }
polars-arrow = { version = "0.33.2", path = "crates/polars-arrow", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ impl DataFrame {
/// # Ok::<(), PolarsError>(())
/// ```
pub fn schema(&self) -> Schema {
self.iter().map(|s| s.field().into_owned()).collect()
self.columns.as_slice().into()
}

/// Get a reference to the [`DataFrame`] columns.
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ impl Debug for Schema {
}
}

impl From<&[Series]> for Schema {
fn from(value: &[Series]) -> Self {
value.iter().map(|s| s.field().into_owned()).collect()
}
}

impl<F> FromIterator<F> for Schema
where
F: Into<Field>,
Expand Down
18 changes: 1 addition & 17 deletions crates/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,23 +557,7 @@ impl<'a> CoreReader<'a> {

// An empty file with a schema should return an empty DataFrame with that schema
if bytes.is_empty() {
// TODO! add DataFrame::new_from_schema
let buffers = init_buffers(
&projection,
0,
&self.schema,
&self.init_string_size_stats(&str_columns, 0),
self.quote_char,
self.encoding,
self.ignore_errors,
)?;
let df = DataFrame::new_no_checks(
buffers
.into_iter()
.map(|buf| buf.into_series())
.collect::<PolarsResult<_>>()?,
);
return Ok(df);
return Ok(DataFrame::from(self.schema.as_ref()));
}

// all the buffers returned from the threads
Expand Down
18 changes: 2 additions & 16 deletions crates/polars-io/src/csv/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,21 @@ use std::borrow::Cow;
use std::io::Read;
use std::mem::MaybeUninit;

use once_cell::sync::Lazy;
use polars_core::datatypes::PlHashSet;
use polars_core::prelude::*;
#[cfg(feature = "polars-time")]
use polars_time::chunkedarray::utf8::infer as date_infer;
#[cfg(feature = "polars-time")]
use polars_time::prelude::utf8::Pattern;
use regex::{Regex, RegexBuilder};

#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
use crate::csv::parser::next_line_position_naive;
use crate::csv::parser::{next_line_position, skip_bom, skip_line_ending, SplitLines};
use crate::csv::splitfields::SplitFields;
use crate::csv::CsvEncoding;
use crate::mmap::*;
use crate::mmap::ReaderBytes;
use crate::prelude::NullValues;
use crate::utils::{BOOLEAN_RE, FLOAT_RE, INTEGER_RE};

pub(crate) fn get_file_chunks(
bytes: &[u8],
Expand Down Expand Up @@ -58,19 +57,6 @@ pub(crate) fn get_file_chunks(
offsets
}

static FLOAT_RE: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"^\s*[-+]?((\d*\.\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+\.)$").unwrap()
});

static INTEGER_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^\s*-?(\d+)$").unwrap());

static BOOLEAN_RE: Lazy<Regex> = Lazy::new(|| {
RegexBuilder::new(r"^\s*(true)$|^(false)$")
.case_insensitive(true)
.build()
.unwrap()
});

/// Infer the data type of a record
fn infer_field_schema(string: &str, try_parse_dates: bool) -> DataType {
// when quoting is enabled in the reader, these quotes aren't escaped, we default to
Expand Down
120 changes: 10 additions & 110 deletions crates/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,118 +1,18 @@
use arrow::compute::concatenate::concatenate;
use arrow::io::parquet::read::statistics::{deserialize, Statistics};
use arrow::io::parquet::read::RowGroupMetaData;
use polars_core::prelude::*;

use crate::predicates::PhysicalIoExpr;
use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr};
use crate::ArrowResult;

/// The statistics for a column in a Parquet file
/// they typically hold
/// - max value
/// - min value
/// - null_count
#[cfg_attr(debug_assertions, derive(Debug))]
pub struct ColumnStats(Statistics, Field);

impl ColumnStats {
pub fn dtype(&self) -> DataType {
self.1.data_type().clone()
}

pub fn null_count(&self) -> Option<usize> {
match self.1.data_type() {
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => None,
_ => {
// the array holds the null count for every row group
// so we sum them to get them of the whole file.
let s = Series::try_from(("", self.0.null_count.clone())).unwrap();

// if all null, there are no statistics.
if s.null_count() != s.len() {
s.sum()
} else {
None
}
},
}
}

pub fn to_min_max(&self) -> Option<Series> {
let max_val = &*self.0.max_value;
let min_val = &*self.0.min_value;

let dtype = DataType::from(min_val.data_type());

if Self::use_min_max(dtype) {
let arr = concatenate(&[min_val, max_val]).unwrap();
let s = Series::try_from(("", arr)).unwrap();
if s.null_count() > 0 {
None
} else {
Some(s)
}
} else {
None
}
}

pub fn to_min(&self) -> Option<Series> {
let min_val = self.0.min_value.clone();
let dtype = DataType::from(min_val.data_type());

if !Self::use_min_max(dtype) || min_val.len() != 1 {
return None;
}

let s = Series::try_from(("", min_val)).unwrap();
if s.null_count() > 0 {
None
} else {
Some(s)
}
}

pub fn to_max(&self) -> Option<Series> {
let max_val = self.0.max_value.clone();
let dtype = DataType::from(max_val.data_type());

if !Self::use_min_max(dtype) || max_val.len() != 1 {
return None;
}

let s = Series::try_from(("", max_val)).unwrap();
if s.null_count() > 0 {
None
} else {
Some(s)
}
}

#[cfg(feature = "dtype-binary")]
fn use_min_max(dtype: DataType) -> bool {
dtype.is_numeric() || matches!(dtype, DataType::Utf8) || matches!(dtype, DataType::Binary)
}

#[cfg(not(feature = "dtype-binary"))]
fn use_min_max(dtype: DataType) -> bool {
dtype.is_numeric() || matches!(dtype, DataType::Utf8)
}
}

/// A collection of column stats with a known schema.
pub struct BatchStats {
schema: Schema,
stats: Vec<ColumnStats>,
}

impl BatchStats {
pub fn get_stats(&self, column: &str) -> polars_core::error::PolarsResult<&ColumnStats> {
self.schema.try_index_of(column).map(|i| &self.stats[i])
}

pub fn schema(&self) -> &Schema {
&self.schema
fn from_arrow_stats(stats: Statistics, field: &ArrowField) -> Self {
Self::new(
field.into(),
Some(Series::try_from(("", stats.null_count)).unwrap()),
Some(Series::try_from(("", stats.min_value)).unwrap()),
Some(Series::try_from(("", stats.max_value)).unwrap()),
)
}
}

Expand All @@ -133,13 +33,13 @@ pub(crate) fn collect_statistics(
Some(rg) => deserialize(fld, &md[rg..rg + 1])?,
};
schema.with_column((&fld.name).into(), (&fld.data_type).into());
stats.push(ColumnStats(st, fld.into()));
stats.push(ColumnStats::from_arrow_stats(st, fld));
}

Ok(if stats.is_empty() {
None
} else {
Some(BatchStats { schema, stats })
Some(BatchStats::new(schema, stats))
})
}

Expand Down
18 changes: 18 additions & 0 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct ParquetReader<R: Read + Seek> {
row_count: Option<RowCount>,
low_memory: bool,
metadata: Option<FileMetaData>,
hive_partition_columns: Option<Vec<Series>>,
use_statistics: bool,
}

Expand All @@ -78,6 +79,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self.parallel,
self.row_count,
self.use_statistics,
self.hive_partition_columns.as_deref(),
)
.map(|mut df| {
if rechunk {
Expand Down Expand Up @@ -145,6 +147,11 @@ impl<R: MmapBytesReader> ParquetReader<R> {
Ok(metadata.num_rows)
}

pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
self.hive_partition_columns = columns;
self
}

fn get_metadata(&mut self) -> PolarsResult<&FileMetaData> {
if self.metadata.is_none() {
self.metadata = Some(read::read_metadata(&mut self.reader)?);
Expand All @@ -166,6 +173,7 @@ impl<R: MmapBytesReader + 'static> ParquetReader<R> {
self.row_count,
chunk_size,
self.use_statistics,
self.hive_partition_columns,
)
}
}
Expand All @@ -184,6 +192,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
low_memory: false,
metadata: None,
use_statistics: true,
hive_partition_columns: None,
}
}

Expand All @@ -210,6 +219,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
self.parallel,
self.row_count,
self.use_statistics,
self.hive_partition_columns.as_deref(),
)
.map(|mut df| {
if self.rechunk {
Expand All @@ -230,6 +240,7 @@ pub struct ParquetAsyncReader {
projection: Option<Vec<usize>>,
row_count: Option<RowCount>,
use_statistics: bool,
hive_partition_columns: Option<Vec<Series>>,
}

#[cfg(feature = "cloud")]
Expand All @@ -245,6 +256,7 @@ impl ParquetAsyncReader {
projection: None,
row_count: None,
use_statistics: true,
hive_partition_columns: None,
})
}

Expand Down Expand Up @@ -294,6 +306,11 @@ impl ParquetAsyncReader {
self
}

pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
self.hive_partition_columns = columns;
self
}

#[tokio::main(flavor = "current_thread")]
pub async fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.reader.get_metadata().await?.to_owned();
Expand All @@ -311,6 +328,7 @@ impl ParquetAsyncReader {
self.row_count,
chunk_size,
self.use_statistics,
self.hive_partition_columns,
)
}

Expand Down
Loading

0 comments on commit cdcd737

Please sign in to comment.