diff --git a/crates/polars-core/src/chunked_array/cast.rs b/crates/polars-core/src/chunked_array/cast.rs index 976dd568ddc4..b003278aac14 100644 --- a/crates/polars-core/src/chunked_array/cast.rs +++ b/crates/polars-core/src/chunked_array/cast.rs @@ -136,9 +136,6 @@ where polars_bail!(OutOfBounds: "index {} is bigger than the number of categories {}",m,categories.len()); } } - // SAFETY: - // we are guarded by the type system - let ca = unsafe { &*(self as *const ChunkedArray as *const UInt32Chunked) }; // SAFETY: indices are in bound unsafe { Ok(CategoricalChunked::from_cats_and_rev_map_unchecked( diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 2bd67de5a925..aa2f90b6f692 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -10,7 +10,7 @@ use crate::prelude::*; #[derive(Clone)] #[cfg(feature = "csv")] -pub struct LazyCsvReader<'a> { +pub struct LazyCsvReader { path: PathBuf, paths: Arc<[PathBuf]>, separator: u8, @@ -20,7 +20,7 @@ pub struct LazyCsvReader<'a> { n_rows: Option, cache: bool, schema: Option, - schema_overwrite: Option<&'a Schema>, + schema_overwrite: Option, low_memory: bool, comment_prefix: Option, quote_char: Option, @@ -39,7 +39,7 @@ pub struct LazyCsvReader<'a> { } #[cfg(feature = "csv")] -impl<'a> LazyCsvReader<'a> { +impl LazyCsvReader { pub fn new_paths(paths: Arc<[PathBuf]>) -> Self { Self::new("").with_paths(paths) } @@ -129,7 +129,7 @@ impl<'a> LazyCsvReader<'a> { /// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset /// of the total schema. #[must_use] - pub fn with_dtype_overwrite(mut self, schema: Option<&'a Schema>) -> Self { + pub fn with_dtype_overwrite(mut self, schema: Option) -> Self { self.schema_overwrite = schema; self } @@ -270,7 +270,7 @@ impl<'a> LazyCsvReader<'a> { let mut schema = f(schema)?; // the dtypes set may be for the new names, so update again - if let Some(overwrite_schema) = self.schema_overwrite { + if let Some(overwrite_schema) = &self.schema_overwrite { for (name, dtype) in overwrite_schema.iter() { schema.with_column(name.clone(), dtype.clone()); } @@ -280,7 +280,7 @@ impl<'a> LazyCsvReader<'a> { } } -impl LazyFileListReader for LazyCsvReader<'_> { +impl LazyFileListReader for LazyCsvReader { fn finish_no_glob(self) -> PolarsResult { let mut lf: LazyFrame = DslBuilder::scan_csv( self.path, diff --git a/crates/polars-lazy/src/tests/io.rs b/crates/polars-lazy/src/tests/io.rs index 29e48fae16c5..19095e44536f 100644 --- a/crates/polars-lazy/src/tests/io.rs +++ b/crates/polars-lazy/src/tests/io.rs @@ -672,10 +672,10 @@ fn scan_small_dtypes() -> PolarsResult<()> { for dt in small_dt { let df = LazyCsvReader::new(FOODS_CSV) .has_header(true) - .with_dtype_overwrite(Some(&Schema::from_iter([Field::new( + .with_dtype_overwrite(Some(Arc::new(Schema::from_iter([Field::new( "sugars_g", dt.clone(), - )]))) + )])))) .finish()? .select(&[col("sugars_g")]) .collect()?; diff --git a/crates/polars-plan/src/dot.rs b/crates/polars-plan/src/dot.rs index e8be059bf905..174331019342 100644 --- a/crates/polars-plan/src/dot.rs +++ b/crates/polars-plan/src/dot.rs @@ -199,7 +199,7 @@ impl DslPlan { "PYTHON", &[], options.with_columns.as_ref().map(|s| s.as_slice()), - options.schema.len(), + Some(options.schema.len()), &options.predicate, branch, id, @@ -342,7 +342,7 @@ impl DslPlan { name, paths.as_ref(), options.with_columns.as_ref().map(|cols| cols.as_slice()), - file_info.schema.len(), + file_info.as_ref().map(|fi| fi.schema.len()), predicate, branch, id, @@ -418,7 +418,7 @@ impl DslPlan { name: &str, path: &[PathBuf], with_columns: Option<&[String]>, - total_columns: usize, + total_columns: Option, predicate: &Option

