Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/pip/development-dependencies-8400…
Browse files Browse the repository at this point in the history
…e7d1a7
  • Loading branch information
LeonLuttenberger authored Aug 12, 2024
2 parents 44400ec + 108148c commit 03a9cb0
Show file tree
Hide file tree
Showing 10 changed files with 16 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ def __init__( # noqa: PLR0912,PLR0915
paths,
**dataset_kwargs,
filesystem=filesystem,
use_legacy_dataset=False,
)
except OSError as e:
_handle_read_os_error(e, paths)
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/modin/_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def pyarrow_types_from_pandas_distributed(
) -> dict[str, pa.DataType]:
"""Extract the related Pyarrow data types from a pandas DataFrame."""
func = ray_remote()(pyarrow_types_from_pandas)
first_block_object_ref = _ray_dataset_from_df(df).get_internal_block_refs()[0]
first_block_object_ref = next(_ray_dataset_from_df(df).iter_internal_ref_bundles()).block_refs[0]
return ray_get( # type: ignore[no-any-return]
func(
df=first_block_object_ref,
Expand Down
10 changes: 8 additions & 2 deletions awswrangler/distributed/ray/modin/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,21 @@ def _to_modin(

return from_partitions(
partitions=[
_block_to_df(block=block, to_pandas_kwargs=_to_pandas_kwargs) for block in dataset.get_internal_block_refs()
_block_to_df(block=block_ref, to_pandas_kwargs=_to_pandas_kwargs)
for ref_bundle in dataset.iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
],
axis=0,
index=index,
)


def _split_modin_frame(df: modin_pd.DataFrame, splits: int) -> list[ObjectRef[Any]]:
object_refs: list[ObjectRef[Any]] = _ray_dataset_from_df(df).get_internal_block_refs()
object_refs: list[ObjectRef[Any]] = [
block_ref
for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
]
return object_refs


Expand Down
6 changes: 5 additions & 1 deletion awswrangler/distributed/ray/modin/s3/_write_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ def write_partitions(df: pd.DataFrame, block_index: int) -> tuple[list[str], dic
)
return paths, partitions_values

block_object_refs = _ray_dataset_from_df(df).get_internal_block_refs()
block_object_refs = (
block_ref
for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
)
result = ray_get(
[write_partitions(object_ref, block_index) for block_index, object_ref in enumerate(block_object_refs)]
)
Expand Down
2 changes: 2 additions & 0 deletions awswrangler/s3/_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from awswrangler import _data_types, _utils, exceptions
from awswrangler._distributed import engine
from awswrangler._executor import _BaseExecutor, _get_executor
from awswrangler.annotations import Deprecated
from awswrangler.distributed.ray import ray_get
from awswrangler.s3._describe import size_objects
from awswrangler.s3._list import _path2list
Expand Down Expand Up @@ -148,6 +149,7 @@ def _select_query(
)


@Deprecated
@_utils.validate_distributed_kwargs(
unsupported_kwargs=["boto3_session"],
)
Expand Down
5 changes: 0 additions & 5 deletions tests/unit/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database,
assert len(wr.s3.list_objects(path=path3)) == 0


@pytest.mark.modin_index
def test_athena_read_sql_ctas_bucketing(path, path2, glue_table, glue_table2, glue_database, glue_ctas_database):
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]})
wr.s3.to_parquet(
Expand Down Expand Up @@ -1013,7 +1012,6 @@ def test_bucketing_catalog_parquet_table(path, glue_database, glue_table):
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -1102,7 +1100,6 @@ def test_bucketing_catalog_csv_table(path, glue_database, glue_table):
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -1168,7 +1165,6 @@ def test_bucketing_csv_dataset(path, glue_database, glue_table, bucketing_data,
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2, 3], [False, True, False, True], ["b", "c", "d", "e"]])
def test_combined_bucketing_partitioning_parquet_dataset(path, glue_database, glue_table, bucketing_data):
nb_of_buckets = 2
Expand Down Expand Up @@ -1296,7 +1292,6 @@ def test_combined_bucketing_partitioning_csv_dataset(path, glue_database, glue_t
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())


@pytest.mark.modin_index
def test_multiple_bucketing_columns_parquet_dataset(path, glue_database, glue_table):
nb_of_buckets = 2
df = pd.DataFrame({"c0": [0, 1, 2, 3], "c1": [4, 6, 5, 7], "c2": ["foo", "bar", "baz", "boo"]})
Expand Down
1 change: 0 additions & 1 deletion tests/unit/test_athena_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ def test_athena_csv_types(path, glue_database, glue_table):
ensure_data_types_csv(df2)


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [True, False])
@pytest.mark.parametrize("ctas_approach", [True, False])
@pytest.mark.parametrize("line_count", [1, 2])
Expand Down
1 change: 0 additions & 1 deletion tests/unit/test_athena_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,6 @@ def test_schema_evolution_disabled(path, glue_table, glue_database):
assert df2.c0.sum() == 3


@pytest.mark.modin_index
def test_date_cast(path, glue_table, glue_database):
df = pd.DataFrame(
{
Expand Down
8 changes: 0 additions & 8 deletions tests/unit/test_s3_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ def test_index_recovery_simple_str(path, use_threads):
assert_pandas_equals(df, df2)


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -447,7 +446,6 @@ def test_range_index_recovery_simple(path, use_threads):
assert_pandas_equals(df.reset_index(level=0), df2.reset_index(level=0))


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -498,7 +496,6 @@ def test_multi_index_recovery_nameless(path, use_threads):
assert_pandas_equals(df.reset_index(), df2.reset_index())


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=(wr.exceptions.InvalidArgumentCombination, AssertionError),
reason="Named index not working when partitioning to a single file",
Expand Down Expand Up @@ -535,7 +532,6 @@ def test_index_schema_validation(path, glue_database, glue_table, index):
assert_pandas_equals(pd.concat([df, df]), df2)


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -625,7 +621,6 @@ def test_to_parquet_dataset_sanitize(path):
assert df2.par.to_list() == ["a", "b"]


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [False, True, 2])
def test_timezone_file(path, use_threads):
file_path = f"{path}0.parquet"
Expand All @@ -636,7 +631,6 @@ def test_timezone_file(path, use_threads):
assert_pandas_equals(df, df2)


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [True, False, 2])
def test_timezone_file_columns(path, use_threads):
file_path = f"{path}0.parquet"
Expand Down Expand Up @@ -690,7 +684,6 @@ def test_validate_columns(path, partition_cols) -> None:
wr.s3.read_parquet(path, columns=["a", "b", "c"], dataset=True, validate_schema=True)


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand All @@ -715,7 +708,6 @@ def test_mixed_types_column(path) -> None:
wr.s3.to_parquet(df, path, dataset=True, partition_cols=["par"])


@pytest.mark.modin_index
@pytest.mark.parametrize("compression", [None, "snappy", "gzip", "zstd"])
def test_parquet_compression(path, compression) -> None:
df = pd.DataFrame({"id": [1, 2, 3]}, dtype="Int64")
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/test_s3_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def test_csv_dataset_header_modes(path, mode, glue_database, glue_table):
assert df_res.equals(dfs[-1])


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand All @@ -205,7 +204,6 @@ def test_json(path):
assert df1.equals(wr.s3.read_json(path=[path0, path1], use_threads=True))


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -366,7 +364,6 @@ def test_csv_line_terminator(path, line_terminator):
assert df.equals(df2)


@pytest.mark.modin_index
def test_read_json_versioned(path) -> None:
path_file = f"{path}0.json"
dfs = [
Expand Down

0 comments on commit 03a9cb0

Please sign in to comment.