From 1f14e4aa87b68dd931db0d1230dea44c4345d148 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Fri, 26 Jul 2024 23:41:53 +1000 Subject: [PATCH] feat: Decompress in CSV / NDJSON scan (#17841) --- Cargo.lock | 2 + crates/polars-io/src/csv/read/mod.rs | 1 - crates/polars-io/src/csv/read/read_impl.rs | 4 +- crates/polars-io/src/csv/read/reader.rs | 2 +- crates/polars-io/src/csv/read/utils.rs | 17 +------ crates/polars-io/src/utils/compression.rs | 19 ++++++++ crates/polars-io/src/utils/mod.rs | 2 + crates/polars-io/src/utils/other.rs | 47 +++++++++++++++++++ crates/polars-mem-engine/Cargo.toml | 1 + .../src/executors/scan/csv.rs | 27 +++++++---- .../src/executors/scan/ndjson.rs | 46 ++++++++++-------- .../sinks/group_by/generic/source.rs | 2 +- crates/polars-plan/Cargo.toml | 1 + .../polars-plan/src/plans/conversion/scans.rs | 36 +++++++------- crates/polars-utils/src/io.rs | 14 ++---- .../polars/tests/it/io/parquet/write/mod.rs | 2 +- py-polars/tests/unit/io/test_csv.py | 7 +-- 17 files changed, 148 insertions(+), 82 deletions(-) create mode 100644 crates/polars-io/src/utils/compression.rs diff --git a/Cargo.lock b/Cargo.lock index 25fff4ef566a..6af2da134e8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3185,6 +3185,7 @@ name = "polars-mem-engine" version = "0.41.3" dependencies = [ "futures", + "memmap2", "polars-arrow", "polars-core", "polars-error", @@ -3305,6 +3306,7 @@ dependencies = [ "futures", "hashbrown", "libloading", + "memmap2", "once_cell", "percent-encoding", "polars-arrow", diff --git a/crates/polars-io/src/csv/read/mod.rs b/crates/polars-io/src/csv/read/mod.rs index aced5f377208..969be1a58908 100644 --- a/crates/polars-io/src/csv/read/mod.rs +++ b/crates/polars-io/src/csv/read/mod.rs @@ -30,4 +30,3 @@ pub use parser::count_rows; pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader}; pub use reader::CsvReader; pub use schema_inference::infer_file_schema; -pub use utils::is_compressed; diff --git a/crates/polars-io/src/csv/read/read_impl.rs b/crates/polars-io/src/csv/read/read_impl.rs index 50ba63dd668a..51ba097b2133 100644 --- a/crates/polars-io/src/csv/read/read_impl.rs +++ b/crates/polars-io/src/csv/read/read_impl.rs @@ -21,10 +21,10 @@ use super::schema_inference::{check_decimal_comma, infer_file_schema}; #[cfg(any(feature = "decompress", feature = "decompress-fast"))] use super::utils::decompress; use super::utils::get_file_chunks; -#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))] -use super::utils::is_compressed; use crate::mmap::ReaderBytes; use crate::predicates::PhysicalIoExpr; +#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))] +use crate::utils::is_compressed; use crate::utils::update_row_counts; use crate::RowIndex; diff --git a/crates/polars-io/src/csv/read/reader.rs b/crates/polars-io/src/csv/read/reader.rs index 0fbea139c845..ba883f946fc3 100644 --- a/crates/polars-io/src/csv/read/reader.rs +++ b/crates/polars-io/src/csv/read/reader.rs @@ -91,7 +91,7 @@ impl CsvReadOptions { ); let path = resolve_homedir(self.path.as_ref().unwrap()); - let reader = polars_utils::open_file(path)?; + let reader = polars_utils::open_file(&path)?; let options = self; Ok(CsvReader { diff --git a/crates/polars-io/src/csv/read/utils.rs b/crates/polars-io/src/csv/read/utils.rs index 61a14399237a..33c6a6c8f290 100644 --- a/crates/polars-io/src/csv/read/utils.rs +++ b/crates/polars-io/src/csv/read/utils.rs @@ -45,22 +45,6 @@ pub(crate) fn get_file_chunks( offsets } -// magic numbers -const GZIP: [u8; 2] = [31, 139]; -const ZLIB0: [u8; 2] = [0x78, 0x01]; -const ZLIB1: [u8; 2] = [0x78, 0x9C]; -const ZLIB2: [u8; 2] = [0x78, 0xDA]; -const ZSTD: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD]; - -/// check if csv file is compressed -pub fn is_compressed(bytes: &[u8]) -> bool { - bytes.starts_with(&ZLIB0) - || bytes.starts_with(&ZLIB1) - || bytes.starts_with(&ZLIB2) - || bytes.starts_with(&GZIP) - || bytes.starts_with(&ZSTD) -} - #[cfg(any(feature = "decompress", feature = "decompress-fast"))] fn decompress_impl( decoder: &mut R, @@ -145,6 +129,7 @@ pub(crate) fn decompress( quote_char: Option, eol_char: u8, ) -> Option> { + use crate::utils::compression::magic::*; if bytes.starts_with(&GZIP) { let mut decoder = flate2::read::MultiGzDecoder::new(bytes); decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char) diff --git a/crates/polars-io/src/utils/compression.rs b/crates/polars-io/src/utils/compression.rs new file mode 100644 index 000000000000..d771b4c6ca1e --- /dev/null +++ b/crates/polars-io/src/utils/compression.rs @@ -0,0 +1,19 @@ +// magic numbers +pub mod magic { + pub const GZIP: [u8; 2] = [31, 139]; + pub const ZLIB0: [u8; 2] = [0x78, 0x01]; + pub const ZLIB1: [u8; 2] = [0x78, 0x9C]; + pub const ZLIB2: [u8; 2] = [0x78, 0xDA]; + pub const ZSTD: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD]; +} + +/// check if csv file is compressed +pub fn is_compressed(bytes: &[u8]) -> bool { + use magic::*; + + bytes.starts_with(&ZLIB0) + || bytes.starts_with(&ZLIB1) + || bytes.starts_with(&ZLIB2) + || bytes.starts_with(&GZIP) + || bytes.starts_with(&ZSTD) +} diff --git a/crates/polars-io/src/utils/mod.rs b/crates/polars-io/src/utils/mod.rs index 4711fab55609..a940870c9270 100644 --- a/crates/polars-io/src/utils/mod.rs +++ b/crates/polars-io/src/utils/mod.rs @@ -1,5 +1,7 @@ +pub mod compression; mod other; +pub use compression::is_compressed; pub use other::*; pub const URL_ENCODE_CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index 648c678bf814..9ab9772d955e 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -6,6 +6,7 @@ use once_cell::sync::Lazy; use polars_core::prelude::*; #[cfg(any(feature = "ipc_streaming", feature = "parquet"))] use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref}; +use polars_error::to_compute_err; use regex::{Regex, RegexBuilder}; use crate::mmap::{MmapBytesReader, ReaderBytes}; @@ -41,6 +42,52 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>( } } +/// Decompress `bytes` if compression is detected, otherwise simply return it. +/// An `out` vec must be given for ownership of the decompressed data. +/// +/// # Safety +/// The `out` vec outlives `bytes` (declare `out` first). +pub unsafe fn maybe_decompress_bytes<'a>( + bytes: &'a [u8], + out: &'a mut Vec, +) -> PolarsResult<&'a [u8]> { + assert!(out.is_empty()); + use crate::prelude::is_compressed; + let is_compressed = bytes.len() >= 4 && is_compressed(bytes); + + if is_compressed { + #[cfg(any(feature = "decompress", feature = "decompress-fast"))] + { + use crate::utils::compression::magic::*; + + if bytes.starts_with(&GZIP) { + flate2::read::MultiGzDecoder::new(bytes) + .read_to_end(out) + .map_err(to_compute_err)?; + } else if bytes.starts_with(&ZLIB0) + || bytes.starts_with(&ZLIB1) + || bytes.starts_with(&ZLIB2) + { + flate2::read::ZlibDecoder::new(bytes) + .read_to_end(out) + .map_err(to_compute_err)?; + } else if bytes.starts_with(&ZSTD) { + zstd::Decoder::new(bytes)?.read_to_end(out)?; + } else { + polars_bail!(ComputeError: "unimplemented compression format") + } + + Ok(out) + } + #[cfg(not(any(feature = "decompress", feature = "decompress-fast")))] + { + panic!("cannot decompress without 'decompress' or 'decompress-fast' feature") + } + } else { + Ok(bytes) + } +} + /// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read /// concurrently/parallel /// diff --git a/crates/polars-mem-engine/Cargo.toml b/crates/polars-mem-engine/Cargo.toml index f50a8aee4240..aa3d7dc5fd0a 100644 --- a/crates/polars-mem-engine/Cargo.toml +++ b/crates/polars-mem-engine/Cargo.toml @@ -11,6 +11,7 @@ description = "In memory engine of the Polars project." [dependencies] arrow = { workspace = true } futures = { workspace = true, optional = true } +memmap = { workspace = true } polars-core = { workspace = true, features = ["lazy"] } polars-error = { workspace = true } polars-expr = { workspace = true } diff --git a/crates/polars-mem-engine/src/executors/scan/csv.rs b/crates/polars-mem-engine/src/executors/scan/csv.rs index 348eebfe332f..4f9dd90c3376 100644 --- a/crates/polars-mem-engine/src/executors/scan/csv.rs +++ b/crates/polars-mem-engine/src/executors/scan/csv.rs @@ -65,14 +65,18 @@ impl CsvExec { let mut df = if run_async { #[cfg(feature = "cloud")] { + let file = polars_io::file_cache::FILE_CACHE + .get_entry(path.to_str().unwrap()) + // Safety: This was initialized by schema inference. + .unwrap() + .try_open_assume_latest()?; + let owned = &mut vec![]; + let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; + options - .into_reader_with_file_handle( - polars_io::file_cache::FILE_CACHE - .get_entry(path.to_str().unwrap()) - // Safety: This was initialized by schema inference. - .unwrap() - .try_open_assume_latest()?, - ) + .into_reader_with_file_handle(std::io::Cursor::new(unsafe { + maybe_decompress_bytes(mmap.as_ref(), owned) + }?)) ._with_predicate(predicate.clone()) .finish() } @@ -81,9 +85,14 @@ impl CsvExec { panic!("required feature `cloud` is not enabled") } } else { + let file = polars_utils::open_file(path)?; + let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; + let owned = &mut vec![]; + options - .try_into_reader_with_file_path(Some(path.clone())) - .unwrap() + .into_reader_with_file_handle(std::io::Cursor::new(unsafe { + maybe_decompress_bytes(mmap.as_ref(), owned) + }?)) ._with_predicate(predicate.clone()) .finish() }?; diff --git a/crates/polars-mem-engine/src/executors/scan/ndjson.rs b/crates/polars-mem-engine/src/executors/scan/ndjson.rs index fb898eb4ce31..23fb6511ad8a 100644 --- a/crates/polars-mem-engine/src/executors/scan/ndjson.rs +++ b/crates/polars-mem-engine/src/executors/scan/ndjson.rs @@ -69,32 +69,40 @@ impl JsonExec { return None; } - let reader = if run_async { - JsonLineReader::new({ - #[cfg(feature = "cloud")] + let file = if run_async { + #[cfg(feature = "cloud")] + { + match polars_io::file_cache::FILE_CACHE + .get_entry(p.to_str().unwrap()) + // Safety: This was initialized by schema inference. + .unwrap() + .try_open_assume_latest() { - match polars_io::file_cache::FILE_CACHE - .get_entry(p.to_str().unwrap()) - // Safety: This was initialized by schema inference. - .unwrap() - .try_open_assume_latest() - { - Ok(v) => v, - Err(e) => return Some(Err(e)), - } + Ok(v) => v, + Err(e) => return Some(Err(e)), } - #[cfg(not(feature = "cloud"))] - { - panic!("required feature `cloud` is not enabled") - } - }) + } + #[cfg(not(feature = "cloud"))] + { + panic!("required feature `cloud` is not enabled") + } } else { - match JsonLineReader::from_path(p) { - Ok(r) => r, + match polars_utils::open_file(p.as_ref()) { + Ok(v) => v, Err(e) => return Some(Err(e)), } }; + let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; + let owned = &mut vec![]; + let curs = std::io::Cursor::new( + match unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) } { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }, + ); + let reader = JsonLineReader::new(curs); + let row_index = self.file_scan_options.row_index.as_mut(); let df = reader diff --git a/crates/polars-pipe/src/executors/sinks/group_by/generic/source.rs b/crates/polars-pipe/src/executors/sinks/group_by/generic/source.rs index 4ebaf073525b..530225457350 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/generic/source.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/generic/source.rs @@ -64,7 +64,7 @@ impl Source for GroupBySource { if partition_dir.exists() { for file in std::fs::read_dir(partition_dir).expect("should be there") { let spilled = file.unwrap().path(); - let file = polars_utils::open_file(spilled)?; + let file = polars_utils::open_file(&spilled)?; let reader = IpcReader::new(file); let spilled = reader.finish().unwrap(); if spilled.n_chunks() > 1 { diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index 79422dd644a1..a05b19720be6 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -32,6 +32,7 @@ ciborium = { workspace = true, optional = true } either = { workspace = true } futures = { workspace = true, optional = true } hashbrown = { workspace = true } +memmap = { workspace = true } once_cell = { workspace = true } percent-encoding = { workspace = true } pyo3 = { workspace = true, optional = true } diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 9ecfa85382a2..19ba780f5a25 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -135,7 +135,6 @@ pub(super) fn csv_file_info( use std::io::{Read, Seek}; use polars_core::{config, POOL}; - use polars_io::csv::read::is_compressed; use polars_io::csv::read::schema_inference::SchemaInferenceResult; use polars_io::utils::get_reader_bytes; use rayon::iter::{IntoParallelIterator, ParallelIterator}; @@ -173,7 +172,7 @@ pub(super) fn csv_file_info( }; let infer_schema_func = |i| { - let mut file = if run_async { + let file = if run_async { #[cfg(feature = "cloud")] { let entry: &Arc = @@ -185,24 +184,22 @@ pub(super) fn csv_file_info( panic!("required feature `cloud` is not enabled") } } else { - polars_utils::open_file(paths.get(i).unwrap())? + let p: &PathBuf = &paths[i]; + polars_utils::open_file(p.as_ref())? }; - let mut magic_nr = [0u8; 4]; - let res_len = file.read(&mut magic_nr)?; - if res_len < 2 { - if csv_options.raise_if_empty { - polars_bail!(NoData: "empty CSV") - } - } else { - polars_ensure!( - !is_compressed(&magic_nr), - ComputeError: "cannot scan compressed csv; use `read_csv` for compressed data", - ); + let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; + let owned = &mut vec![]; + + let mut curs = + std::io::Cursor::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?); + + if curs.read(&mut [0; 4])? < 2 && csv_options.raise_if_empty { + polars_bail!(NoData: "empty CSV") } + curs.rewind()?; - file.rewind()?; - let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file"); + let reader_bytes = get_reader_bytes(&mut curs).expect("could not mmap file"); // this needs a way to estimated bytes/rows. let si_result = @@ -323,7 +320,12 @@ pub(super) fn ndjson_file_info( } else { polars_utils::open_file(first_path)? }; - let mut reader = std::io::BufReader::new(f); + + let owned = &mut vec![]; + let mmap = unsafe { memmap::Mmap::map(&f).unwrap() }; + + let mut reader = + std::io::BufReader::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?); let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { if file_options.row_index.is_none() { diff --git a/crates/polars-utils/src/io.rs b/crates/polars-utils/src/io.rs index 7b52ebb0ab67..d472c4a8186d 100644 --- a/crates/polars-utils/src/io.rs +++ b/crates/polars-utils/src/io.rs @@ -19,16 +19,10 @@ pub fn _limit_path_len_io_err(path: &Path, err: io::Error) -> PolarsError { io::Error::new(err.kind(), msg).into() } -pub fn open_file

(path: P) -> PolarsResult -where - P: AsRef, -{ - File::open(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err)) +pub fn open_file(path: &Path) -> PolarsResult { + File::open(path).map_err(|err| _limit_path_len_io_err(path, err)) } -pub fn create_file

(path: P) -> PolarsResult -where - P: AsRef, -{ - File::create(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err)) +pub fn create_file(path: &Path) -> PolarsResult { + File::create(path).map_err(|err| _limit_path_len_io_err(path, err)) } diff --git a/crates/polars/tests/it/io/parquet/write/mod.rs b/crates/polars/tests/it/io/parquet/write/mod.rs index 52c534d370b9..c0e37be12a68 100644 --- a/crates/polars/tests/it/io/parquet/write/mod.rs +++ b/crates/polars/tests/it/io/parquet/write/mod.rs @@ -291,7 +291,7 @@ async fn test_column_async(column: &str, compression: CompressionOptions) -> Par #[test] fn test_parquet() { // In CI: This test will be skipped because the file does not exist. - if let Ok(r) = polars_utils::open_file("data/simple.parquet") { + if let Ok(r) = polars_utils::open_file("data/simple.parquet".as_ref()) { let reader = ParquetReader::new(r); let df = reader.finish().unwrap(); assert_eq!(df.get_column_names(), ["a", "b"]); diff --git a/py-polars/tests/unit/io/test_csv.py b/py-polars/tests/unit/io/test_csv.py index 33bbcf67f1ae..b4ccbed97db8 100644 --- a/py-polars/tests/unit/io/test_csv.py +++ b/py-polars/tests/unit/io/test_csv.py @@ -579,11 +579,8 @@ def test_compressed_csv(io_files_path: Path, monkeypatch: pytest.MonkeyPatch) -> # zstd compressed file csv_file = io_files_path / "zstd_compressed.csv.zst" - with pytest.raises( - ComputeError, - match="cannot scan compressed csv; use `read_csv` for compressed data", - ): - pl.scan_csv(csv_file).collect() + out = pl.scan_csv(csv_file, truncate_ragged_lines=True).collect() + assert_frame_equal(out, expected) out = pl.read_csv(str(csv_file), truncate_ragged_lines=True) assert_frame_equal(out, expected)