Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:fsspec/alluxiofs into changeConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
LuQQiu committed Mar 12, 2024
2 parents 0022f6b + 444f0e4 commit f807b99
Show file tree
Hide file tree
Showing 18 changed files with 326 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
run: |
pip install .[tests]
- name: Run tests
- name: Run Alluxio FileSystem tests
run: |
pytest -vv \
--log-format="%(asctime)s %(levelname)s %(message)s" \
Expand Down
151 changes: 139 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,157 @@
# Alluxio FileSystem

Alluxio filesystem spec implementation
This quickstart shows how you can use the FSSpec interface to connect to [Alluxio](https://github.com/Alluxio/alluxio).
For more information on what to expect, please read the blog [Accelerate data loading in large scale ML training with Ray and Alluxio](https://www.alluxio.io/blog/accelerating-data-loading-in-large-scale-ml-training-with-ray-and-alluxio/).

## Dependencies

* Launch Alluxio servers from https://github.com/Alluxio/alluxio.
* Install Alluxio Python Library https://github.com/Alluxio/alluxio-py.
### A running Alluxio server with ETCD membership service

## Install alluxiofs
Alluxio version >= 309

Launch Alluxio clusters with the example configuration
```config
# only one master, one worker are running in this example
alluxio.master.hostname=localhost
alluxio.worker.hostname=localhost
# Critical properties for this example
# UFS address (e.g., the src of data to cache), change it to your bucket
alluxio.dora.client.ufs.root=s3://example_bucket/datasets/
# storage dir
alluxio.worker.page.store.dirs=/tmp/page_ufs
# size of storage dir
alluxio.worker.page.store.sizes=10GB
# use etcd to keep consistent hashing ring
alluxio.worker.membership.manager.type=ETCD
# default etcd endpoint
alluxio.etcd.endpoints=http://localhost:2379
# number of vnodes per worker on the ring
alluxio.user.consistent.hash.virtual.node.count.per.worker=5
# Other optional settings, good to have
alluxio.job.batch.size=200
alluxio.master.journal.type=NOOP
alluxio.master.scheduler.initial.wait.time=10s
alluxio.network.netty.heartbeat.timeout=5min
alluxio.underfs.io.threads=50
```
cd alluxiofs && python3 setup.py bdist_wheel && pip3 install dist/alluxiofs-0.1-py3-none-any.whl

### Python Dependencies

Python in range of [3.8, 3.9, 3.10]
ray >= 2.8.2
fsspec released after 2023.6

#### Install fsspec implementation for underlying data storage

Alluxio fsspec acts as a cache on top of an existing underlying data lake storage connection.
The fsspec implementation corresponding to the underlying data lake storage needs to be installed.
In the below Alluxio configuration example, Amazon S3 is the data lake storage where the dataset is read from.

To connect to an existing underlying storage, there are two requirements
- Install the underlying storage fsspec
- For all [built-in storage fsspec](https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations), no extra python libraries are needed to be installed.
- For all [third-party storage fsspec](https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations), the third-party fsspec python libraries are needed to be installed.
- Set credentials for the underlying data lake storage

Example: Deploy S3 as the underlying data lake storage
[Install third-party S3 fsspec](https://s3fs.readthedocs.io/en/latest/)

```commandline
pip install s3fs
```

## Launch Alluxio FileSystem
#### Install alluxiofs

Minimum requirements to launch Alluxio on top of s3
Directly install the latest published alluxiofs
```
pip install alluxiofs
```

[Optional] Install from the source code
```commandline
git clone [email protected]:fsspec/alluxiofs.git
cd alluxiofs && python3 setup.py bdist_wheel && \
pip3 install dist/alluxiofs-<alluxiofs_version>-py3-none-any.whl
```

## Running a Hello World Example

### Load the dataset

#### Load dataset using Alluxio CLI load command

````commandline
bin/alluxio job load --path s3://example_bucket/datasets/ --submit
````
This will trigger a load job asynchronously with a job ID specified. You can wait until the load finishes or check the progress of this loading process using the following command:

````commandline
bin/alluxio job load --path s3://example_bucket/datasets/ --progress
````

### Create a AlluxioFS (backed by S3)

Create the Alluxio Filesystem with data backed in S3

```
import fsspec
from alluxiofs import AlluxioFileSystem
# Register Alluxio to fsspec
fsspec.register_implementation("alluxio", AlluxioFileSystem, clobber=True)
alluxio = fsspec.filesystem("alluxio", etcd_host=args.etcd_host, target_protocol="s3")
# Create Alluxio filesystem
alluxio_fs = fsspec.filesystem("alluxio", etcd_hosts="localhost", etcd_port=2379, target_protocol="s3")
```

### Run Alluxio FileSystem operations

Similar to [fsspec examples](https://filesystem-spec.readthedocs.io/en/latest/usage.html#use-a-file-system) and [alluxiofs](https://github.com/fsspec/alluxiofs/blob/main/tests/test_alluxio_fsspec.py) examples.
Note that all the read operations can only succeed if the parent folder has been loaded into Alluxio.
```
See concrete options descriptions at [Alluxio filesystem initialization description](alluxiofs/core.py)
See a more concrete example at [tests/test_alluxio_fsspec.py](tests/alluxiofs/test_alluxio_fsspec.py)
# list files
contents = alluxio_fs.ls("s3://apc999/datasets/nyc-taxi-csv/green-tripdata/", detail=True)
## Development
# Read files
with alluxio_fs.open("s3://apc999/datasets/nyc-taxi-csv/green-tripdata/green_tripdata_2021-01.csv", "rb") as f:
data = f.read()
```

### Running an example with Ray

See [Contributions](CONTRIBUTING.md) for guidelines around making new contributions and reviewing them.
```
import fsspec
import ray
from alluxiofs import AlluxioFileSystem
# Register the Alluxio fsspec implementation
fsspec.register_implementation("alluxio", AlluxioFileSystem, clobber=True)
alluxio_fs = fsspec.filesystem(
"alluxio", etcd_hosts="localhost", target_protocol="s3"
)
# Pass the initialized Alluxio filesystem to Ray and read the NYC taxi ride data set
ds = ray.data.read_csv("s3://example_bucket/datasets/example.csv", filesystem=alluxio_fs)
# Get a count of the number of records in the single CSV file
ds.count()
# Display the schema derived from the CSV file header record
ds.schema()
# Display the header record
ds.take(1)
# Display the first data record
ds.take(2)
# Read multiple CSV files:
ds2 = ray.data.read_csv("s3://apc999/datasets/csv_dir/", filesystem=alluxio_fs)
# Get a count of the number of records in the twelve CSV files
ds2.count()
# End of Python example
```
Empty file.
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
filterwarnings =
ignore::DeprecationWarning
9 changes: 9 additions & 0 deletions tests/client/test_docker_alluxio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from tests.conftest import TEST_ROOT


def test_simple(alluxio_client):
alluxio_client.listdir(TEST_ROOT) # no error


def test_simple_etcd(etcd_alluxio_client):
etcd_alluxio_client.listdir(TEST_ROOT) # no error
66 changes: 66 additions & 0 deletions tests/client/test_metadata_docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os

from alluxiofs import AlluxioClient
from tests.conftest import TEST_DIR
from tests.conftest import TEST_ROOT


def validate_list_get_status(
alluxio_client: AlluxioClient, alluxio_path, local_path
):
validate_get_status(alluxio_client, alluxio_path, local_path)
if not os.path.isdir(local_path):
return
alluxio_res = alluxio_client.listdir(alluxio_path)
alluxio_listing = {item.name: item for item in alluxio_res}
local_listing = list(os.scandir(local_path))
assert len(local_listing) == len(
alluxio_listing
), "Local listing result has different length compared to alluxio_listing"
for entry in local_listing:
assert (
entry.name in alluxio_listing
), f"{entry.name} is missing in Alluxio listing"
alluxio_entry = alluxio_listing[entry.name]
expected_type = "directory" if entry.is_dir() else "file"
assert (
expected_type == alluxio_entry.type
), f"Type mismatch for {entry.name}: expected {expected_type}, got {alluxio_entry.type}"
local_mod_time_ms = int(entry.stat().st_mtime * 1000)
alluxio_mod_time_ms = alluxio_entry.last_modification_time_ms
assert (
abs(local_mod_time_ms - alluxio_mod_time_ms) <= 1000
), f"Last modification time mismatch for {entry.name}: expected {local_mod_time_ms}, got {alluxio_mod_time_ms}"
if expected_type == "file":
assert (
entry.stat().st_size == alluxio_entry.length
), f"Size mismatch for {entry.name}: expected {entry.stat().st_size}, got {alluxio_entry.length}"
validate_list_get_status(
alluxio_client,
os.path.join(alluxio_path, alluxio_entry.name),
os.path.join(local_path, entry.name),
)


def validate_get_status(
alluxio_client: AlluxioClient, alluxio_path: str, local_path: str
):
alluxio_status = alluxio_client.get_file_status(alluxio_path)
local_status = os.stat(local_path)
expected_type = "directory" if os.path.isdir(local_path) else "file"
assert (
expected_type == alluxio_status.type
), f"Type mismatch for {alluxio_path}: expected {expected_type}, got {alluxio_status.type}"
local_mod_time_ms = int(local_status.st_mtime * 1000)
assert (
abs(local_mod_time_ms - alluxio_status.last_modification_time_ms)
<= 1000
), f"Last modification time mismatch for {alluxio_path}"
if expected_type == "file":
assert (
local_status.st_size == alluxio_status.length
), f"Size mismatch for {alluxio_path}: expected {local_status.st_size}, got {alluxio_status.length}"


def test_list_get_status(alluxio_client: AlluxioClient):
validate_list_get_status(alluxio_client, TEST_ROOT, TEST_DIR)
4 changes: 4 additions & 0 deletions tests/client/test_read_range_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,7 @@ def test_alluxio_client(alluxio_client: AlluxioClient):
length,
)
LOGGER.debug("Passed corner test cases")


def test_etcd_alluxio_client(etcd_alluxio_client: AlluxioClient):
test_alluxio_client(etcd_alluxio_client)
Loading

0 comments on commit f807b99

Please sign in to comment.