Skip to content

Commit

Permalink
async and par
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 21, 2023
1 parent def2982 commit c733cd1
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 85 deletions.
3 changes: 2 additions & 1 deletion crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ description = "Lazy query engine for the Polars DataFrame library"

[dependencies]
arrow = { workspace = true }
futures = { workspace = true, optional = true }
polars-core = { workspace = true, features = ["lazy", "zip_with", "random"] }
polars-io = { workspace = true, features = ["lazy"] }
polars-json = { workspace = true, optional = true }
Expand Down Expand Up @@ -43,7 +44,7 @@ async = [
"polars-io/cloud",
"polars-pipe?/async",
]
cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio"]
cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio", "futures"]
cloud_write = ["cloud"]
ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe?/ipc"]
json = ["polars-io/json", "polars-plan/json", "polars-json"]
Expand Down
338 changes: 283 additions & 55 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::path::PathBuf;
use polars_core::utils::accumulate_dataframes_vertical;
use std::path::{Path, PathBuf};

use arrow::io::parquet::read::read_metadata_async;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::utils::arrow::io::parquet::read::FileMetaData;
use polars_io::cloud::CloudOptions;
use polars_io::is_cloud_url;
use polars_io::{is_cloud_url, RowCount};

use super::*;

Expand Down Expand Up @@ -39,69 +40,296 @@ impl ParquetExec {
}
}

fn read(&mut self) -> PolarsResult<DataFrame> {
let mut out = Vec::with_capacity(self.paths.len());
fn read_par(&mut self) -> PolarsResult<Vec<DataFrame>> {
let parallel = match self.options.parallel {
ParallelStrategy::Auto if self.paths.len() > POOL.current_num_threads() => {
ParallelStrategy::RowGroups
},
identity => identity,
};

for path in self.paths.iter() {
POOL.install(|| {
self.paths
.par_iter()
.map(|path| {
let mut file_info = self.file_info.clone();
file_info.update_hive_partitions(path);

let hive_partitions = self
.file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let (file, projection, n_rows, predicate) = prepare_scan_args(
path,
&self.predicate,
&mut self.file_options.with_columns,
&mut self.file_info.schema.clone(),
self.file_options.n_rows,
self.file_options.row_count.is_some(),
hive_partitions.as_deref(),
);
let hive_partitions = file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let df = if let Some(file) = file {
ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.with_n_rows(n_rows)
.read_parallel(self.options.parallel)
.with_row_count(mem::take(&mut self.file_options.row_count))
.set_rechunk(self.file_options.rechunk)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.with_hive_partition_columns(hive_partitions)
._finish_with_scan_ops(predicate, projection.as_ref().map(|v| v.as_ref()))
} else if is_cloud_url(path.as_path()) {
#[cfg(feature = "cloud")]
{
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
let reader = ParquetAsyncReader::from_uri(
&path.to_string_lossy(),
self.cloud_options.as_ref(),
Some(self.file_info.reader_schema.clone()),
self.metadata.clone(),
)
.await?
.with_n_rows(n_rows)
.with_row_count(mem::take(&mut self.file_options.row_count))
.with_projection(projection)
let (file, projection, n_rows, predicate) = prepare_scan_args(
path,
&self.predicate,
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
None,
self.file_options.row_count.is_some(),
hive_partitions.as_deref(),
);

let df = if let Some(file) = file {
ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.read_parallel(parallel)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.with_predicate(predicate)
.with_hive_partition_columns(hive_partitions);
.with_hive_partition_columns(hive_partitions)
._finish_with_scan_ops(
predicate,
projection.as_ref().map(|v| v.as_ref()),
)
} else {
polars_bail!(ComputeError: "could not read {}", path.display())
}?;
Ok(df)
})
.collect()
})
}

reader.finish().await
})
}
fn read_seq(&mut self) -> PolarsResult<Vec<DataFrame>> {
let mut base_offset = 0 as IdxSize;
let mut n_rows_total = self.file_options.n_rows;

self.paths
.iter()
.map(|path| {
let row_count = self.file_options.row_count.as_ref().map(|rc| RowCount {
name: rc.name.clone(),
offset: rc.offset + base_offset,
});

self.file_info.update_hive_partitions(path);

let hive_partitions = self
.file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let (file, projection, n_rows, predicate) = prepare_scan_args(
path,
&self.predicate,
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
n_rows_total,
self.file_options.row_count.is_some(),
hive_partitions.as_deref(),
);

let df = if let Some(file) = file {
ParquetReader::new(file)
.with_schema(Some(self.file_info.reader_schema.clone()))
.with_n_rows(n_rows)
.read_parallel(self.options.parallel)
.with_row_count(row_count)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.with_hive_partition_columns(hive_partitions)
._finish_with_scan_ops(predicate, projection.as_ref().map(|v| v.as_ref()))
} else {
polars_bail!(ComputeError: "could not read {}", path.display())
}?;
let read = df.height();
base_offset += read as IdxSize;
n_rows_total.as_mut().map(|total| *total -= read);
Ok(df)
})
.collect()
}

#[cfg(feature = "cloud")]
async fn read_async(&mut self) -> PolarsResult<Vec<DataFrame>> {
let mut n_rows_total = self.file_options.n_rows;

let iter = self.paths.iter().map(|path| async {
let mut reader = ParquetAsyncReader::from_uri(
&path.to_string_lossy(),
self.cloud_options.as_ref(),
None,
None,
)
.await?;
let num_rows = reader.num_rows().await?;
PolarsResult::Ok((num_rows, reader))
});
let readers_and_metadata = futures::future::try_join_all(iter).await?;

let mut n_rows_to_read = n_rows_total.unwrap_or(usize::MAX);
let mut cumulative_read = 0;
let row_counts = readers_and_metadata
.iter()
.map(|(num_rows, _)| {
let n_rows = n_rows_to_read;
n_rows_to_read -= n_rows_to_read.saturating_sub(*num_rows);
cumulative_read += num_rows;

(n_rows, cumulative_read)
})
.collect::<Vec<_>>();

let base_row_count = std::mem::take(&mut self.file_options.row_count);
let base_row_count = &base_row_count;
let mut file_info = std::mem::take(&mut self.file_info);
let file_info = &file_info;
let reader_schema = file_info.reader_schema.clone();
let reader_schema = &reader_schema;
let file_options = std::mem::take(&mut self.file_options);
let file_options = &file_options;
let paths = self.paths.clone();
let use_statistics = self.options.use_statistics;
let predicate = self.predicate.clone();
let predicate = &predicate;

let iter = readers_and_metadata
.into_iter()
.zip(row_counts.iter())
.zip(paths.as_ref().iter())
.map(
|(((row_count, reader), (n_rows, cumulative_read)), path)| async move {
let mut file_info = file_info.clone();
let n_rows = *n_rows;
if n_rows == 0 {
return Ok(None);
}

let n_rows = if row_count < n_rows {
None
} else {
Some(n_rows)
};
let row_count = base_row_count.as_ref().map(|rc| RowCount {
name: rc.name.clone(),
offset: rc.offset + row_count as IdxSize,
});

file_info.update_hive_partitions(path);

let hive_partitions = file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());

let (_, projection, n_rows, predicate) = prepare_scan_args(
Path::new(""),
&predicate,
&mut file_options.with_columns.clone(),
&mut reader_schema.clone(),
n_rows,
row_count.is_some(),
hive_partitions.as_deref(),
);

reader
.with_n_rows(n_rows)
.with_row_count(row_count)
.with_projection(projection)
.use_statistics(use_statistics)
.with_predicate(predicate)
.with_hive_partition_columns(hive_partitions)
.finish()
.await
.map(Some)
},
);

let dfs = futures::future::try_join_all(iter).await?;
Ok(dfs.into_iter().flatten().collect())
}

