Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][wip] faster DefaultEngine parquet reads #595

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

zachschuermann
Copy link
Collaborator

@zachschuermann zachschuermann commented Dec 13, 2024

TLDR

This PR is a POC/exploration on speeding up DefaultEngine parquet reads. The current implementation is unfortunately relatively complicated and despite a solid amount of async code appears to be serially reading all parquet files. There are two main outcomes from this exploration:

  1. Empirically showed that the DefaultParquetHandler::read_parquet_files implementation is indeed serial but can be trivially made async/concurrent using typical async code and tokio tasks. In this case we can fire off all IO requests (up to some limit) and then bridge the async-to-sync boundary with an mpsc channel.
  2. Within our Scan::execute implementation we only ever pass a singular FileMeta to read_parquet_files thereby immediately limiting any concurrency implemented in the engine. This wasn't explored further in this PR but likely takes some more design work since there is a requirement to colocate a parquet file's partition values with the outcome of the parquet read. This doesn't seem to fit nicely in the existing API.

Details

Need for a better DefaultParquetHandler::read_parquet_files

The POC here gives an alternative (strawman) as AsyncParquetHandler which simply launches a tokio task for each parquet file to read. The rudimentary tests (that only work on my machine lol) show that the existing implementation serially reads each file (despite readahead = 10) and the new implementation indeed fires of all IO immediately. (tests simulated high IO latency via 'sleep')

Future work

  • understand why the existing implementation is serial (obviously a bug and not intended)
  • consider productionization of something like the AsyncParquetHandler
  • include benchmarks/other substantiation of changes made in this area
  • consider if/when/how to integrate with various async runtimes. if the consumer of the kernel is also an async rust user, it may be beneficial to propagate a runtime handle so that we don't end up with competing runtimes.
  • consider overall execution design in the DefaultEngine: do we want multiple runtimes? (IO runtime and CPU-bound runtime?)

Need for a better Scan::execute implementation

Currently, within Scan::execute we take the ScanFile iterator and sequentially call read_parquet_files() on each one:

    let result = scan_files_iter
        .map(move |scan_file| -> DeltaResult<_> {
            // ... [snip] ...
            let read_result_iter = engine.get_parquet_handler().read_parquet_files(
                &[meta],
                global_state.physical_schema.clone(),
                physical_predicate.clone(),
            )?;
            // ... [snip] ...
            Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
                // to transform the physical data into the correct logical form
                let logical = transform_to_logical_internal(...);
                // ... [snip] ...
                Ok(result)
            }))
        })
        .flatten_ok()
        .map(|x| x?);

Instead, ideally we would pass many parquet files required for the scan at once and then let the engine decide how to schedule IO. This is unfortunately not easy to coordinate since we have per-file state that must propagate 'through' the read_parquet_file API. That is, if we colocate all parquet files in the scan (spanning multiple partitions) we must somehow align all the data we read with the corresponding partition values. I didn't delve too much further here but flagging this for optimization soon.

Future work

  • consider all the spots we do parquet file reads, do we pass in multiple files to leverage concurrency implemented by the engine? No: everywhere except for reading all checkpoint parts just passes a single file. Note a future use case is reading all sidecars in v2Checkpoint
  • how can we fix the execute implementation to allow for reading all parquet files at once and propagating necessary per-file information?

@github-actions github-actions bot added the breaking-change Change that will require a version bump label Dec 13, 2024
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, this is clearly simpler and probably better.

Couple of things:

  1. Do you have any rough benchmarks for perf differences?
  2. yeah execute is pretty dumb here. we could consider a with_batch_size arg or something that tells it to try and collect a certain number of files to read before firing off to the parquet handler, I don't think that would be too hard to implement.

@@ -137,6 +137,9 @@ fn try_main() -> DeltaResult<()> {
}
})
.try_collect()?;
print_batches(&batches)?;
// print_batches(&batches)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for experimenting, i'd suggest using the multi-threaded reader. although i guess this does help determine how much a single call can read. regardless, read-table-multi-threaded has a --limit option for this case so you can see that some data got returned but not print it all, but it does tell you the total row count. maybe add that as an option here too :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep thanks I ended up playing with both but yea the --limit is nicer :)


