From cc487506f51970501947cafd4c4036d74574c0fa Mon Sep 17 00:00:00 2001 From: Frederic Kayser Date: Mon, 11 Nov 2024 18:16:43 +0100 Subject: [PATCH 1/3] fix: read parquet file in chunked mode per row group --- awswrangler/s3/_read_parquet.py | 47 ++++++++++++++++----------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/awswrangler/s3/_read_parquet.py b/awswrangler/s3/_read_parquet.py index f90c25b69..b160ae4fd 100644 --- a/awswrangler/s3/_read_parquet.py +++ b/awswrangler/s3/_read_parquet.py @@ -248,32 +248,31 @@ def _read_parquet_chunked( continue use_threads_flag: bool = use_threads if isinstance(use_threads, bool) else bool(use_threads > 1) - chunks = pq_file.iter_batches( + for chunk in pq_file.iter_batches( batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False - ) - - schema = pq_file.schema.to_arrow_schema() - if columns: - schema = pa.schema([schema.field(column) for column in columns], schema.metadata) - - table = _add_table_partitions( - table=pa.Table.from_batches(chunks, schema=schema), - path=path, - path_root=path_root, - ) - df = _table_to_df(table=table, kwargs=arrow_kwargs) - if chunked is True: - yield df - else: - if next_slice is not None: - df = pd.concat(objs=[next_slice, df], sort=False, copy=False) - while len(df.index) >= chunked: - yield df.iloc[:chunked, :].copy() - df = df.iloc[chunked:, :] - if df.empty: - next_slice = None + ): + schema = pq_file.schema.to_arrow_schema() + if columns: + schema = pa.schema([schema.field(column) for column in columns], schema.metadata) + + table = _add_table_partitions( + table=pa.Table.from_batches([chunk], schema=schema), + path=path, + path_root=path_root, + ) + df = _table_to_df(table=table, kwargs=arrow_kwargs) + if chunked is True: + yield df else: - next_slice = df + if next_slice is not None: + df = pd.concat(objs=[next_slice, df], sort=False, copy=False) + while len(df.index) >= chunked: + yield df.iloc[:chunked, :].copy() + df = df.iloc[chunked:, :] + if df.empty: + next_slice = None + else: + next_slice = df if next_slice is not None: yield next_slice From 5d51c451cb51436d82bebdceb1f4349843db5eb4 Mon Sep 17 00:00:00 2001 From: Frederic Kayser Date: Mon, 11 Nov 2024 20:27:44 +0100 Subject: [PATCH 2/3] fix: Fix test_empty_parquet --- awswrangler/s3/_read_parquet.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/awswrangler/s3/_read_parquet.py b/awswrangler/s3/_read_parquet.py index b160ae4fd..ca28916d7 100644 --- a/awswrangler/s3/_read_parquet.py +++ b/awswrangler/s3/_read_parquet.py @@ -247,14 +247,16 @@ def _read_parquet_chunked( if pq_file is None: continue + schema = pq_file.schema.to_arrow_schema() + if columns: + schema = pa.schema([schema.field(column) for column in columns], schema.metadata) + use_threads_flag: bool = use_threads if isinstance(use_threads, bool) else bool(use_threads > 1) + iterate_at_least_once = False for chunk in pq_file.iter_batches( batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False ): - schema = pq_file.schema.to_arrow_schema() - if columns: - schema = pa.schema([schema.field(column) for column in columns], schema.metadata) - + iterate_at_least_once = True table = _add_table_partitions( table=pa.Table.from_batches([chunk], schema=schema), path=path, @@ -273,6 +275,15 @@ def _read_parquet_chunked( next_slice = None else: next_slice = df + if not iterate_at_least_once: + table = _add_table_partitions( + table=pa.Table.from_batches([], schema=schema), + path=path, + path_root=path_root, + ) + df = _table_to_df(table=table, kwargs=arrow_kwargs) + yield df + if next_slice is not None: yield next_slice From fecdbd47ad5d4586160dc68b964eceacc7c6c5f2 Mon Sep 17 00:00:00 2001 From: Abdel Jaidi Date: Fri, 15 Nov 2024 13:07:52 +0000 Subject: [PATCH 3/3] feat: reduce memory impact of chunked reads --- awswrangler/s3/_read_parquet.py | 51 ++++++++++++++------------------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/awswrangler/s3/_read_parquet.py b/awswrangler/s3/_read_parquet.py index ca28916d7..fa697d013 100644 --- a/awswrangler/s3/_read_parquet.py +++ b/awswrangler/s3/_read_parquet.py @@ -247,40 +247,33 @@ def _read_parquet_chunked( if pq_file is None: continue - schema = pq_file.schema.to_arrow_schema() + metadata = pq_file.metadata + schema = metadata.schema.to_arrow_schema() if columns: schema = pa.schema([schema.field(column) for column in columns], schema.metadata) use_threads_flag: bool = use_threads if isinstance(use_threads, bool) else bool(use_threads > 1) - iterate_at_least_once = False - for chunk in pq_file.iter_batches( - batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False - ): - iterate_at_least_once = True - table = _add_table_partitions( - table=pa.Table.from_batches([chunk], schema=schema), - path=path, - path_root=path_root, - ) - df = _table_to_df(table=table, kwargs=arrow_kwargs) - if chunked is True: - yield df - else: - if next_slice is not None: - df = pd.concat(objs=[next_slice, df], sort=False, copy=False) - while len(df.index) >= chunked: - yield df.iloc[:chunked, :].copy() - df = df.iloc[chunked:, :] - if df.empty: - next_slice = None + table_kwargs = {"path": path, "path_root": path_root} + if metadata.num_rows > 0: + for chunk in pq_file.iter_batches( + batch_size=batch_size, columns=columns, use_threads=use_threads_flag, use_pandas_metadata=False + ): + table = _add_table_partitions(table=pa.Table.from_batches([chunk], schema=schema), **table_kwargs) + df = _table_to_df(table=table, kwargs=arrow_kwargs) + if chunked is True: + yield df else: - next_slice = df - if not iterate_at_least_once: - table = _add_table_partitions( - table=pa.Table.from_batches([], schema=schema), - path=path, - path_root=path_root, - ) + if next_slice is not None: + df = pd.concat(objs=[next_slice, df], sort=False, copy=False) + while len(df.index) >= chunked: + yield df.iloc[:chunked, :].copy() + df = df.iloc[chunked:, :] + if df.empty: + next_slice = None + else: + next_slice = df + else: + table = _add_table_partitions(table=pa.Table.from_batches([], schema=schema), **table_kwargs) df = _table_to_df(table=table, kwargs=arrow_kwargs) yield df