fn read(&mut self) -> PolarsResult<DataFrame> {
let is_cloud = is_cloud_url(self.paths[0].as_path());

let out = if self.file_options.n_rows.is_some()
|| self.file_options.row_count.is_some()
|| is_cloud
{
if is_cloud {
#[cfg(not(feature = "cloud"))]
{
panic!("activate cloud feature")
}

#[cfg(feature = "cloud")]
{
polars_io::pl_async::get_runtime()
.block_on_potential_spawn(self.read_async())?
}
} else {
polars_bail!(ComputeError: "could not read {}", path.display())
}?;
out.push(df)
self.read_seq()?
}
} else {
self.read_par()?
};

// let mut out = Vec::with_capacity(self.paths.len());
//
// for path in self.paths.iter() {
// self.file_info.update_hive_partitions(path);
//
// let hive_partitions = self
// .file_info
// .hive_parts
// .as_ref()
// .map(|hive| hive.materialize_partition_columns());
//
// let (file, projection, n_rows, predicate) = prepare_scan_args(
// path,
// &self.predicate,
// &mut self.file_options.with_columns.clone(),
// &mut self.file_info.schema.clone(),
// self.file_options.n_rows,
// self.file_options.row_count.is_some(),
// hive_partitions.as_deref(),
// );
//
// let df = if let Some(file) = file {
// ParquetReader::new(file)
// .with_schema(Some(self.file_info.reader_schema.clone()))
// .with_n_rows(n_rows)
// .read_parallel(self.options.parallel)
// .with_row_count(self.file_options.row_count.clone())
// .set_rechunk(self.file_options.rechunk)
// .set_low_memory(self.options.low_memory)
// .use_statistics(self.options.use_statistics)
// .with_hive_partition_columns(hive_partitions)
// ._finish_with_scan_ops(predicate, projection.as_ref().map(|v| v.as_ref()))
// } else if is_cloud_url(path.as_path()) {
// #[cfg(feature = "cloud")]
// {
// polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
// let reader = ParquetAsyncReader::from_uri(
// &path.to_string_lossy(),
// self.cloud_options.as_ref(),
// Some(self.file_info.reader_schema.clone()),
// self.metadata.clone(),
// )
// .await?
// .with_n_rows(n_rows)
// .with_row_count(self.file_options.row_count.clone())
// .with_projection(projection)
// .use_statistics(self.options.use_statistics)
// .with_predicate(predicate)
// .with_hive_partition_columns(hive_partitions);
//
// reader.finish().await
// })
// }
// #[cfg(not(feature = "cloud"))]
// {
// panic!("activate cloud feature")
// }
// } else {
// polars_bail!(ComputeError: "could not read {}", path.display())
// }?;
// out.push(df)
// }
// out
// };

}
accumulate_dataframes_vertical(out)
}
}
Expand Down
Loading

0 comments on commit c733cd1

Please sign in to comment.