From d17d7819ddc100cba797bcf84b03aa765830eb4a Mon Sep 17 00:00:00 2001 From: ritchie Date: Sat, 28 Oct 2023 08:50:58 +0200 Subject: [PATCH] fix --- crates/polars-io/Cargo.toml | 2 +- crates/polars-io/src/parquet/async_impl.rs | 60 ++++++++++++++-------- crates/polars-io/src/parquet/read_impl.rs | 6 +-- 3 files changed, 42 insertions(+), 26 deletions(-) diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index 97999970e27a..f3fd0059fb51 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -43,7 +43,7 @@ serde_json = { version = "1", default-features = false, features = ["alloc", "ra simd-json = { workspace = true, optional = true } simdutf8 = { workspace = true, optional = true } smartstring = { workspace = true } -tokio = { workspace = true, features = ["net", "rt-multi-thread", "time", "sync"], optional = true } +tokio = { workspace = true, features = ["net", "rt-multi-thread", "time"], optional = true } tokio-util = { workspace = true, features = ["io", "io-util"], optional = true } url = { workspace = true, optional = true } diff --git a/crates/polars-io/src/parquet/async_impl.rs b/crates/polars-io/src/parquet/async_impl.rs index d3d935558dc5..1ddeb8ecf979 100644 --- a/crates/polars-io/src/parquet/async_impl.rs +++ b/crates/polars-io/src/parquet/async_impl.rs @@ -161,9 +161,11 @@ async fn download_projection( } type DownloadedRowGroup = Vec>; +type QueuePayload = (usize, DownloadedRowGroup); pub struct FetchRowGroupsFromObjectStore { - row_groups: Arc>>>, + rg_q: Arc>>>, + prefetched_rg: PlHashMap, } impl FetchRowGroupsFromObjectStore { @@ -193,14 +195,26 @@ impl FetchRowGroupsFromObjectStore { let row_groups_end = compute_row_group_range(0, row_groups.len(), limit, row_groups); let row_groups = &row_groups[0..row_groups_end]; + let mut prefetched: PlHashMap = PlHashMap::new(); + let row_groups = if let Some(pred) = predicate.as_deref() { row_groups .iter() - .filter(|rg| matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true))) - .cloned() + .enumerate() + .filter(|(i, rg)| { + let should_be_read = + matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true)); + + // Already add the row groups that will be skipped to the prefetched data. + if !should_be_read { + prefetched.insert(*i, Default::default()); + } + should_be_read + }) + .map(|(i, rg)| (i, rg.clone())) .collect::>() } else { - row_groups.to_vec() + row_groups.iter().cloned().enumerate().collect() }; let reader = Arc::new(reader); @@ -208,12 +222,12 @@ impl FetchRowGroupsFromObjectStore { let _ = std::thread::spawn(move || { get_runtime().block_on(async { - 'loop_rg: for rg in row_groups { + 'loop_rg: for (i, rg) in row_groups { let fetched = download_projection(&projected_fields, &[rg], &reader).await; match fetched { Ok(fetched) => { - let payload = PolarsResult::Ok(fetched); + let payload = PolarsResult::Ok((i, fetched)); if snd.send(payload).is_err() { break 'loop_rg; } @@ -229,7 +243,8 @@ impl FetchRowGroupsFromObjectStore { }); Ok(FetchRowGroupsFromObjectStore { - row_groups: Arc::new(Mutex::new(rcv)), + rg_q: Arc::new(Mutex::new(rcv)), + prefetched_rg: Default::default(), }) } @@ -237,23 +252,24 @@ impl FetchRowGroupsFromObjectStore { &mut self, row_groups: Range, ) -> PolarsResult { - let mut received = PlHashMap::new(); - let guard = self.row_groups.lock().unwrap(); - for _ in row_groups { - let downloaded = guard.recv().unwrap()?; - - if received.is_empty() { - let downloaded_per_filepos = downloaded - .into_iter() - .flat_map(|rg| rg.into_iter()) - .collect::>(); - received = downloaded_per_filepos - } else { - let downloaded_per_filepos = downloaded.into_iter().flat_map(|rg| rg.into_iter()); + let guard = self.rg_q.lock().unwrap(); - received.extend(downloaded_per_filepos) - } + while !row_groups + .clone() + .all(|i| self.prefetched_rg.contains_key(&i)) + { + let Ok(fetched) = guard.recv() else { break }; + let (rg_i, payload) = fetched?; + + self.prefetched_rg.insert(rg_i, payload); } + + let received = row_groups + .flat_map(|i| self.prefetched_rg.remove(&i)) + .flat_map(|rg| rg.into_iter()) + .flat_map(|v| v.into_iter()) + .collect::>(); + Ok(ColumnStore::Fetched(received)) } } diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index ff558a237eaf..a02b1392eeb6 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -476,7 +476,7 @@ impl From for RowGroupFetcher { } impl RowGroupFetcher { - async fn fetch_row_groups(&mut self, _row_groups: Range) -> PolarsResult { + fn fetch_row_groups(&mut self, _row_groups: Range) -> PolarsResult { match self { RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups), #[cfg(feature = "cloud")] @@ -597,8 +597,7 @@ impl BatchedParquetReader { let store = self .row_group_fetcher - .fetch_row_groups(row_group_start..row_group_end) - .await?; + .fetch_row_groups(row_group_start..row_group_end)?; let dfs = rg_to_dfs( &store, @@ -617,6 +616,7 @@ impl BatchedParquetReader { )?; self.row_group_offset += n; + // case where there is no data in the file // the streaming engine needs at least a single chunk if self.rows_read == 0 && dfs.is_empty() {