Skip to content

Commit

Permalink
feat: Decompress in CSV / NDJSON scan (#17841)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Jul 26, 2024
1 parent de92ea6 commit 1f14e4a
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 82 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/polars-io/src/csv/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,3 @@ pub use parser::count_rows;
pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader};
pub use reader::CsvReader;
pub use schema_inference::infer_file_schema;
pub use utils::is_compressed;
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use super::schema_inference::{check_decimal_comma, infer_file_schema};
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
use super::utils::decompress;
use super::utils::get_file_chunks;
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
use super::utils::is_compressed;
use crate::mmap::ReaderBytes;
use crate::predicates::PhysicalIoExpr;
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
use crate::utils::is_compressed;
use crate::utils::update_row_counts;
use crate::RowIndex;

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl CsvReadOptions {
);

let path = resolve_homedir(self.path.as_ref().unwrap());
let reader = polars_utils::open_file(path)?;
let reader = polars_utils::open_file(&path)?;
let options = self;

Ok(CsvReader {
Expand Down
17 changes: 1 addition & 16 deletions crates/polars-io/src/csv/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,6 @@ pub(crate) fn get_file_chunks(
offsets
}

// magic numbers
const GZIP: [u8; 2] = [31, 139];
const ZLIB0: [u8; 2] = [0x78, 0x01];
const ZLIB1: [u8; 2] = [0x78, 0x9C];
const ZLIB2: [u8; 2] = [0x78, 0xDA];
const ZSTD: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];

/// check if csv file is compressed
pub fn is_compressed(bytes: &[u8]) -> bool {
bytes.starts_with(&ZLIB0)
|| bytes.starts_with(&ZLIB1)
|| bytes.starts_with(&ZLIB2)
|| bytes.starts_with(&GZIP)
|| bytes.starts_with(&ZSTD)
}

#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
fn decompress_impl<R: Read>(
decoder: &mut R,
Expand Down Expand Up @@ -145,6 +129,7 @@ pub(crate) fn decompress(
quote_char: Option<u8>,
eol_char: u8,
) -> Option<Vec<u8>> {
use crate::utils::compression::magic::*;
if bytes.starts_with(&GZIP) {
let mut decoder = flate2::read::MultiGzDecoder::new(bytes);
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
Expand Down
19 changes: 19 additions & 0 deletions crates/polars-io/src/utils/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// magic numbers
pub mod magic {
pub const GZIP: [u8; 2] = [31, 139];
pub const ZLIB0: [u8; 2] = [0x78, 0x01];
pub const ZLIB1: [u8; 2] = [0x78, 0x9C];
pub const ZLIB2: [u8; 2] = [0x78, 0xDA];
pub const ZSTD: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
}

/// check if csv file is compressed
pub fn is_compressed(bytes: &[u8]) -> bool {
use magic::*;

bytes.starts_with(&ZLIB0)
|| bytes.starts_with(&ZLIB1)
|| bytes.starts_with(&ZLIB2)
|| bytes.starts_with(&GZIP)
|| bytes.starts_with(&ZSTD)
}
2 changes: 2 additions & 0 deletions crates/polars-io/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod compression;
mod other;

pub use compression::is_compressed;
pub use other::*;

pub const URL_ENCODE_CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
Expand Down
47 changes: 47 additions & 0 deletions crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use once_cell::sync::Lazy;
use polars_core::prelude::*;
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_error::to_compute_err;
use regex::{Regex, RegexBuilder};

use crate::mmap::{MmapBytesReader, ReaderBytes};
Expand Down Expand Up @@ -41,6 +42,52 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
}
}

/// Decompress `bytes` if compression is detected, otherwise simply return it.
/// An `out` vec must be given for ownership of the decompressed data.
///
/// # Safety
/// The `out` vec outlives `bytes` (declare `out` first).
pub unsafe fn maybe_decompress_bytes<'a>(
bytes: &'a [u8],
out: &'a mut Vec<u8>,
) -> PolarsResult<&'a [u8]> {
assert!(out.is_empty());
use crate::prelude::is_compressed;
let is_compressed = bytes.len() >= 4 && is_compressed(bytes);

if is_compressed {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
{
use crate::utils::compression::magic::*;

if bytes.starts_with(&GZIP) {
flate2::read::MultiGzDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
} else if bytes.starts_with(&ZLIB0)
|| bytes.starts_with(&ZLIB1)
|| bytes.starts_with(&ZLIB2)
{
flate2::read::ZlibDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
} else if bytes.starts_with(&ZSTD) {
zstd::Decoder::new(bytes)?.read_to_end(out)?;
} else {
polars_bail!(ComputeError: "unimplemented compression format")
}

Ok(out)
}
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
{
panic!("cannot decompress without 'decompress' or 'decompress-fast' feature")
}
} else {
Ok(bytes)
}
}

/// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read
/// concurrently/parallel
///
Expand Down
1 change: 1 addition & 0 deletions crates/polars-mem-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description = "In memory engine of the Polars project."
[dependencies]
arrow = { workspace = true }
futures = { workspace = true, optional = true }
memmap = { workspace = true }
polars-core = { workspace = true, features = ["lazy"] }
polars-error = { workspace = true }
polars-expr = { workspace = true }
Expand Down
27 changes: 18 additions & 9 deletions crates/polars-mem-engine/src/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ impl CsvExec {
let mut df = if run_async {
#[cfg(feature = "cloud")]
{
let file = polars_io::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?;
let owned = &mut vec![];
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };

options
.into_reader_with_file_handle(
polars_io::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?,
)
.into_reader_with_file_handle(std::io::Cursor::new(unsafe {
maybe_decompress_bytes(mmap.as_ref(), owned)
}?))
._with_predicate(predicate.clone())
.finish()
}
Expand All @@ -81,9 +85,14 @@ impl CsvExec {
panic!("required feature `cloud` is not enabled")
}
} else {
let file = polars_utils::open_file(path)?;
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];

