From 15ec1062b9af27fba2d24f2ee8e2290cda0dfce5 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 24 Oct 2023 14:40:27 +0200 Subject: [PATCH] fix: ensure streaming parquet datasets deal with limits (#11977) --- crates/polars-io/src/parquet/read_impl.rs | 8 ++++++ .../src/executors/sources/parquet.rs | 27 ++++++++++++++++--- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 3477ba200670..eecaa59549fa 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -552,6 +552,14 @@ impl BatchedParquetReader { }) } + pub fn limit_reached(&self) -> bool { + self.limit == 0 + } + + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + pub async fn next_batches(&mut self, n: usize) -> PolarsResult>> { if self.limit == 0 && self.has_returned { return Ok(None); diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 4af1f86f4334..9ea8b5fc931f 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -2,7 +2,7 @@ use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; -use polars_core::error::PolarsResult; +use polars_core::error::*; use polars_core::utils::arrow::io::parquet::read::FileMetaData; use polars_core::POOL; use polars_io::cloud::CloudOptions; @@ -22,6 +22,7 @@ use crate::pipeline::determine_chunk_size; pub struct ParquetSource { batched_reader: Option, n_threads: usize, + processed_paths: usize, chunk_index: IdxSize, paths: std::slice::Iter<'static, PathBuf>, _paths_lifetime: Arc<[PathBuf]>, @@ -69,6 +70,12 @@ impl ParquetSource { eprintln!("STREAMING CHUNK SIZE: {chunk_size} rows") } + let reader_schema = if self.processed_paths == 0 { + Some(self.file_info.reader_schema.clone()) + } else { + None + }; + let batched_reader = if is_cloud_url(path) { #[cfg(not(feature = "async"))] { @@ -83,7 +90,7 @@ impl ParquetSource { ParquetAsyncReader::from_uri( &uri, self.cloud_options.as_ref(), - Some(self.file_info.reader_schema.clone()), + reader_schema, self.metadata.clone(), ) .await? @@ -105,7 +112,7 @@ impl ParquetSource { let file = std::fs::File::open(path).unwrap(); ParquetReader::new(file) - .with_schema(Some(self.file_info.reader_schema.clone())) + .with_schema(reader_schema) .with_n_rows(file_options.n_rows) .with_row_count(file_options.row_count) .with_projection(projection) @@ -118,7 +125,11 @@ impl ParquetSource { ) .batched(chunk_size)? }; + if self.processed_paths >= 1 { + polars_ensure!(batched_reader.schema().as_ref() == self.file_info.reader_schema.as_ref(), ComputeError: "schema of all files in a single scan_parquet must be equal"); + } self.batched_reader = Some(batched_reader); + self.processed_paths += 1; Ok(()) } @@ -145,6 +156,7 @@ impl ParquetSource { batched_reader: None, n_threads, chunk_index: 0, + processed_paths: 0, options, file_options, paths: iter, @@ -175,14 +187,23 @@ impl Source for ParquetSource { )?; Ok(match batches { None => { + if self.batched_reader.as_ref().unwrap().limit_reached() { + return Ok(SourceResult::Finished); + } // reset the reader self.batched_reader = None; + self.metadata = None; return self.get_batches(_context); }, Some(batches) => SourceResult::GotMoreData( batches .into_iter() .map(|data| { + // Keep the row limit updated so the next reader will have a correct limit. + if let Some(n_rows) = &mut self.file_options.n_rows { + *n_rows = n_rows.saturating_sub(data.height()) + } + let chunk_index = self.chunk_index; self.chunk_index += 1; DataChunk { chunk_index, data }