let file_opener: Arc<dyn FileOpener + Send + Sync> = Arc::from(file_opener);
let len = files.len();
runtime.block_on(async {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can run basically all the code in here not inside the block_on except the join. So you'd do something like:

let files = files.to_vec();
let mut handles = Vec::with_capacity(len);
for file in files.into_iter() {
  [same code]
}

runtime.block_on(async {
  join_all(handles).await;
});

Just a little more clear what's going on I think


for file in files.into_iter() {
// let permit = semaphore.clone().acquire_owned().await.unwrap();
let tx_clone = tx.clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just call it tx


Ok(Box::pin(async move {
// TODO avoid IO by converting passed file meta to ObjectMeta
let meta = store.head(&path).await?;
let mut reader = ParquetObjectReader::new(store, meta);
if let Some(handle) = handle {
reader = reader.with_runtime(handle);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does setting this do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new in arrow 53.3 i think - lets you push down a runtime for them to schedule their IO on. This has gotten me thinking about various ways to enable this sort of 'runtime passthrough' ourselves..

Perform IO on the provided tokio runtime
Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding, on the same tokio runtime can lead to degraded throughput, dropped connections and other issues. For more information see here.

see with_runtime

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Yeah, seems similar to what you're doing.

@zachschuermann
Copy link
Collaborator Author

nice, this is clearly simpler and probably better.

Couple of things:

  1. Do you have any rough benchmarks for perf differences?
  2. yeah execute is pretty dumb here. we could consider a with_batch_size arg or something that tells it to try and collect a certain number of files to read before firing off to the parquet handler, I don't think that would be too hard to implement.

Note

100MB test table with 100 files in latest snapshot (on S3) read goes from 50s to 7.58s (6.6x speedup) on my M1 mac

woo! and yea i just did some heavy-handed hacking to get a somewhat useful benchmark running. I hacked around to get Scan::execute to just hand all the files at once to the reader. The hard part with execute is 'lining up' all the chunks of data.. that is, if we have 3 parquet files and each read them in two chunks then in our current API we have 6 chunks yielded from the parquet reader, and would need to somehow 'attach' the appropriate per-file data to each set (e.g. if the first file is partition part=1 then we need to make sure that part=1 is applied to the first two chunks). Anyways, I totally hacked it out and i'm not applying any DVs or partition values appropriately, but just did a simple bake-off to validate. Table size is ~100MB in the latest snapshot split across 100 files (1MB each):

before my changes (old read_parquet_files)

[20:02] [databricks] read-table-single-threaded ➜ time ../../../target/release/read-table-single-threaded "s3://zach-tables/test_table_100/100_file_table" --public --region us-west-2
Reading s3://zach-tables/test_table_100/100_file_table/
Total rows read: 10000000

________________________________________________________
Executed in   50.02 secs    fish           external
   usr time    1.38 secs    0.14 millis    1.38 secs
   sys time    0.88 secs    3.30 millis    0.88 secs

after my changes (new read_parquet_files)

[20:03] [databricks] read-table-single-threaded ➜ time ../../../target/release/read-table-single-threaded "s3://zach-tables/test_table_100/100_file_table" --public --region us-west-2
Reading s3://zach-tables/test_table_100/100_file_table/
Total rows read: 10000000

________________________________________________________
Executed in    7.58 secs      fish           external
   usr time  968.55 millis    0.12 millis  968.43 millis
   sys time  880.15 millis    2.65 millis  877.50 millis

@OussamaSaoudi-db
Copy link
Collaborator

@zachschuermann You mention in the PR description:

Instead, ideally we would pass many parquet files required for the scan at once and then let the engine decide how to schedule IO. This is unfortunately not easy to coordinate since we have per-file state that must propagate 'through' the read_parquet_file API. That is, if we colocate all parquet files in the scan (spanning multiple partitions) we must somehow align all the data we read with the corresponding partition values. I didn't delve too much further here but flagging this for optimization soon.

I'm not sure I fully get this. ScanFile already holds the partition value for that file, so even if there are multiple parquet files that belong to a partition, each of them will have their own ScanFile telling them what their partition value is. Given that, it should be pretty easy to parallelize: (GlobalScanState + ScanFile) is everything you need.

@scovich
Copy link
Collaborator

scovich commented Jan 6, 2025

@zachschuermann You mention in the PR description:

Instead, ideally we would pass many parquet files required for the scan at once and then let the engine decide how to schedule IO. This is unfortunately not easy to coordinate since we have per-file state that must propagate 'through' the read_parquet_file API. That is, if we colocate all parquet files in the scan (spanning multiple partitions) we must somehow align all the data we read with the corresponding partition values. I didn't delve too much further here but flagging this for optimization soon.

I'm not sure I fully get this. ScanFile already holds the partition value for that file, so even if there are multiple parquet files that belong to a partition, each of them will have their own ScanFile telling them what their partition value is. Given that, it should be pretty easy to parallelize: (GlobalScanState + ScanFile) is everything you need.

This was also my understanding of how scans worked -- or at least how the original design wanted them to work? Engine receives a ScanFile for each file that contains everything kernel needs to know about that file. It is largely opaque to engine, but required to stay "attached" to the file across splits and iterators, and should pass back to kernel when the time comes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Change that will require a version bump
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants