Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to best optimize reading from S3? #278

Open
stevbear opened this issue Feb 12, 2025 · 7 comments
Open

How to best optimize reading from S3? #278

stevbear opened this issue Feb 12, 2025 · 7 comments

Comments

@stevbear
Copy link

Describe the usage question you have. Please include as many useful details as possible.

Hi!
I have a use case of reading certain row groups from S3.
I see that there is an option BufferedStreamEnabled.
When I set BufferedStreamEnabled to false, it seems to try to read all of the data of a column for a row group at once, which will, unfortunately, result in OOM for us.
When I set BufferedStreamEnabled to true, the library seems to be reading the row group page by page, which is not optimal for cloud usage.
How can I improve this? I imagine that the best way to improve this would be to read multiple pages in one read() sys call?

Component(s)

Parquet

@zeroshade
Copy link
Member

Hi @stevbear

Optimizing the reading from S3 has been on my list for a while and I just hadn't gotten around to it. It didn't get prioritized because no one had filed any issues concerning it, so thank you for filing this!

When I set BufferedStreamEnabled to true, the library seems to be reading the row group page by page, which is not optimal for cloud usage.

This is likely because the default buffer size is 16KB, have you tried increasing the BufferSize member of the ReaderProperties so that it buffers more pages at a time? Perhaps try a buffer of a few MB?

I have a few ideas to further optimize via pre-buffering (which all have different trade-offs) so can you give me a bit more context to make sure that your use case would be helped and to identify which idea would work best for you?

Specifically: If reading an entire column for a single row group gives you OOM, you either have a significantly large row group, or I'm guessing it's string data with a lot of large strings? That leads to the question of what you're doing with the column data after you read it from the row group. If you can't hold the entire column in memory from a single row group, are you streaming the data somewhere? Are you reading only a single column at a time or multiple columns from the row group? Can you give me more of an idea of the sizes of the columns / row group of the file and the memory limitations of your system?

Is the issue the copy that happens when decoding/decompressing the column data? etc.

The more information the better so we can figure out a good solution here, gives me the opportunity to improve the memory usage of the parquet package like I've been wanting to! 😄

@stevbear
Copy link
Author

stevbear commented Feb 13, 2025

Hi @zeroshade, thanks for getting back to me.
The OOM is not my concern at the moment, as I was able to get away with using BufferSize and specifying a reasonable batch size. My concern is more on efficiency. The BufferSize inside of ReaderProperties doesn't matter that much If I'm reading the source code correctly, as it creates a new buffer when reading the page data (https://github.com/apache/arrow-go/blob/main/parquet/file/page_reader.go#L543).
We have a not so usual use case of using parquet files for point reads, meaning that, on each read, we will only be reading a few hundred rows, that is in one, or 2 - 3 consecutive pages. Looking at the implementation, I will be able to use column indexes and page indexes to narrow down the pages I will need (thanks to your recent changes). In the current implementation, what I have observed is that, for those pages, it will require 1 read for the page header, and another read for the content (sometimes this doesn't happen if reading the page header covers the content). I was thinking that, since S3 has high latency, it would be more efficient to support a mechanism of reading 2 or more pages at the same time, given that memory will fit.
Thanks again!

@zeroshade
Copy link
Member

The BufferSize matters because it controls the underlying BufferedReader that is being read from, (see https://github.com/apache/arrow-go/blob/main/internal/utils/buf_reader.go#L72). When the page reader attempts to read from S3 for the page header and data, the underlying BufferedReader will first read BufferSize data from S3 to fill the buffer and the buffers that are created/allocated are merely copying from that internal buffer.

In the current implementation, what I have observed is that, for those pages, it will require 1 read for the page header, and another read for the content (sometimes this doesn't happen if reading the page header covers the content). I was thinking that, since S3 has high latency, it would be more efficient to support a mechanism of reading 2 or more pages at the same time, given that memory will fit.

Assuming the BufferSize is large enough and BufferedStreamEnabled is true, it should read multiple pages at once for you. Another option would be to enable providing a read-cache that you could pre-populate by using the OffsetIndex and PageLocations if you know which pages you want. That said, right now the current APIs don't make it easy (or really possible) to have the page reader skip entire pages or read specific pages and take advantage of such a read-cache. So In addition to the read-cache I'd have to add changes to make that more possible.

Does that make sense?

@stevbear
Copy link
Author

stevbear commented Feb 13, 2025

Thanks! That makes complete sense!
Do you have an example handy of using column indexes and offset indexes and reading pages? I assume that will be using the PageReader?

@zeroshade
Copy link
Member

That's where the current problem is, as it currently stands there isn't a good way to actually use the column and offset indexes when reading pages as right now there isn't a way to tell the page reader to skip an entire page or to utilize the pagelocation/offset information to go to a specific page.

It would be helpful if you had an example of your use case scenario, and we could potentially work together to figure out what a good new API would look like to add support for leveraging the indexes to skip pages etc.

@stevbear
Copy link
Author

I see. Thanks for that explanation.
Here's an example: given a row group, containing pages:

  • page 0 (min: 1, max: 5)
  • page 1 (min: 6, max: 9)
  • page 2 (min: 10, max: 16)
  • page 3 (min: 17, max: 22)
    We would like to read a range of data from 7 to 14.
    We should be able to use column indexes and page indexes to narrow the pages down to just page 1 and page 2, and read those pages.
    I think parquet-go (https://github.com/parquet-go/parquet-go) implemented an API to seek to a row number within the row group and an API to read x number of rows from there.

@zeroshade
Copy link
Member

I started implementing this a bit and realized that when you start dealing with repeated columns a SeekToRow doesn't make so much sense with the low level APIs (distinguishing rows vs values is not easy for skipping within a page).

It makes more sense with the Arrow column and record readers in the pqarrow packages. What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants