Skip to content

Commit

Permalink
fix: Read partitioned parquet files from relative paths (#3470)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffkinnison authored Aug 1, 2023
1 parent 079429e commit d146799
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
16 changes: 15 additions & 1 deletion ludwig/data/dataset/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import torch
from packaging import version
from pyarrow.fs import FSSpecHandler, PyFileSystem
from pyarrow.lib import ArrowInvalid
from ray.data import read_parquet
from ray.data.dataset_pipeline import DatasetPipeline

Expand Down Expand Up @@ -55,7 +56,20 @@
@default_retry()
def read_remote_parquet(path: str):
fs, path = get_fs_and_path(path)
return read_parquet(path, filesystem=PyFileSystem(FSSpecHandler(fs)))

# Fix for https://github.com/ludwig-ai/ludwig/issues/3440
# Parquet file reads will fail with `pyarrow.lib.ArrowInvalid` under the following conditions:
# 1) The Parquet data is in multi-file format
# 2) A relative filepath is passed to the read function
# 3) A filesystem object is passed to the read function
# The issue can be resolved by either:
# 1) Passing an absolute filepath
# 2) Not passing a filesystem object
try:
df = read_parquet(path, filesystem=PyFileSystem(FSSpecHandler(fs)))
except ArrowInvalid:
df = read_parquet(path)
return df


@DeveloperAPI
Expand Down
60 changes: 59 additions & 1 deletion tests/ludwig/data/test_ray_data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os
import shutil
from unittest import mock

import pandas as pd
import pytest

# Skip these tests if Ray is not installed
ray = pytest.importorskip("ray") # noqa
dask = pytest.importorskip("dask") # noqa

from ludwig.data.dataset.ray import RayDatasetBatcher # noqa
from ludwig.data.dataset.ray import RayDatasetBatcher, read_remote_parquet # noqa

# Mark the entire module as distributed
pytestmark = pytest.mark.distributed
Expand All @@ -31,3 +35,57 @@ def test_async_reader_error():
samples_per_epoch=100,
ignore_last=False,
)


@pytest.fixture(scope="module")
def parquet_file(ray_cluster_2cpu) -> str:
"""Write a multi-file parquet dataset to the cwd.
Returns:
The path to the parquet dataset.
"""
# The data needs to be written to a multi-file parquet format, otherwise the issue doesn't repro. To do this, we
# partitition a test dataframe with dask and then write to file.
df = pd.DataFrame({"col1": list(range(1000)), "col2": list(range(1000))})
df = dask.dataframe.from_pandas(df, chunksize=100)

# Typically we would write test data to a temporary directory, but the issue this was set up to test only happens
# when using relative filepaths.
cwd = os.getcwd()
filepath = os.path.join(cwd, "data.training.parquet")
df.to_parquet(filepath, engine="pyarrow")

yield filepath

# Clean up the data
shutil.rmtree(filepath)


@pytest.fixture(scope="module", params=["absolute", "relative"])
def parquet_filepath(parquet_file: str, request: "pytest.FixtureRequest") -> str:
"""Convert a filepath in the CWD to either an absolute or relative path.
Args:
parquet_file: Absolute path to a parquet file in the CWD
request: pytest request fixture with the fixture parameters
Returns:
Either the absolute or relative path of the parquet file.
"""
filepath_type = request.param
return parquet_file if filepath_type == "absolute" else os.path.basename(parquet_file)


def test_read_remote_parquet(parquet_filepath: str):
"""Test for the fix to https://github.com/ludwig-ai/ludwig/issues/3440.
Parquet file reads will fail with `pyarrow.lib.ArrowInvalid` under the following conditions:
1) The Parquet data is in multi-file format
2) A relative filepath is passed to the read function
3) A filesystem object is passed to the read function
The issue can be resolved by either:
1) Passing an absolute filepath
2) Not passing a filesystem object
"""
read_remote_parquet(parquet_filepath)

0 comments on commit d146799

Please sign in to comment.