Skip to content

Commit

Permalink
feat(rust, python): add truncate_ragged_lines (#10660)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Aug 22, 2023
1 parent 97ab4cd commit dc2e617
Show file tree
Hide file tree
Showing 24 changed files with 135 additions and 24 deletions.
3 changes: 3 additions & 0 deletions crates/polars-error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ description = "Error definitions for the Polars DataFrame library"
arrow = { workspace = true }
regex = { workspace = true, optional = true }
thiserror = { workspace = true }

[features]
python = []
10 changes: 10 additions & 0 deletions crates/polars-error/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//! Constant that help with creating error messages dependent on the host language.
#[cfg(feature = "python")]
pub static TRUE: &str = "True";
#[cfg(feature = "python")]
pub static FALSE: &str = "False";

#[cfg(not(feature = "python"))]
pub static TRUE: &str = "true";
#[cfg(not(feature = "python"))]
pub static FALSE: &str = "false";
1 change: 1 addition & 0 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod constants;
mod warning;

use std::borrow::Cow;
Expand Down
1 change: 1 addition & 0 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ gcp = ["object_store/gcp", "cloud", "polars-core/gcp"]
partition = ["polars-core/partition_by"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
simd = []
python = ["polars-error/python"]

[package.metadata.docs.rs]
all-features = true
Expand Down
16 changes: 14 additions & 2 deletions crates/polars-io/src/csv/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,12 @@ pub(super) fn parse_lines<'a>(
comment_char: Option<u8>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<&NullValuesCompiled>,
missing_is_null: bool,
ignore_errors: bool,
mut truncate_ragged_lines: bool,
null_values: Option<&NullValuesCompiled>,
projection: &[usize],
buffers: &mut [Buffer<'a>],
ignore_errors: bool,
n_lines: usize,
// length of original schema
schema_len: usize,
Expand All @@ -368,6 +369,12 @@ pub(super) fn parse_lines<'a>(
!projection.is_empty(),
"at least one column should be projected"
);
// During projection pushdown we are not checking other csv fields.
// This would be very expensive and we don't care as we only want
// the projected columns.
if projection.len() != schema_len {
truncate_ragged_lines = true
}

// we use the pointers to track the no of bytes read.
let start = bytes.as_ptr() as usize;
Expand Down Expand Up @@ -487,6 +494,11 @@ pub(super) fn parse_lines<'a>(
if bytes.get(read_sol - 1) == Some(&eol_char) {
bytes = &bytes[read_sol..];
} else {
if !truncate_ragged_lines && read_sol < bytes.len() {
polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
}
let bytes_rem = skip_this_line(
&bytes[read_sol - 1..],
quote_char,
Expand Down
23 changes: 16 additions & 7 deletions crates/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ where
{
/// File or Stream object
reader: R,
/// Aggregates chunk afterwards to a single chunk.
rechunk: bool,
/// Stop reading from the csv after this number of rows is reached
n_rows: Option<usize>,
// used by error ignore logic
Expand All @@ -112,8 +110,6 @@ where
/// Optional column names to project/ select.
columns: Option<Vec<String>>,
delimiter: Option<u8>,
has_header: bool,
ignore_errors: bool,
pub(crate) schema: Option<SchemaRef>,
encoding: CsvEncoding,
n_threads: Option<usize>,
Expand All @@ -122,17 +118,22 @@ where
dtype_overwrite: Option<&'a [DataType]>,
sample_size: usize,
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValues>,
missing_is_null: bool,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
quote_char: Option<u8>,
skip_rows_after_header: usize,
try_parse_dates: bool,
row_count: Option<RowCount>,
/// Aggregates chunk afterwards to a single chunk.
rechunk: bool,
raise_if_empty: bool,
truncate_ragged_lines: bool,
missing_is_null: bool,
low_memory: bool,
has_header: bool,
ignore_errors: bool,
eol_char: u8,
}

impl<'a, R> CsvReader<'a, R>
Expand Down Expand Up @@ -324,6 +325,12 @@ where
self.predicate = predicate;
self
}

/// Truncate lines that are longer than the schema.
pub fn truncate_ragged_lines(mut self, toggle: bool) -> Self {
self.truncate_ragged_lines = toggle;
self
}
}

impl<'a> CsvReader<'a, File> {
Expand Down Expand Up @@ -374,6 +381,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
std::mem::take(&mut self.row_count),
self.try_parse_dates,
self.raise_if_empty,
self.truncate_ragged_lines,
)
}

Expand Down Expand Up @@ -558,6 +566,7 @@ where
try_parse_dates: false,
row_count: None,
raise_if_empty: true,
truncate_ragged_lines: false,
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/polars-io/src/csv/read_impl/batched_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl<'a> CoreReader<'a> {
missing_is_null: self.missing_is_null,
to_cast: self.to_cast,
ignore_errors: self.ignore_errors,
truncate_ragged_lines: self.truncate_ragged_lines,
n_rows: self.n_rows,
encoding: self.encoding,
delimiter: self.delimiter,
Expand All @@ -186,6 +187,7 @@ pub struct BatchedCsvReaderMmap<'a> {
eol_char: u8,
null_values: Option<NullValuesCompiled>,
missing_is_null: bool,
truncate_ragged_lines: bool,
to_cast: Vec<Field>,
ignore_errors: bool,
n_rows: Option<usize>,
Expand Down Expand Up @@ -244,6 +246,7 @@ impl<'a> BatchedCsvReaderMmap<'a> {
self.encoding,
self.null_values.as_ref(),
self.missing_is_null,
self.truncate_ragged_lines,
self.chunk_size,
stop_at_nbytes,
self.starting_point_offset,
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-io/src/csv/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ impl<'a> CoreReader<'a> {
missing_is_null: self.missing_is_null,
to_cast: self.to_cast,
ignore_errors: self.ignore_errors,
truncate_ragged_lines: self.truncate_ragged_lines,
n_rows: self.n_rows,
encoding: self.encoding,
delimiter: self.delimiter,
Expand Down Expand Up @@ -271,6 +272,7 @@ pub struct BatchedCsvReaderRead<'a> {
missing_is_null: bool,
to_cast: Vec<Field>,
ignore_errors: bool,
truncate_ragged_lines: bool,
n_rows: Option<usize>,
encoding: CsvEncoding,
delimiter: u8,
Expand Down Expand Up @@ -341,6 +343,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
self.encoding,
self.null_values.as_ref(),
self.missing_is_null,
self.truncate_ragged_lines,
self.chunk_size,
stop_at_n_bytes,
self.starting_point_offset,
Expand Down
20 changes: 14 additions & 6 deletions crates/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub(crate) struct CoreReader<'a> {
predicate: Option<Arc<dyn PhysicalIoExpr>>,
to_cast: Vec<Field>,
row_count: Option<RowCount>,
truncate_ragged_lines: bool,
}

impl<'a> fmt::Debug for CoreReader<'a> {
Expand Down Expand Up @@ -206,6 +207,7 @@ impl<'a> CoreReader<'a> {
row_count: Option<RowCount>,
try_parse_dates: bool,
raise_if_empty: bool,
truncate_ragged_lines: bool,
) -> PolarsResult<CoreReader<'a>> {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
let mut reader_bytes = reader_bytes;
Expand Down Expand Up @@ -303,6 +305,7 @@ impl<'a> CoreReader<'a> {
predicate,
to_cast,
row_count,
truncate_ragged_lines,
})
}

Expand Down Expand Up @@ -609,11 +612,12 @@ impl<'a> CoreReader<'a> {
self.comment_char,
self.quote_char,
self.eol_char,
self.null_values.as_ref(),
self.missing_is_null,
self.truncate_ragged_lines,
ignore_errors,
self.null_values.as_ref(),
projection,
&mut buffers,
ignore_errors,
chunk_size,
self.schema.len(),
&self.schema,
Expand Down Expand Up @@ -683,6 +687,7 @@ impl<'a> CoreReader<'a> {
self.encoding,
self.null_values.as_ref(),
self.missing_is_null,
self.truncate_ragged_lines,
usize::MAX,
stop_at_nbytes,
starting_point_offset,
Expand Down Expand Up @@ -725,11 +730,12 @@ impl<'a> CoreReader<'a> {
self.comment_char,
self.quote_char,
self.eol_char,
self.null_values.as_ref(),
self.missing_is_null,
self.ignore_errors,
self.truncate_ragged_lines,
self.null_values.as_ref(),
&projection,
&mut buffers,
self.ignore_errors,
remaining_rows - 1,
self.schema.len(),
self.schema.as_ref(),
Expand Down Expand Up @@ -811,6 +817,7 @@ fn read_chunk(
encoding: CsvEncoding,
null_values: Option<&NullValuesCompiled>,
missing_is_null: bool,
truncate_ragged_lines: bool,
chunk_size: usize,
stop_at_nbytes: usize,
starting_point_offset: Option<usize>,
Expand Down Expand Up @@ -842,11 +849,12 @@ fn read_chunk(
comment_char,
quote_char,
eol_char,
null_values,
missing_is_null,
ignore_errors,
truncate_ragged_lines,
null_values,
projection,
&mut buffers,
ignore_errors,
chunk_size,
schema.len(),
schema,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ list_eval = []
cumulative_eval = []
chunked_ids = ["polars-plan/chunked_ids", "polars-core/chunked_ids"]
list_to_struct = ["polars-plan/list_to_struct"]
python = ["pyo3", "polars-plan/python", "polars-core/python"]
python = ["pyo3", "polars-plan/python", "polars-core/python", "polars-io/python"]
row_hash = ["polars-plan/row_hash"]
string_justify = ["polars-plan/string_justify"]
string_from_radix = ["polars-plan/string_from_radix"]
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct LazyCsvReader<'a> {
eol_char: u8,
null_values: Option<NullValues>,
missing_is_null: bool,
truncate_ragged_lines: bool,
infer_schema_length: Option<usize>,
rechunk: bool,
skip_rows_after_header: usize,
Expand Down Expand Up @@ -61,6 +62,7 @@ impl<'a> LazyCsvReader<'a> {
row_count: None,
try_parse_dates: false,
raise_if_empty: true,
truncate_ragged_lines: false,
}
}

Expand Down Expand Up @@ -208,6 +210,13 @@ impl<'a> LazyCsvReader<'a> {
self
}

/// Truncate lines that are longer than the schema.
#[must_use]
pub fn truncate_ragged_lines(mut self, toggle: bool) -> Self {
self.truncate_ragged_lines = toggle;
self
}

/// Modify a schema before we run the lazy scanning.
///
/// Important! Run this function latest in the builder!
Expand Down Expand Up @@ -280,6 +289,7 @@ impl LazyFileListReader for LazyCsvReader<'_> {
self.row_count,
self.try_parse_dates,
self.raise_if_empty,
self.truncate_ragged_lines,
)?
.build()
.into();
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl CsvExec {
.with_rechunk(self.file_options.rechunk)
.with_row_count(std::mem::take(&mut self.file_options.row_count))
.with_try_parse_dates(self.options.try_parse_dates)
.truncate_ragged_lines(self.options.truncate_ragged_lines)
.raise_if_empty(self.options.raise_if_empty)
.finish()
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl CsvSource {
.with_chunk_size(chunk_size)
.with_row_count(file_options.row_count)
.with_try_parse_dates(options.try_parse_dates)
.truncate_ragged_lines(options.truncate_ragged_lines)
.raise_if_empty(options.raise_if_empty);

let reader = Box::new(reader);
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ impl LogicalPlanBuilder {
row_count: Option<RowCount>,
try_parse_dates: bool,
raise_if_empty: bool,
truncate_ragged_lines: bool,
) -> PolarsResult<Self> {
let path = path.into();
let mut file = polars_utils::open_file(&path).map_err(|e| {
Expand Down Expand Up @@ -346,6 +347,7 @@ impl LogicalPlanBuilder {
encoding,
try_parse_dates,
raise_if_empty,
truncate_ragged_lines,
},
},
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct CsvParserOptions {
pub encoding: CsvEncoding,
pub try_parse_dates: bool,
pub raise_if_empty: bool,
pub truncate_ragged_lines: bool,
}

#[cfg(feature = "parquet")]
Expand Down
9 changes: 6 additions & 3 deletions crates/polars/tests/it/io/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ fn test_comment_lines() -> PolarsResult<()> {
#[test]
fn test_null_values_argument() -> PolarsResult<()> {
let csv = r"1,a,foo
null-value,b,bar,
null-value,b,bar
3,null-value,ham
";

Expand Down Expand Up @@ -826,7 +826,10 @@ fn test_scientific_floats() -> PolarsResult<()> {
fn test_tsv_header_offset() -> PolarsResult<()> {
let csv = "foo\tbar\n\t1000011\t1\n\t1000026\t2\n\t1000949\t2";
let file = Cursor::new(csv);
let df = CsvReader::new(file).with_delimiter(b'\t').finish()?;
let df = CsvReader::new(file)
.truncate_ragged_lines(true)
.with_delimiter(b'\t')
.finish()?;

assert_eq!(df.shape(), (3, 2));
assert_eq!(df.dtypes(), &[DataType::Utf8, DataType::Int64]);
Expand Down Expand Up @@ -925,7 +928,7 @@ foo,bar
.finish()?;
assert_eq!(df.get_column_names(), &["foo", "bar"]);
assert_eq!(df.shape(), (1, 2));
let df = CsvReader::new(file).finish()?;
let df = CsvReader::new(file).truncate_ragged_lines(true).finish()?;
assert_eq!(df.shape(), (5, 1));

Ok(())
Expand Down
Loading

0 comments on commit dc2e617

Please sign in to comment.