Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Alluxio configuration approach #23

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions alluxiofs/client/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from typing import Optional

import humanfriendly

from .const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE


class AlluxioClientConfig:
"""
Class responsible for creating the configuration for Alluxio Client.
"""

def __init__(
self,
etcd_hosts: Optional[str] = None,
worker_hosts: Optional[str] = None,
etcd_port=2379,
worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
page_size=ALLUXIO_PAGE_SIZE_DEFAULT_VALUE,
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE,
etcd_username: Optional[str] = None,
etcd_password: Optional[str] = None,
concurrency=64,
**kwargs,
):
"""
Initializes Alluxio client configuration.
Args:
etcd_hosts (Optional[str], optional): The hostnames of ETCD to get worker addresses from
in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both.
worker_hosts (Optional[str], optional): The worker hostnames in 'host1,host2,host3' format.
Either etcd_hosts or worker_hosts should be provided, not both.
concurrency (int, optional): The maximum number of concurrent operations for HTTP requests, default to 64.
etcd_port (int, optional): The port of each etcd server.
worker_http_port (int, optional): The port of the HTTP server on each Alluxio worker node.
etcd_refresh_workers_interval (int, optional): The interval to refresh worker list from ETCD membership service periodically.
All negative values mean the service is disabled.
"""
if not (etcd_hosts or worker_hosts):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I feel like a lot of these can ust use assert

raise ValueError(
"Must supply either 'etcd_hosts' or 'worker_hosts'"
)
if etcd_hosts and worker_hosts:
raise ValueError(
"Supply either 'etcd_hosts' or 'worker_hosts', not both"
)
if not isinstance(etcd_port, int) or not (1 <= etcd_port <= 65535):
raise ValueError(
"'etcd_port' should be an integer in the range 1-65535"
)
if not isinstance(worker_http_port, int) or not (
1 <= worker_http_port <= 65535
):
raise ValueError(
"'worker_http_port' should be an integer in the range 1-65535"
)
if not isinstance(concurrency, int) or concurrency <= 0:
raise ValueError("'concurrency' should be a positive integer")
if not isinstance(etcd_refresh_workers_interval, int):
raise ValueError(
"'etcd_refresh_workers_interval' should be an integer"
)
self.etcd_hosts = etcd_hosts
self.worker_hosts = worker_hosts
self.etcd_port = etcd_port
self.worker_http_port = worker_http_port
self.etcd_refresh_workers_interval = etcd_refresh_workers_interval
if (
not isinstance(hash_node_per_worker, int)
or hash_node_per_worker <= 0
):
raise ValueError(
"'hash_node_per_worker' should be a positive integer"
)

self.hash_node_per_worker = hash_node_per_worker
self.page_size = humanfriendly.parse_size(page_size, binary=True)
self.cluster_name = cluster_name

if (etcd_username is None) != (etcd_password is None):
raise ValueError(
"Both ETCD username and password must be set or both should be unset."
)
self.etcd_username = etcd_username
self.etcd_password = etcd_password
self.concurrency = concurrency
6 changes: 0 additions & 6 deletions alluxiofs/client/const.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
ALLUXIO_CLUSTER_NAME_KEY = "alluxio.cluster.name"
ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE = "DefaultAlluxioCluster"
ALLUXIO_ETCD_USERNAME_KEY = "alluxio.etcd.username"
ALLUXIO_ETCD_PASSWORD_KEY = "alluxio.etcd.password"
ALLUXIO_PAGE_SIZE_KEY = "alluxio.worker.page.store.page.size"
ALLUXIO_PAGE_SIZE_DEFAULT_VALUE = "1MB"
ALLUXIO_HASH_NODE_PER_WORKER_KEY = (
"alluxio.user.consistent.hash.virtual.node.count.per.worker"
)
ALLUXIO_WORKER_HTTP_SERVER_PORT_KEY = "alluxio.worker.http.server.port"
ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE = 28080
ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE = 5
Expand Down
169 changes: 25 additions & 144 deletions alluxiofs/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
import requests
from requests.adapters import HTTPAdapter

from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY
from .config import AlluxioClientConfig
from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
from .const import ALLUXIO_PAGE_SIZE_KEY
from .const import ALLUXIO_SUCCESS_IDENTIFIER
from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE
from .const import FULL_PAGE_URL_FORMAT
from .const import GET_FILE_STATUS_URL_FORMAT
from .const import LIST_URL_FORMAT
Expand Down Expand Up @@ -74,139 +72,21 @@ class OpType(Enum):

class AlluxioClient:
"""
Access Alluxio file system

Examples
--------
>>> # Launch Alluxio with ETCD as service discovery
>>> alluxio = AlluxioClient(etcd_hosts="localhost")
>>> # Or launch Alluxio with user provided worker list
>>> alluxio = AlluxioClient(worker_hosts="host1,host2,host3")

>>> print(alluxio.listdir("s3://mybucket/mypath/dir"))
[
{
type: "file",
name: "my_file_name",
path: '/my_file_name',
ufs_path: 's3://example-bucket/my_file_name',
last_modification_time_ms: 0,
length: 77542,
human_readable_file_size: '75.72KB'
},

]
>>> print(alluxio.read("s3://mybucket/mypath/dir/myfile"))
my_file_content
An AlluxioClient for interacting with Alluxio servers.
"""

