Skip to content

Commit

Permalink
always read in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 22, 2023
1 parent 0c59c04 commit c0c236b
Showing 1 changed file with 78 additions and 117 deletions.
195 changes: 78 additions & 117 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,92 +46,18 @@ impl ParquetExec {
},
identity => identity,
};
let n_rows = self.file_options.n_rows;

// self.paths.iter().map(|path| {
// let (file, projection, predicate) = prepare_scan_args(
// path,
// &self.predicate,
// &mut self.file_options.with_columns.clone(),
// &mut self.file_info.schema.clone(),
// self.file_options.row_count.is_some(),
// hive_partitions.as_deref(),
// );
//
// let reader = if let Some(file) = file {
// ParquetReader::new(file)
// .with_schema(Some(self.file_info.reader_schema.clone()))
// .read_parallel(parallel)
// .set_low_memory(self.options.low_memory)
// .use_statistics(self.options.use_statistics)
// .with_hive_partition_columns(hive_partitions);
// )
// } else {
// polars_bail!(ComputeError: "could not read {}", path.display())
// }?;
//
//
//
// })

POOL.install(|| {
self.paths
.par_iter()
.map(|path| {
let mut file_info = self.file_info.clone();
file_info.update_hive_partitions(path);

let hive_partitions = file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let (file, projection, predicate) = prepare_scan_args(
path,
&self.predicate,
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
self.file_options.row_count.is_some(),
hive_partitions.as_deref(),
);

let df = if let Some(file) = file {
ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.read_parallel(parallel)
.with_n_rows(n_rows)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.with_hive_partition_columns(hive_partitions)
._finish_with_scan_ops(
predicate,
projection.as_ref().map(|v| v.as_ref()),
)
} else {
polars_bail!(ComputeError: "could not read {}", path.display())
}?;
Ok(df)
})
.collect()
})
}

fn read_seq(&mut self) -> PolarsResult<Vec<DataFrame>> {
let mut base_offset = 0 as IdxSize;
let mut n_rows_total = self.file_options.n_rows;

self.paths
// First initialize the readers, predicates and metadata.
// This will be used to determine the slices. That way we can actually read all the
// files in parallel even when we add row counts or slices.
let readers_and_metadata = self
.paths
.iter()
.map(|path| {
let row_count = self.file_options.row_count.as_ref().map(|rc| RowCount {
name: rc.name.clone(),
offset: rc.offset + base_offset,
});
let mut file_info = self.file_info.clone();
file_info.update_hive_partitions(path);

self.file_info.update_hive_partitions(path);

let hive_partitions = self
.file_info
let hive_partitions = file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());
Expand All @@ -145,27 +71,66 @@ impl ParquetExec {
hive_partitions.as_deref(),
);

let df = if let Some(file) = file {
ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.with_n_rows(n_rows_total)
.read_parallel(self.options.parallel)
.with_row_count(row_count)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.with_hive_partition_columns(hive_partitions)
._finish_with_scan_ops(predicate, projection.as_ref().map(|v| v.as_ref()))
let mut reader = if let Some(file) = file {
PolarsResult::Ok(
ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.read_parallel(parallel)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.set_rechunk(false)
.with_hive_partition_columns(hive_partitions),
)
} else {
polars_bail!(ComputeError: "could not read {}", path.display())
}?;
let read = df.height();
base_offset += read as IdxSize;
if let Some(total) = n_rows_total.as_mut() {
*total -= read
}
Ok(df)

reader
.num_rows()
.map(|num_rows| (reader, num_rows, predicate, projection))
})
.collect()
.collect::<PolarsResult<Vec<_>>>()?;

let n_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX);
let iter = readers_and_metadata
.iter()
.map(|(_, num_rows, _, _)| *num_rows);

let rows_statistics = get_sequential_row_statistics(iter, n_rows_to_read);
let base_row_count = &self.file_options.row_count;

POOL.install(|| {
readers_and_metadata
.into_par_iter()
.zip(rows_statistics.par_iter())
.map(
|(
(reader, num_rows_this_file, predicate, projection),
(remaining_rows_to_read, cumulative_read),
)| {
let remaining_rows_to_read = *remaining_rows_to_read;
let remaining_rows_to_read = if num_rows_this_file < remaining_rows_to_read
{
None
} else {
Some(remaining_rows_to_read)
};
let row_count = base_row_count.as_ref().map(|rc| RowCount {
name: rc.name.clone(),
offset: rc.offset + *cumulative_read as IdxSize,
});

reader
.with_n_rows(remaining_rows_to_read)
.with_row_count(row_count)
._finish_with_scan_ops(
predicate.clone(),
projection.as_ref().map(|v| v.as_ref()),
)
},
)
.collect()
})
}

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -251,6 +216,7 @@ impl ParquetExec {
.with_projection(projection)
.use_statistics(use_statistics)
.with_predicate(predicate)
.set_rechunk(false)
.with_hive_partition_columns(hive_partitions)
.finish()
.await
Expand All @@ -266,30 +232,25 @@ impl ParquetExec {
let is_cloud = is_cloud_url(self.paths[0].as_path());
let force_async = std::env::var("POLARS_FORCE_ASYNC").as_deref().unwrap_or("") == "1";

let out = if self.file_options.n_rows.is_some()
|| self.file_options.row_count.is_some()
|| is_cloud
|| force_async
{
if is_cloud || force_async {
#[cfg(not(feature = "cloud"))]
{
panic!("activate cloud feature")
}
let out = if is_cloud || force_async {
#[cfg(not(feature = "cloud"))]
{
panic!("activate cloud feature")
}

#[cfg(feature = "cloud")]
{
polars_io::pl_async::get_runtime()
.block_on_potential_spawn(self.read_async())?
}
} else {
self.read_seq()?
#[cfg(feature = "cloud")]
{
polars_io::pl_async::get_runtime().block_on_potential_spawn(self.read_async())?
}
} else {
self.read_par()?
};

accumulate_dataframes_vertical(out)
let mut out = accumulate_dataframes_vertical(out)?;
if self.file_options.rechunk {
out.as_single_chunk_par();
}
Ok(out)
}
}

Expand Down

0 comments on commit c0c236b

Please sign in to comment.