Skip to content

Commit

Permalink
a clippy pass
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 21, 2023
1 parent c733cd1 commit ff6a5d2
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 96 deletions.
7 changes: 2 additions & 5 deletions crates/polars-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@ pub(crate) const DECIMAL_ACTIVE: &str = "POLARS_ACTIVATE_DECIMAL";

#[cfg(feature = "dtype-decimal")]
pub(crate) fn decimal_is_active() -> bool {
match std::env::var(DECIMAL_ACTIVE) {
Ok(val) => val == "1",
_ => false,
}
std::env::var(DECIMAL_ACTIVE).as_deref().unwrap_or("") == "1"
}

pub fn verbose() -> bool {
std::env::var("POLARS_VERBOSE").as_deref().unwrap_or("0") == "1"
std::env::var("POLARS_VERBOSE").as_deref().unwrap_or("") == "1"
}
7 changes: 5 additions & 2 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,15 @@ impl ParquetAsyncReader {

pub async fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.reader.get_metadata().await?.clone();
let schema = self.schema().await?;
let schema = match self.schema {
Some(schema) => schema,
None => self.schema().await?,
};
// row group fetched deals with projection
let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
self.reader,
&metadata,
self.schema.unwrap(),
schema.clone(),
self.projection.as_deref(),
self.predicate.clone(),
)?
Expand Down
120 changes: 32 additions & 88 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::path::{Path, PathBuf};

use arrow::io::parquet::read::read_metadata_async;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::utils::arrow::io::parquet::read::FileMetaData;
use polars_io::cloud::CloudOptions;
Expand Down Expand Up @@ -60,7 +59,7 @@ impl ParquetExec {
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let (file, projection, n_rows, predicate) = prepare_scan_args(
let (file, projection, _, predicate) = prepare_scan_args(
path,
&self.predicate,
&mut self.file_options.with_columns.clone(),
Expand Down Expand Up @@ -135,30 +134,42 @@ impl ParquetExec {
}?;
let read = df.height();
base_offset += read as IdxSize;
n_rows_total.as_mut().map(|total| *total -= read);
if let Some(total) = n_rows_total.as_mut() {
*total -= read
}
Ok(df)
})
.collect()
}

