From 66adbde45ab1851317144dde20c1bdd19da3178f Mon Sep 17 00:00:00 2001 From: ritchie Date: Sun, 22 Oct 2023 12:20:32 +0200 Subject: [PATCH] fix test --- .../src/physical_plan/executors/scan/ipc.rs | 5 ++--- .../src/physical_plan/executors/scan/mod.rs | 8 ++------ .../src/physical_plan/executors/scan/parquet.rs | 13 ++++++------- crates/polars-lazy/src/physical_plan/planner/lp.rs | 4 +++- py-polars/tests/unit/io/test_parquet.py | 4 +++- 5 files changed, 16 insertions(+), 18 deletions(-) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 5256252d3a5d..9a0cc49c16a6 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -12,17 +12,16 @@ pub struct IpcExec { impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { - let (file, projection, n_rows, predicate) = prepare_scan_args( + let (file, projection, predicate) = prepare_scan_args( &self.path, &self.predicate, &mut self.file_options.with_columns, &mut self.schema, - self.file_options.n_rows, self.file_options.row_count.is_some(), None, ); IpcReader::new(file.unwrap()) - .with_n_rows(n_rows) + .with_n_rows(self.file_options.n_rows) .with_row_count(std::mem::take(&mut self.file_options.row_count)) .set_rechunk(self.file_options.rechunk) .with_projection(projection) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs b/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs index 2c6d8ac24d88..7b8aeb13698f 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs @@ -30,8 +30,6 @@ use crate::prelude::*; #[cfg(any(feature = "ipc", feature = "parquet"))] type Projection = Option>; #[cfg(any(feature = "ipc", feature = "parquet"))] -type StopNRows = Option; -#[cfg(any(feature = "ipc", feature = "parquet"))] type Predicate = Option>; #[cfg(any(feature = "ipc", feature = "parquet"))] @@ -40,10 +38,9 @@ fn prepare_scan_args( predicate: &Option>, with_columns: &mut Option>>, schema: &mut SchemaRef, - n_rows: Option, has_row_count: bool, hive_partitions: Option<&[Series]>, -) -> (Option, Projection, StopNRows, Predicate) { +) -> (Option, Projection, Predicate) { let file = std::fs::File::open(path).ok(); let with_columns = mem::take(with_columns); @@ -56,10 +53,9 @@ fn prepare_scan_args( has_row_count, ); - let n_rows = _set_n_rows_for_scan(n_rows); let predicate = predicate.clone().map(phys_expr_to_io_expr); - (file, projection, n_rows, predicate) + (file, projection, predicate) } /// Producer of an in memory DataFrame diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index a67f1f9c31b2..4b14c9534ea1 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -46,6 +46,7 @@ impl ParquetExec { }, identity => identity, }; + let n_rows = self.file_options.n_rows; POOL.install(|| { self.paths @@ -59,12 +60,11 @@ impl ParquetExec { .as_ref() .map(|hive| hive.materialize_partition_columns()); - let (file, projection, _, predicate) = prepare_scan_args( + let (file, projection, predicate) = prepare_scan_args( path, &self.predicate, &mut self.file_options.with_columns.clone(), &mut self.file_info.schema.clone(), - None, self.file_options.row_count.is_some(), hive_partitions.as_deref(), ); @@ -73,6 +73,7 @@ impl ParquetExec { ParquetReader::new(file) .with_schema(Some(self.file_info.reader_schema.clone())) .read_parallel(parallel) + .with_n_rows(n_rows) .set_low_memory(self.options.low_memory) .use_statistics(self.options.use_statistics) .with_hive_partition_columns(hive_partitions) @@ -109,12 +110,11 @@ impl ParquetExec { .as_ref() .map(|hive| hive.materialize_partition_columns()); - let (file, projection, n_rows, predicate) = prepare_scan_args( + let (file, projection, predicate) = prepare_scan_args( path, &self.predicate, &mut self.file_options.with_columns.clone(), &mut self.file_info.schema.clone(), - n_rows_total, self.file_options.row_count.is_some(), hive_partitions.as_deref(), ); @@ -122,7 +122,7 @@ impl ParquetExec { let df = if let Some(file) = file { ParquetReader::new(file) .with_schema(Some(self.file_info.reader_schema.clone())) - .with_n_rows(n_rows) + .with_n_rows(n_rows_total) .read_parallel(self.options.parallel) .with_row_count(row_count) .set_low_memory(self.options.low_memory) @@ -217,12 +217,11 @@ impl ParquetExec { .as_ref() .map(|hive| hive.materialize_partition_columns()); - let (_, projection, remaining_rows_to_read, predicate) = prepare_scan_args( + let (_, projection, predicate) = prepare_scan_args( Path::new(""), predicate, &mut file_options.with_columns.clone(), &mut file_info.schema.clone(), - remaining_rows_to_read, row_count.is_some(), hive_partitions.as_deref(), ); diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index aa6a92c9d928..f0128817a70d 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -1,5 +1,6 @@ use polars_core::prelude::*; use polars_core::POOL; +use polars_plan::global::_set_n_rows_for_scan; use super::super::executors::{self, Executor}; use super::*; @@ -198,8 +199,9 @@ pub fn create_physical_plan( output_schema, scan_type, predicate, - file_options, + mut file_options, } => { + file_options.n_rows = _set_n_rows_for_scan(file_options.n_rows); let mut state = ExpressionConversionState::default(); let predicate = predicate .map(|pred| { diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index abc9b6603622..b5b2fbdece6b 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -406,7 +406,9 @@ def test_fetch_union(tmp_path: Path) -> None: expected = pl.DataFrame({"a": [0], "b": [1]}) assert_frame_equal(result_one, expected) - expected = pl.DataFrame({"a": [0, 3], "b": [1, 4]}) + # Both fetch 1 per file or 1 per dataset would be ok, as we don't guarantee anything + # currently we have one per dataset. + expected = pl.DataFrame({"a": [0], "b": [1]}) assert_frame_equal(result_glob, expected)