, branch: usize, id: usize, @@ -440,6 +440,9 @@ impl DslPlan { }; let pred = fmt_predicate(predicate.as_ref()); + let total_columns = total_columns + .map(|v| format!("{v}")) + .unwrap_or_else(|| "?".to_string()); let fmt = format!( "{name} SCAN {};\nπ {}/{};\nσ {}", path_fmt, n_columns_fmt, total_columns, pred, diff --git a/crates/polars-plan/src/logical_plan/builder_dsl.rs b/crates/polars-plan/src/logical_plan/builder_dsl.rs index 712a7dfab8fb..a39d5ce2c581 100644 --- a/crates/polars-plan/src/logical_plan/builder_dsl.rs +++ b/crates/polars-plan/src/logical_plan/builder_dsl.rs @@ -1,15 +1,6 @@ -#[cfg(feature = "csv")] -use std::io::{Read, Seek}; - use polars_core::prelude::*; #[cfg(feature = "parquet")] use polars_io::cloud::CloudOptions; -#[cfg(all(feature = "parquet", feature = "async"))] -use polars_io::parquet::ParquetAsyncReader; -#[cfg(feature = "parquet")] -use polars_io::parquet::ParquetReader; -#[cfg(all(feature = "cloud", feature = "parquet"))] -use polars_io::pl_async::get_runtime; use polars_io::HiveOptions; #[cfg(any( feature = "parquet", @@ -19,13 +10,7 @@ use polars_io::HiveOptions; ))] use polars_io::RowIndex; #[cfg(feature = "csv")] -use polars_io::{ - csv::utils::{infer_file_schema, is_compressed}, - csv::CommentPrefix, - csv::CsvEncoding, - csv::NullValues, - utils::get_reader_bytes, -}; +use polars_io::{csv::CommentPrefix, csv::CsvEncoding, csv::NullValues}; use crate::constants::UNLIMITED_CACHE; use crate::logical_plan::expr_expansion::rewrite_projections; @@ -50,14 +35,6 @@ impl From for DslBuilder { } } -#[cfg(any(feature = "parquet", feature = "parquet_async",))] -fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> SchemaRef { - if let Some(rc) = row_index { - let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); - } - Arc::new(schema) -} - impl DslBuilder { pub fn anonymous_scan( function: Arc, @@ -89,7 +66,7 @@ impl DslBuilder { Ok(DslPlan::Scan { paths: Arc::new([]), - file_info, + file_info: Some(file_info), predicate: None, file_options, scan_type: FileScan::Anonymous { @@ -117,57 +94,7 @@ impl DslBuilder { use_statistics: bool, hive_options: HiveOptions, ) -> PolarsResult { - use polars_io::{is_cloud_url, SerReader as _}; - let paths = paths.into(); - polars_ensure!(paths.len() >= 1, ComputeError: "expected at least 1 path"); - - // Use first path to get schema. - let path = &paths[0]; - - let (schema, reader_schema, num_rows, metadata) = if is_cloud_url(path) { - #[cfg(not(feature = "cloud"))] - panic!( - "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." - ); - - #[cfg(feature = "cloud")] - { - let uri = path.to_string_lossy(); - get_runtime().block_on(async { - let mut reader = - ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), None, None) - .await?; - let reader_schema = reader.schema().await?; - let num_rows = reader.num_rows().await?; - let metadata = reader.get_metadata().await?.clone(); - - let schema = prepare_schema((&reader_schema).into(), row_index.as_ref()); - PolarsResult::Ok((schema, reader_schema, Some(num_rows), Some(metadata))) - })? - } - } else { - let file = polars_utils::open_file(path)?; - let mut reader = ParquetReader::new(file); - let reader_schema = reader.schema()?; - let schema = prepare_schema((&reader_schema).into(), row_index.as_ref()); - ( - schema, - reader_schema, - Some(reader.num_rows()?), - Some(reader.get_metadata()?.clone()), - ) - }; - - let mut file_info = FileInfo::new( - schema, - Some(reader_schema), - (num_rows, num_rows.unwrap_or(0)), - ); - - if hive_options.enabled { - file_info.init_hive_partitions(path.as_path(), hive_options.schema.clone())? - } let options = FileScanOptions { with_columns: None, @@ -180,7 +107,7 @@ impl DslBuilder { }; Ok(DslPlan::Scan { paths, - file_info, + file_info: None, file_options: options, predicate: None, scan_type: FileScan::Parquet { @@ -190,7 +117,7 @@ impl DslBuilder { use_statistics, }, cloud_options, - metadata, + metadata: None, }, } .into()) @@ -206,44 +133,11 @@ impl DslBuilder { rechunk: bool, cloud_options: Option, ) -> PolarsResult { - use polars_io::is_cloud_url; - let paths = paths.into(); - // Use first path to get schema. - let path = paths - .first() - .ok_or_else(|| polars_err!(ComputeError: "expected at least 1 path"))?; - - let metadata = if is_cloud_url(path) { - #[cfg(not(feature = "cloud"))] - panic!( - "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." - ); - - #[cfg(feature = "cloud")] - { - let uri = path.to_string_lossy(); - get_runtime().block_on(async { - polars_io::ipc::IpcReaderAsync::from_uri(&uri, cloud_options.as_ref()) - .await? - .metadata() - .await - })? - } - } else { - arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new( - polars_utils::open_file(path)?, - ))? - }; - Ok(DslPlan::Scan { paths, - file_info: FileInfo::new( - prepare_schema(metadata.schema.as_ref().into(), row_index.as_ref()), - Some(Arc::clone(&metadata.schema)), - (None, 0), - ), + file_info: None, file_options: FileScanOptions { with_columns: None, cache, @@ -261,7 +155,7 @@ impl DslBuilder { scan_type: FileScan::Ipc { options, cloud_options, - metadata: Some(metadata), + metadata: None, }, } .into()) @@ -274,11 +168,11 @@ impl DslBuilder { separator: u8, has_header: bool, ignore_errors: bool, - mut skip_rows: usize, + skip_rows: usize, n_rows: Option, cache: bool, - mut schema: Option>, - schema_overwrite: Option<&Schema>, + schema: Option, + schema_overwrite: Option, low_memory: bool, comment_prefix: Option, quote_char: Option, @@ -292,70 +186,12 @@ impl DslBuilder { try_parse_dates: bool, raise_if_empty: bool, truncate_ragged_lines: bool, - mut n_threads: Option, + n_threads: Option, ) -> PolarsResult { let path = path.into(); - let mut file = polars_utils::open_file(&path)?; let paths = Arc::new([path]); - let mut magic_nr = [0u8; 4]; - let res_len = file.read(&mut magic_nr)?; - if res_len < 2 { - if raise_if_empty { - polars_bail!(NoData: "empty CSV") - } - } else { - polars_ensure!( - !is_compressed(&magic_nr), - ComputeError: "cannot scan compressed csv; use `read_csv` for compressed data", - ); - } - - file.rewind()?; - let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file"); - - // TODO! delay inferring schema until absolutely necessary - // this needs a way to estimated bytes/rows. - let (mut inferred_schema, rows_read, bytes_read) = infer_file_schema( - &reader_bytes, - separator, - infer_schema_length, - has_header, - schema_overwrite, - &mut skip_rows, - skip_rows_after_header, - comment_prefix.as_ref(), - quote_char, - eol_char, - null_values.as_ref(), - try_parse_dates, - raise_if_empty, - &mut n_threads, - )?; - - if let Some(rc) = &row_index { - match schema { - None => { - let _ = inferred_schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); - }, - Some(inner) => { - schema = Some(Arc::new( - inner - .new_inserting_at_index(0, rc.name.as_str().into(), IDX_DTYPE) - .unwrap(), - )); - }, - } - } - - let schema = schema.unwrap_or_else(|| Arc::new(inferred_schema)); - let n_bytes = reader_bytes.len(); - let estimated_n_rows = (rows_read as f64 / bytes_read as f64 * n_bytes as f64) as usize; - - skip_rows += skip_rows_after_header; - let file_info = FileInfo::new(schema, None, (None, estimated_n_rows)); - let options = FileScanOptions { with_columns: None, cache, @@ -371,7 +207,7 @@ impl DslBuilder { }; Ok(DslPlan::Scan { paths, - file_info, + file_info: None, file_options: options, predicate: None, scan_type: FileScan::Csv { @@ -390,6 +226,10 @@ impl DslBuilder { raise_if_empty, truncate_ragged_lines, n_threads, + schema, + schema_overwrite, + skip_rows_after_header, + infer_schema_length, }, }, } diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs index f7d1dc270cc9..40cbd8eb239e 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs @@ -35,12 +35,50 @@ pub fn to_alp( let owned = Arc::unwrap_or_clone; let v = match lp { DslPlan::Scan { - mut file_info, + file_info, paths, predicate, - scan_type, + mut scan_type, file_options, } => { + let mut file_info = if let Some(file_info) = file_info { + file_info + } else { + match &mut scan_type { + #[cfg(feature = "parquet")] + FileScan::Parquet { + cloud_options, + metadata, + .. + } => { + let (file_info, md) = scans::parquet_file_info( + &paths, + &file_options, + cloud_options.as_ref(), + )?; + *metadata = md; + file_info + }, + #[cfg(feature = "ipc")] + FileScan::Ipc { + cloud_options, + metadata, + .. + } => { + let (file_info, md) = + scans::ipc_file_info(&paths, &file_options, cloud_options.as_ref())?; + *metadata = Some(md); + file_info + }, + #[cfg(feature = "csv")] + FileScan::Csv { options, .. } => { + scans::csv_file_info(&paths, &file_options, options)? + }, + // FileInfo should be set. + FileScan::Anonymous { .. } => unreachable!(), + } + }; + if let Some(row_index) = &file_options.row_index { let schema = Arc::make_mut(&mut file_info.schema); *schema = schema diff --git a/crates/polars-plan/src/logical_plan/conversion/mod.rs b/crates/polars-plan/src/logical_plan/conversion/mod.rs index 7da56e2a9c8a..282b1da41337 100644 --- a/crates/polars-plan/src/logical_plan/conversion/mod.rs +++ b/crates/polars-plan/src/logical_plan/conversion/mod.rs @@ -1,6 +1,8 @@ mod dsl_plan_to_ir_plan; mod expr_to_expr_ir; mod ir_to_dsl; +#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))] +mod scans; use std::borrow::Cow; @@ -43,7 +45,7 @@ impl IR { file_options: options, } => DslPlan::Scan { paths, - file_info, + file_info: Some(file_info), predicate: predicate.map(|e| e.to_expr(expr_arena)), scan_type, file_options: options, diff --git a/crates/polars-plan/src/logical_plan/conversion/scans.rs b/crates/polars-plan/src/logical_plan/conversion/scans.rs new file mode 100644 index 000000000000..ed48b8f1fef6 --- /dev/null +++ b/crates/polars-plan/src/logical_plan/conversion/scans.rs @@ -0,0 +1,182 @@ +use std::io::Read; +use std::path::PathBuf; + +#[cfg(feature = "cloud")] +use polars_io::pl_async::get_runtime; +use polars_io::prelude::*; +use polars_io::{is_cloud_url, RowIndex}; + +use super::*; + +fn get_path(paths: &[PathBuf]) -> PolarsResult<&PathBuf> { + // Use first path to get schema. + paths + .first() + .ok_or_else(|| polars_err!(ComputeError: "expected at least 1 path")) +} + +#[cfg(any(feature = "parquet", feature = "parquet_async",))] +fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> SchemaRef { + if let Some(rc) = row_index { + let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + } + Arc::new(schema) +} + +#[cfg(feature = "parquet")] +pub(super) fn parquet_file_info( + paths: &[PathBuf], + file_options: &FileScanOptions, + cloud_options: Option<&polars_io::cloud::CloudOptions>, +) -> PolarsResult<(FileInfo, Option)> { + let path = get_path(paths)?; + + let (schema, reader_schema, num_rows, metadata) = if is_cloud_url(path) { + #[cfg(not(feature = "cloud"))] + panic!("One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled."); + + #[cfg(feature = "cloud")] + { + let uri = path.to_string_lossy(); + get_runtime().block_on(async { + let mut reader = + ParquetAsyncReader::from_uri(&uri, cloud_options, None, None).await?; + let reader_schema = reader.schema().await?; + let num_rows = reader.num_rows().await?; + let metadata = reader.get_metadata().await?.clone(); + + let schema = + prepare_schema((&reader_schema).into(), file_options.row_index.as_ref()); + PolarsResult::Ok((schema, reader_schema, Some(num_rows), Some(metadata))) + })? + } + } else { + let file = polars_utils::open_file(path)?; + let mut reader = ParquetReader::new(file); + let reader_schema = reader.schema()?; + let schema = prepare_schema((&reader_schema).into(), file_options.row_index.as_ref()); + ( + schema, + reader_schema, + Some(reader.num_rows()?), + Some(reader.get_metadata()?.clone()), + ) + }; + + let mut file_info = FileInfo::new( + schema, + Some(reader_schema), + (num_rows, num_rows.unwrap_or(0)), + ); + + if file_options.hive_options.enabled { + file_info.init_hive_partitions(path.as_path(), file_options.hive_options.schema.clone())? + } + + Ok((file_info, metadata)) +} + +// TODO! return metadata arced +#[cfg(feature = "ipc")] +pub(super) fn ipc_file_info( + paths: &[PathBuf], + file_options: &FileScanOptions, + cloud_options: Option<&polars_io::cloud::CloudOptions>, +) -> PolarsResult<(FileInfo, arrow::io::ipc::read::FileMetadata)> { + let path = get_path(paths)?; + + let metadata = if is_cloud_url(path) { + #[cfg(not(feature = "cloud"))] + panic!("One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled."); + + #[cfg(feature = "cloud")] + { + let uri = path.to_string_lossy(); + get_runtime().block_on(async { + polars_io::ipc::IpcReaderAsync::from_uri(&uri, cloud_options) + .await? + .metadata() + .await + })? + } + } else { + arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new( + polars_utils::open_file(path)?, + ))? + }; + let file_info = FileInfo::new( + prepare_schema( + metadata.schema.as_ref().into(), + file_options.row_index.as_ref(), + ), + Some(Arc::clone(&metadata.schema)), + (None, 0), + ); + + Ok((file_info, metadata)) +} + +#[cfg(feature = "csv")] +pub(super) fn csv_file_info( + paths: &[PathBuf], + file_options: &FileScanOptions, + csv_options: &mut CsvParserOptions, +) -> PolarsResult { + use std::io::Seek; + + use polars_io::csv::utils::{infer_file_schema, is_compressed}; + use polars_io::utils::get_reader_bytes; + + let path = get_path(paths)?; + let mut file = polars_utils::open_file(path)?; + + let mut magic_nr = [0u8; 4]; + let res_len = file.read(&mut magic_nr)?; + if res_len < 2 { + if csv_options.raise_if_empty { + polars_bail!(NoData: "empty CSV") + } + } else { + polars_ensure!( + !is_compressed(&magic_nr), + ComputeError: "cannot scan compressed csv; use `read_csv` for compressed data", + ); + } + + file.rewind()?; + let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file"); + + // this needs a way to estimated bytes/rows. + let (inferred_schema, rows_read, bytes_read) = infer_file_schema( + &reader_bytes, + csv_options.separator, + csv_options.infer_schema_length, + csv_options.has_header, + csv_options.schema_overwrite.as_deref(), + &mut csv_options.skip_rows, + csv_options.skip_rows_after_header, + csv_options.comment_prefix.as_ref(), + csv_options.quote_char, + csv_options.eol_char, + csv_options.null_values.as_ref(), + csv_options.try_parse_dates, + csv_options.raise_if_empty, + &mut csv_options.n_threads, + )?; + + let mut schema = csv_options + .schema + .clone() + .unwrap_or_else(|| Arc::new(inferred_schema)); + + if let Some(rc) = &file_options.row_index { + let schema = Arc::make_mut(&mut schema); + schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE)?; + } + + let n_bytes = reader_bytes.len(); + let estimated_n_rows = (rows_read as f64 / bytes_read as f64 * n_bytes as f64) as usize; + + csv_options.skip_rows += csv_options.skip_rows_after_header; + Ok(FileInfo::new(schema, None, (None, estimated_n_rows))) +} diff --git a/crates/polars-plan/src/logical_plan/format.rs b/crates/polars-plan/src/logical_plan/format.rs index 7f343f471458..b388a69e4b89 100644 --- a/crates/polars-plan/src/logical_plan/format.rs +++ b/crates/polars-plan/src/logical_plan/format.rs @@ -12,7 +12,7 @@ fn write_scan( path: &[PathBuf], indent: usize, n_columns: i64, - total_columns: usize, + total_columns: Option, predicate: &Option

, n_rows: Option, ) -> fmt::Result { @@ -28,6 +28,9 @@ fn write_scan( path[0].to_string_lossy() )), }; + let total_columns = total_columns + .map(|v| format!("{v}")) + .unwrap_or_else(|| "?".to_string()); write!(f, "{:indent$}{name} SCAN {path_fmt}", "")?; if n_columns > 0 { @@ -58,7 +61,7 @@ impl DslPlan { match self { #[cfg(feature = "python")] PythonScan { options } => { - let total_columns = options.schema.len(); + let total_columns = Some(options.schema.len()); let n_columns = options .with_columns .as_ref() @@ -136,7 +139,7 @@ impl DslPlan { paths, sub_indent, n_columns, - file_info.schema.len(), + file_info.as_ref().map(|fi| fi.schema.len()), predicate, file_options.n_rows, ) diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index 84ce1b812e4f..4d3669198e57 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -84,7 +84,8 @@ pub enum DslPlan { }, Scan { paths: Arc<[PathBuf]>, - file_info: FileInfo, + // Option as this is mostly materialized on the IR phase. + file_info: Option, predicate: Option, file_options: FileScanOptions, scan_type: FileScan, diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index c646be5d869f..8137ead2b6a3 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -26,20 +26,24 @@ pub type FileCount = u32; #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct CsvParserOptions { - pub separator: u8, pub comment_prefix: Option, pub quote_char: Option, - pub eol_char: u8, - pub has_header: bool, pub skip_rows: usize, - pub low_memory: bool, - pub ignore_errors: bool, - pub null_values: Option, pub encoding: CsvEncoding, + pub skip_rows_after_header: usize, + pub infer_schema_length: Option, + pub n_threads: Option, pub try_parse_dates: bool, pub raise_if_empty: bool, pub truncate_ragged_lines: bool, - pub n_threads: Option, + pub low_memory: bool, + pub ignore_errors: bool, + pub has_header: bool, + pub eol_char: u8, + pub separator: u8, + pub schema_overwrite: Option, + pub schema: Option, + pub null_values: Option, } #[cfg(feature = "parquet")] diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index e01882bc2657..751ae0262dba 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -198,7 +198,7 @@ impl PyLazyFrame { .with_skip_rows(skip_rows) .with_n_rows(n_rows) .with_cache(cache) - .with_dtype_overwrite(overwrite_dtype.as_ref()) + .with_dtype_overwrite(overwrite_dtype.map(Arc::new)) .with_schema(schema.map(|schema| Arc::new(schema.0))) .low_memory(low_memory) .with_comment_prefix(comment_prefix) diff --git a/py-polars/tests/unit/datatypes/test_enum.py b/py-polars/tests/unit/datatypes/test_enum.py index acae415af237..be01c60a5e66 100644 --- a/py-polars/tests/unit/datatypes/test_enum.py +++ b/py-polars/tests/unit/datatypes/test_enum.py @@ -509,3 +509,23 @@ def test_category_comparison_subset() -> None: assert out["dt1"].dtype == pl.Enum(["a"]) assert out["dt2"].dtype == pl.Enum(["a", "b"]) assert out["dt1"].dtype != out["dt2"].dtype + + +@pytest.mark.parametrize( + "dt", + [ + pl.UInt8, + pl.UInt16, + pl.UInt32, + pl.UInt64, + pl.Int8, + pl.Int16, + pl.Int32, + pl.Int64, + ], +) +def test_integer_cast_to_enum_15738(dt: pl.DataType) -> None: + s = pl.Series([0, 1, 2], dtype=dt).cast(pl.Enum(["a", "b", "c"])) + assert s.to_list() == ["a", "b", "c"] + expected_s = pl.Series(["a", "b", "c"], dtype=pl.Enum(["a", "b", "c"])) + assert_series_equal(s, expected_s) diff --git a/py-polars/tests/unit/io/test_csv.py b/py-polars/tests/unit/io/test_csv.py index febe3736dabf..b1de9102644e 100644 --- a/py-polars/tests/unit/io/test_csv.py +++ b/py-polars/tests/unit/io/test_csv.py @@ -570,7 +570,7 @@ def test_compressed_csv(io_files_path: Path) -> None: ComputeError, match="cannot scan compressed csv; use `read_csv` for compressed data", ): - pl.scan_csv(csv_file) + pl.scan_csv(csv_file).collect() out = pl.read_csv(str(csv_file), truncate_ragged_lines=True) assert_frame_equal(out, expected) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 535f1412eb7b..c5d3377df5f3 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -182,7 +182,7 @@ def test_hive_partitioned_err(io_files_path: Path, tmp_path: Path) -> None: df.write_parquet(root / "file.parquet") with pytest.raises(pl.DuplicateError, match="invalid Hive partition schema"): - pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True) + pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True).collect() @pytest.mark.write_disk() diff --git a/py-polars/tests/unit/io/test_other.py b/py-polars/tests/unit/io/test_other.py index 6041cbc08ed2..2bc36d66e5ae 100644 --- a/py-polars/tests/unit/io/test_other.py +++ b/py-polars/tests/unit/io/test_other.py @@ -30,8 +30,12 @@ def test_read_missing_file(read_function: Callable[[Any], pl.DataFrame]) -> None if sys.platform == "linux": match = "No such file or directory " + match - with pytest.raises(FileNotFoundError, match=match): - read_function("fake_file_path") + if "scan" in read_function.__name__: + with pytest.raises(FileNotFoundError, match=match): + read_function("fake_file_path").collect() # type: ignore[attr-defined] + else: + with pytest.raises(FileNotFoundError, match=match): + read_function("fake_file_path") @pytest.mark.parametrize(