diff --git a/alluxiofs/client/config.py b/alluxiofs/client/config.py new file mode 100644 index 0000000..8745e6e --- /dev/null +++ b/alluxiofs/client/config.py @@ -0,0 +1,89 @@ +from typing import Optional + +import humanfriendly + +from .const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE +from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE +from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE +from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE + + +class AlluxioClientConfig: + """ + Class responsible for creating the configuration for Alluxio Client. + """ + + def __init__( + self, + etcd_hosts: Optional[str] = None, + worker_hosts: Optional[str] = None, + etcd_port=2379, + worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE, + etcd_refresh_workers_interval=120, + page_size=ALLUXIO_PAGE_SIZE_DEFAULT_VALUE, + hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, + cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE, + etcd_username: Optional[str] = None, + etcd_password: Optional[str] = None, + concurrency=64, + **kwargs, + ): + """ + Initializes Alluxio client configuration. + Args: + etcd_hosts (Optional[str], optional): The hostnames of ETCD to get worker addresses from + in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both. + worker_hosts (Optional[str], optional): The worker hostnames in 'host1,host2,host3' format. + Either etcd_hosts or worker_hosts should be provided, not both. + concurrency (int, optional): The maximum number of concurrent operations for HTTP requests, default to 64. + etcd_port (int, optional): The port of each etcd server. + worker_http_port (int, optional): The port of the HTTP server on each Alluxio worker node. + etcd_refresh_workers_interval (int, optional): The interval to refresh worker list from ETCD membership service periodically. + All negative values mean the service is disabled. + """ + + assert ( + etcd_hosts or worker_hosts + ), "Must supply either 'etcd_hosts' or 'worker_hosts'" + + assert not ( + etcd_hosts and worker_hosts + ), "Supply either 'etcd_hosts' or 'worker_hosts', not both" + + assert isinstance(etcd_port, int) and ( + 1 <= etcd_port <= 65535 + ), "'etcd_port' should be an integer in the range 1-65535" + + assert isinstance(worker_http_port, int) and ( + 1 <= worker_http_port <= 65535 + ), "'worker_http_port' should be an integer in the range 1-65535" + + assert ( + isinstance(concurrency, int) and concurrency > 0 + ), "'concurrency' should be a positive integer" + + assert isinstance( + etcd_refresh_workers_interval, int + ), "'etcd_refresh_workers_interval' should be an integer" + + self.etcd_hosts = etcd_hosts + self.worker_hosts = worker_hosts + self.etcd_port = etcd_port + self.worker_http_port = worker_http_port + self.etcd_refresh_workers_interval = etcd_refresh_workers_interval + + assert ( + isinstance(hash_node_per_worker, int) and hash_node_per_worker > 0 + ), "'hash_node_per_worker' should be a positive integer" + + self.hash_node_per_worker = hash_node_per_worker + self.page_size = humanfriendly.parse_size(page_size, binary=True) + self.cluster_name = cluster_name + + assert (etcd_username is None) == ( + etcd_password is None + ), "Both ETCD username and password must be set or both should be unset." + + self.etcd_username = etcd_username + self.etcd_password = etcd_password + self.concurrency = concurrency diff --git a/alluxiofs/client/core.py b/alluxiofs/client/core.py index dafd1c9..72f7473 100644 --- a/alluxiofs/client/core.py +++ b/alluxiofs/client/core.py @@ -32,15 +32,13 @@ "[WARNING]pkg 'alluxiocommon' not installed, relative modules unable to invoke." ) +from .config import AlluxioClientConfig from .const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE from .const import ALLUXIO_COMMON_EXTENSION_ENABLE -from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY1 -from .const import ALLUXIO_HASH_NODE_PER_WORKER_KEY2 from .const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE from .const import ALLUXIO_PAGE_SIZE_KEY from .const import ALLUXIO_SUCCESS_IDENTIFIER -from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE from .const import FULL_PAGE_URL_FORMAT from .const import GET_FILE_STATUS_URL_FORMAT from .const import LIST_URL_FORMAT @@ -119,14 +117,7 @@ class AlluxioClient: def __init__( self, - etcd_hosts=None, - worker_hosts=None, - options=None, - concurrency=64, - etcd_port=2379, - worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE, - etcd_refresh_workers_interval=120, - test_options=None, + **kwargs, ): """ Inits Alluxio file system. @@ -150,101 +141,23 @@ def __init__( The interval to refresh worker list from ETCD membership service periodically. All negative values mean the service is disabled. """ - # TODO(lu/chunxu) change to ETCD endpoints in format of 'http://etcd_host:port, http://etcd_host:port' & worker hosts in 'host:port, host:port' format - if not (etcd_hosts or worker_hosts): - raise ValueError( - "Must supply either 'etcd_hosts' or 'worker_hosts'" - ) - if etcd_hosts and worker_hosts: - raise ValueError( - "Supply either 'etcd_hosts' or 'worker_hosts', not both" - ) - if not etcd_hosts: - logger.warning( - "'etcd_hosts' not supplied. An etcd cluster is required for dynamic cluster changes." - ) - if not isinstance(etcd_port, int) or not (1 <= etcd_port <= 65535): - raise ValueError( - "'etcd_port' should be an integer in the range 1-65535" - ) - if not isinstance(worker_http_port, int) or not ( - 1 <= worker_http_port <= 65535 - ): - raise ValueError( - "'worker_http_port' should be an integer in the range 1-65535" - ) - if not isinstance(concurrency, int) or concurrency <= 0: - raise ValueError("'concurrency' should be a positive integer") - if concurrency < 10 or concurrency > 128: - logger.warning( - f"'concurrency' value of {concurrency} is outside the recommended range (10-128). " - "This may lead to suboptimal performance or resource utilization.", - ) - if not isinstance(etcd_refresh_workers_interval, int): - raise ValueError( - "'etcd_refresh_workers_interval' should be an integer" - ) - - self.session = self._create_session(concurrency) - # parse options - page_size = ALLUXIO_PAGE_SIZE_DEFAULT_VALUE - hash_node_per_worker = ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE + self.config = AlluxioClientConfig(**kwargs) + self.session = self._create_session(self.config.concurrency) + self.hash_provider = ConsistentHashProvider(self.config) self.data_manager = None - if options: - if ALLUXIO_PAGE_SIZE_KEY in options: - page_size = options[ALLUXIO_PAGE_SIZE_KEY] - logger.debug(f"Page size is set to {page_size}") - if ALLUXIO_HASH_NODE_PER_WORKER_KEY1 in options: - hash_node_per_worker = int( - options[ALLUXIO_HASH_NODE_PER_WORKER_KEY1] - ) - logger.debug( - f"Hash node per worker is set to {hash_node_per_worker}" - ) - if ALLUXIO_HASH_NODE_PER_WORKER_KEY2 in options: - hash_node_per_worker = int( - options[ALLUXIO_HASH_NODE_PER_WORKER_KEY2] - ) - logger.debug( - f"Hash node per worker is set to {hash_node_per_worker}" - ) - if ( - ALLUXIO_COMMON_EXTENSION_ENABLE in options - and options[ALLUXIO_COMMON_EXTENSION_ENABLE].lower() == "true" - ): - print("alluxiocommon extension enabled.") - logger.info("alluxiocommon extension enabled.") - ondemand_pool_disabled = ( - ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE in options - and options[ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE].lower() - == "true" - ) - self.data_manager = _DataManager( - concurrency, ondemand_pool_disabled=ondemand_pool_disabled - ) - if ( - not isinstance(hash_node_per_worker, int) - or hash_node_per_worker <= 0 - ): - raise ValueError( - "'hash_node_per_worker' should be a positive integer" + if kwargs.get(ALLUXIO_COMMON_EXTENSION_ENABLE, False): + logger.info("alluxiocommon extension enabled.") + self.data_manager = _DataManager( + self.config.concurrency, + ondemand_pool_disabled=kwargs.get( + ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE, False + ), ) - self.page_size = humanfriendly.parse_size(page_size, binary=True) - test_options = test_options or {} + test_options = kwargs.get("test_options", {}) set_log_level(logger, test_options) - self.hash_provider = ConsistentHashProvider( - etcd_hosts=etcd_hosts, - etcd_port=etcd_port, - worker_hosts=worker_hosts, - worker_http_port=worker_http_port, - hash_node_per_worker=hash_node_per_worker, - options=options, - etcd_refresh_workers_interval=etcd_refresh_workers_interval, - ) - def listdir(self, path): """ Lists the directory. @@ -655,7 +568,7 @@ def _all_page_generator_alluxiocommon( yield pages_content if ( len(pages_content) - < fetching_pages_num_each_round * self.page_size + < fetching_pages_num_each_round * self.config.page_size ): break except Exception as e: @@ -683,7 +596,7 @@ def _all_page_generator(self, worker_host, worker_http_port, path_id): if not page_content: break yield page_content - if len(page_content) < self.page_size: # last page + if len(page_content) < self.config.page_size: # last page break page_index += 1 @@ -693,13 +606,13 @@ def _range_page_generator_alluxiocommon( read_urls = [] start = offset while start < offset + length: - page_index = start // self.page_size - inpage_off = start % self.page_size + page_index = start // self.config.page_size + inpage_off = start % self.config.page_size inpage_read_len = min( - self.page_size - inpage_off, offset + length - start + self.config.page_size - inpage_off, offset + length - start ) page_url = None - if inpage_off == 0 and inpage_read_len == self.page_size: + if inpage_off == 0 and inpage_read_len == self.config.page_size: page_url = FULL_PAGE_URL_FORMAT.format( worker_host=worker_host, http_port=worker_http_port, @@ -723,23 +636,23 @@ def _range_page_generator_alluxiocommon( def _range_page_generator( self, worker_host, worker_http_port, path_id, offset, length ): - start_page_index = offset // self.page_size - start_page_offset = offset % self.page_size + start_page_index = offset // self.config.page_size + start_page_offset = offset % self.config.page_size - end_page_index = (offset + length - 1) // self.page_size - end_page_read_to = ((offset + length - 1) % self.page_size) + 1 + end_page_index = (offset + length - 1) // self.config.page_size + end_page_read_to = ((offset + length - 1) % self.config.page_size) + 1 page_index = start_page_index while True: try: read_offset = 0 - read_length = self.page_size + read_length = self.config.page_size if page_index == start_page_index: read_offset = start_page_offset if start_page_index == end_page_index: read_length = end_page_read_to - start_page_offset else: - read_length = self.page_size - start_page_offset + read_length = self.config.page_size - start_page_offset elif page_index == end_page_index: read_length = end_page_read_to @@ -1004,13 +917,14 @@ def __init__( logger.debug(f"Page size is set to {page_size}") self.page_size = humanfriendly.parse_size(page_size, binary=True) self.hash_provider = ConsistentHashProvider( - etcd_hosts=etcd_hosts, - etcd_port=int(etcd_port), - worker_hosts=worker_hosts, - worker_http_port=int(http_port), - hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, - options=options, - etcd_refresh_workers_interval=120, + AlluxioClientConfig( + etcd_hosts=etcd_hosts, + etcd_port=int(etcd_port), + worker_hosts=worker_hosts, + worker_http_port=int(http_port), + hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE, + etcd_refresh_workers_interval=120, + ) ) self.http_port = http_port self._loop = loop or asyncio.get_event_loop() diff --git a/alluxiofs/client/worker_ring.py b/alluxiofs/client/worker_ring.py index 1f0a6de..4cae097 100644 --- a/alluxiofs/client/worker_ring.py +++ b/alluxiofs/client/worker_ring.py @@ -21,10 +21,8 @@ import mmh3 from sortedcontainers import SortedDict +from .config import AlluxioClientConfig from .const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE -from .const import ALLUXIO_CLUSTER_NAME_KEY -from .const import ALLUXIO_ETCD_PASSWORD_KEY -from .const import ALLUXIO_ETCD_USERNAME_KEY from .const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE from .const import ETCD_PREFIX_FORMAT from .utils import set_log_level @@ -150,31 +148,37 @@ def from_host_and_port(worker_host, worker_http_port): class EtcdClient: - def __init__(self, host="localhost", port=2379, options=None): + def __init__(self, config: AlluxioClientConfig, host, port): self._host = host self._port = port - - # Parse options self._etcd_username = None self._etcd_password = None self._prefix = ETCD_PREFIX_FORMAT.format( cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE ) - if options: - if ALLUXIO_ETCD_USERNAME_KEY in options: - self._etcd_username = options[ALLUXIO_ETCD_USERNAME_KEY] - if ALLUXIO_ETCD_PASSWORD_KEY in options: - self._etcd_password = options[ALLUXIO_ETCD_PASSWORD_KEY] - if ALLUXIO_CLUSTER_NAME_KEY in options: - self._prefix = ETCD_PREFIX_FORMAT.format( - cluster_name=options[ALLUXIO_CLUSTER_NAME_KEY] - ) + + if self._host is None: + raise ValueError( + "ETCD host is None. ETCD host must be provided to use ETCD." + ) + + if self._port is None: + raise ValueError( + "ETCD port is None. ETCD port must be provided to use ETCD." + ) + + self._etcd_username = config.etcd_username + self._etcd_password = config.etcd_password if (self._etcd_username is None) != (self._etcd_password is None): raise ValueError( "Both ETCD username and password must be set or both should be unset." ) + self._prefix = ETCD_PREFIX_FORMAT.format( + cluster_name=config.cluster_name + ) + def get_worker_entities(self) -> Set[WorkerEntity]: """ Retrieve worker entities from etcd using the specified prefix. @@ -216,36 +220,29 @@ def _get_etcd_client(self): class ConsistentHashProvider: def __init__( self, - etcd_hosts=None, - etcd_port=None, - worker_hosts=None, - worker_http_port=None, - options=None, - etcd_refresh_workers_interval=None, - hash_node_per_worker=None, + config: AlluxioClientConfig, max_attempts=100, test_options=None, ): - self._etcd_hosts = etcd_hosts - self._etcd_port = etcd_port - self._options = options - self._hash_node_per_worker = hash_node_per_worker + self._config = config self._max_attempts = max_attempts self._lock = threading.Lock() self._is_ring_initialized = False self._worker_info_map = {} - self._etcd_refresh_workers_interval = etcd_refresh_workers_interval - if worker_hosts: + + if self._config.worker_hosts is not None: self._update_hash_ring( - self._generate_worker_info_map(worker_hosts, worker_http_port) + self._generate_worker_info_map( + self._config.worker_hosts, self._config.worker_http_port + ) ) - if self._etcd_hosts: + if self._config.etcd_hosts is not None: self._fetch_workers_and_update_ring() - if self._etcd_refresh_workers_interval > 0: + if self._config.etcd_refresh_workers_interval > 0: self._shutdown_background_update_ring_event = threading.Event() self._background_thread = None self._start_background_update_ring( - self._etcd_refresh_workers_interval + self._config.etcd_refresh_workers_interval ) test_options = test_options or {} set_log_level(logger, test_options) @@ -309,7 +306,10 @@ def update_loop(): self._background_thread.start() def shutdown_background_update_ring(self): - if self._etcd_hosts and self._etcd_refresh_workers_interval > 0: + if ( + self._config.etcd_hosts + and self._config.etcd_refresh_workers_interval > 0 + ): self._shutdown_background_update_ring_event.set() if self._background_thread: self._background_thread.join() @@ -318,26 +318,29 @@ def __del__(self): self.shutdown_background_update_ring() def _fetch_workers_and_update_ring(self): - etcd_hosts_list = self._etcd_hosts.split(",") + etcd_hosts_list = self._config.etcd_hosts.split(",") random.shuffle(etcd_hosts_list) worker_entities: Set[WorkerEntity] = set() for host in etcd_hosts_list: try: worker_entities = EtcdClient( - host=host, port=self._etcd_port, options=self._options + host=host, + port=self._config.etcd_port, + config=self._config, ).get_worker_entities() + break except Exception: continue if not worker_entities: if self._is_ring_initialized: logger.info( - f"Failed to achieve worker info list from ETCD servers:{self._etcd_hosts}" + f"Failed to achieve worker info list from ETCD servers:{self._config.etcd_hosts}" ) return else: raise Exception( - f"Failed to achieve worker info list from ETCD servers:{self._etcd_hosts}" + f"Failed to achieve worker info list from ETCD servers:{self._config.etcd_hosts}" ) worker_info_map = {} @@ -366,7 +369,7 @@ def _update_hash_ring( with self._lock: hash_ring = SortedDict() for worker_identity in worker_info_map.keys(): - for i in range(self._hash_node_per_worker): + for i in range(self._config.hash_node_per_worker): hash_key = self._hash_worker_identity(worker_identity, i) hash_ring[hash_key] = worker_identity self.hash_ring = hash_ring diff --git a/alluxiofs/core.py b/alluxiofs/core.py index d74f14f..2656a24 100644 --- a/alluxiofs/core.py +++ b/alluxiofs/core.py @@ -46,17 +46,10 @@ class AlluxioFileSystem(AbstractFileSystem): def __init__( self, - etcd_hosts=None, - worker_hosts=None, - options=None, - concurrency=64, - etcd_port=2379, - worker_http_port=28080, preload_path=None, target_protocol=None, target_options=None, fs=None, - test_options=None, **kwargs, ): """ @@ -102,7 +95,7 @@ def __init__( "provided. Will not fall back to under file systems when " "accessed files are not in Alluxiofs" ) - self.kwargs = target_options or {} + self.target_options = target_options or {} self.fs = None self.target_protocol = None if fs is not None: @@ -118,21 +111,15 @@ def __init__( + self.fs.protocol ) elif target_protocol is not None: - self.fs = filesystem(target_protocol, **self.kwargs) + self.fs = filesystem(target_protocol, **self.target_options) self.target_protocol = target_protocol - test_options = test_options or {} + test_options = kwargs.get("test_options", {}) set_log_level(logger, test_options) if test_options.get("skip_alluxio") is True: self.alluxio = None else: self.alluxio = AlluxioClient( - etcd_hosts=etcd_hosts, - worker_hosts=worker_hosts, - options=options, - concurrency=concurrency, - etcd_port=etcd_port, - worker_http_port=worker_http_port, - test_options=test_options, + **kwargs, ) if preload_path is not None: self.alluxio.load(preload_path) diff --git a/tests/client/test_worker_hash_ring.py b/tests/client/test_worker_hash_ring.py index a112917..89e6aa8 100644 --- a/tests/client/test_worker_hash_ring.py +++ b/tests/client/test_worker_hash_ring.py @@ -1,6 +1,7 @@ import json import os +from alluxiofs.client.config import AlluxioClientConfig from alluxiofs.client.worker_ring import ConsistentHashProvider from alluxiofs.client.worker_ring import WorkerIdentity from alluxiofs.client.worker_ring import WorkerNetAddress @@ -14,9 +15,11 @@ def test_hash_ring(): worker_hostnames = json.load(file) hash_provider = ConsistentHashProvider( - worker_hosts=", ".join(worker_hostnames), - hash_node_per_worker=5, - etcd_refresh_workers_interval=100000000, + AlluxioClientConfig( + worker_hosts=", ".join(worker_hostnames), + hash_node_per_worker=5, + etcd_refresh_workers_interval=100000000, + ) ) hash_ring_path = os.path.join(hash_res_dir, "activeNodesMap.json")