Skip to content

Commit

Permalink
refactor(rust): Fix new-streaming parquet on empty parquet (pola-rs#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Sep 17, 2024
1 parent 55e7fec commit f631502
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 16 deletions.
15 changes: 6 additions & 9 deletions crates/polars-stream/src/nodes/parquet_source/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,6 @@ impl ParquetSourceNode {
/// * `self.projected_arrow_schema`
/// * `self.physical_predicate`
pub(super) fn init_row_group_decoder(&self) -> RowGroupDecoder {
assert!(
!self.projected_arrow_schema.is_empty()
|| self.file_options.with_columns.as_deref() == Some(&[])
);
assert_eq!(self.predicate.is_some(), self.physical_predicate.is_some());

let scan_sources = self.scan_sources.clone();
Expand All @@ -282,7 +278,7 @@ impl ParquetSourceNode {
.map(|x| x[0].get_statistics().column_stats().len())
.unwrap_or(0);
let include_file_paths = self.file_options.include_file_paths.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap();
let row_index = self.file_options.row_index.clone();
let physical_predicate = self.physical_predicate.clone();
let ideal_morsel_size = get_ideal_morsel_size();
Expand Down Expand Up @@ -378,7 +374,7 @@ impl ParquetSourceNode {
.unwrap_left()
.clone();

self.projected_arrow_schema =
self.projected_arrow_schema = Some(
if let Some(columns) = self.file_options.with_columns.as_deref() {
Arc::new(
columns
Expand All @@ -391,12 +387,13 @@ impl ParquetSourceNode {
)
} else {
reader_schema.clone()
};
},
);

if self.verbose {
eprintln!(
"[ParquetSource]: {} columns to be projected from {} files",
self.projected_arrow_schema.len(),
"[ParquetSource]: {:?} columns to be projected from {} files",
self.projected_arrow_schema.as_ref().map(|x| x.len()),
self.scan_sources.len(),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ impl ParquetSourceNode {
let verbose = self.verbose;
let io_runtime = polars_io::pl_async::get_runtime();

assert!(
!self.projected_arrow_schema.is_empty()
|| self.file_options.with_columns.as_deref() == Some(&[])
);
let projected_arrow_schema = self.projected_arrow_schema.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap();

let (normalized_slice_oneshot_tx, normalized_slice_oneshot_rx) =
tokio::sync::oneshot::channel();
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct ParquetSourceNode {
config: Config,
verbose: bool,
physical_predicate: Option<Arc<dyn PhysicalIoExpr>>,
projected_arrow_schema: Arc<ArrowSchema>,
projected_arrow_schema: Option<Arc<ArrowSchema>>,
byte_source_builder: DynByteSourceBuilder,
memory_prefetch_func: fn(&[u8]) -> (),
// This permit blocks execution until the first morsel is requested.
Expand Down Expand Up @@ -112,7 +112,7 @@ impl ParquetSourceNode {
},
verbose,
physical_predicate: None,
projected_arrow_schema: Arc::new(ArrowSchema::default()),
projected_arrow_schema: None,
byte_source_builder,
memory_prefetch_func,

Expand Down

0 comments on commit f631502

Please sign in to comment.