Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Scan and read datasets from remote object stores (parquet only) #11256

Closed
wants to merge 9 commits into from

Conversation

chitralverma
Copy link
Contributor

@chitralverma chitralverma commented Sep 22, 2023

[I'll write a better description once this is out of the experiments phase]

Aim to replace the current architecture of scanning and reading all file based datasets (parquet only in this PR)

Targeting the following changes (also from the discord chats)

  • Create a new abstraction FileFormat that knows how to read a file's content and its metadata (schema inference etc.)
  • This abstraction will deal with async under the hood.
  • The file format-related stuff is scattered across multiple crates, this can be aggregated under polars-io via the above abstractions.
  • Create an object listing API which internally handles all the async stuff
  • Object listing API should allow inference of an operator to connect to remote stores.
  • Object listing API should support full-scale path globbing, recursive paths
  • Publicly it should have a finish() -> DataFrame and a get_batches() -> Vec so that streaming can also use it.
  • For local files, the existing approach (mmap readers) will be used so no functional change. For remote stores, the new async readers will be used.

Notes for review:

  • This implementation relies on opendal a part of Apache org, backed by an active community. It provides a lotttt of backends out of the box and looks like a great fit for polars.
    • Also object_store does not have arrow2 compatible async readers, see more here. opendal provides this out of the box.
    • object_store has support for lesser backends and works better with the arrow-rs crate instead. This will involve writing async readers for each format as done by datafusion.
    • opendal community is working on building a bridge with object_store, so if this is required later it can still be used.
  • To not bloat this PR, I haven't removed the older implementation based on object_store. Feature gating has not been done yet for the same reason.
  • Once the idea is validated on a high level, I will add 1 new feature like cloud-datasets or something which gates all the async stuff like tokio etc.

@chitralverma chitralverma changed the title (feat:rust) Scan and read datasets from remote object stores feat(rust): Scan and read datasets from remote object stores Sep 22, 2023
@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature rust Related to Rust Polars labels Sep 22, 2023
Signed-off-by: Chitral Verma <[email protected]>
@chitralverma chitralverma changed the title feat(rust): Scan and read datasets from remote object stores feat(rust): Scan and read datasets from remote object stores (parquet only) Sep 22, 2023
@chitralverma
Copy link
Contributor Author

chitralverma commented Sep 22, 2023

@ritchie46 big PR for that old discussion incoming finally.

I'm building a prototype alongside the existing code starting with the scan side changes for parquet. will proceed to the eager side after this.
(in my local tests I'm able to glob 1000s of parquets from s3 and local fs )

I have 2 questions about this -
say a glob pattern a/b/c/*.pq is given which resolves to 2 files file1.pq and file.pq.

  1. On the scan side, these may have different schema, concat_impl does not check for this. So should this check be on the scan side or the eager side (execution phase)?
  2. multiple schemas can be handled in many ways, for example, a strict check that all the schemas should be the same, or they can be merged to have a new schema (if possible), or pick the schema of the first file (current implementation)

@ritchie46
Copy link
Member

@chitralverma I don't exactly follow what this will add differently from currently globbing? I just exposed that to python this week and was thinking of supporting dataset/ as well. But I believe it can reuse most of the architecture that is already in place.

@chitralverma
Copy link
Contributor Author

@chitralverma I don't exactly follow what this will add differently from currently globbing? I just exposed that to python this week and was thinking of supporting dataset/ as well. But I believe it can reuse most of the architecture that is already in place.

yes, the globbing is just an add-on, but this PR is more about refactoring and aggregating all the file_format/ async stuff under polar-io as per the new abstractions. The way I see it, the current implementation can be simplified a lot more.

This refactoring will also directly feed into the pluggable/ user-defined data sources idea.

This also makes some way for partitioned datasets and their pruning across all file formats once things are standardized

Some things that were sequential before (like schema inference) are parallel now.

the second major change is the ability to work with more backends which are provided by opendal, so that's about the versatility.

finally, these changes in this PR are just for parquet so functionally everything is more or less the same. But if and when this gets merged, this can easily be extended to other formats like csv, avro, ipc, json, etc.

@tustvold
Copy link

tustvold commented Sep 25, 2023

This implementation relies on opendal a part of Apache org,

I'm sorry to hear about this, but understand if that is the direction you wish to take. Do let me know if there is any functionality that would help sway you back to the object_store fold 😅

This will involve writing async readers for each writing async readers for each format as done by datafusion.

FWIW the readers in DataFusion just layer on DataFusion specific functionality like predicate pushdown, schema adaption, etc... the core IO exists in arrow-rs/parquet and would be usable by polars.

opendal provides this out of the box.

I feel something has been lost in translation here, object_store doesn't provide these because object stores themselves don't provide them. A roundtrip to object stores is on the order of 100-200ms, so even if you have pre-fetching heuristics and a very predictable read pattern, something which isn't really true for parquet, you're going to pay a very high price for this API.

The approach taken by arrow-rs/DataFusion is instead to learn the lessons from the Hadoop ecosystem (apache/datafusion#2205 (comment)) and start with a vectored IO abstraction from the get go.

That being said we could possibly add an AsyncRead + AsyncSeek utility to object_store to faciliate integration with the arrow2 IO readers, I'll have a play

Edit: see apache/arrow-rs#4857

@chitralverma
Copy link
Contributor Author

chitralverma commented Sep 27, 2023

Thanks @tustvold I'll check it out.

@ritchie46 so do these changes still make sense to you or shall I drop this idea because while this PR is just for parquet, a similar pattern will be used for other formats later?

@stinodego
Copy link
Member

We have this feature available now in Polars, so I'm closing this PR.

@stinodego stinodego closed this Feb 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants