Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Alluxio Configuration Approach #52

Merged
merged 8 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions alluxiofs/client/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Optional

import humanfriendly

from .const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE


class AlluxioClientConfig:
"""
Class responsible for creating the configuration for Alluxio Client.
"""

def __init__(
self,
etcd_hosts: Optional[str] = None,
worker_hosts: Optional[str] = None,
etcd_port=2379,
worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
page_size=ALLUXIO_PAGE_SIZE_DEFAULT_VALUE,
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE,
etcd_username: Optional[str] = None,
etcd_password: Optional[str] = None,
concurrency=64,
**kwargs,
):
"""
Initializes Alluxio client configuration.
Args:
etcd_hosts (Optional[str], optional): The hostnames of ETCD to get worker addresses from
in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both.
worker_hosts (Optional[str], optional): The worker hostnames in 'host1,host2,host3' format.
Either etcd_hosts or worker_hosts should be provided, not both.
concurrency (int, optional): The maximum number of concurrent operations for HTTP requests, default to 64.
etcd_port (int, optional): The port of each etcd server.
worker_http_port (int, optional): The port of the HTTP server on each Alluxio worker node.
etcd_refresh_workers_interval (int, optional): The interval to refresh worker list from ETCD membership service periodically.
All negative values mean the service is disabled.
"""

assert (
etcd_hosts or worker_hosts
), "Must supply either 'etcd_hosts' or 'worker_hosts'"

assert not (
etcd_hosts and worker_hosts
), "Supply either 'etcd_hosts' or 'worker_hosts', not both"

assert isinstance(etcd_port, int) and (
1 <= etcd_port <= 65535
), "'etcd_port' should be an integer in the range 1-65535"

assert isinstance(worker_http_port, int) and (
1 <= worker_http_port <= 65535
), "'worker_http_port' should be an integer in the range 1-65535"

assert (
isinstance(concurrency, int) and concurrency > 0
), "'concurrency' should be a positive integer"

assert isinstance(
etcd_refresh_workers_interval, int
), "'etcd_refresh_workers_interval' should be an integer"

self.etcd_hosts = etcd_hosts
self.worker_hosts = worker_hosts
self.etcd_port = etcd_port
self.worker_http_port = worker_http_port
self.etcd_refresh_workers_interval = etcd_refresh_workers_interval

assert (
isinstance(hash_node_per_worker, int) and hash_node_per_worker > 0
), "'hash_node_per_worker' should be a positive integer"

self.hash_node_per_worker = hash_node_per_worker
self.page_size = humanfriendly.parse_size(page_size, binary=True)
self.cluster_name = cluster_name

assert (etcd_username is None) == (
etcd_password is None
), "Both ETCD username and password must be set or both should be unset."

self.etcd_username = etcd_username
self.etcd_password = etcd_password
self.concurrency = concurrency
149 changes: 32 additions & 117 deletions alluxiofs/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@
"[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke."
)

from .config import AlluxioClientConfig
from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE
from .const import ALLUXIO_COMMON_EXTENSION_ENABLE
from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY1
from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY2
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
from .const import ALLUXIO_PAGE_SIZE_KEY
from .const import ALLUXIO_SUCCESS_IDENTIFIER
from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE
from .const import FULL_PAGE_URL_FORMAT
from .const import GET_FILE_STATUS_URL_FORMAT
from .const import LIST_URL_FORMAT
Expand Down Expand Up @@ -119,14 +117,8 @@ class AlluxioClient:

