Skip to content

Commit

Permalink
improve file pre-fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 28, 2023
1 parent de8a5ab commit b0153eb
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
4 changes: 4 additions & 0 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ impl BatchedParquetReader {
self.row_group_offset >= self.n_row_groups
}

pub fn finishes_this_batch(&self, n: usize) -> bool {
self.row_group_offset + n > self.n_row_groups
}

pub async fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if self.limit == 0 && self.has_returned {
return Ok(None);
Expand Down
27 changes: 22 additions & 5 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,32 @@ impl Source for ParquetSource {
}
let reader = self.batched_reader.as_mut().unwrap();

let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?;
// We branch, because if we know the reader finishes after this batch we already want to start downloading the new batches.
// We can only do that if we don't have to update the row_count/limit with the result of this iteration.
let (reset, limit_reached, batches) =
if reader.finishes_this_batch(self.n_threads) && self.file_options.n_rows.is_none() {
// Take the reader, and immediately start the new download, before we await this one.
let mut reader = self.batched_reader.take().unwrap();
// Already ensure the new downloads are started
self.init_reader()?;
let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?;

(false, reader.limit_reached(), batches)
} else {
let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?;

(reader.is_finished(), reader.limit_reached(), batches)
};

Ok(match batches {
None => {
if reader.limit_reached() {
if limit_reached {
return Ok(SourceResult::Finished);
}
// Set the new the reader.
self.init_reader()?;
if reset {
// Set the new the reader.
self.init_reader()?;
}
return self.get_batches(_context);
},
Some(batches) => {
Expand All @@ -193,7 +210,7 @@ impl Source for ParquetSource {
);

// Already start downloading the new files before we push the data into the engine.
if reader.is_finished() {
if reset {
self.init_reader()?
}
source_result
Expand Down

0 comments on commit b0153eb

Please sign in to comment.