Skip to content

Commit

Permalink
perf: support multiple files in a single scan parquet node. (#11922)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 22, 2023
1 parent b3f6c82 commit b51d194
Show file tree
Hide file tree
Showing 30 changed files with 491 additions and 228 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ jobs:
if: github.ref_name != 'main'
run: pytest --cov -n auto --dist loadgroup -m "not benchmark and not docs"

- name: Run tests async reader tests
if: github.ref_name != 'main'
run: POLARS_FORCE_ASYNC=1 pytest -m "not benchmark and not docs" tests/unit/io/

- name: Run doctests
if: github.ref_name != 'main'
run: |
Expand Down
7 changes: 2 additions & 5 deletions crates/polars-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@ pub(crate) const DECIMAL_ACTIVE: &str = "POLARS_ACTIVATE_DECIMAL";

#[cfg(feature = "dtype-decimal")]
pub(crate) fn decimal_is_active() -> bool {
match std::env::var(DECIMAL_ACTIVE) {
Ok(val) => val == "1",
_ => false,
}
std::env::var(DECIMAL_ACTIVE).as_deref().unwrap_or("") == "1"
}

pub fn verbose() -> bool {
std::env::var("POLARS_VERBOSE").as_deref().unwrap_or("0") == "1"
std::env::var("POLARS_VERBOSE").as_deref().unwrap_or("") == "1"
}
2 changes: 1 addition & 1 deletion crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ where
T: Into<Cow<'static, str>>,
{
fn from(msg: T) -> Self {
if env::var("POLARS_PANIC_ON_ERR").is_ok() {
if env::var("POLARS_PANIC_ON_ERR").as_deref().unwrap_or("") == "1" {
panic!("{}", msg.into())
} else {
ErrString(msg.into())
Expand Down
14 changes: 9 additions & 5 deletions crates/polars-io/src/cloud/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,16 @@ pub struct CloudLocation {
}

impl CloudLocation {
/// Parse a CloudLocation from an url.
pub fn new(url: &str) -> PolarsResult<CloudLocation> {
let parsed = Url::parse(url).map_err(to_compute_err)?;
pub fn from_url(parsed: &Url) -> PolarsResult<CloudLocation> {
let is_local = parsed.scheme() == "file";
let (bucket, key) = if is_local {
("".into(), url[7..].into())
("".into(), parsed.path())
} else {
let key = parsed.path();
let bucket = parsed
.host()
.ok_or_else(
|| polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", url),
|| polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", parsed),
)?
.to_string();
(bucket, key)
Expand All @@ -117,6 +115,12 @@ impl CloudLocation {
expansion,
})
}

/// Parse a CloudLocation from an url.
pub fn new(url: &str) -> PolarsResult<CloudLocation> {
let parsed = Url::parse(url).map_err(to_compute_err)?;
Self::from_url(&parsed)
}
}

/// Return a full url from a key relative to the given location.
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-io/src/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#[cfg(feature = "cloud")]
use std::borrow::Cow;
#[cfg(feature = "cloud")]
use std::str::FromStr;
#[cfg(feature = "cloud")]
use std::sync::Arc;

#[cfg(feature = "cloud")]
Expand Down
15 changes: 9 additions & 6 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use once_cell::sync::Lazy;
pub use options::*;
use polars_error::to_compute_err;
use tokio::sync::RwLock;

use super::*;
Expand All @@ -25,7 +26,8 @@ fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult {

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> BuildResult {
let cloud_location = CloudLocation::new(url)?;
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed)?;

let options = options.cloned();
let key = (url.to_string(), options);
Expand All @@ -39,17 +41,14 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu
}
}

let cloud_type = CloudType::from_str(url)?;
let options = key
.1
.as_ref()
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(Default::default()));

let cloud_type = CloudType::from_url(&parsed)?;
let store = match cloud_type {
CloudType::File => {
let local = LocalFileSystem::new();
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
},
CloudType::Aws => {
#[cfg(feature = "aws")]
{
Expand Down Expand Up @@ -79,6 +78,10 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu
#[cfg(not(feature = "azure"))]
return err_missing_feature("azure", &cloud_location.scheme);
},
CloudType::File => {
let local = LocalFileSystem::new();
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
},
}?;
let mut cache = OBJECT_STORE_CACHE.write().await;
*cache = Some((key, store.clone()));
Expand Down
35 changes: 30 additions & 5 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,9 @@ pub enum CloudType {
Gcp,
}

impl FromStr for CloudType {
type Err = PolarsError;

impl CloudType {
#[cfg(feature = "cloud")]
fn from_str(url: &str) -> Result<Self, Self::Err> {
let parsed = Url::parse(url).map_err(to_compute_err)?;
pub(crate) fn from_url(parsed: &Url) -> PolarsResult<Self> {
Ok(match parsed.scheme() {
"s3" | "s3a" => Self::Aws,
"az" | "azure" | "adl" | "abfs" | "abfss" => Self::Azure,
Expand All @@ -97,6 +94,34 @@ impl FromStr for CloudType {
_ => polars_bail!(ComputeError: "unknown url scheme"),
})
}
}

#[cfg(feature = "cloud")]
pub(crate) fn parse_url(url: &str) -> std::result::Result<Url, url::ParseError> {
match Url::parse(url) {
Err(err) => match err {
url::ParseError::RelativeUrlWithoutBase => {
let parsed = Url::parse(&format!(
"file://{}",
std::env::current_dir().unwrap().to_string_lossy()
))
.unwrap();
parsed.join(url)
},
err => Err(err),
},
parsed => parsed,
}
}

impl FromStr for CloudType {
type Err = PolarsError;

#[cfg(feature = "cloud")]
fn from_str(url: &str) -> Result<Self, Self::Err> {
let parsed = parse_url(url).map_err(to_compute_err)?;
Self::from_url(&parsed)
}

#[cfg(not(feature = "cloud"))]
fn from_str(_s: &str) -> Result<Self, Self::Err> {
Expand Down
34 changes: 17 additions & 17 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::sync::Arc;
use arrow::io::parquet::read;
use arrow::io::parquet::write::FileMetaData;
use polars_core::prelude::*;
#[cfg(feature = "cloud")]
use polars_core::utils::concat_df;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

Expand All @@ -17,10 +16,8 @@ use crate::mmap::MmapBytesReader;
use crate::parquet::async_impl::FetchRowGroupsFromObjectStore;
#[cfg(feature = "cloud")]
use crate::parquet::async_impl::ParquetObjectStore;
use crate::parquet::read_impl::read_parquet;
pub use crate::parquet::read_impl::BatchedParquetReader;
#[cfg(feature = "cloud")]
use crate::predicates::apply_predicate;
use crate::parquet::read_impl::{materialize_hive_partitions, read_parquet};
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use crate::RowCount;
Expand Down Expand Up @@ -334,12 +331,15 @@ impl ParquetAsyncReader {

pub async fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.reader.get_metadata().await?.clone();
let schema = self.schema().await?;
let schema = match self.schema {
Some(schema) => schema,
None => self.schema().await?,
};
// row group fetched deals with projection
let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
self.reader,
&metadata,
self.schema.unwrap(),
schema.clone(),
self.projection.as_deref(),
self.predicate.clone(),
)?
Expand All @@ -364,25 +364,25 @@ impl ParquetAsyncReader {

pub async fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let metadata = self.get_metadata().await?.clone();
let schema = self.schema().await?;
let hive_partition_columns = self.hive_partition_columns.clone();

let predicate = self.predicate.clone();
// batched reader deals with slice pushdown
let reader = self.batched(usize::MAX).await?;
let mut iter = reader.iter(16);
let n_batches = metadata.row_groups.len();
let mut iter = reader.iter(n_batches);

let mut chunks = Vec::with_capacity(16);
let mut chunks = Vec::with_capacity(n_batches);
while let Some(result) = iter.next_().await {
let out = result.and_then(|mut df| {
apply_predicate(&mut df, predicate.as_deref(), true)?;
Ok(df)
})?;
chunks.push(out)
chunks.push(result?)
}
if chunks.is_empty() {
return Ok(DataFrame::from(schema.as_ref()));
let mut df = DataFrame::from(schema.as_ref());
materialize_hive_partitions(&mut df, hive_partition_columns.as_deref(), 0);
return Ok(df);
}
let mut df = concat_df(&chunks)?;
let mut df = accumulate_dataframes_vertical_unchecked(chunks);

if rechunk {
df.as_single_chunk_par();
Expand Down
16 changes: 9 additions & 7 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub(super) fn array_iter_to_series(
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
/// Safety: num_rows equals the height of the df when the df height is non-zero.
fn materialize_hive_partitions(
pub(crate) fn materialize_hive_partitions(
df: &mut DataFrame,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
Expand Down Expand Up @@ -342,6 +342,13 @@ pub fn read_parquet<R: MmapBytesReader>(
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<DataFrame> {
// Fast path.
if limit == 0 && hive_partition_columns.is_none() {
let mut df = DataFrame::from(schema.as_ref());
materialize_hive_partitions(&mut df, hive_partition_columns, 0);
return Ok(df);
}

let file_metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
Expand Down Expand Up @@ -409,12 +416,7 @@ pub fn read_parquet<R: MmapBytesReader>(
Cow::Borrowed(schema)
};
let mut df = DataFrame::from(schema.as_ref().as_ref());
if let Some(parts) = hive_partition_columns {
for s in parts {
// SAFETY: length is equal
unsafe { df.with_column_unchecked(s.clear()) };
}
}
materialize_hive_partitions(&mut df, hive_partition_columns, 0);
Ok(df)
} else {
accumulate_dataframes_vertical(dfs)
Expand Down
24 changes: 24 additions & 0 deletions crates/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,30 @@ pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) {
}
}

/// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read
/// concurrently/parallel
///
/// This takes an iterator over the number of rows per file.
pub fn get_sequential_row_statistics<I>(
iter: I,
mut total_rows_to_read: usize,
) -> Vec<(usize, usize)>
where
I: Iterator<Item = usize>,
{
let mut cumulative_read = 0;
iter.map(|rows_this_file| {
let remaining_rows_to_read = total_rows_to_read;
total_rows_to_read = total_rows_to_read.saturating_sub(rows_this_file);

let current_cumulative_read = cumulative_read;
cumulative_read += rows_this_file;

(remaining_rows_to_read, current_cumulative_read)
})
.collect()
}

#[cfg(feature = "json")]
pub(crate) fn overwrite_schema(
schema: &mut Schema,
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ description = "Lazy query engine for the Polars DataFrame library"

[dependencies]
arrow = { workspace = true }
futures = { workspace = true, optional = true }
polars-core = { workspace = true, features = ["lazy", "zip_with", "random"] }
polars-io = { workspace = true, features = ["lazy"] }
polars-json = { workspace = true, optional = true }
Expand Down Expand Up @@ -43,7 +44,7 @@ async = [
"polars-io/cloud",
"polars-pipe?/async",
]
cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio"]
cloud = ["async", "polars-pipe?/cloud", "polars-plan/cloud", "tokio", "futures"]
cloud_write = ["cloud"]
ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe?/ipc"]
json = ["polars-io/json", "polars-plan/json", "polars-json"]
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ pub struct IpcExec {

impl IpcExec {
fn read(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let (file, projection, n_rows, predicate) = prepare_scan_args(
let (file, projection, predicate) = prepare_scan_args(
&self.path,
&self.predicate,
&mut self.file_options.with_columns,
&mut self.schema,
self.file_options.n_rows,
self.file_options.row_count.is_some(),
None,
);
IpcReader::new(file.unwrap())
.with_n_rows(n_rows)
.with_n_rows(self.file_options.n_rows)
.with_row_count(std::mem::take(&mut self.file_options.row_count))
.set_rechunk(self.file_options.rechunk)
.with_projection(projection)
Expand Down
8 changes: 2 additions & 6 deletions crates/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use crate::prelude::*;
#[cfg(any(feature = "ipc", feature = "parquet"))]
type Projection = Option<Vec<usize>>;
#[cfg(any(feature = "ipc", feature = "parquet"))]
type StopNRows = Option<usize>;
#[cfg(any(feature = "ipc", feature = "parquet"))]
type Predicate = Option<Arc<dyn PhysicalIoExpr>>;

#[cfg(any(feature = "ipc", feature = "parquet"))]
Expand All @@ -40,10 +38,9 @@ fn prepare_scan_args(
predicate: &Option<Arc<dyn PhysicalExpr>>,
with_columns: &mut Option<Arc<Vec<String>>>,
schema: &mut SchemaRef,
n_rows: Option<usize>,
has_row_count: bool,
hive_partitions: Option<&[Series]>,
) -> (Option<std::fs::File>, Projection, StopNRows, Predicate) {
) -> (Option<std::fs::File>, Projection, Predicate) {
let file = std::fs::File::open(path).ok();

let with_columns = mem::take(with_columns);
Expand All @@ -56,10 +53,9 @@ fn prepare_scan_args(
has_row_count,
);

let n_rows = _set_n_rows_for_scan(n_rows);
let predicate = predicate.clone().map(phys_expr_to_io_expr);

(file, projection, n_rows, predicate)
(file, projection, predicate)
}

/// Producer of an in memory DataFrame
Expand Down
Loading

0 comments on commit b51d194

Please sign in to comment.