From b51d194a04ace33aa383da332a2a8f873c93403b Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 22 Oct 2023 17:20:04 +0200 Subject: [PATCH] perf: support multiple files in a single scan parquet node. (#11922) --- .github/workflows/test-python.yml | 4 + crates/polars-core/src/config.rs | 7 +- crates/polars-error/src/lib.rs | 2 +- crates/polars-io/src/cloud/glob.rs | 14 +- crates/polars-io/src/cloud/mod.rs | 2 - .../polars-io/src/cloud/object_store_setup.rs | 15 +- crates/polars-io/src/cloud/options.rs | 35 ++- crates/polars-io/src/parquet/read.rs | 34 +-- crates/polars-io/src/parquet/read_impl.rs | 16 +- crates/polars-io/src/utils.rs | 24 ++ crates/polars-lazy/Cargo.toml | 3 +- .../src/physical_plan/executors/scan/ipc.rs | 5 +- .../src/physical_plan/executors/scan/mod.rs | 8 +- .../physical_plan/executors/scan/parquet.rs | 269 ++++++++++++++---- .../src/physical_plan/planner/lp.rs | 26 +- crates/polars-lazy/src/scan/csv.rs | 8 +- .../polars-lazy/src/scan/file_list_reader.rs | 28 +- crates/polars-lazy/src/scan/ipc.rs | 8 +- crates/polars-lazy/src/scan/ndjson.rs | 8 +- crates/polars-lazy/src/scan/parquet.rs | 42 +-- .../src/executors/sources/parquet.rs | 44 ++- crates/polars-pipe/src/pipeline/convert.rs | 3 +- .../polars-plan/src/logical_plan/builder.rs | 27 +- .../polars-plan/src/logical_plan/file_scan.rs | 10 + .../optimizer/predicate_pushdown/mod.rs | 30 +- .../polars-plan/src/logical_plan/options.rs | 4 +- crates/polars-plan/src/logical_plan/schema.rs | 30 +- py-polars/Cargo.lock | 1 + py-polars/src/lazyframe.rs | 8 +- py-polars/tests/unit/io/test_parquet.py | 4 +- 30 files changed, 491 insertions(+), 228 deletions(-) diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index 25c65475cc6f..77244bc411dc 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -72,6 +72,10 @@ jobs: if: github.ref_name != 'main' run: pytest --cov -n auto --dist loadgroup -m "not benchmark and not docs" + - name: Run tests async reader tests + if: github.ref_name != 'main' + run: POLARS_FORCE_ASYNC=1 pytest -m "not benchmark and not docs" tests/unit/io/ + - name: Run doctests if: github.ref_name != 'main' run: | diff --git a/crates/polars-core/src/config.rs b/crates/polars-core/src/config.rs index 72ddf90fa639..572db05a677e 100644 --- a/crates/polars-core/src/config.rs +++ b/crates/polars-core/src/config.rs @@ -22,12 +22,9 @@ pub(crate) const DECIMAL_ACTIVE: &str = "POLARS_ACTIVATE_DECIMAL"; #[cfg(feature = "dtype-decimal")] pub(crate) fn decimal_is_active() -> bool { - match std::env::var(DECIMAL_ACTIVE) { - Ok(val) => val == "1", - _ => false, - } + std::env::var(DECIMAL_ACTIVE).as_deref().unwrap_or("") == "1" } pub fn verbose() -> bool { - std::env::var("POLARS_VERBOSE").as_deref().unwrap_or("0") == "1" + std::env::var("POLARS_VERBOSE").as_deref().unwrap_or("") == "1" } diff --git a/crates/polars-error/src/lib.rs b/crates/polars-error/src/lib.rs index a6dd3760b825..be7a72debe32 100644 --- a/crates/polars-error/src/lib.rs +++ b/crates/polars-error/src/lib.rs @@ -18,7 +18,7 @@ where T: Into>, { fn from(msg: T) -> Self { - if env::var("POLARS_PANIC_ON_ERR").is_ok() { + if env::var("POLARS_PANIC_ON_ERR").as_deref().unwrap_or("") == "1" { panic!("{}", msg.into()) } else { ErrString(msg.into()) diff --git a/crates/polars-io/src/cloud/glob.rs b/crates/polars-io/src/cloud/glob.rs index 16d4cefa32f8..c24b31e6634b 100644 --- a/crates/polars-io/src/cloud/glob.rs +++ b/crates/polars-io/src/cloud/glob.rs @@ -86,18 +86,16 @@ pub struct CloudLocation { } impl CloudLocation { - /// Parse a CloudLocation from an url. - pub fn new(url: &str) -> PolarsResult { - let parsed = Url::parse(url).map_err(to_compute_err)?; + pub fn from_url(parsed: &Url) -> PolarsResult { let is_local = parsed.scheme() == "file"; let (bucket, key) = if is_local { - ("".into(), url[7..].into()) + ("".into(), parsed.path()) } else { let key = parsed.path(); let bucket = parsed .host() .ok_or_else( - || polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", url), + || polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", parsed), )? .to_string(); (bucket, key) @@ -117,6 +115,12 @@ impl CloudLocation { expansion, }) } + + /// Parse a CloudLocation from an url. + pub fn new(url: &str) -> PolarsResult { + let parsed = Url::parse(url).map_err(to_compute_err)?; + Self::from_url(&parsed) + } } /// Return a full url from a key relative to the given location. diff --git a/crates/polars-io/src/cloud/mod.rs b/crates/polars-io/src/cloud/mod.rs index 6118a4bb9a76..4c46260de21f 100644 --- a/crates/polars-io/src/cloud/mod.rs +++ b/crates/polars-io/src/cloud/mod.rs @@ -3,8 +3,6 @@ #[cfg(feature = "cloud")] use std::borrow::Cow; #[cfg(feature = "cloud")] -use std::str::FromStr; -#[cfg(feature = "cloud")] use std::sync::Arc; #[cfg(feature = "cloud")] diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index 5826c48f8bd1..6ed5c00e618a 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -1,5 +1,6 @@ use once_cell::sync::Lazy; pub use options::*; +use polars_error::to_compute_err; use tokio::sync::RwLock; use super::*; @@ -25,7 +26,8 @@ fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult { /// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store. pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> BuildResult { - let cloud_location = CloudLocation::new(url)?; + let parsed = parse_url(url).map_err(to_compute_err)?; + let cloud_location = CloudLocation::from_url(&parsed)?; let options = options.cloned(); let key = (url.to_string(), options); @@ -39,17 +41,14 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu } } - let cloud_type = CloudType::from_str(url)?; let options = key .1 .as_ref() .map(Cow::Borrowed) .unwrap_or_else(|| Cow::Owned(Default::default())); + + let cloud_type = CloudType::from_url(&parsed)?; let store = match cloud_type { - CloudType::File => { - let local = LocalFileSystem::new(); - Ok::<_, PolarsError>(Arc::new(local) as Arc) - }, CloudType::Aws => { #[cfg(feature = "aws")] { @@ -79,6 +78,10 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu #[cfg(not(feature = "azure"))] return err_missing_feature("azure", &cloud_location.scheme); }, + CloudType::File => { + let local = LocalFileSystem::new(); + Ok::<_, PolarsError>(Arc::new(local) as Arc) + }, }?; let mut cache = OBJECT_STORE_CACHE.write().await; *cache = Some((key, store.clone())); diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index 78aedaf8e90f..972f59c4ae4d 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -83,12 +83,9 @@ pub enum CloudType { Gcp, } -impl FromStr for CloudType { - type Err = PolarsError; - +impl CloudType { #[cfg(feature = "cloud")] - fn from_str(url: &str) -> Result { - let parsed = Url::parse(url).map_err(to_compute_err)?; + pub(crate) fn from_url(parsed: &Url) -> PolarsResult { Ok(match parsed.scheme() { "s3" | "s3a" => Self::Aws, "az" | "azure" | "adl" | "abfs" | "abfss" => Self::Azure, @@ -97,6 +94,34 @@ impl FromStr for CloudType { _ => polars_bail!(ComputeError: "unknown url scheme"), }) } +} + +#[cfg(feature = "cloud")] +pub(crate) fn parse_url(url: &str) -> std::result::Result { + match Url::parse(url) { + Err(err) => match err { + url::ParseError::RelativeUrlWithoutBase => { + let parsed = Url::parse(&format!( + "file://{}", + std::env::current_dir().unwrap().to_string_lossy() + )) + .unwrap(); + parsed.join(url) + }, + err => Err(err), + }, + parsed => parsed, + } +} + +impl FromStr for CloudType { + type Err = PolarsError; + + #[cfg(feature = "cloud")] + fn from_str(url: &str) -> Result { + let parsed = parse_url(url).map_err(to_compute_err)?; + Self::from_url(&parsed) + } #[cfg(not(feature = "cloud"))] fn from_str(_s: &str) -> Result { diff --git a/crates/polars-io/src/parquet/read.rs b/crates/polars-io/src/parquet/read.rs index 4df47cf9e658..c09bc65a39cc 100644 --- a/crates/polars-io/src/parquet/read.rs +++ b/crates/polars-io/src/parquet/read.rs @@ -4,8 +4,7 @@ use std::sync::Arc; use arrow::io::parquet::read; use arrow::io::parquet::write::FileMetaData; use polars_core::prelude::*; -#[cfg(feature = "cloud")] -use polars_core::utils::concat_df; +use polars_core::utils::accumulate_dataframes_vertical_unchecked; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -17,10 +16,8 @@ use crate::mmap::MmapBytesReader; use crate::parquet::async_impl::FetchRowGroupsFromObjectStore; #[cfg(feature = "cloud")] use crate::parquet::async_impl::ParquetObjectStore; -use crate::parquet::read_impl::read_parquet; pub use crate::parquet::read_impl::BatchedParquetReader; -#[cfg(feature = "cloud")] -use crate::predicates::apply_predicate; +use crate::parquet::read_impl::{materialize_hive_partitions, read_parquet}; use crate::predicates::PhysicalIoExpr; use crate::prelude::*; use crate::RowCount; @@ -334,12 +331,15 @@ impl ParquetAsyncReader { pub async fn batched(mut self, chunk_size: usize) -> PolarsResult { let metadata = self.reader.get_metadata().await?.clone(); - let schema = self.schema().await?; + let schema = match self.schema { + Some(schema) => schema, + None => self.schema().await?, + }; // row group fetched deals with projection let row_group_fetcher = FetchRowGroupsFromObjectStore::new( self.reader, &metadata, - self.schema.unwrap(), + schema.clone(), self.projection.as_deref(), self.predicate.clone(), )? @@ -364,25 +364,25 @@ impl ParquetAsyncReader { pub async fn finish(mut self) -> PolarsResult { let rechunk = self.rechunk; + let metadata = self.get_metadata().await?.clone(); let schema = self.schema().await?; + let hive_partition_columns = self.hive_partition_columns.clone(); - let predicate = self.predicate.clone(); // batched reader deals with slice pushdown let reader = self.batched(usize::MAX).await?; - let mut iter = reader.iter(16); + let n_batches = metadata.row_groups.len(); + let mut iter = reader.iter(n_batches); - let mut chunks = Vec::with_capacity(16); + let mut chunks = Vec::with_capacity(n_batches); while let Some(result) = iter.next_().await { - let out = result.and_then(|mut df| { - apply_predicate(&mut df, predicate.as_deref(), true)?; - Ok(df) - })?; - chunks.push(out) + chunks.push(result?) } if chunks.is_empty() { - return Ok(DataFrame::from(schema.as_ref())); + let mut df = DataFrame::from(schema.as_ref()); + materialize_hive_partitions(&mut df, hive_partition_columns.as_deref(), 0); + return Ok(df); } - let mut df = concat_df(&chunks)?; + let mut df = accumulate_dataframes_vertical_unchecked(chunks); if rechunk { df.as_single_chunk_par(); diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 51ec4669f25c..958a166ca4c1 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -99,7 +99,7 @@ pub(super) fn array_iter_to_series( /// We have a special num_rows arg, as df can be empty when a projection contains /// only hive partition columns. /// Safety: num_rows equals the height of the df when the df height is non-zero. -fn materialize_hive_partitions( +pub(crate) fn materialize_hive_partitions( df: &mut DataFrame, hive_partition_columns: Option<&[Series]>, num_rows: usize, @@ -342,6 +342,13 @@ pub fn read_parquet( use_statistics: bool, hive_partition_columns: Option<&[Series]>, ) -> PolarsResult { + // Fast path. + if limit == 0 && hive_partition_columns.is_none() { + let mut df = DataFrame::from(schema.as_ref()); + materialize_hive_partitions(&mut df, hive_partition_columns, 0); + return Ok(df); + } + let file_metadata = metadata .map(Ok) .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?; @@ -409,12 +416,7 @@ pub fn read_parquet( Cow::Borrowed(schema) }; let mut df = DataFrame::from(schema.as_ref().as_ref()); - if let Some(parts) = hive_partition_columns { - for s in parts { - // SAFETY: length is equal - unsafe { df.with_column_unchecked(s.clear()) }; - } - } + materialize_hive_partitions(&mut df, hive_partition_columns, 0); Ok(df) } else { accumulate_dataframes_vertical(dfs) diff --git a/crates/polars-io/src/utils.rs b/crates/polars-io/src/utils.rs index 2f14628a1fd6..292c7becf157 100644 --- a/crates/polars-io/src/utils.rs +++ b/crates/polars-io/src/utils.rs @@ -159,6 +159,30 @@ pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) { } } +/// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read +/// concurrently/parallel +/// +/// This takes an iterator over the number of rows per file. +pub fn get_sequential_row_statistics( + iter: I, + mut total_rows_to_read: usize, +) -> Vec<(usize, usize)> +where + I: Iterator, +{ + let mut cumulative_read = 0; + iter.map(|rows_this_file| { + let remaining_rows_to_read = total_rows_to_read; + total_rows_to_read = total_rows_to_read.saturating_sub(rows_this_file); + + let current_cumulative_read = cumulative_read; + cumulative_read += rows_this_file; + + (remaining_rows_to_read, current_cumulative_read) + }) + .collect() +} + #[cfg(feature = "json")] pub(crate) fn overwrite_schema( schema: &mut Schema, diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index 554e8eeda37d..ef220fd8a7af 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -10,6 +10,7 @@ description = "Lazy query engine for the Polars DataFrame library" [dependencies] arrow = { workspace = true } +futures = { workspace = true, optional = true } polars-core = { workspace = true, features = ["lazy", "zip_with", "random"] } polars-io = { workspace = true, features = ["lazy"] } polars-json = { workspace = true, optional = true } @@ -43,7 +44,7 @@ async = [ "polars-io/cloud", "polars-pipe?/async", ] -cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio"] +cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio", "futures"] cloud_write = ["cloud"] ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe?/ipc"] json = ["polars-io/json", "polars-plan/json", "polars-json"] diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 5256252d3a5d..9a0cc49c16a6 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -12,17 +12,16 @@ pub struct IpcExec { impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { - let (file, projection, n_rows, predicate) = prepare_scan_args( + let (file, projection, predicate) = prepare_scan_args( &self.path, &self.predicate, &mut self.file_options.with_columns, &mut self.schema, - self.file_options.n_rows, self.file_options.row_count.is_some(), None, ); IpcReader::new(file.unwrap()) - .with_n_rows(n_rows) + .with_n_rows(self.file_options.n_rows) .with_row_count(std::mem::take(&mut self.file_options.row_count)) .set_rechunk(self.file_options.rechunk) .with_projection(projection) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs b/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs index 2c6d8ac24d88..7b8aeb13698f 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs @@ -30,8 +30,6 @@ use crate::prelude::*; #[cfg(any(feature = "ipc", feature = "parquet"))] type Projection = Option>; #[cfg(any(feature = "ipc", feature = "parquet"))] -type StopNRows = Option; -#[cfg(any(feature = "ipc", feature = "parquet"))] type Predicate = Option>; #[cfg(any(feature = "ipc", feature = "parquet"))] @@ -40,10 +38,9 @@ fn prepare_scan_args( predicate: &Option>, with_columns: &mut Option>>, schema: &mut SchemaRef, - n_rows: Option, has_row_count: bool, hive_partitions: Option<&[Series]>, -) -> (Option, Projection, StopNRows, Predicate) { +) -> (Option, Projection, Predicate) { let file = std::fs::File::open(path).ok(); let with_columns = mem::take(with_columns); @@ -56,10 +53,9 @@ fn prepare_scan_args( has_row_count, ); - let n_rows = _set_n_rows_for_scan(n_rows); let predicate = predicate.clone().map(phys_expr_to_io_expr); - (file, projection, n_rows, predicate) + (file, projection, predicate) } /// Producer of an in memory DataFrame diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 9f99c8580870..a4a26b94eb67 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -1,13 +1,14 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use polars_core::utils::accumulate_dataframes_vertical; use polars_core::utils::arrow::io::parquet::read::FileMetaData; use polars_io::cloud::CloudOptions; -use polars_io::is_cloud_url; +use polars_io::{is_cloud_url, RowCount}; use super::*; pub struct ParquetExec { - path: PathBuf, + paths: Arc<[PathBuf]>, file_info: FileInfo, predicate: Option>, options: ParquetOptions, @@ -19,7 +20,7 @@ pub struct ParquetExec { impl ParquetExec { pub(crate) fn new( - path: PathBuf, + paths: Arc<[PathBuf]>, file_info: FileInfo, predicate: Option>, options: ParquetOptions, @@ -28,7 +29,7 @@ impl ParquetExec { metadata: Option>, ) -> Self { ParquetExec { - path, + paths, file_info, predicate, options, @@ -38,69 +39,225 @@ impl ParquetExec { } } - fn read(&mut self) -> PolarsResult { - let hive_partitions = self - .file_info - .hive_parts - .as_ref() - .map(|hive| hive.materialize_partition_columns()); - - let (file, projection, n_rows, predicate) = prepare_scan_args( - &self.path, - &self.predicate, - &mut self.file_options.with_columns, - &mut self.file_info.schema.clone(), - self.file_options.n_rows, - self.file_options.row_count.is_some(), - hive_partitions.as_deref(), - ); - - if let Some(file) = file { - ParquetReader::new(file) - .with_schema(Some(self.file_info.reader_schema.clone())) - .with_n_rows(n_rows) - .read_parallel(self.options.parallel) - .with_row_count(mem::take(&mut self.file_options.row_count)) - .set_rechunk(self.file_options.rechunk) - .set_low_memory(self.options.low_memory) - .use_statistics(self.options.use_statistics) - .with_hive_partition_columns(hive_partitions) - ._finish_with_scan_ops(predicate, projection.as_ref().map(|v| v.as_ref())) - } else if is_cloud_url(self.path.as_path()) { - #[cfg(feature = "cloud")] - { - polars_io::pl_async::get_runtime().block_on_potential_spawn(async { - let reader = ParquetAsyncReader::from_uri( - &self.path.to_string_lossy(), - self.cloud_options.as_ref(), - Some(self.file_info.reader_schema.clone()), - self.metadata.clone(), + fn read_par(&mut self) -> PolarsResult> { + let parallel = match self.options.parallel { + ParallelStrategy::Auto if self.paths.len() > POOL.current_num_threads() => { + ParallelStrategy::RowGroups + }, + identity => identity, + }; + + // First initialize the readers, predicates and metadata. + // This will be used to determine the slices. That way we can actually read all the + // files in parallel even when we add row counts or slices. + let readers_and_metadata = self + .paths + .iter() + .map(|path| { + let mut file_info = self.file_info.clone(); + file_info.update_hive_partitions(path); + + let hive_partitions = file_info + .hive_parts + .as_ref() + .map(|hive| hive.materialize_partition_columns()); + + let (file, projection, predicate) = prepare_scan_args( + path, + &self.predicate, + &mut self.file_options.with_columns.clone(), + &mut self.file_info.schema.clone(), + self.file_options.row_count.is_some(), + hive_partitions.as_deref(), + ); + + let mut reader = if let Some(file) = file { + PolarsResult::Ok( + ParquetReader::new(file) + .with_schema(Some(self.file_info.reader_schema.clone())) + .read_parallel(parallel) + .set_low_memory(self.options.low_memory) + .use_statistics(self.options.use_statistics) + .set_rechunk(false) + .with_hive_partition_columns(hive_partitions), ) - .await? - .with_n_rows(n_rows) - .with_row_count(mem::take(&mut self.file_options.row_count)) - .with_projection(projection) - .use_statistics(self.options.use_statistics) - .with_predicate(predicate) - .with_hive_partition_columns(hive_partitions); - - reader.finish().await - }) - } + } else { + polars_bail!(ComputeError: "could not read {}", path.display()) + }?; + + reader + .num_rows() + .map(|num_rows| (reader, num_rows, predicate, projection)) + }) + .collect::>>()?; + + let n_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX); + let iter = readers_and_metadata + .iter() + .map(|(_, num_rows, _, _)| *num_rows); + + let rows_statistics = get_sequential_row_statistics(iter, n_rows_to_read); + let base_row_count = &self.file_options.row_count; + + POOL.install(|| { + readers_and_metadata + .into_par_iter() + .zip(rows_statistics.par_iter()) + .map( + |( + (reader, num_rows_this_file, predicate, projection), + (remaining_rows_to_read, cumulative_read), + )| { + let remaining_rows_to_read = *remaining_rows_to_read; + let remaining_rows_to_read = if num_rows_this_file < remaining_rows_to_read + { + None + } else { + Some(remaining_rows_to_read) + }; + let row_count = base_row_count.as_ref().map(|rc| RowCount { + name: rc.name.clone(), + offset: rc.offset + *cumulative_read as IdxSize, + }); + + reader + .with_n_rows(remaining_rows_to_read) + .with_row_count(row_count) + ._finish_with_scan_ops( + predicate.clone(), + projection.as_ref().map(|v| v.as_ref()), + ) + }, + ) + .collect() + }) + } + + #[cfg(feature = "cloud")] + async fn read_async(&mut self) -> PolarsResult> { + let first_schema = &self.file_info.reader_schema; + let first_metadata = &self.metadata; + let cloud_options = self.cloud_options.as_ref(); + // First initialize the readers and get the metadata concurrently. + let iter = self.paths.iter().enumerate().map(|(i, path)| async move { + // use the cached one as this saves a cloud call + let metadata = if i == 0 { first_metadata.clone() } else { None }; + let mut reader = ParquetAsyncReader::from_uri( + &path.to_string_lossy(), + cloud_options, + // Schema must be the same for all files. The hive partitions are included in this schema. + Some(first_schema.clone()), + metadata, + ) + .await?; + let num_rows = reader.num_rows().await?; + PolarsResult::Ok((num_rows, reader)) + }); + let readers_and_metadata = futures::future::try_join_all(iter).await?; + + // Then compute `n_rows` to be taken per file up front, so we can actually read concurrently + // after this. + let n_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX); + let iter = readers_and_metadata + .iter() + .map(|(num_rows, _)| num_rows) + .copied(); + + let rows_statistics = get_sequential_row_statistics(iter, n_rows_to_read); + + // Now read the actual data. + let base_row_count = &self.file_options.row_count; + let file_info = &self.file_info; + let file_options = &self.file_options; + let paths = &self.paths; + let use_statistics = self.options.use_statistics; + let predicate = &self.predicate; + + let iter = readers_and_metadata + .into_iter() + .zip(rows_statistics.iter()) + .zip(paths.as_ref().iter()) + .map( + |( + ((num_rows_this_file, reader), (remaining_rows_to_read, cumulative_read)), + path, + )| async move { + let mut file_info = file_info.clone(); + let remaining_rows_to_read = *remaining_rows_to_read; + let remaining_rows_to_read = if num_rows_this_file < remaining_rows_to_read { + None + } else { + Some(remaining_rows_to_read) + }; + let row_count = base_row_count.as_ref().map(|rc| RowCount { + name: rc.name.clone(), + offset: rc.offset + *cumulative_read as IdxSize, + }); + + file_info.update_hive_partitions(path); + + let hive_partitions = file_info + .hive_parts + .as_ref() + .map(|hive| hive.materialize_partition_columns()); + + let (_, projection, predicate) = prepare_scan_args( + Path::new(""), + predicate, + &mut file_options.with_columns.clone(), + &mut file_info.schema.clone(), + row_count.is_some(), + hive_partitions.as_deref(), + ); + + reader + .with_n_rows(remaining_rows_to_read) + .with_row_count(row_count) + .with_projection(projection) + .use_statistics(use_statistics) + .with_predicate(predicate) + .set_rechunk(false) + .with_hive_partition_columns(hive_partitions) + .finish() + .await + .map(Some) + }, + ); + + let dfs = futures::future::try_join_all(iter).await?; + Ok(dfs.into_iter().flatten().collect()) + } + + fn read(&mut self) -> PolarsResult { + let is_cloud = is_cloud_url(self.paths[0].as_path()); + let force_async = std::env::var("POLARS_FORCE_ASYNC").as_deref().unwrap_or("") == "1"; + + let out = if is_cloud || force_async { #[cfg(not(feature = "cloud"))] { panic!("activate cloud feature") } + + #[cfg(feature = "cloud")] + { + polars_io::pl_async::get_runtime().block_on_potential_spawn(self.read_async())? + } } else { - polars_bail!(ComputeError: "could not read {}", self.path.display()) + self.read_par()? + }; + + let mut out = accumulate_dataframes_vertical(out)?; + if self.file_options.rechunk { + out.as_single_chunk_par(); } + Ok(out) } } impl Executor for ParquetExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let finger_print = FileFingerPrint { - paths: Arc::new([self.path.clone()]), + paths: self.paths.clone(), predicate: self .predicate .as_ref() @@ -109,7 +266,7 @@ impl Executor for ParquetExec { }; let profile_name = if state.has_node_timer() { - let mut ids = vec![self.path.to_string_lossy().into()]; + let mut ids = vec![self.paths[0].to_string_lossy().into()]; if self.predicate.is_some() { ids.push("predicate".into()) } diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 1e2683d4bdd4..f0128817a70d 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -1,5 +1,6 @@ use polars_core::prelude::*; use polars_core::POOL; +use polars_plan::global::_set_n_rows_for_scan; use super::super::executors::{self, Executor}; use super::*; @@ -198,8 +199,9 @@ pub fn create_physical_plan( output_schema, scan_type, predicate, - file_options, + mut file_options, } => { + file_options.n_rows = _set_n_rows_for_scan(file_options.n_rows); let mut state = ExpressionConversionState::default(); let predicate = predicate .map(|pred| { @@ -245,19 +247,15 @@ pub fn create_physical_plan( options, cloud_options, metadata, - } => { - assert_eq!(paths.len(), 1); - let path = paths[0].clone(); - Ok(Box::new(executors::ParquetExec::new( - path, - file_info, - predicate, - options, - cloud_options, - file_options, - metadata, - ))) - }, + } => Ok(Box::new(executors::ParquetExec::new( + paths, + file_info, + predicate, + options, + cloud_options, + file_options, + metadata, + ))), FileScan::Anonymous { function, .. } => { Ok(Box::new(executors::AnonymousScanExec { function, diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 639b508dd7b2..2543dcbea92b 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -13,7 +13,7 @@ use crate::prelude::*; #[cfg(feature = "csv")] pub struct LazyCsvReader<'a> { path: PathBuf, - paths: Vec, + paths: Arc<[PathBuf]>, separator: u8, has_header: bool, ignore_errors: bool, @@ -40,14 +40,14 @@ pub struct LazyCsvReader<'a> { #[cfg(feature = "csv")] impl<'a> LazyCsvReader<'a> { - pub fn new_paths(paths: Vec) -> Self { + pub fn new_paths(paths: Arc<[PathBuf]>) -> Self { Self::new("").with_paths(paths) } pub fn new(path: impl AsRef) -> Self { LazyCsvReader { path: path.as_ref().to_owned(), - paths: vec![], + paths: Arc::new([]), separator: b',', has_header: true, ignore_errors: false, @@ -317,7 +317,7 @@ impl LazyFileListReader for LazyCsvReader<'_> { self } - fn with_paths(mut self, paths: Vec) -> Self { + fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self { self.paths = paths; self } diff --git a/crates/polars-lazy/src/scan/file_list_reader.rs b/crates/polars-lazy/src/scan/file_list_reader.rs index 1ba125312508..18a1a62a29d3 100644 --- a/crates/polars-lazy/src/scan/file_list_reader.rs +++ b/crates/polars-lazy/src/scan/file_list_reader.rs @@ -34,14 +34,12 @@ fn polars_glob(pattern: &str, cloud_options: Option<&CloudOptions>) -> PolarsRes /// Use [LazyFileListReader::finish] to get the final [LazyFrame]. pub trait LazyFileListReader: Clone { /// Get the final [LazyFrame]. - fn finish(mut self) -> PolarsResult { + fn finish(self) -> PolarsResult { if let Some(paths) = self.iter_paths()? { let lfs = paths - .enumerate() - .map(|(i, r)| { + .map(|r| { let path = r?; - let lf = self - .clone() + self.clone() .with_path(path.clone()) .with_rechunk(false) .finish_no_glob() @@ -49,15 +47,7 @@ pub trait LazyFileListReader: Clone { polars_err!( ComputeError: "error while reading {}: {}", path.display(), e ) - }); - - if i == 0 { - let lf = lf?; - self.set_known_schema(lf.schema()?); - Ok(lf) - } else { - lf - } + }) }) .collect::>>()?; @@ -108,7 +98,7 @@ pub trait LazyFileListReader: Clone { /// Set paths of the scanned files. /// Doesn't glob patterns. #[must_use] - fn with_paths(self, paths: Vec) -> Self; + fn with_paths(self, paths: Arc<[PathBuf]>) -> Self; /// Rechunk the memory to contiguous chunks when parsing is done. fn rechunk(&self) -> bool; @@ -129,14 +119,6 @@ pub trait LazyFileListReader: Clone { None } - /// Set a schema on first glob pattern, so that others don't have to fetch metadata - /// from cloud - fn known_schema(&self) -> Option { - None - } - - fn set_known_schema(&mut self, _known_schema: SchemaRef) {} - /// Get list of files referenced by this reader. /// /// Returns [None] if path is not a glob pattern. diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index e369b990f4df..db75efada68a 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -30,7 +30,7 @@ impl Default for ScanArgsIpc { struct LazyIpcReader { args: ScanArgsIpc, path: PathBuf, - paths: Vec, + paths: Arc<[PathBuf]>, } impl LazyIpcReader { @@ -38,7 +38,7 @@ impl LazyIpcReader { Self { args, path, - paths: vec![], + paths: Arc::new([]), } } } @@ -84,7 +84,7 @@ impl LazyFileListReader for LazyIpcReader { self } - fn with_paths(mut self, paths: Vec) -> Self { + fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self { self.paths = paths; self } @@ -113,7 +113,7 @@ impl LazyFrame { LazyIpcReader::new(path.as_ref().to_owned(), args).finish() } - pub fn scan_ipc_files(paths: Vec, args: ScanArgsIpc) -> PolarsResult { + pub fn scan_ipc_files(paths: Arc<[PathBuf]>, args: ScanArgsIpc) -> PolarsResult { LazyIpcReader::new(PathBuf::new(), args) .with_paths(paths) .finish() diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index bd21e00ddf6a..c008e54c7088 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -9,7 +9,7 @@ use crate::prelude::{LazyFrame, ScanArgsAnonymous}; #[derive(Clone)] pub struct LazyJsonLineReader { pub(crate) path: PathBuf, - paths: Vec, + paths: Arc<[PathBuf]>, pub(crate) batch_size: Option, pub(crate) low_memory: bool, pub(crate) rechunk: bool, @@ -20,14 +20,14 @@ pub struct LazyJsonLineReader { } impl LazyJsonLineReader { - pub fn new_paths(paths: Vec) -> Self { + pub fn new_paths(paths: Arc<[PathBuf]>) -> Self { Self::new(PathBuf::new()).with_paths(paths) } pub fn new(path: impl AsRef) -> Self { LazyJsonLineReader { path: path.as_ref().to_path_buf(), - paths: vec![], + paths: Arc::new([]), batch_size: None, low_memory: false, rechunk: true, @@ -107,7 +107,7 @@ impl LazyFileListReader for LazyJsonLineReader { self } - fn with_paths(mut self, paths: Vec) -> Self { + fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self { self.paths = paths; self } diff --git a/crates/polars-lazy/src/scan/parquet.rs b/crates/polars-lazy/src/scan/parquet.rs index d4f42c014163..253d83928194 100644 --- a/crates/polars-lazy/src/scan/parquet.rs +++ b/crates/polars-lazy/src/scan/parquet.rs @@ -40,8 +40,7 @@ impl Default for ScanArgsParquet { struct LazyParquetReader { args: ScanArgsParquet, path: PathBuf, - paths: Vec, - known_schema: Option, + paths: Arc<[PathBuf]>, } impl LazyParquetReader { @@ -49,19 +48,33 @@ impl LazyParquetReader { Self { args, path, - paths: vec![], - known_schema: None, + paths: Arc::new([]), } } } impl LazyFileListReader for LazyParquetReader { - fn finish_no_glob(mut self) -> PolarsResult { - let known_schema = self.known_schema(); + /// Get the final [LazyFrame]. + fn finish(mut self) -> PolarsResult { + if let Some(paths) = self.iter_paths()? { + let paths = paths + .into_iter() + .collect::>>()?; + self.paths = paths; + } + self.finish_no_glob() + } + + fn finish_no_glob(self) -> PolarsResult { let row_count = self.args.row_count; - let path = self.path; + + let paths = if self.paths.is_empty() { + Arc::new([self.path]) as Arc<[PathBuf]> + } else { + self.paths + }; let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet( - path, + paths, self.args.n_rows, self.args.cache, self.args.parallel, @@ -71,7 +84,6 @@ impl LazyFileListReader for LazyParquetReader { self.args.cloud_options, self.args.use_statistics, self.args.hive_partitioning, - known_schema, )? .build() .into(); @@ -80,7 +92,6 @@ impl LazyFileListReader for LazyParquetReader { if let Some(row_count) = row_count { lf = lf.with_row_count(&row_count.name, Some(row_count.offset)) } - self.known_schema = Some(lf.schema()?); lf.opt_state.file_caching = true; Ok(lf) @@ -99,7 +110,7 @@ impl LazyFileListReader for LazyParquetReader { self } - fn with_paths(mut self, paths: Vec) -> Self { + fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self { self.paths = paths; self } @@ -121,13 +132,6 @@ impl LazyFileListReader for LazyParquetReader { self.args.n_rows } - fn known_schema(&self) -> Option { - self.known_schema.clone() - } - fn set_known_schema(&mut self, known_schema: SchemaRef) { - self.known_schema = Some(known_schema); - } - fn row_count(&self) -> Option<&RowCount> { self.args.row_count.as_ref() } @@ -140,7 +144,7 @@ impl LazyFrame { } /// Create a LazyFrame directly from a parquet scan. - pub fn scan_parquet_files(paths: Vec, args: ScanArgsParquet) -> PolarsResult { + pub fn scan_parquet_files(paths: Arc<[PathBuf]>, args: ScanArgsParquet) -> PolarsResult { LazyParquetReader::new(PathBuf::new(), args) .with_paths(paths) .finish() diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index c487d34e8d89..4af1f86f4334 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -23,9 +23,10 @@ pub struct ParquetSource { batched_reader: Option, n_threads: usize, chunk_index: IdxSize, - path: Option, - options: Option, - file_options: Option, + paths: std::slice::Iter<'static, PathBuf>, + _paths_lifetime: Arc<[PathBuf]>, + options: ParquetOptions, + file_options: FileScanOptions, #[allow(dead_code)] cloud_options: Option, metadata: Option>, @@ -38,9 +39,11 @@ impl ParquetSource { // otherwise all files would be opened during construction of the pipeline // leading to Too many Open files error fn init_reader(&mut self) -> PolarsResult<()> { - let path = self.path.take().unwrap(); - let options = self.options.take().unwrap(); - let file_options = self.file_options.take().unwrap(); + let Some(path) = self.paths.next() else { + return Ok(()); + }; + let options = self.options; + let file_options = self.file_options.clone(); let schema = self.file_info.schema.clone(); let hive_partitions = self @@ -66,7 +69,7 @@ impl ParquetSource { eprintln!("STREAMING CHUNK SIZE: {chunk_size} rows") } - let batched_reader = if is_cloud_url(&path) { + let batched_reader = if is_cloud_url(path) { #[cfg(not(feature = "async"))] { panic!( @@ -121,7 +124,7 @@ impl ParquetSource { #[allow(unused_variables)] pub(crate) fn new( - path: PathBuf, + paths: Arc<[PathBuf]>, options: ParquetOptions, cloud_options: Option, metadata: Option>, @@ -131,13 +134,21 @@ impl ParquetSource { ) -> PolarsResult { let n_threads = POOL.current_num_threads(); + // extend lifetime as it will be bound to parquet source + let iter = unsafe { + std::mem::transmute::, std::slice::Iter<'static, PathBuf>>( + paths.iter(), + ) + }; + Ok(ParquetSource { batched_reader: None, n_threads, chunk_index: 0, - options: Some(options), - file_options: Some(file_options), - path: Some(path), + options, + file_options, + paths: iter, + _paths_lifetime: paths, cloud_options, metadata, file_info, @@ -150,6 +161,11 @@ impl Source for ParquetSource { fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { if self.batched_reader.is_none() { self.init_reader()?; + + // If there was no new reader, we depleted all of them and are finished. + if self.batched_reader.is_none() { + return Ok(SourceResult::Finished); + } } let batches = get_runtime().block_on( self.batched_reader @@ -158,7 +174,11 @@ impl Source for ParquetSource { .next_batches(self.n_threads), )?; Ok(match batches { - None => SourceResult::Finished, + None => { + // reset the reader + self.batched_reader = None; + return self.get_batches(_context); + }, Some(batches) => SourceResult::GotMoreData( batches .into_iter() diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 04fb4c287e62..f2b51cbd2e16 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -103,9 +103,8 @@ where cloud_options, metadata, } => { - assert_eq!(paths.len(), 1); let src = sources::ParquetSource::new( - paths[0].clone(), + paths, parquet_options, cloud_options, metadata, diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index b736c6cc8a98..20fd2dc7e6e2 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -136,8 +136,8 @@ impl LogicalPlanBuilder { #[cfg(any(feature = "parquet", feature = "parquet_async"))] #[allow(clippy::too_many_arguments)] - pub fn scan_parquet>( - path: P, + pub fn scan_parquet>>( + paths: P, n_rows: Option, cache: bool, parallel: polars_io::parquet::ParallelStrategy, @@ -147,22 +147,23 @@ impl LogicalPlanBuilder { cloud_options: Option, use_statistics: bool, hive_partitioning: bool, - // used to prevent multiple cloud calls - known_schema: Option, ) -> PolarsResult { use polars_io::{is_cloud_url, SerReader as _}; - let path = path.into(); - let (schema, num_rows, metadata) = if is_cloud_url(&path) { + let paths = paths.into(); + polars_ensure!(paths.len() >= 1, ComputeError: "expected at least 1 path"); + + // Use first path to get schema. + let path = &paths[0]; + + let (schema, num_rows, metadata) = if is_cloud_url(path) { #[cfg(not(feature = "cloud"))] panic!( "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." ); #[cfg(feature = "cloud")] - if let Some(known_schema) = known_schema { - (known_schema, None, None) - } else { + { let uri = path.to_string_lossy(); get_runtime().block_on(async { let mut reader = @@ -176,7 +177,7 @@ impl LogicalPlanBuilder { })? } } else { - let file = polars_utils::open_file(&path)?; + let file = polars_utils::open_file(path)?; let mut reader = ParquetReader::new(file); ( prepare_schema(reader.schema()?, row_count.as_ref()), @@ -187,8 +188,10 @@ impl LogicalPlanBuilder { let mut file_info = FileInfo::new(schema, (num_rows, num_rows.unwrap_or(0))); + // We set the hive partitions of the first path to determine the schema. + // On iteration the partition values will be re-set per file. if hive_partitioning { - file_info.set_hive_partitions(path.as_path()); + file_info.init_hive_partitions(path.as_path()); } let options = FileScanOptions { @@ -201,7 +204,7 @@ impl LogicalPlanBuilder { hive_partitioning, }; Ok(LogicalPlan::Scan { - paths: Arc::new([path]), + paths, file_info, file_options: options, predicate: None, diff --git a/crates/polars-plan/src/logical_plan/file_scan.rs b/crates/polars-plan/src/logical_plan/file_scan.rs index 15b0ba13c0bc..0a3c66831927 100644 --- a/crates/polars-plan/src/logical_plan/file_scan.rs +++ b/crates/polars-plan/src/logical_plan/file_scan.rs @@ -50,6 +50,16 @@ impl PartialEq for FileScan { } impl FileScan { + pub(crate) fn remove_metadata(&mut self) { + match self { + #[cfg(feature = "parquet")] + Self::Parquet { metadata, .. } => { + *metadata = None; + }, + _ => {}, + } + } + pub(crate) fn skip_rows(&self) -> usize { #[allow(unreachable_patterns)] match self { diff --git a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index 615d3f6dcc8a..2ffe12623908 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -225,24 +225,38 @@ impl<'a> PredicatePushDown<'a> { Ok(lp) } Scan { - paths, - file_info, + mut paths, + mut file_info, predicate, - scan_type, + mut scan_type, file_options: options, output_schema } => { let local_predicates = partition_by_full_context(&mut acc_predicates, expr_arena); let predicate = predicate_at_scan(acc_predicates, predicate, expr_arena); - // TODO! this still assumes a single file. Fix hive partitioning for multiple files - if let (Some(hive_part_stats), Some(predicate)) = (file_info.hive_parts.as_deref(), predicate) { + if let (true, Some(predicate)) = (file_info.hive_parts.is_some(), predicate) { if let Some(io_expr) = self.hive_partition_eval.unwrap()(predicate, expr_arena) { if let Some(stats_evaluator) = io_expr.as_stats_evaluator() { - if !stats_evaluator.should_read(hive_part_stats.get_statistics())? { + let mut new_paths = Vec::with_capacity(paths.len()); + + + for path in paths.as_ref().iter() { + file_info.update_hive_partitions(path); + let hive_part_stats = file_info.hive_parts.as_deref().ok_or_else(|| polars_err!(ComputeError: "cannot combine hive partitioned directories with non-hive partitioned ones"))?; + + if stats_evaluator.should_read(hive_part_stats.get_statistics())? { + new_paths.push(path.clone()); + } + } + + if paths.len() != new_paths.len() { if self.verbose { - eprintln!("hive partitioning: skipped: {}", paths[0].display()) + eprintln!("hive partitioning: skipped {} files, first file : {}", paths.len() - new_paths.len(), paths[0].display()) } + scan_type.remove_metadata(); + } + if paths.is_empty() { let schema = output_schema.as_ref().unwrap_or(&file_info.schema); let df = DataFrame::from(schema.as_ref()); @@ -253,6 +267,8 @@ impl<'a> PredicatePushDown<'a> { projection: None, selection: None }) + } else { + paths = Arc::from(new_paths) } } } diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 795107b49cf0..6dc3edbcd16b 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -40,7 +40,7 @@ pub struct CsvParserOptions { } #[cfg(feature = "parquet")] -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Copy)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ParquetOptions { pub parallel: polars_io::parquet::ParallelStrategy, @@ -90,7 +90,7 @@ pub struct IpcScanOptions { pub memmap: bool, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Default)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] /// Generic options for all file types pub struct FileScanOptions { diff --git a/crates/polars-plan/src/logical_plan/schema.rs b/crates/polars-plan/src/logical_plan/schema.rs index 223ec2df7fd6..ada1bd69c3d7 100644 --- a/crates/polars-plan/src/logical_plan/schema.rs +++ b/crates/polars-plan/src/logical_plan/schema.rs @@ -41,15 +41,15 @@ impl LogicalPlan { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct FileInfo { pub schema: SchemaRef, - // Stores the schema used for the reader, as the main schema can contain - // extra hive columns. + /// Stores the schema used for the reader, as the main schema can contain + /// extra hive columns. pub reader_schema: SchemaRef, - // - known size - // - estimated size + /// - known size + /// - estimated size pub row_estimation: (Option, usize), pub hive_parts: Option>, } @@ -64,7 +64,8 @@ impl FileInfo { } } - pub fn set_hive_partitions(&mut self, url: &Path) { + /// Updates the statistics and merges the hive partitions schema with the file one. + pub fn init_hive_partitions(&mut self, url: &Path) { self.hive_parts = hive::HivePartitions::parse_url(url).map(|hive_parts| { let schema = Arc::make_mut(&mut self.schema); schema.merge(hive_parts.get_statistics().schema().clone()); @@ -72,6 +73,23 @@ impl FileInfo { Arc::new(hive_parts) }); } + + /// Updates the statistics, but not the schema. + pub fn update_hive_partitions(&mut self, url: &Path) { + let new = hive::HivePartitions::parse_url(url); + + match (&mut self.hive_parts, new) { + (Some(current), Some(new)) => match Arc::get_mut(current) { + Some(current) => { + *current = new; + }, + _ => { + *current = Arc::new(new); + }, + }, + (_, new) => self.hive_parts = new.map(Arc::new), + } + } } #[cfg(feature = "streaming")] diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 8e2c6af72fd0..93e85079eddb 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -1750,6 +1750,7 @@ version = "0.33.2" dependencies = [ "ahash", "bitflags 2.4.0", + "futures", "glob", "once_cell", "polars-arrow", diff --git a/py-polars/src/lazyframe.rs b/py-polars/src/lazyframe.rs index 64815d0f551c..4833c4bcc6e8 100644 --- a/py-polars/src/lazyframe.rs +++ b/py-polars/src/lazyframe.rs @@ -128,7 +128,7 @@ impl PyLazyFrame { let r = if let Some(path) = &path { LazyJsonLineReader::new(path) } else { - LazyJsonLineReader::new_paths(paths) + LazyJsonLineReader::new_paths(paths.into()) }; let lf = r @@ -197,7 +197,7 @@ impl PyLazyFrame { let r = if let Some(path) = path.as_ref() { LazyCsvReader::new(path) } else { - LazyCsvReader::new_paths(paths) + LazyCsvReader::new_paths(paths.into()) }; let mut r = r @@ -306,7 +306,7 @@ impl PyLazyFrame { let lf = if path.is_some() { LazyFrame::scan_parquet(first_path, args) } else { - LazyFrame::scan_parquet_files(paths, args) + LazyFrame::scan_parquet_files(Arc::from(paths), args) } .map_err(PyPolarsErr::from)?; Ok(lf.into()) @@ -336,7 +336,7 @@ impl PyLazyFrame { let lf = if let Some(path) = &path { LazyFrame::scan_ipc(path, args) } else { - LazyFrame::scan_ipc_files(paths, args) + LazyFrame::scan_ipc_files(paths.into(), args) } .map_err(PyPolarsErr::from)?; Ok(lf.into()) diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index abc9b6603622..b5b2fbdece6b 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -406,7 +406,9 @@ def test_fetch_union(tmp_path: Path) -> None: expected = pl.DataFrame({"a": [0], "b": [1]}) assert_frame_equal(result_one, expected) - expected = pl.DataFrame({"a": [0, 3], "b": [1, 4]}) + # Both fetch 1 per file or 1 per dataset would be ok, as we don't guarantee anything + # currently we have one per dataset. + expected = pl.DataFrame({"a": [0], "b": [1]}) assert_frame_equal(result_glob, expected)