Skip to content

Commit

Permalink
feat: reduce memory impact of chunked reads
Browse files Browse the repository at this point in the history
  • Loading branch information
jaidisido committed Nov 15, 2024
1 parent 5d51c45 commit fecdbd4
Showing 1 changed file with 22 additions and 29 deletions.
51 changes: 22 additions & 29 deletions awswrangler/s3/_read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,40 +247,33 @@ def _read_parquet_chunked(
if pq_file is None:
continue

schema = pq_file.schema.to_arrow_schema()
metadata = pq_file.metadata
schema = metadata.schema.to_arrow_schema()
if columns:
schema = pa.schema([schema.field(column) for column in columns], schema.metadata)

use_threads_flag: bool = use_threads if isinstance(use_threads, bool) else bool(use_threads > 1)
iterate_at_least_once = False
for chunk in pq_file.iter_batches(
batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False
):
iterate_at_least_once = True
table = _add_table_partitions(
table=pa.Table.from_batches([chunk], schema=schema),
path=path,
path_root=path_root,
)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
if chunked is True:
yield df
else:
if next_slice is not None:
df = pd.concat(objs=[next_slice, df], sort=False, copy=False)
while len(df.index) >= chunked:
yield df.iloc[:chunked, :].copy()
df = df.iloc[chunked:, :]
if df.empty:
next_slice = None
table_kwargs = {"path": path, "path_root": path_root}
if metadata.num_rows > 0:
for chunk in pq_file.iter_batches(
batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False
):
table = _add_table_partitions(table=pa.Table.from_batches([chunk], schema=schema), **table_kwargs)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
if chunked is True:
yield df
else:
next_slice = df
if not iterate_at_least_once:
table = _add_table_partitions(
table=pa.Table.from_batches([], schema=schema),
path=path,
path_root=path_root,
)
if next_slice is not None:
df = pd.concat(objs=[next_slice, df], sort=False, copy=False)
while len(df.index) >= chunked:
yield df.iloc[:chunked, :].copy()
df = df.iloc[chunked:, :]
if df.empty:
next_slice = None
else:
next_slice = df
else:
table = _add_table_partitions(table=pa.Table.from_batches([], schema=schema), **table_kwargs)
df = _table_to_df(table=table, kwargs=arrow_kwargs)
yield df

Expand Down

0 comments on commit fecdbd4

Please sign in to comment.