From b86eed3c1e46332750018d700a99e8a322fa2c96 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Mon, 23 Sep 2024 14:50:49 +0800 Subject: [PATCH] fix: Raise when parquet file has extra columns and no `select()` was done (#18843) --- crates/polars-io/src/parquet/read/reader.rs | 18 +++++++++++-- .../src/executors/scan/parquet.rs | 4 +-- .../src/executors/sources/parquet.rs | 4 +-- .../nodes/parquet_source/metadata_fetch.rs | 27 ++++++++++++++----- .../nodes/parquet_source/metadata_utils.rs | 7 ++--- py-polars/tests/unit/io/test_lazy_parquet.py | 9 ++++--- 6 files changed, 48 insertions(+), 21 deletions(-) diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index af3f95046ae8..5a8397dc48c8 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -83,7 +83,7 @@ impl ParquetReader { /// Checks that the file contains all the columns in `projected_arrow_schema` with the same /// dtype, and sets the projection indices. - pub fn with_projected_arrow_schema( + pub fn with_arrow_schema_projection( mut self, first_schema: &ArrowSchema, projected_arrow_schema: Option<&ArrowSchema>, @@ -96,6 +96,13 @@ impl ParquetReader { projected_arrow_schema, )?; } else { + if schema.len() > first_schema.len() { + polars_bail!( + SchemaMismatch: + "parquet file contained extra columns and no selection was given" + ) + } + self.projection = projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?; } @@ -292,7 +299,7 @@ impl ParquetAsyncReader { }) } - pub async fn with_projected_arrow_schema( + pub async fn with_arrow_schema_projection( mut self, first_schema: &ArrowSchema, projected_arrow_schema: Option<&ArrowSchema>, @@ -305,6 +312,13 @@ impl ParquetAsyncReader { projected_arrow_schema, )?; } else { + if schema.len() > first_schema.len() { + polars_bail!( + SchemaMismatch: + "parquet file contained extra columns and no selection was given" + ) + } + self.projection = projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?; } diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index bac328d347aa..e15f8ee8be00 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -216,7 +216,7 @@ impl ParquetExec { .with_slice(Some(slice)) .with_row_index(row_index) .with_predicate(predicate.clone()) - .with_projected_arrow_schema( + .with_arrow_schema_projection( first_schema.as_ref(), projected_arrow_schema.as_deref(), )? @@ -421,7 +421,7 @@ impl ParquetExec { let df = reader .with_slice(Some(slice)) .with_row_index(row_index) - .with_projected_arrow_schema( + .with_arrow_schema_projection( first_schema.as_ref(), projected_arrow_schema.as_deref(), ) diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index b4099c98e500..faed9d4b667e 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -131,7 +131,7 @@ impl ParquetSource { } let mut reader = reader - .with_projected_arrow_schema( + .with_arrow_schema_projection( &self.first_schema, self.projected_arrow_schema.as_deref(), )? @@ -196,7 +196,7 @@ impl ParquetSource { ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata) .await? .with_row_index(file_options.row_index) - .with_projected_arrow_schema( + .with_arrow_schema_projection( &self.first_schema, self.projected_arrow_schema.as_deref(), ) diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs index 8bddbaa24c7b..f65a4436a75f 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs @@ -1,14 +1,14 @@ use std::sync::Arc; use futures::StreamExt; -use polars_error::PolarsResult; +use polars_error::{polars_bail, PolarsResult}; use polars_io::prelude::FileMetadata; use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource}; use polars_io::utils::slice::SplitSlicePosition; use polars_utils::mmap::MemSlice; use polars_utils::pl_str::PlSmallStr; -use super::metadata_utils::{ensure_metadata_has_projected_fields, read_parquet_metadata_bytes}; +use super::metadata_utils::{ensure_schema_has_projected_fields, read_parquet_metadata_bytes}; use super::ParquetSourceNode; use crate::async_executor; use crate::async_primitives::connector::connector; @@ -107,6 +107,15 @@ impl ParquetSourceNode { }; let first_metadata = self.first_metadata.clone(); + let reader_schema_len = self + .file_info + .reader_schema + .as_ref() + .unwrap() + .as_ref() + .unwrap_left() + .len(); + let has_projection = self.file_options.with_columns.is_some(); let process_metadata_bytes = { move |handle: task_handles_ext::AbortOnDropHandle< @@ -127,10 +136,16 @@ impl ParquetSourceNode { )?, }; - ensure_metadata_has_projected_fields( - &metadata, - projected_arrow_schema.as_ref(), - )?; + let schema = polars_parquet::arrow::read::infer_schema(&metadata)?; + + if !has_projection && schema.len() > reader_schema_len { + polars_bail!( + SchemaMismatch: + "parquet file contained extra columns and no selection was given" + ) + } + + ensure_schema_has_projected_fields(&schema, projected_arrow_schema.as_ref())?; PolarsResult::Ok((path_index, byte_source, metadata)) }); diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs index aa99742fb83e..3e4d03a3a270 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs @@ -1,6 +1,5 @@ use polars_core::prelude::{ArrowSchema, DataType}; use polars_error::{polars_bail, PolarsResult}; -use polars_io::prelude::FileMetadata; use polars_io::utils::byte_source::{ByteSource, DynByteSource}; use polars_utils::mmap::MemSlice; @@ -122,12 +121,10 @@ pub(super) async fn read_parquet_metadata_bytes( /// Ensures that a parquet file has all the necessary columns for a projection with the correct /// dtype. There are no ordering requirements and extra columns are permitted. -pub(super) fn ensure_metadata_has_projected_fields( - metadata: &FileMetadata, +pub(super) fn ensure_schema_has_projected_fields( + schema: &ArrowSchema, projected_fields: &ArrowSchema, ) -> PolarsResult<()> { - let schema = polars_parquet::arrow::read::infer_schema(metadata)?; - for field in projected_fields.iter_values() { // Note: We convert to Polars-native dtypes for timezone normalization. let expected_dtype = DataType::from_arrow(&field.dtype, true); diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index b31e4399c116..111289408dea 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -597,10 +597,11 @@ def test_parquet_unaligned_schema_read(tmp_path: Path, streaming: bool) -> None: pl.DataFrame({"a": [1, 2], "b": [10, 11]}), ) - assert_frame_equal( - lf.collect(streaming=streaming), - pl.DataFrame({"a": [1, 2, 3], "b": [10, 11, 12]}), - ) + with pytest.raises( + pl.exceptions.SchemaError, + match="parquet file contained extra columns and no selection was given", + ): + lf.collect(streaming=streaming) @pytest.mark.write_disk