Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 28, 2023
1 parent 2354aa0 commit d17d781
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 26 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ serde_json = { version = "1", default-features = false, features = ["alloc", "ra
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
smartstring = { workspace = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "time", "sync"], optional = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "time"], optional = true }
tokio-util = { workspace = true, features = ["io", "io-util"], optional = true }
url = { workspace = true, optional = true }

Expand Down
60 changes: 38 additions & 22 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,11 @@ async fn download_projection(
}

type DownloadedRowGroup = Vec<Vec<(u64, Bytes)>>;
type QueuePayload = (usize, DownloadedRowGroup);

pub struct FetchRowGroupsFromObjectStore {
row_groups: Arc<Mutex<Receiver<PolarsResult<DownloadedRowGroup>>>>,
rg_q: Arc<Mutex<Receiver<PolarsResult<QueuePayload>>>>,
prefetched_rg: PlHashMap<usize, DownloadedRowGroup>,
}

impl FetchRowGroupsFromObjectStore {
Expand Down Expand Up @@ -193,27 +195,39 @@ impl FetchRowGroupsFromObjectStore {
let row_groups_end = compute_row_group_range(0, row_groups.len(), limit, row_groups);
let row_groups = &row_groups[0..row_groups_end];

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();

let row_groups = if let Some(pred) = predicate.as_deref() {
row_groups
.iter()
.filter(|rg| matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true)))
.cloned()
.enumerate()
.filter(|(i, rg)| {
let should_be_read =
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
prefetched.insert(*i, Default::default());
}
should_be_read
})
.map(|(i, rg)| (i, rg.clone()))
.collect::<Vec<_>>()
} else {
row_groups.to_vec()
row_groups.iter().cloned().enumerate().collect()
};
let reader = Arc::new(reader);

let (snd, rcv) = sync_channel(5);

let _ = std::thread::spawn(move || {
get_runtime().block_on(async {
'loop_rg: for rg in row_groups {
'loop_rg: for (i, rg) in row_groups {
let fetched = download_projection(&projected_fields, &[rg], &reader).await;

match fetched {
Ok(fetched) => {
let payload = PolarsResult::Ok(fetched);
let payload = PolarsResult::Ok((i, fetched));
if snd.send(payload).is_err() {
break 'loop_rg;
}
Expand All @@ -229,31 +243,33 @@ impl FetchRowGroupsFromObjectStore {
});

Ok(FetchRowGroupsFromObjectStore {
row_groups: Arc::new(Mutex::new(rcv)),
rg_q: Arc::new(Mutex::new(rcv)),
prefetched_rg: Default::default(),
})
}

pub(crate) fn fetch_row_groups(
&mut self,
row_groups: Range<usize>,
) -> PolarsResult<ColumnStore> {
let mut received = PlHashMap::new();
let guard = self.row_groups.lock().unwrap();
for _ in row_groups {
let downloaded = guard.recv().unwrap()?;

if received.is_empty() {
let downloaded_per_filepos = downloaded
.into_iter()
.flat_map(|rg| rg.into_iter())
.collect::<PlHashMap<_, _>>();
received = downloaded_per_filepos
} else {
let downloaded_per_filepos = downloaded.into_iter().flat_map(|rg| rg.into_iter());
let guard = self.rg_q.lock().unwrap();

received.extend(downloaded_per_filepos)
}
while !row_groups
.clone()
.all(|i| self.prefetched_rg.contains_key(&i))
{
let Ok(fetched) = guard.recv() else { break };
let (rg_i, payload) = fetched?;

self.prefetched_rg.insert(rg_i, payload);
}

let received = row_groups
.flat_map(|i| self.prefetched_rg.remove(&i))
.flat_map(|rg| rg.into_iter())
.flat_map(|v| v.into_iter())
.collect::<PlHashMap<_, _>>();

Ok(ColumnStore::Fetched(received))
}
}
6 changes: 3 additions & 3 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl From<FetchRowGroupsFromMmapReader> for RowGroupFetcher {
}

impl RowGroupFetcher {
async fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
match self {
RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups),
#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -597,8 +597,7 @@ impl BatchedParquetReader {

let store = self
.row_group_fetcher
.fetch_row_groups(row_group_start..row_group_end)
.await?;
.fetch_row_groups(row_group_start..row_group_end)?;

let dfs = rg_to_dfs(
&store,
Expand All @@ -617,6 +616,7 @@ impl BatchedParquetReader {
)?;

self.row_group_offset += n;

// case where there is no data in the file
// the streaming engine needs at least a single chunk
if self.rows_read == 0 && dfs.is_empty() {
Expand Down

0 comments on commit d17d781

Please sign in to comment.