Skip to content

Commit

Permalink
Add rust extension lib alluxiocommon to the repo (#39)
Browse files Browse the repository at this point in the history
* Port changes from https://github.com/lucyge2022/alluxiofs/tree/multithread

5f80059 small fixes for REST and metrics
1c426c2 add access debug logs for all fsspec ops & add quick sanity unit tests for read
7dcb833 checkin multi process http benchmark for worker http server
4b4ebce use alluxiocommon pyo3 package to try to do multithread http req
523f748 add rust pyo3 extension
8979d2b remove first
0367659 add related files
4d0eebe small fixes
a0519b3 if any error on any thread running result, raise exception back to python directly

* missing changes from port

* fix the bug of inpage_read_len calculation

* make ondemand pool default and enable it by flag during instantiation

* cleanup fixes

* WIP add fixture to test with alluxio client with alluxiocommon lib enabled

* for read pages if we got any data just return without throwing any following exception

* add _all_page_generator with alluxiocommon

* 1. add pytest fixture for alluxiocommon enabled to use in test_docker_alluxio_fsspec.py
2. make tests/client/test_read_range_docker.py (now named as test_read_docker.py
to handle full read as well
add alluxiocommon fixture in it too

* remove unnecessary files

* try adding a ci yaml file with alluxiocommon enabled as dependency

* cleanup fixes

* add to ci.yml of alluxiocommon dependency

* add rust/alluxiocommon/tests/

* fixes

* remove changes in ci.yml as it wont run

* try add alluxiocommon pyo3 build

* fix pyo3 related

* pre-commit style changes + manually install openssl

* chanegs

* try to use rustls instead of rust-native-tls as reqwest dependency as rust-native-tls requires dynamic link of tls in runtime env

* enable run alluxiocommon tests job in ci.yml

* reorder to have alluxiocommon install after alluxiocommon wheel building

* try fix pip install alluxiocommon from dist/

* add --no-index

* add ls dist/ to debug

* add pyo3 with python version when buildoing

* merge pytest for alluxiocommon and alluxiofs together

* syntax fix

* syntax fix

* try reorder

* name change

* modify rust/alluxiocommon/README.md and cleanup ci.yml

* remove comments for debugging purpose before
  • Loading branch information
lucyge2022 authored Apr 29, 2024
1 parent 8c6dc03 commit 08429c4
Show file tree
Hide file tree
Showing 16 changed files with 2,050 additions and 18 deletions.
33 changes: 25 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.8","3.9"]
target: [x86_64]

steps:
- name: Checkout source
uses: actions/checkout@v4
# - name: Checkout source
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Setup Conda Environment
uses: mamba-org/setup-micromamba@v1
Expand All @@ -34,15 +38,28 @@ jobs:
conda list
conda --version
- name: Install dependencies
run: |
pip install .[tests]
# alluxiocommon related
- name: Build alluxiocommon wheels
uses: PyO3/maturin-action@v1
with:
target: ${{ matrix.target }}
args: --release -i python${{ matrix.python-version }} --out dist -m rust/alluxiocommon/Cargo.toml
manylinux: auto

- name: Run Alluxio FileSystem tests
- name: Install all packages and Run AlluxioCommon tests
shell: bash
run: |
set -e
pip install .[tests]
pip install alluxiocommon --no-index --find-links=dist/ --force-reinstall
pip install pytest
pytest -vv \
--log-format="%(asctime)s %(levelname)s %(message)s" \
--log-date-format="%H:%M:%S" \
rust/alluxiocommon/tests/
pytest -vv \
--log-format="%(asctime)s %(levelname)s %(message)s" \
--log-date-format="%H:%M:%S" \
--log-format="%(asctime)s %(levelname)s %(message)s" \
--log-date-format="%H:%M:%S" \
tests/
lint:
Expand Down
2 changes: 2 additions & 0 deletions alluxiofs/client/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE = 28080
ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE = 5
ALLUXIO_SUCCESS_IDENTIFIER = "success"
ALLUXIO_COMMON_EXTENSION_ENABLE = "alluxio.common.extension.enable"
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE = "alluxio.common.ondemandpool.disable"
LIST_URL_FORMAT = "http://{worker_host}:{http_port}/v1/files"
FULL_PAGE_URL_FORMAT = (
"http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}"
Expand Down
122 changes: 113 additions & 9 deletions alluxiofs/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@
import requests
from requests.adapters import HTTPAdapter

try:
from alluxiocommon import _DataManager
except ModuleNotFoundError:
print(
"[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke."
)

from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE
from .const import ALLUXIO_COMMON_EXTENSION_ENABLE
from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY1
from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY2
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
Expand Down Expand Up @@ -177,6 +186,7 @@ def __init__(
# parse options
page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
self.data_manager = None
if options:
if ALLUXIO_PAGE_SIZE_KEY in options:
page_size = options[ALLUXIO_PAGE_SIZE_KEY]
Expand All @@ -195,6 +205,20 @@ def __init__(
self.logger.debug(
f"Hash node per worker is set to {hash_node_per_worker}"
)
if (
ALLUXIO_COMMON_EXTENSION_ENABLE in options
and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true"
):
print("Using alluxiocommon extension..")
self.logger.debug("alluxiocommon extension enabled.")
ondemand_pool_disabled = (
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options
and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower()
== "true"
)
self.data_manager = _DataManager(
concurrency, ondemand_pool_disabled=ondemand_pool_disabled
)
if (
not isinstance(hash_node_per_worker, int)
or hash_node_per_worker <= 0
Expand Down Expand Up @@ -482,11 +506,18 @@ def read(self, file_path):
)
path_id = self._get_path_hash(file_path)
try:
return b"".join(
self._all_page_generator(
worker_host, worker_http_port, path_id
if self.data_manager:
return b"".join(
self._all_page_generator_alluxiocommon(
worker_host, worker_http_port, path_id
)
)
else:
return b"".join(
self._all_page_generator(
worker_host, worker_http_port, path_id
)
)
)
except Exception as e:
raise Exception(
f"Error when reading file {file_path}: error {e}"
Expand All @@ -504,6 +535,7 @@ def read_range(self, file_path, offset, length):
Returns:
file content (str): The file content with length from offset
"""
self.logger.debug(f"read_range,off:{offset}:length:{length}")
self._validate_path(file_path)
if not isinstance(offset, int) or offset < 0:
raise ValueError("Offset must be a non-negative integer")
Expand All @@ -528,15 +560,20 @@ def read_range(self, file_path, offset, length):
path_id = self._get_path_hash(file_path)

try:
return b"".join(
self._range_page_generator(
if self.data_manager:
return self._range_page_generator_alluxiocommon(
worker_host, worker_http_port, path_id, offset, length
)
)
else:
return b"".join(
self._range_page_generator(
worker_host, worker_http_port, path_id, offset, length
)
)
except Exception as e:
raise Exception(
f"Error when reading file {file_path}: error {e}: "
f"worker_host{worker_host}, worker_http_port:{worker_http_port}"
f"Error when reading file:{file_path}: error:{e}: "
f"worker_host:{worker_host}, worker_http_port:{worker_http_port}"
) from e

def write_page(self, file_path, page_index, page_bytes):
Expand Down Expand Up @@ -574,6 +611,39 @@ def write_page(self, file_path, page_index, page_bytes):
f"Error writing to file {file_path} at page {page_index}: {e}"
)

def _all_page_generator_alluxiocommon(
self, worker_host, worker_http_port, path_id
):
page_index = 0
fetching_pages_num_each_round = 4
while True:
read_urls = []
try:
for _ in range(fetching_pages_num_each_round):
page_url = FULL_PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
page_index=page_index,
)
read_urls.append(page_url)
page_index += 1
pages_content = self.data_manager.make_multi_http_req(
read_urls
)
yield pages_content
if (
len(pages_content)
< fetching_pages_num_each_round * self.page_size
):
break
except Exception as e:
# data_manager won't throw exception if there are any first few content retrieved
# hence we always propagte exception from data_manager upwards
raise Exception(
f"Error when reading all pages of {path_id}: error {e}"
) from e

def _all_page_generator(self, worker_host, worker_http_port, path_id):
page_index = 0
while True:
Expand All @@ -596,6 +666,40 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id):
break
page_index += 1

def _range_page_generator_alluxiocommon(
self, worker_host, worker_http_port, path_id, offset, length
):
read_urls = []
start = offset
while start < offset + length:
page_index = start // self.page_size
inpage_off = start % self.page_size
inpage_read_len = min(
self.page_size - inpage_off, offset + length - start
)
page_url = None
if inpage_off == 0 and inpage_read_len == self.page_size:
page_url = FULL_PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
page_index=page_index,
)
else:
page_url = PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
page_index=page_index,
page_offset=inpage_off,
page_length=inpage_read_len,
)
read_urls.append(page_url)
start += inpage_read_len
self.logger.debug(f"read_urls:{read_urls}")
data = self.data_manager.make_multi_http_req(read_urls)
return data

def _range_page_generator(
self, worker_host, worker_http_port, path_id, offset, length
):
Expand Down
1 change: 1 addition & 0 deletions alluxiofs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def fallback_wrapper(self, path, *args, **kwargs):
path = self._strip_alluxio_protocol(path)
try:
if self.alluxio:
self.logger.debug(f"calling {alluxio_impl.__name__}")
return alluxio_impl(self, path, *args, **kwargs)
except Exception as e:
if not isinstance(e, NotImplementedError):
Expand Down
5 changes: 5 additions & 0 deletions benchmark/AbstractBench.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,14 @@ def get_protocol(self, full_path: str) -> str:

def init(self):
# protocol = self.get_protocol(self.args.path)
alluxio_options = {
# "alluxio.common.extension.enable": "True",
"alluxio.worker.page.store.page.size": "1MB"
}
self.alluxio_fs = AlluxioFileSystem(
etcd_hosts=self.args.etcd_hosts,
worker_hosts=self.args.worker_hosts,
options=alluxio_options,
# target_protocol=protocol
)
self.traverse(self.args.path)
Expand Down
36 changes: 36 additions & 0 deletions rust/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
## A PyO3 based common native extension lib for alluxio python client

### Developer Prerequisites:
- Install Rust:
https://www.rust-lang.org/tools/install
- Install maturin:
https://www.maturin.rs/installation


### To build developer version locally and play:

1) create virtualenv, (a tool used to create isolated Python environments):


python3 -m venv .env
source .env/bin/activate
maturin develop

2) then can start using:


python3
>>> from alluxiocommon import _DataManager
>>> dm = _DataManager(4)
>>> # do something with dm...
### To build wheel package and install with pip:

#in rust/alluxiocommon dir:
$ maturin build --out <alluxiofs_home>/dist -m <alluxiofs_home>/rust/alluxiocommon/Cargo.toml -i python<x.y> (python version such as 3.8)
#then find .whl package in <alluxiofs_home>/dist:
[root@ip-XXX-XX-XX-XX alluxiofs]# ls -l dist/
total 21848
-rw-r--r--. 1 root root 22318133 Apr 28 05:31 alluxiocommon-0.1.0-cp38-cp38-linux_x86_64.whl
#install with pip
$ pip install dist/alluxiocommon-0.1.0-cp38-cp38-linux_x86_64.whl --force-reinstall
Loading

0 comments on commit 08429c4

Please sign in to comment.