Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rust extension lib alluxiocommon to the repo #39

Merged
merged 34 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
22c7f42
Port changes from https://github.com/lucyge2022/alluxiofs/tree/multit…
lucyge2022 Apr 12, 2024
814629d
missing changes from port
lucyge2022 Apr 12, 2024
8913fa8
fix the bug of inpage_read_len calculation
lucyge2022 Apr 18, 2024
e321c84
make ondemand pool default and enable it by flag during instantiation
lucyge2022 Apr 23, 2024
1d5d5f8
cleanup fixes
lucyge2022 Apr 23, 2024
5b20fac
WIP add fixture to test with alluxio client with alluxiocommon lib en…
lucyge2022 Apr 24, 2024
e9bc8c0
for read pages if we got any data just return without throwing any fo…
lucyge2022 Apr 24, 2024
9efb902
add _all_page_generator with alluxiocommon
lucyge2022 Apr 25, 2024
a226893
1. add pytest fixture for alluxiocommon enabled to use in test_docker…
lucyge2022 Apr 25, 2024
9cda7cf
remove unnecessary files
lucyge2022 Apr 25, 2024
9d3c31a
try adding a ci yaml file with alluxiocommon enabled as dependency
lucyge2022 Apr 26, 2024
c585c8a
cleanup fixes
lucyge2022 Apr 26, 2024
b517162
add to ci.yml of alluxiocommon dependency
lucyge2022 Apr 26, 2024
b5b5b8b
add rust/alluxiocommon/tests/
lucyge2022 Apr 26, 2024
a24652f
fixes
lucyge2022 Apr 26, 2024
647e3ea
remove changes in ci.yml as it wont run
lucyge2022 Apr 26, 2024
083a2a5
try add alluxiocommon pyo3 build
lucyge2022 Apr 26, 2024
5fe1c1d
fix pyo3 related
lucyge2022 Apr 26, 2024
3eccac5
pre-commit style changes + manually install openssl
lucyge2022 Apr 27, 2024
c9c1dbc
chanegs
lucyge2022 Apr 27, 2024
f44d550
try to use rustls instead of rust-native-tls as reqwest dependency as…
lucyge2022 Apr 28, 2024
b93e580
enable run alluxiocommon tests job in ci.yml
lucyge2022 Apr 28, 2024
0b23d38
reorder to have alluxiocommon install after alluxiocommon wheel building
lucyge2022 Apr 28, 2024
e509554
try fix pip install alluxiocommon from dist/
lucyge2022 Apr 28, 2024
006ffd8
add --no-index
lucyge2022 Apr 28, 2024
e0a62b4
add ls dist/ to debug
lucyge2022 Apr 28, 2024
b50cd32
add pyo3 with python version when buildoing
lucyge2022 Apr 28, 2024
7571b50
merge pytest for alluxiocommon and alluxiofs together
lucyge2022 Apr 28, 2024
06fd446
syntax fix
lucyge2022 Apr 28, 2024
c8d9743
syntax fix
lucyge2022 Apr 28, 2024
b2deaff
try reorder
lucyge2022 Apr 29, 2024
03a16da
name change
lucyge2022 Apr 29, 2024
1de7808
modify rust/alluxiocommon/README.md and cleanup ci.yml
lucyge2022 Apr 29, 2024
8f73b59
remove comments for debugging purpose before
lucyge2022 Apr 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,67 @@ defaults:
shell: bash -l -eo pipefail {0}