def __init__(
self,
etcd_hosts=None,
worker_hosts=None,
options=None,
logger=None,
concurrency=64,
etcd_port=2379,
worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
**kwargs,
):
"""
Inits Alluxio file system.

Args:
etcd_hosts (str, optional):
The hostnames of ETCD to get worker addresses from
The hostnames in host1,host2,host3 format. Either etcd_hosts or worker_hosts should be provided, not both.
worker_hosts (str, optional):
The worker hostnames in host1,host2,host3 format. Either etcd_hosts or worker_hosts should be provided, not both.
options (dict, optional):
A dictionary of Alluxio property key and values.
Note that Alluxio Python API only support a limited set of Alluxio properties.
logger (Logger, optional):
A logger instance for logging messages.
concurrency (int, optional):
The maximum number of concurrent operations for HTTP requests. Default to 64.
etcd_port (int, optional):
The port of each etcd server.
worker_http_port (int, optional):
The port of the HTTP server on each Alluxio worker node.
etcd_refresh_workers_interval(int, optional):
The interval to refresh worker list from ETCD membership service periodically. All negative values mean the service is disabled.

Inits Alluxio Client with Alluxio client arguments.
See AlluxioClientConfig for configurations.
"""
# TODO(lu/chunxu) change to ETCD endpoints in format of 'http://etcd_host:port, http://etcd_host:port' & worker hosts in 'host:port, host:port' format
self.logger = logger or logging.getLogger("AlluxioPython")
if not (etcd_hosts or worker_hosts):
raise ValueError(
"Must supply either 'etcd_hosts' or 'worker_hosts'"
)
if etcd_hosts and worker_hosts:
raise ValueError(
"Supply either 'etcd_hosts' or 'worker_hosts', not both"
)
if not etcd_hosts:
self.logger.warning(
"'etcd_hosts' not supplied. An etcd cluster is required for dynamic cluster changes."
)
if not isinstance(etcd_port, int) or not (1 <= etcd_port <= 65535):
raise ValueError(
"'etcd_port' should be an integer in the range 1-65535"
)
if not isinstance(worker_http_port, int) or not (
1 <= worker_http_port <= 65535
):
raise ValueError(
"'worker_http_port' should be an integer in the range 1-65535"
)
if not isinstance(concurrency, int) or concurrency <= 0:
raise ValueError("'concurrency' should be a positive integer")
if concurrency < 10 or concurrency > 128:
self.logger.warning(
f"'concurrency' value of {concurrency} is outside the recommended range (10-128). "
"This may lead to suboptimal performance or resource utilization.",
)
if not isinstance(etcd_refresh_workers_interval, int):
raise ValueError(
"'etcd_refresh_workers_interval' should be an integer"
)

self.session = self._create_session(concurrency)

# parse options
page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
if options:
if ALLUXIO_PAGE_SIZE_KEY in options:
page_size = options[ALLUXIO_PAGE_SIZE_KEY]
self.logger.debug(f"Page size is set to {page_size}")
if ALLUXIO_HASH_NODE_PER_WORKER_KEY in options:
hash_node_per_worker = int(
options[ALLUXIO_HASH_NODE_PER_WORKER_KEY]
)
self.logger.debug(
f"Hash node per worker is set to {hash_node_per_worker}"
)
if (
not isinstance(hash_node_per_worker, int)
or hash_node_per_worker <= 0
):
raise ValueError(
"'hash_node_per_worker' should be a positive integer"
)

self.page_size = humanfriendly.parse_size(page_size, binary=True)

self.hash_provider = ConsistentHashProvider(
etcd_hosts=etcd_hosts,
etcd_port=etcd_port,
worker_hosts=worker_hosts,
worker_http_port=worker_http_port,
hash_node_per_worker=hash_node_per_worker,
options=options,
logger=self.logger,
etcd_refresh_workers_interval=etcd_refresh_workers_interval,
)
self.logger = kwargs.get("logger", logging.getLogger("Alluxiofs"))
self.config = AlluxioClientConfig(**kwargs)
self.session = self._create_session(self.config.concurrency)
self.hash_provider = ConsistentHashProvider(self.config, self.logger)

def listdir(self, path):
"""
Expand Down Expand Up @@ -583,30 +463,30 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id):
if not page_content:
break
yield page_content
if len(page_content) < self.page_size: # last page
if len(page_content) < self.config.page_size: # last page
break
page_index += 1

def _range_page_generator(
self, worker_host, worker_http_port, path_id, offset, length
):
start_page_index = offset // self.page_size
start_page_offset = offset % self.page_size
start_page_index = offset // self.config.page_size
start_page_offset = offset % self.config.page_size

end_page_index = (offset + length - 1) // self.page_size
end_page_read_to = ((offset + length - 1) % self.page_size) + 1
end_page_index = (offset + length - 1) // self.config.page_size
end_page_read_to = ((offset + length - 1) % self.config.page_size) + 1

page_index = start_page_index
while True:
try:
read_offset = 0
read_length = self.page_size
read_length = self.config.page_size
if page_index == start_page_index:
read_offset = start_page_offset
if start_page_index == end_page_index:
read_length = end_page_read_to - start_page_offset
else:
read_length = self.page_size - start_page_offset
read_length = self.config.page_size - start_page_offset
elif page_index == end_page_index:
read_length = end_page_read_to

Expand Down Expand Up @@ -867,14 +747,15 @@ def __init__(
self.logger.debug(f"Page size is set to {page_size}")
self.page_size = humanfriendly.parse_size(page_size, binary=True)
self.hash_provider = ConsistentHashProvider(
etcd_hosts=etcd_hosts,
etcd_port=int(etcd_port),
worker_hosts=worker_hosts,
worker_http_port=int(http_port),
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
options=options,
logger=self.logger,
etcd_refresh_workers_interval=120,
AlluxioClientConfig(
etcd_hosts=etcd_hosts,
etcd_port=int(etcd_port),
worker_hosts=worker_hosts,
worker_http_port=int(http_port),
etcd_refresh_workers_interval=120,
page_size=page_size,
),
self.logger,
)
self.http_port = http_port
self._loop = loop or asyncio.get_event_loop()
Expand Down
Loading
Loading