From 76107406830e17ae2b6cdd8cd91d4b6139abcf65 Mon Sep 17 00:00:00 2001 From: ritchie Date: Sun, 22 Oct 2023 18:22:39 +0200 Subject: [PATCH] feat: improve error handling in scan_parquet and deal with file limits --- .../src/physical_plan/executors/scan/ipc.rs | 2 +- .../src/physical_plan/executors/scan/mod.rs | 4 +- .../physical_plan/executors/scan/parquet.rs | 170 ++++++++++-------- 3 files changed, 95 insertions(+), 81 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 9a0cc49c16a6..cfe5709b8ca4 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -20,7 +20,7 @@ impl IpcExec { self.file_options.row_count.is_some(), None, ); - IpcReader::new(file.unwrap()) + IpcReader::new(file?) .with_n_rows(self.file_options.n_rows) .with_row_count(std::mem::take(&mut self.file_options.row_count)) .set_rechunk(self.file_options.rechunk) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs b/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs index 7b8aeb13698f..07f3e66f5ce9 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs @@ -40,8 +40,8 @@ fn prepare_scan_args( schema: &mut SchemaRef, has_row_count: bool, hive_partitions: Option<&[Series]>, -) -> (Option, Projection, Predicate) { - let file = std::fs::File::open(path).ok(); +) -> (std::io::Result, Projection, Predicate) { + let file = std::fs::File::open(path); let with_columns = mem::take(with_columns); let schema = mem::take(schema); diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index a4a26b94eb67..e8ec682eaff3 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -47,90 +47,104 @@ impl ParquetExec { identity => identity, }; - // First initialize the readers, predicates and metadata. - // This will be used to determine the slices. That way we can actually read all the - // files in parallel even when we add row counts or slices. - let readers_and_metadata = self - .paths - .iter() - .map(|path| { - let mut file_info = self.file_info.clone(); - file_info.update_hive_partitions(path); + let mut result = vec![]; - let hive_partitions = file_info - .hive_parts - .as_ref() - .map(|hive| hive.materialize_partition_columns()); + let mut remaining_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX); + let mut base_row_count = self.file_options.row_count.take(); - let (file, projection, predicate) = prepare_scan_args( - path, - &self.predicate, - &mut self.file_options.with_columns.clone(), - &mut self.file_info.schema.clone(), - self.file_options.row_count.is_some(), - hive_partitions.as_deref(), - ); - - let mut reader = if let Some(file) = file { - PolarsResult::Ok( - ParquetReader::new(file) - .with_schema(Some(self.file_info.reader_schema.clone())) - .read_parallel(parallel) - .set_low_memory(self.options.low_memory) - .use_statistics(self.options.use_statistics) - .set_rechunk(false) - .with_hive_partition_columns(hive_partitions), - ) - } else { - polars_bail!(ComputeError: "could not read {}", path.display()) - }?; + // 'Only' 256 files at a time to prevent open file limits. + for paths in self.paths.chunks(256) { + // First initialize the readers, predicates and metadata. + // This will be used to determine the slices. That way we can actually read all the + // files in parallel even when we add row counts or slices. + let readers_and_metadata = paths + .iter() + .map(|patbh| { + let mut file_info = self.file_info.clone(); + file_info.update_hive_partitions(path); - reader - .num_rows() - .map(|num_rows| (reader, num_rows, predicate, projection)) - }) - .collect::>>()?; + let hive_partitions = file_info + .hive_parts + .as_ref() + .map(|hive| hive.materialize_partition_columns()); - let n_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX); - let iter = readers_and_metadata - .iter() - .map(|(_, num_rows, _, _)| *num_rows); + let (file, projection, predicate) = prepare_scan_args( + path, + &self.predicate, + &mut self.file_options.with_columns.clone(), + &mut self.file_info.schema.clone(), + self.file_options.row_count.is_some(), + hive_partitions.as_deref(), + ); - let rows_statistics = get_sequential_row_statistics(iter, n_rows_to_read); - let base_row_count = &self.file_options.row_count; + let file = file?; + let mut reader = ParquetReader::new(file) + .with_schema(Some(self.file_info.reader_schema.clone())) + .read_parallel(parallel) + .set_low_memory(self.options.low_memory) + .use_statistics(self.options.use_statistics) + .set_rechunk(false) + .with_hive_partition_columns(hive_partitions); + + reader + .num_rows() + .map(|num_rows| (reader, num_rows, predicate, projection)) + }) + .collect::>>()?; + + let iter = readers_and_metadata + .iter() + .map(|(_, num_rows, _, _)| *num_rows); + + let rows_statistics = get_sequential_row_statistics(iter, remaining_rows_to_read); + + let out = POOL.install(|| { + readers_and_metadata + .into_par_iter() + .zip(rows_statistics.par_iter()) + .map( + |( + (reader, num_rows_this_file, predicate, projection), + (remaining_rows_to_read, cumulative_read), + )| { + let remaining_rows_to_read = *remaining_rows_to_read; + let remaining_rows_to_read = + if num_rows_this_file < remaining_rows_to_read { + None + } else { + Some(remaining_rows_to_read) + }; + let row_count = base_row_count.as_ref().map(|rc| RowCount { + name: rc.name.clone(), + offset: rc.offset + *cumulative_read as IdxSize, + }); + + reader + .with_n_rows(remaining_rows_to_read) + .with_row_count(row_count) + ._finish_with_scan_ops( + predicate.clone(), + projection.as_ref().map(|v| v.as_ref()), + ) + }, + ) + .collect::>>() + })?; + + let n_read = + remaining_rows_to_read.saturating_sub(out.iter().map(|df| df.height()).sum()); + remaining_rows_to_read = n_read; + if let Some(rc) = &mut base_row_count { + rc.offset += n_read as IdxSize; + } + if result.is_empty() { + result = out; + } else { + result.extend_from_slice(&out) + } + } - POOL.install(|| { - readers_and_metadata - .into_par_iter() - .zip(rows_statistics.par_iter()) - .map( - |( - (reader, num_rows_this_file, predicate, projection), - (remaining_rows_to_read, cumulative_read), - )| { - let remaining_rows_to_read = *remaining_rows_to_read; - let remaining_rows_to_read = if num_rows_this_file < remaining_rows_to_read - { - None - } else { - Some(remaining_rows_to_read) - }; - let row_count = base_row_count.as_ref().map(|rc| RowCount { - name: rc.name.clone(), - offset: rc.offset + *cumulative_read as IdxSize, - }); - - reader - .with_n_rows(remaining_rows_to_read) - .with_row_count(row_count) - ._finish_with_scan_ops( - predicate.clone(), - projection.as_ref().map(|v| v.as_ref()), - ) - }, - ) - .collect() - }) + todo!() } #[cfg(feature = "cloud")]