jobs:
alluxiocommon:
runs-on: ubuntu-latest
strategy:
fail-fase: false
# matrix:
# platform:
# - runner: ubuntu-latest
# target: x86_64
# - runner: ubuntu-latest
# target: x86
# - runner: ubuntu-latest
# target: aarch64
# - runner: ubuntu-latest
# target: armv7
# - runner: ubuntu-latest
# target: s390x
# - runner: ubuntu-latest
# target: ppc64le
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Build wheels
uses: PyO3/maturin-action@v1
with:
target: ${{ matrix.platform.target }}
args: --release --out dist --find-interpreter --manifest-path rust/alluxiocommon/Cargo.toml
sccache: 'true'
manylinux: auto
# - name: Upload wheels
# uses: actions/upload-artifact@v4
# with:
# name: wheels-linux-${{ matrix.platform.target }}
# path: dist
# - name: pytest
# if: ${{ startsWith(matrix.platform.target, 'x86_64') }}
# shell: bash
# run: |
# set -e
# pip install alluxiocommon --find-links dist --force-reinstall
# pip install pytest
# cd /root/github/alluxiofs/rust/alluxiocommon && pytest
# - name: pytest
# if: ${{ !startsWith(matrix.platform.target, 'x86') && matrix.platform.target != 'ppc64' }}
# uses: uraimo/[email protected]
# with:
# arch: ${{ matrix.platform.target }}
# distro: ubuntu22.04
# githubToken: ${{ github.token }}
# install: |
# apt-get update
# apt-get install -y --no-install-recommends python3 python3-pip
# pip3 install -U pip pytest
# run: |
# set -e
# pip3 install alluxiocommon --find-links dist --force-reinstall
# cd /root/github/alluxiofs/rust/alluxiocommon && pytest