def __init__(
self,
etcd_hosts=None,
worker_hosts=None,
options=None,
concurrency=64,
etcd_port=2379,
worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
test_options=None,
SibylYang marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""
Inits Alluxio file system.
Expand All @@ -150,101 +142,23 @@ def __init__(
The interval to refresh worker list from ETCD membership service periodically. All negative values mean the service is disabled.

"""
# TODO(lu/chunxu) change to ETCD endpoints in format of 'http://etcd_host:port, http://etcd_host:port' & worker hosts in 'host:port, host:port' format
if not (etcd_hosts or worker_hosts):
raise ValueError(
"Must supply either 'etcd_hosts' or 'worker_hosts'"
)
if etcd_hosts and worker_hosts:
raise ValueError(
"Supply either 'etcd_hosts' or 'worker_hosts', not both"
)
if not etcd_hosts:
logger.warning(
"'etcd_hosts' not supplied. An etcd cluster is required for dynamic cluster changes."
)
if not isinstance(etcd_port, int) or not (1 <= etcd_port <= 65535):
raise ValueError(
"'etcd_port' should be an integer in the range 1-65535"
)
if not isinstance(worker_http_port, int) or not (
1 <= worker_http_port <= 65535
):
raise ValueError(
"'worker_http_port' should be an integer in the range 1-65535"
)
if not isinstance(concurrency, int) or concurrency <= 0:
raise ValueError("'concurrency' should be a positive integer")
if concurrency < 10 or concurrency > 128:
logger.warning(
f"'concurrency' value of {concurrency} is outside the recommended range (10-128). "
"This may lead to suboptimal performance or resource utilization.",
)
if not isinstance(etcd_refresh_workers_interval, int):
raise ValueError(
"'etcd_refresh_workers_interval' should be an integer"
)

self.session = self._create_session(concurrency)

# parse options
page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
self.config = AlluxioClientConfig(**kwargs)
self.session = self._create_session(self.config.concurrency)
self.hash_provider = ConsistentHashProvider(self.config)
self.data_manager = None
if options:
if ALLUXIO_PAGE_SIZE_KEY in options:
page_size = options[ALLUXIO_PAGE_SIZE_KEY]
logger.debug(f"Page size is set to {page_size}")
if ALLUXIO_HASH_NODE_PER_WORKER_KEY1 in options:
hash_node_per_worker = int(
options[ALLUXIO_HASH_NODE_PER_WORKER_KEY1]
)
logger.debug(
f"Hash node per worker is set to {hash_node_per_worker}"
)
if ALLUXIO_HASH_NODE_PER_WORKER_KEY2 in options:
hash_node_per_worker = int(
options[ALLUXIO_HASH_NODE_PER_WORKER_KEY2]
)
logger.debug(
f"Hash node per worker is set to {hash_node_per_worker}"
)
if (
ALLUXIO_COMMON_EXTENSION_ENABLE in options
and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true"
):
print("alluxiocommon extension enabled.")
logger.info("alluxiocommon extension enabled.")
ondemand_pool_disabled = (
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options
and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower()
== "true"
)
self.data_manager = _DataManager(
concurrency, ondemand_pool_disabled=ondemand_pool_disabled
)
if (
not isinstance(hash_node_per_worker, int)
or hash_node_per_worker <= 0
):
raise ValueError(
"'hash_node_per_worker' should be a positive integer"
if kwargs.get(ALLUXIO_COMMON_EXTENSION_ENABLE, False):
logger.info("alluxiocommon extension enabled.")
self.data_manager = _DataManager(
self.config.concurrency,
ondemand_pool_disabled=kwargs.get(
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE, False
),
)

self.page_size = humanfriendly.parse_size(page_size, binary=True)
test_options = test_options or {}
set_log_level(logger, test_options)

self.hash_provider = ConsistentHashProvider(
etcd_hosts=etcd_hosts,
etcd_port=etcd_port,
worker_hosts=worker_hosts,
worker_http_port=worker_http_port,
hash_node_per_worker=hash_node_per_worker,
options=options,
etcd_refresh_workers_interval=etcd_refresh_workers_interval,
)

def listdir(self, path):
"""
Lists the directory.
Expand Down Expand Up @@ -655,7 +569,7 @@ def _all_page_generator_alluxiocommon(
yield pages_content
if (
len(pages_content)
< fetching_pages_num_each_round * self.page_size
< fetching_pages_num_each_round * self.config.page_size
):
break
except Exception as e:
Expand Down Expand Up @@ -683,7 +597,7 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id):
if not page_content:
break
yield page_content
if len(page_content) < self.page_size: # last page
if len(page_content) < self.config.page_size: # last page
break
page_index += 1

Expand All @@ -693,13 +607,13 @@ def _range_page_generator_alluxiocommon(
read_urls = []
start = offset
while start < offset + length:
page_index = start // self.page_size
inpage_off = start % self.page_size
page_index = start // self.config.page_size
inpage_off = start % self.config.page_size
inpage_read_len = min(
self.page_size - inpage_off, offset + length - start
self.config.page_size - inpage_off, offset + length - start
)
page_url = None
if inpage_off == 0 and inpage_read_len == self.page_size:
if inpage_off == 0 and inpage_read_len == self.config.page_size:
page_url = FULL_PAGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
Expand All @@ -723,23 +637,23 @@ def _range_page_generator_alluxiocommon(
def _range_page_generator(
self, worker_host, worker_http_port, path_id, offset, length
):
start_page_index = offset // self.page_size
start_page_offset = offset % self.page_size
start_page_index = offset // self.config.page_size
start_page_offset = offset % self.config.page_size

end_page_index = (offset + length - 1) // self.page_size
end_page_read_to = ((offset + length - 1) % self.page_size) + 1
end_page_index = (offset + length - 1) // self.config.page_size
end_page_read_to = ((offset + length - 1) % self.config.page_size) + 1

page_index = start_page_index
while True:
try:
read_offset = 0
read_length = self.page_size
read_length = self.config.page_size
if page_index == start_page_index:
read_offset = start_page_offset
if start_page_index == end_page_index:
read_length = end_page_read_to - start_page_offset
else:
read_length = self.page_size - start_page_offset
read_length = self.config.page_size - start_page_offset
elif page_index == end_page_index:
read_length = end_page_read_to

Expand Down Expand Up @@ -1004,13 +918,14 @@ def __init__(
logger.debug(f"Page size is set to {page_size}")
self.page_size = humanfriendly.parse_size(page_size, binary=True)
self.hash_provider = ConsistentHashProvider(
etcd_hosts=etcd_hosts,
etcd_port=int(etcd_port),
worker_hosts=worker_hosts,
worker_http_port=int(http_port),
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
options=options,
etcd_refresh_workers_interval=120,
AlluxioClientConfig(
etcd_hosts=etcd_hosts,
etcd_port=int(etcd_port),
worker_hosts=worker_hosts,
worker_http_port=int(http_port),
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
)
)
self.http_port = http_port
self._loop = loop or asyncio.get_event_loop()
Expand Down
Loading
Loading