#[cfg(feature = "cloud")]
async fn read_async(&mut self) -> PolarsResult<Vec<DataFrame>> {
let mut n_rows_total = self.file_options.n_rows;

let iter = self.paths.iter().map(|path| async {
let first_schema = &self.file_info.reader_schema;
let first_metadata = &self.metadata;
let cloud_options = self.cloud_options.as_ref();
// First initialize the readers and get the metadata concurrently.
let iter = self.paths.iter().enumerate().map(|(i, path)| async move {
// use the cached one as this saves a cloud call
let (schema, metadata) = if i == 0 {
(Some(first_schema.clone()), first_metadata.clone())
} else {
(None, None)
};
let mut reader = ParquetAsyncReader::from_uri(
&path.to_string_lossy(),
self.cloud_options.as_ref(),
None,
None,
cloud_options,
schema,
metadata,
)
.await?;
let num_rows = reader.num_rows().await?;
PolarsResult::Ok((num_rows, reader))
});
let readers_and_metadata = futures::future::try_join_all(iter).await?;

let mut n_rows_to_read = n_rows_total.unwrap_or(usize::MAX);
// Then compute `n_rows` to be take per file up front, so we can actually read concurrently
// after this.
let mut n_rows_to_read = self.file_options.n_rows.unwrap_or(usize::MAX);
let mut cumulative_read = 0;
let row_counts = readers_and_metadata
.iter()
Expand All @@ -171,18 +182,14 @@ impl ParquetExec {
})
.collect::<Vec<_>>();

let base_row_count = std::mem::take(&mut self.file_options.row_count);
let base_row_count = &base_row_count;
let mut file_info = std::mem::take(&mut self.file_info);
let file_info = &file_info;
let reader_schema = file_info.reader_schema.clone();
let reader_schema = &reader_schema;
let file_options = std::mem::take(&mut self.file_options);
let file_options = &file_options;
let paths = self.paths.clone();
// Now read the actual data.
let base_row_count = &self.file_options.row_count;
let file_info = &self.file_info;
let reader_schema = &file_info.reader_schema;
let file_options = &self.file_options;
let paths = &self.paths;
let use_statistics = self.options.use_statistics;
let predicate = self.predicate.clone();
let predicate = &predicate;
let predicate = &self.predicate;

let iter = readers_and_metadata
.into_iter()
Expand All @@ -203,7 +210,7 @@ impl ParquetExec {
};
let row_count = base_row_count.as_ref().map(|rc| RowCount {
name: rc.name.clone(),
offset: rc.offset + row_count as IdxSize,
offset: rc.offset + *cumulative_read as IdxSize,
});

file_info.update_hive_partitions(path);
Expand All @@ -215,7 +222,7 @@ impl ParquetExec {

let (_, projection, n_rows, predicate) = prepare_scan_args(
Path::new(""),
&predicate,
predicate,
&mut file_options.with_columns.clone(),
&mut reader_schema.clone(),
n_rows,
Expand All @@ -242,10 +249,12 @@ impl ParquetExec {

fn read(&mut self) -> PolarsResult<DataFrame> {
let is_cloud = is_cloud_url(self.paths[0].as_path());
let force_async = std::env::var("POLARS_FORCE_ASYNC").as_deref().unwrap_or("") == "1";

let out = if self.file_options.n_rows.is_some()
|| self.file_options.row_count.is_some()
|| is_cloud
|| force_async
{
if is_cloud {
#[cfg(not(feature = "cloud"))]
Expand All @@ -265,71 +274,6 @@ impl ParquetExec {
self.read_par()?
};

// let mut out = Vec::with_capacity(self.paths.len());
//
// for path in self.paths.iter() {
// self.file_info.update_hive_partitions(path);
//
// let hive_partitions = self
// .file_info
// .hive_parts
// .as_ref()
// .map(|hive| hive.materialize_partition_columns());
//
// let (file, projection, n_rows, predicate) = prepare_scan_args(
// path,
// &self.predicate,
// &mut self.file_options.with_columns.clone(),
// &mut self.file_info.schema.clone(),
// self.file_options.n_rows,
// self.file_options.row_count.is_some(),
// hive_partitions.as_deref(),
// );
//
// let df = if let Some(file) = file {
// ParquetReader::new(file)
// .with_schema(Some(self.file_info.reader_schema.clone()))
// .with_n_rows(n_rows)
// .read_parallel(self.options.parallel)
// .with_row_count(self.file_options.row_count.clone())
// .set_rechunk(self.file_options.rechunk)
// .set_low_memory(self.options.low_memory)
// .use_statistics(self.options.use_statistics)
// .with_hive_partition_columns(hive_partitions)
// ._finish_with_scan_ops(predicate, projection.as_ref().map(|v| v.as_ref()))
// } else if is_cloud_url(path.as_path()) {
// #[cfg(feature = "cloud")]
// {
// polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
// let reader = ParquetAsyncReader::from_uri(
// &path.to_string_lossy(),
// self.cloud_options.as_ref(),
// Some(self.file_info.reader_schema.clone()),
// self.metadata.clone(),
// )
// .await?
// .with_n_rows(n_rows)
// .with_row_count(self.file_options.row_count.clone())
// .with_projection(projection)
// .use_statistics(self.options.use_statistics)
// .with_predicate(predicate)
// .with_hive_partition_columns(hive_partitions);
//
// reader.finish().await
// })
// }
// #[cfg(not(feature = "cloud"))]
// {
// panic!("activate cloud feature")
// }
// } else {
// polars_bail!(ComputeError: "could not read {}", path.display())
// }?;
// out.push(df)
// }
// out
// };

accumulate_dataframes_vertical(out)
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl LazyFileListReader for LazyParquetReader {
self.finish_no_glob()
}

fn finish_no_glob(mut self) -> PolarsResult<LazyFrame> {
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let row_count = self.args.row_count;

let paths = if self.paths.is_empty() {
Expand Down

0 comments on commit ff6a5d2

Please sign in to comment.