test:
needs: alluxiocommon
name: Test
runs-on: ubuntu-latest
timeout-minutes: 10
Expand Down
2 changes: 2 additions & 0 deletions alluxiofs/client/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE = 28080
ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE = 5
ALLUXIO_SUCCESS_IDENTIFIER = "success"
ALLUXIO_COMMON_EXTENSION_ENABLE = "alluxio.common.extension.enable"
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE = "alluxio.common.ondemandpool.disable"
LIST_URL_FORMAT = "http://{worker_host}:{http_port}/v1/files"
FULL_PAGE_URL_FORMAT = (
"http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}"
Expand Down
122 changes: 113 additions & 9 deletions alluxiofs/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@
import requests
from requests.adapters import HTTPAdapter

try:
from alluxiocommon import _DataManager
except ModuleNotFoundError:
print(
"[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke."
)

from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE
from .const import ALLUXIO_COMMON_EXTENSION_ENABLE
from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY1
from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY2
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
Expand Down Expand Up @@ -177,6 +186,7 @@ def __init__(
# parse options
page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
self.data_manager = None
if options:
if ALLUXIO_PAGE_SIZE_KEY in options:
page_size = options[ALLUXIO_PAGE_SIZE_KEY]
Expand All @@ -195,6 +205,20 @@ def __init__(
self.logger.debug(
f"Hash node per worker is set to {hash_node_per_worker}"
)
if (
ALLUXIO_COMMON_EXTENSION_ENABLE in options
and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true"
):
print("Using alluxiocommon extension..")
self.logger.debug("alluxiocommon extension enabled.")
ondemand_pool_disabled = (
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options
and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower()
== "true"
)
self.data_manager = _DataManager(
concurrency, ondemand_pool_disabled=ondemand_pool_disabled
)
if (
not isinstance(hash_node_per_worker, int)
or hash_node_per_worker <= 0
Expand Down Expand Up @@ -482,11 +506,18 @@ def read(self, file_path):
)
path_id = self._get_path_hash(file_path)
try:
return b"".join(
self._all_page_generator(
worker_host, worker_http_port, path_id
if self.data_manager:
return b"".join(
self._all_page_generator_alluxiocommon(
worker_host, worker_http_port, path_id
)
)
else:
return b"".join(
self._all_page_generator(
worker_host, worker_http_port, path_id
)
)
)
except Exception as e:
raise Exception(
f"Error when reading file {file_path}: error {e}"
Expand All @@ -504,6 +535,7 @@ def read_range(self, file_path, offset, length):
Returns:
file content (str): The file content with length from offset
"""
self.logger.debug(f"read_range,off:{offset}:length:{length}")
self._validate_path(file_path)
if not isinstance(offset, int) or offset < 0:
raise ValueError("Offset must be a non-negative integer")
Expand All @@ -528,15 +560,20 @@ def read_range(self, file_path, offset, length):
path_id = self._get_path_hash(file_path)

try:
return b"".join(
self._range_page_generator(
if self.data_manager:
return self._range_page_generator_alluxiocommon(
worker_host, worker_http_port, path_id, offset, length
)
)
else:
return b"".join(
self._range_page_generator(
worker_host, worker_http_port, path_id, offset, length
)
)
except Exception as e:
raise Exception(
f"Error when reading file {file_path}: error {e}: "
f"worker_host{worker_host}, worker_http_port:{worker_http_port}"
f"Error when reading file:{file_path}: error:{e}: "
f"worker_host:{worker_host}, worker_http_port:{worker_http_port}"
) from e

def write_page(self, file_path, page_index, page_bytes):
Expand Down Expand Up @@ -574,6 +611,39 @@ def write_page(self, file_path, page_index, page_bytes):
f"Error writing to file {file_path} at page {page_index}: {e}"
)

def _all_page_generator_alluxiocommon(
self, worker_host, worker_http_port, path_id
):
page_index = 0
fetching_pages_num_each_round = 4
while True:
read_urls = []
try:
for _ in range(fetching_pages_num_each_round):
page_url = FULL_PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
page_index=page_index,
)
read_urls.append(page_url)
page_index += 1
pages_content = self.data_manager.make_multi_http_req(
read_urls
)
yield pages_content
if (
len(pages_content)
< fetching_pages_num_each_round * self.page_size
):
break
except Exception as e:
# data_manager won't throw exception if there are any first few content retrieved
# hence we always propagte exception from data_manager upwards
raise Exception(
f"Error when reading all pages of {path_id}: error {e}"
) from e

def _all_page_generator(self, worker_host, worker_http_port, path_id):
page_index = 0
while True:
Expand All @@ -596,6 +666,40 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id):
break
page_index += 1

def _range_page_generator_alluxiocommon(
self, worker_host, worker_http_port, path_id, offset, length
):
read_urls = []
start = offset
while start < offset + length:
page_index = start // self.page_size
inpage_off = start % self.page_size
inpage_read_len = min(
self.page_size - inpage_off, offset + length - start
)
page_url = None
if inpage_off == 0 and inpage_read_len == self.page_size:
page_url = FULL_PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
page_index=page_index,
)
else:
page_url = PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
page_index=page_index,
page_offset=inpage_off,
page_length=inpage_read_len,
)
read_urls.append(page_url)
start += inpage_read_len
self.logger.debug(f"read_urls:{read_urls}")
data = self.data_manager.make_multi_http_req(read_urls)
return data

def _range_page_generator(
self, worker_host, worker_http_port, path_id, offset, length
):
Expand Down
1 change: 1 addition & 0 deletions alluxiofs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def fallback_wrapper(self, path, *args, **kwargs):
path = self._strip_alluxio_protocol(path)
try:
if self.alluxio:
self.logger.debug(f"calling {alluxio_impl.__name__}")
return alluxio_impl(self, path, *args, **kwargs)
except Exception as e:
if not isinstance(e, NotImplementedError):
Expand Down
5 changes: 5 additions & 0 deletions benchmark/AbstractBench.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,14 @@ def get_protocol(self, full_path: str) -> str:

def init(self):
# protocol = self.get_protocol(self.args.path)
alluxio_options = {
# "alluxio.common.extension.enable": "True",
"alluxio.worker.page.store.page.size": "1MB"
}
self.alluxio_fs = AlluxioFileSystem(
etcd_hosts=self.args.etcd_hosts,
worker_hosts=self.args.worker_hosts,
options=alluxio_options,
# target_protocol=protocol
)
self.traverse(self.args.path)
Expand Down
14 changes: 14 additions & 0 deletions rust/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
## A common native lib for alluxio python client
to build and play:

create virtualenv, (a tool used to create isolated Python environments):

python3 -m venv .env
source .env/bin/activate
maturin develop

then can do:

python3
>>> import alluxiocommon
>>> alluxiocommon.multi_http_requests(["http://google.com"],[(0,0)])
Loading