Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Read partitioned parquet files from relative paths #3470

Merged
merged 7 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion ludwig/data/dataset/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import torch
from packaging import version
from pyarrow.fs import FSSpecHandler, PyFileSystem
from pyarrow.lib import ArrowInvalid
from ray.data import read_parquet
from ray.data.dataset_pipeline import DatasetPipeline

Expand Down Expand Up @@ -55,7 +56,20 @@
@default_retry()
def read_remote_parquet(path: str):
fs, path = get_fs_and_path(path)
return read_parquet(path, filesystem=PyFileSystem(FSSpecHandler(fs)))

# Fix for https://github.com/ludwig-ai/ludwig/issues/3440
# Parquet file reads will fail with `pyarrow.lib.ArrowInvalid` under the following conditions:
# 1) The Parquet data is in multi-file format
# 2) A relative filepath is passed to the read function
# 3) A filesystem object is passed to the read function
# The issue can be resolved by either:
# 1) Passing an absolute filepath
# 2) Not passing a filesystem object
try:
df = read_parquet(path, filesystem=PyFileSystem(FSSpecHandler(fs)))
except ArrowInvalid:
df = read_parquet(path)
return df


@DeveloperAPI
Expand Down
60 changes: 59 additions & 1 deletion tests/ludwig/data/test_ray_data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os
import shutil
from unittest import mock

import pandas as pd
import pytest

# Skip these tests if Ray is not installed
ray = pytest.importorskip("ray") # noqa
dask = pytest.importorskip("dask") # noqa

from ludwig.data.dataset.ray import RayDatasetBatcher # noqa
from ludwig.data.dataset.ray import RayDatasetBatcher, read_remote_parquet # noqa

# Mark the entire module as distributed
pytestmark = pytest.mark.distributed
Expand All @@ -31,3 +35,57 @@ def test_async_reader_error():
samples_per_epoch=100,
ignore_last=False,
)


@pytest.fixture(scope="module")
def parquet_file(ray_cluster_2cpu) -> str:
"""Write a multi-file parquet dataset to the cwd.

Returns:
The path to the parquet dataset.
"""
# The data needs to be written to a multi-file parquet format, otherwise the issue doesn't repro. To do this, we
# partitition a test dataframe with dask and then write to file.
df = pd.DataFrame({"col1": list(range(1000)), "col2": list(range(1000))})
df = dask.dataframe.from_pandas(df, chunksize=100)

# Typically we would write test data to a temporary directory, but the issue this was set up to test only happens
# when using relative filepaths.
cwd = os.getcwd()
filepath = os.path.join(cwd, "data.training.parquet")
df.to_parquet(filepath, engine="pyarrow")

yield filepath

# Clean up the data
shutil.rmtree(filepath)


@pytest.fixture(scope="module", params=["absolute", "relative"])
def parquet_filepath(parquet_file: str, request: "pytest.FixtureRequest") -> str:
"""Convert a filepath in the CWD to either an absolute or relative path.

Args:
parquet_file: Absolute path to a parquet file in the CWD
request: pytest request fixture with the fixture parameters

Returns:
Either the absolute or relative path of the parquet file.
"""
filepath_type = request.param
return parquet_file if filepath_type == "absolute" else os.path.basename(parquet_file)


def test_read_remote_parquet(parquet_filepath: str):
"""Test for the fix to https://github.com/ludwig-ai/ludwig/issues/3440.

Parquet file reads will fail with `pyarrow.lib.ArrowInvalid` under the following conditions:
1) The Parquet data is in multi-file format
2) A relative filepath is passed to the read function
3) A filesystem object is passed to the read function

The issue can be resolved by either:
1) Passing an absolute filepath
2) Not passing a filesystem object
"""
read_remote_parquet(parquet_filepath)
Loading