Skip to content

Commit

Permalink
fix async reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 22, 2023
1 parent 66adbde commit 45d9292
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:

- name: Run tests async reader tests
if: github.ref_name != 'main'
run: POLARS_FORCE_ASYNC=1 pytest --cov -n auto --dist loadgroup -m "not benchmark and not docs" ../.venv/bin/pytest tests/unit/io/
run: POLARS_FORCE_ASYNC=1 pytest -m "not benchmark and not docs" tests/unit/io/

- name: Run doctests
if: github.ref_name != 'main'
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ where
T: Into<Cow<'static, str>>,
{
fn from(msg: T) -> Self {
if env::var("POLARS_PANIC_ON_ERR").is_ok() {
if env::var("POLARS_PANIC_ON_ERR").as_deref().unwrap_or("") == "1" {
panic!("{}", msg.into())
} else {
ErrString(msg.into())
Expand Down
7 changes: 5 additions & 2 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use crate::mmap::MmapBytesReader;
use crate::parquet::async_impl::FetchRowGroupsFromObjectStore;
#[cfg(feature = "cloud")]
use crate::parquet::async_impl::ParquetObjectStore;
use crate::parquet::read_impl::read_parquet;
pub use crate::parquet::read_impl::BatchedParquetReader;
use crate::parquet::read_impl::{materialize_hive_partitions, read_parquet};
#[cfg(feature = "cloud")]
use crate::predicates::apply_predicate;
use crate::predicates::PhysicalIoExpr;
Expand Down Expand Up @@ -368,6 +368,7 @@ impl ParquetAsyncReader {
let rechunk = self.rechunk;
let metadata = self.get_metadata().await?.clone();
let schema = self.schema().await?;
let hive_partition_columns = self.hive_partition_columns.clone();

let predicate = self.predicate.clone();
// batched reader deals with slice pushdown
Expand All @@ -384,7 +385,9 @@ impl ParquetAsyncReader {
chunks.push(out)
}
if chunks.is_empty() {
return Ok(DataFrame::from(schema.as_ref()));
let mut df = DataFrame::from(schema.as_ref());
materialize_hive_partitions(&mut df, hive_partition_columns.as_deref(), 0);
return Ok(df);
}
let mut df = accumulate_dataframes_vertical_unchecked(chunks);

Expand Down
9 changes: 2 additions & 7 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub(super) fn array_iter_to_series(
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
/// Safety: num_rows equals the height of the df when the df height is non-zero.
fn materialize_hive_partitions(
pub(crate) fn materialize_hive_partitions(
df: &mut DataFrame,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
Expand Down Expand Up @@ -416,12 +416,7 @@ pub fn read_parquet<R: MmapBytesReader>(
Cow::Borrowed(schema)
};
let mut df = DataFrame::from(schema.as_ref().as_ref());
if let Some(parts) = hive_partition_columns {
for s in parts {
// SAFETY: length is equal
unsafe { df.with_column_unchecked(s.clear()) };
}
}
materialize_hive_partitions(&mut df, hive_partition_columns, 0);
Ok(df)
} else {
accumulate_dataframes_vertical(dfs)
Expand Down
39 changes: 29 additions & 10 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,32 @@ impl ParquetExec {
};
let n_rows = self.file_options.n_rows;

// self.paths.iter().map(|path| {
// let (file, projection, predicate) = prepare_scan_args(
// path,
// &self.predicate,
// &mut self.file_options.with_columns.clone(),
// &mut self.file_info.schema.clone(),
// self.file_options.row_count.is_some(),
// hive_partitions.as_deref(),
// );
//
// let reader = if let Some(file) = file {
// ParquetReader::new(file)
// .with_schema(Some(self.file_info.reader_schema.clone()))
// .read_parallel(parallel)
// .set_low_memory(self.options.low_memory)
// .use_statistics(self.options.use_statistics)
// .with_hive_partition_columns(hive_partitions);
// )
// } else {
// polars_bail!(ComputeError: "could not read {}", path.display())
// }?;
//
//
//
// })

POOL.install(|| {
self.paths
.par_iter()
Expand Down Expand Up @@ -150,15 +176,12 @@ impl ParquetExec {
// First initialize the readers and get the metadata concurrently.
let iter = self.paths.iter().enumerate().map(|(i, path)| async move {
// use the cached one as this saves a cloud call
let (schema, metadata) = if i == 0 {
(Some(first_schema.clone()), first_metadata.clone())
} else {
(None, None)
};
let metadata = if i == 0 { first_metadata.clone() } else { None };
let mut reader = ParquetAsyncReader::from_uri(
&path.to_string_lossy(),
cloud_options,
schema,
// Schema must be the same for all files. The hive partitions are included in this schema.
Some(first_schema.clone()),
metadata,
)
.await?;
Expand Down Expand Up @@ -196,10 +219,6 @@ impl ParquetExec {
)| async move {
let mut file_info = file_info.clone();
let remaining_rows_to_read = *remaining_rows_to_read;
if remaining_rows_to_read == 0 {
return Ok(None);
}

let remaining_rows_to_read = if num_rows_this_file < remaining_rows_to_read {
None
} else {
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-plan/src/logical_plan/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ impl PartialEq for FileScan {
}

impl FileScan {
pub(crate) fn remove_metadata(&mut self) {
match self {
#[cfg(feature = "parquet")]
Self::Parquet { metadata, .. } => {
*metadata = None;
},
_ => {},
}
}

pub(crate) fn skip_rows(&self) -> usize {
#[allow(unreachable_patterns)]
match self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl<'a> PredicatePushDown<'a> {
mut paths,
mut file_info,
predicate,
scan_type,
mut scan_type,
file_options: options,
output_schema
} => {
Expand All @@ -250,8 +250,11 @@ impl<'a> PredicatePushDown<'a> {
}
}

if self.verbose && paths.len() != new_paths.len() {
eprintln!("hive partitioning: skipped {} files, first file : {}", paths.len() - new_paths.len(), paths[0].display())
if paths.len() != new_paths.len() {
if self.verbose {
eprintln!("hive partitioning: skipped {} files, first file : {}", paths.len() - new_paths.len(), paths[0].display())
}
scan_type.remove_metadata();
}
if paths.is_empty() {
let schema = output_schema.as_ref().unwrap_or(&file_info.schema);
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-plan/src/logical_plan/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ impl LogicalPlan {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct FileInfo {
pub schema: SchemaRef,
// Stores the schema used for the reader, as the main schema can contain
// extra hive columns.
/// Stores the schema used for the reader, as the main schema can contain
/// extra hive columns.
pub reader_schema: SchemaRef,
// - known size
// - estimated size
/// - known size
/// - estimated size
pub row_estimation: (Option<usize>, usize),
pub hive_parts: Option<Arc<hive::HivePartitions>>,
}
Expand Down

0 comments on commit 45d9292

Please sign in to comment.