Skip to content

Commit

Permalink
use vecdeque
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 28, 2023
1 parent b0153eb commit a4b33e6
Showing 1 changed file with 33 additions and 37 deletions.
70 changes: 33 additions & 37 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::ops::{Deref, Range};
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -19,7 +20,7 @@ use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult};
use crate::pipeline::determine_chunk_size;

pub struct ParquetSource {
batched_reader: Option<BatchedParquetReader>,
batched_readers: VecDeque<BatchedParquetReader>,
n_threads: usize,
processed_paths: usize,
chunk_index: IdxSize,
Expand All @@ -35,9 +36,12 @@ pub struct ParquetSource {
}

impl ParquetSource {
fn init_reader(&mut self) -> PolarsResult<()> {
self.batched_reader = None;
fn init_next_reader(&mut self) -> PolarsResult<()> {
self.metadata = None;
self.init_reader()
}

fn init_reader(&mut self) -> PolarsResult<()> {
let Some(index) = self.iter.next() else {
return Ok(());
};
Expand Down Expand Up @@ -118,7 +122,7 @@ impl ParquetSource {
if self.processed_paths >= 1 {
polars_ensure!(batched_reader.schema().as_ref() == self.file_info.reader_schema.as_ref().unwrap().as_ref(), ComputeError: "schema of all files in a single scan_parquet must be equal");
}
self.batched_reader = Some(batched_reader);
self.batched_readers.push_back(batched_reader);
self.processed_paths += 1;
Ok(())
}
Expand All @@ -138,7 +142,7 @@ impl ParquetSource {
let iter = 0..paths.len();

let mut source = ParquetSource {
batched_reader: None,
batched_readers: VecDeque::new(),
n_threads,
chunk_index: 0,
processed_paths: 0,
Expand All @@ -151,49 +155,42 @@ impl ParquetSource {
file_info,
verbose,
};
source.init_reader()?;
// Already start downloading when we deal with cloud urls.
if !source.paths.first().unwrap().is_file() {
source.init_reader()?;
}
Ok(source)
}
}

impl Source for ParquetSource {
fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult<SourceResult> {
if self.batched_reader.is_none() {
// If there was no new reader, we depleted all of them and are finished.
return Ok(SourceResult::Finished);
// We already start downloading the next file, we can only do that if we don't have a limit.
// In the case of a limit we first must update the row count with the batch results.
if self.batched_readers.len() < 3 && self.file_options.n_rows.is_none()
|| self.batched_readers.is_empty()
{
self.init_next_reader()?
}
let reader = self.batched_reader.as_mut().unwrap();

// We branch, because if we know the reader finishes after this batch we already want to start downloading the new batches.
// We can only do that if we don't have to update the row_count/limit with the result of this iteration.
let (reset, limit_reached, batches) =
if reader.finishes_this_batch(self.n_threads) && self.file_options.n_rows.is_none() {
// Take the reader, and immediately start the new download, before we await this one.
let mut reader = self.batched_reader.take().unwrap();
// Already ensure the new downloads are started
self.init_reader()?;
let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?;

(false, reader.limit_reached(), batches)
} else {
let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?;

(reader.is_finished(), reader.limit_reached(), batches)
};
let Some(mut reader) = self.batched_readers.pop_front() else {
// If there was no new reader, we depleted all of them and are finished.
return Ok(SourceResult::Finished);
};

let batches = get_runtime().block_on(reader.next_batches(self.n_threads))?;
Ok(match batches {
None => {
if limit_reached {
if reader.limit_reached() {
return Ok(SourceResult::Finished);
}
if reset {
// Set the new the reader.
self.init_reader()?;
}

// reset the reader
self.init_next_reader()?;
return self.get_batches(_context);
},
Some(batches) => {
let source_result = SourceResult::GotMoreData(
let result = SourceResult::GotMoreData(
batches
.into_iter()
.map(|data| {
Expand All @@ -208,12 +205,11 @@ impl Source for ParquetSource {
})
.collect(),
);
// We are not yet done with this reader.
// Ensure it is used in next iteration.
self.batched_readers.push_front(reader);

// Already start downloading the new files before we push the data into the engine.
if reset {
self.init_reader()?
}
source_result
result
},
})
}
Expand Down

0 comments on commit a4b33e6

Please sign in to comment.