Skip to content

Commit

Permalink
also add prefetching in multiple files
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 28, 2023
1 parent cae4487 commit de8a5ab
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 48 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl FetchRowGroupsFromObjectStore {
}

// Wait n - 3 tasks, so we already start the next downloads earlier.
for task in handles.drain(..handles.len() - 3) {
for task in handles.drain(..handles.len().saturating_sub(3)) {
let succeeded = task.await.unwrap();
if !succeeded {
return;
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,10 @@ impl BatchedParquetReader {
&self.schema
}

pub fn is_finished(&self) -> bool {
self.row_group_offset >= self.n_row_groups
}

pub async fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if self.limit == 0 && self.has_returned {
return Ok(None);
Expand Down
92 changes: 45 additions & 47 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::ops::Deref;
use std::ops::{Deref, Range};
use std::path::PathBuf;
use std::sync::Arc;

Expand All @@ -23,8 +23,8 @@ pub struct ParquetSource {
n_threads: usize,
processed_paths: usize,
chunk_index: IdxSize,
paths: std::slice::Iter<'static, PathBuf>,
_paths_lifetime: Arc<[PathBuf]>,
iter: Range<usize>,
paths: Arc<[PathBuf]>,
options: ParquetOptions,
file_options: FileScanOptions,
#[allow(dead_code)]
Expand All @@ -35,13 +35,13 @@ pub struct ParquetSource {
}

impl ParquetSource {
// Delay initializing the reader
// otherwise all files would be opened during construction of the pipeline
// leading to Too many Open files error
fn init_reader(&mut self) -> PolarsResult<()> {
let Some(path) = self.paths.next() else {
self.batched_reader = None;
self.metadata = None;
let Some(index) = self.iter.next() else {
return Ok(());
};
let path = &self.paths[index];
let options = self.options;
let file_options = self.file_options.clone();
let schema = self.file_info.schema.clone();
Expand Down Expand Up @@ -135,71 +135,69 @@ impl ParquetSource {
) -> PolarsResult<Self> {
let n_threads = POOL.current_num_threads();

// extend lifetime as it will be bound to parquet source
let iter = unsafe {
std::mem::transmute::<std::slice::Iter<'_, PathBuf>, std::slice::Iter<'static, PathBuf>>(
paths.iter(),
)
};
let iter = 0..paths.len();

Ok(ParquetSource {
let mut source = ParquetSource {
batched_reader: None,
n_threads,
chunk_index: 0,
processed_paths: 0,
options,
file_options,
paths: iter,
_paths_lifetime: paths,
iter,
paths,
cloud_options,
metadata,
file_info,
verbose,
})
};
source.init_reader()?;
Ok(source)
}
}

impl Source for ParquetSource {
fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult<SourceResult> {
if self.batched_reader.is_none() {
self.init_reader()?;

// If there was no new reader, we depleted all of them and are finished.
if self.batched_reader.is_none() {
return Ok(SourceResult::Finished);
}
return Ok(SourceResult::Finished);
}
let batches = get_runtime().block_on(
self.batched_reader
.as_mut()
.unwrap()
.next_batches(self.n_threads),
)?;
let reader = self.batched_reader.as_mut().unwrap();

let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?;

Ok(match batches {
None => {
if self.batched_reader.as_ref().unwrap().limit_reached() {
if reader.limit_reached() {
return Ok(SourceResult::Finished);
}
// reset the reader
self.batched_reader = None;
self.metadata = None;
// Set the new the reader.
self.init_reader()?;
return self.get_batches(_context);
},
Some(batches) => SourceResult::GotMoreData(
batches
.into_iter()
.map(|data| {
// Keep the row limit updated so the next reader will have a correct limit.
if let Some(n_rows) = &mut self.file_options.n_rows {
*n_rows = n_rows.saturating_sub(data.height())
}

let chunk_index = self.chunk_index;
self.chunk_index += 1;
DataChunk { chunk_index, data }
})
.collect(),
),
Some(batches) => {
let source_result = SourceResult::GotMoreData(
batches
.into_iter()
.map(|data| {
// Keep the row limit updated so the next reader will have a correct limit.
if let Some(n_rows) = &mut self.file_options.n_rows {
*n_rows = n_rows.saturating_sub(data.height())
}

let chunk_index = self.chunk_index;
self.chunk_index += 1;
DataChunk { chunk_index, data }
})
.collect(),
);

// Already start downloading the new files before we push the data into the engine.
if reader.is_finished() {
self.init_reader()?
}
source_result
},
})
}
fn fmt(&self) -> &str {
Expand Down

0 comments on commit de8a5ab

Please sign in to comment.