Skip to content

Commit

Permalink
fix usage of iter_internal_ref_bundles
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonLuttenberger committed Aug 8, 2024
1 parent 28caac8 commit 7a3d65a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/modin/_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def pyarrow_types_from_pandas_distributed(
) -> dict[str, pa.DataType]:
"""Extract the related Pyarrow data types from a pandas DataFrame."""
func = ray_remote()(pyarrow_types_from_pandas)
first_block_object_ref = next(_ray_dataset_from_df(df).iter_internal_ref_bundles())
first_block_object_ref = next(_ray_dataset_from_df(df).iter_internal_ref_bundles()).block_refs[0]
return ray_get( # type: ignore[no-any-return]
func(
df=first_block_object_ref,
Expand Down
11 changes: 8 additions & 3 deletions awswrangler/distributed/ray/modin/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,21 @@ def _to_modin(

return from_partitions(
partitions=[
_block_to_df(block=block, to_pandas_kwargs=_to_pandas_kwargs)
for block in dataset.iter_internal_ref_bundles()
_block_to_df(block=block_ref, to_pandas_kwargs=_to_pandas_kwargs)
for ref_bundle in dataset.iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
],
axis=0,
index=index,
)


def _split_modin_frame(df: modin_pd.DataFrame, splits: int) -> list[ObjectRef[Any]]:
object_refs: list[ObjectRef[Any]] = list(_ray_dataset_from_df(df).iter_internal_ref_bundles())
object_refs: list[ObjectRef[Any]] = [
block_ref
for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
]
return object_refs


Expand Down
6 changes: 5 additions & 1 deletion awswrangler/distributed/ray/modin/s3/_write_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ def write_partitions(df: pd.DataFrame, block_index: int) -> tuple[list[str], dic
)
return paths, partitions_values

block_object_refs = _ray_dataset_from_df(df).iter_internal_ref_bundles()
block_object_refs = (
block_ref
for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
)
result = ray_get(
[write_partitions(object_ref, block_index) for block_index, object_ref in enumerate(block_object_refs)]
)
Expand Down

0 comments on commit 7a3d65a

Please sign in to comment.