options
.try_into_reader_with_file_path(Some(path.clone()))
.unwrap()
.into_reader_with_file_handle(std::io::Cursor::new(unsafe {
maybe_decompress_bytes(mmap.as_ref(), owned)
}?))
._with_predicate(predicate.clone())
.finish()
}?;
Expand Down
46 changes: 27 additions & 19 deletions crates/polars-mem-engine/src/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,32 +69,40 @@ impl JsonExec {
return None;
}

let reader = if run_async {
JsonLineReader::new({
#[cfg(feature = "cloud")]
let file = if run_async {
#[cfg(feature = "cloud")]
{
match polars_io::file_cache::FILE_CACHE
.get_entry(p.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()
{
match polars_io::file_cache::FILE_CACHE
.get_entry(p.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()
{
Ok(v) => v,
Err(e) => return Some(Err(e)),
}
Ok(v) => v,
Err(e) => return Some(Err(e)),
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
})
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
} else {
match JsonLineReader::from_path(p) {
Ok(r) => r,
match polars_utils::open_file(p.as_ref()) {
Ok(v) => v,
Err(e) => return Some(Err(e)),
}
};

let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];
let curs = std::io::Cursor::new(
match unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) } {
Ok(v) => v,
Err(e) => return Some(Err(e)),
},
);
let reader = JsonLineReader::new(curs);

let row_index = self.file_scan_options.row_index.as_mut();

let df = reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Source for GroupBySource {
if partition_dir.exists() {
for file in std::fs::read_dir(partition_dir).expect("should be there") {
let spilled = file.unwrap().path();
let file = polars_utils::open_file(spilled)?;
let file = polars_utils::open_file(&spilled)?;
let reader = IpcReader::new(file);
let spilled = reader.finish().unwrap();
if spilled.n_chunks() > 1 {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ciborium = { workspace = true, optional = true }
either = { workspace = true }
futures = { workspace = true, optional = true }
hashbrown = { workspace = true }
memmap = { workspace = true }
once_cell = { workspace = true }
percent-encoding = { workspace = true }
pyo3 = { workspace = true, optional = true }
Expand Down
36 changes: 19 additions & 17 deletions crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ pub(super) fn csv_file_info(
use std::io::{Read, Seek};

use polars_core::{config, POOL};
use polars_io::csv::read::is_compressed;
use polars_io::csv::read::schema_inference::SchemaInferenceResult;
use polars_io::utils::get_reader_bytes;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
Expand Down Expand Up @@ -173,7 +172,7 @@ pub(super) fn csv_file_info(
};

let infer_schema_func = |i| {
let mut file = if run_async {
let file = if run_async {
#[cfg(feature = "cloud")]
{
let entry: &Arc<polars_io::file_cache::FileCacheEntry> =
Expand All @@ -185,24 +184,22 @@ pub(super) fn csv_file_info(
panic!("required feature `cloud` is not enabled")
}
} else {
polars_utils::open_file(paths.get(i).unwrap())?
let p: &PathBuf = &paths[i];
polars_utils::open_file(p.as_ref())?
};

let mut magic_nr = [0u8; 4];
let res_len = file.read(&mut magic_nr)?;
if res_len < 2 {
if csv_options.raise_if_empty {
polars_bail!(NoData: "empty CSV")
}
} else {
polars_ensure!(
!is_compressed(&magic_nr),
ComputeError: "cannot scan compressed csv; use `read_csv` for compressed data",
);
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];

let mut curs =
std::io::Cursor::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?);

if curs.read(&mut [0; 4])? < 2 && csv_options.raise_if_empty {
polars_bail!(NoData: "empty CSV")
}
curs.rewind()?;

file.rewind()?;
let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file");
let reader_bytes = get_reader_bytes(&mut curs).expect("could not mmap file");

// this needs a way to estimated bytes/rows.
let si_result =
Expand Down Expand Up @@ -323,7 +320,12 @@ pub(super) fn ndjson_file_info(
} else {
polars_utils::open_file(first_path)?
};
let mut reader = std::io::BufReader::new(f);

let owned = &mut vec![];
let mmap = unsafe { memmap::Mmap::map(&f).unwrap() };

let mut reader =
std::io::BufReader::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?);

let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() {
if file_options.row_index.is_none() {
Expand Down
14 changes: 4 additions & 10 deletions crates/polars-utils/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@ pub fn _limit_path_len_io_err(path: &Path, err: io::Error) -> PolarsError {
io::Error::new(err.kind(), msg).into()
}

pub fn open_file<P>(path: P) -> PolarsResult<File>
where
P: AsRef<Path>,
{
File::open(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err))
pub fn open_file(path: &Path) -> PolarsResult<File> {
File::open(path).map_err(|err| _limit_path_len_io_err(path, err))
}

pub fn create_file<P>(path: P) -> PolarsResult<File>
where
P: AsRef<Path>,
{
File::create(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err))
pub fn create_file(path: &Path) -> PolarsResult<File> {
File::create(path).map_err(|err| _limit_path_len_io_err(path, err))
}
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async fn test_column_async(column: &str, compression: CompressionOptions) -> Par
#[test]
fn test_parquet() {
// In CI: This test will be skipped because the file does not exist.
if let Ok(r) = polars_utils::open_file("data/simple.parquet") {
if let Ok(r) = polars_utils::open_file("data/simple.parquet".as_ref()) {
let reader = ParquetReader::new(r);
let df = reader.finish().unwrap();
assert_eq!(df.get_column_names(), ["a", "b"]);
Expand Down
Loading

0 comments on commit 1f14e4a

Please sign in to comment.