Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 22, 2023
1 parent 33fca0d commit 66adbde
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 18 deletions.
5 changes: 2 additions & 3 deletions crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ pub struct IpcExec {

impl IpcExec {
fn read(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let (file, projection, n_rows, predicate) = prepare_scan_args(
let (file, projection, predicate) = prepare_scan_args(
&self.path,
&self.predicate,
&mut self.file_options.with_columns,
&mut self.schema,
self.file_options.n_rows,
self.file_options.row_count.is_some(),
None,
);
IpcReader::new(file.unwrap())
.with_n_rows(n_rows)
.with_n_rows(self.file_options.n_rows)
.with_row_count(std::mem::take(&mut self.file_options.row_count))
.set_rechunk(self.file_options.rechunk)
.with_projection(projection)
Expand Down
8 changes: 2 additions & 6 deletions crates/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use crate::prelude::*;
#[cfg(any(feature = "ipc", feature = "parquet"))]
type Projection = Option<Vec<usize>>;
#[cfg(any(feature = "ipc", feature = "parquet"))]
type StopNRows = Option<usize>;
#[cfg(any(feature = "ipc", feature = "parquet"))]
type Predicate = Option<Arc<dyn PhysicalIoExpr>>;

#[cfg(any(feature = "ipc", feature = "parquet"))]
Expand All @@ -40,10 +38,9 @@ fn prepare_scan_args(
predicate: &Option<Arc<dyn PhysicalExpr>>,
with_columns: &mut Option<Arc<Vec<String>>>,
schema: &mut SchemaRef,
n_rows: Option<usize>,
has_row_count: bool,
hive_partitions: Option<&[Series]>,
) -> (Option<std::fs::File>, Projection, StopNRows, Predicate) {
) -> (Option<std::fs::File>, Projection, Predicate) {
let file = std::fs::File::open(path).ok();

let with_columns = mem::take(with_columns);
Expand All @@ -56,10 +53,9 @@ fn prepare_scan_args(
has_row_count,
);

let n_rows = _set_n_rows_for_scan(n_rows);
let predicate = predicate.clone().map(phys_expr_to_io_expr);

(file, projection, n_rows, predicate)
(file, projection, predicate)
}

/// Producer of an in memory DataFrame
Expand Down
13 changes: 6 additions & 7 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl ParquetExec {
},
identity => identity,
};
let n_rows = self.file_options.n_rows;

POOL.install(|| {
self.paths
Expand All @@ -59,12 +60,11 @@ impl ParquetExec {
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let (file, projection, _, predicate) = prepare_scan_args(
let (file, projection, predicate) = prepare_scan_args(
path,
&self.predicate,
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
None,
self.file_options.row_count.is_some(),
hive_partitions.as_deref(),
);
Expand All @@ -73,6 +73,7 @@ impl ParquetExec {
ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.read_parallel(parallel)
.with_n_rows(n_rows)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.with_hive_partition_columns(hive_partitions)
Expand Down Expand Up @@ -109,20 +110,19 @@ impl ParquetExec {
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let (file, projection, n_rows, predicate) = prepare_scan_args(
let (file, projection, predicate) = prepare_scan_args(
path,
&self.predicate,
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
n_rows_total,
self.file_options.row_count.is_some(),
hive_partitions.as_deref(),
);

let df = if let Some(file) = file {
ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.with_n_rows(n_rows)
.with_n_rows(n_rows_total)
.read_parallel(self.options.parallel)
.with_row_count(row_count)
.set_low_memory(self.options.low_memory)
Expand Down Expand Up @@ -217,12 +217,11 @@ impl ParquetExec {
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let (_, projection, remaining_rows_to_read, predicate) = prepare_scan_args(
let (_, projection, predicate) = prepare_scan_args(
Path::new(""),
predicate,
&mut file_options.with_columns.clone(),
&mut file_info.schema.clone(),
remaining_rows_to_read,
row_count.is_some(),
hive_partitions.as_deref(),
);
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use polars_core::prelude::*;
use polars_core::POOL;
use polars_plan::global::_set_n_rows_for_scan;

use super::super::executors::{self, Executor};
use super::*;
Expand Down Expand Up @@ -198,8 +199,9 @@ pub fn create_physical_plan(
output_schema,
scan_type,
predicate,
file_options,
mut file_options,
} => {
file_options.n_rows = _set_n_rows_for_scan(file_options.n_rows);
let mut state = ExpressionConversionState::default();
let predicate = predicate
.map(|pred| {
Expand Down
4 changes: 3 additions & 1 deletion py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ def test_fetch_union(tmp_path: Path) -> None:
expected = pl.DataFrame({"a": [0], "b": [1]})
assert_frame_equal(result_one, expected)

expected = pl.DataFrame({"a": [0, 3], "b": [1, 4]})
# Both fetch 1 per file or 1 per dataset would be ok, as we don't guarantee anything
# currently we have one per dataset.
expected = pl.DataFrame({"a": [0], "b": [1]})
assert_frame_equal(result_glob, expected)


Expand Down

0 comments on commit 66adbde

Please sign in to comment.