Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: read parquet file in chunked mode per row group #3016

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 26 additions & 23 deletions awswrangler/s3/_read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,33 +247,36 @@ def _read_parquet_chunked(
if pq_file is None:
continue

use_threads_flag: bool = use_threads if isinstance(use_threads, bool) else bool(use_threads > 1)
chunks = pq_file.iter_batches(
batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False
)

schema = pq_file.schema.to_arrow_schema()
metadata = pq_file.metadata
schema = metadata.schema.to_arrow_schema()
if columns:
schema = pa.schema([schema.field(column) for column in columns], schema.metadata)

table = _add_table_partitions(
table=pa.Table.from_batches(chunks, schema=schema),
path=path,
path_root=path_root,
)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
if chunked is True:
yield df
use_threads_flag: bool = use_threads if isinstance(use_threads, bool) else bool(use_threads > 1)
table_kwargs = {"path": path, "path_root": path_root}
if metadata.num_rows > 0:
for chunk in pq_file.iter_batches(
batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False
):
table = _add_table_partitions(table=pa.Table.from_batches([chunk], schema=schema), **table_kwargs)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
if chunked is True:
yield df
else:
if next_slice is not None:
df = pd.concat(objs=[next_slice, df], sort=False, copy=False)
while len(df.index) >= chunked:
yield df.iloc[:chunked, :].copy()
df = df.iloc[chunked:, :]
if df.empty:
next_slice = None
else:
next_slice = df
else:
if next_slice is not None:
df = pd.concat(objs=[next_slice, df], sort=False, copy=False)
while len(df.index) >= chunked:
yield df.iloc[:chunked, :].copy()
df = df.iloc[chunked:, :]
if df.empty:
next_slice = None
else:
next_slice = df
table = _add_table_partitions(table=pa.Table.from_batches([], schema=schema), **table_kwargs)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
yield df

if next_slice is not None:
yield next_slice

Expand Down
Loading