From a94f97d95d4884c9428fac168a1d6576b8a886f7 Mon Sep 17 00:00:00 2001 From: Archit Kulkarni Date: Thu, 7 Sep 2023 10:31:02 -0700 Subject: [PATCH] [Ray Core] Adds in Google Cloud TPUs as a native Resource (#38669) (#39352) * [Ray Core] Adds in Google Cloud TPUs as a native Resource (#38669) The issue below has more details, but at a high level this change addresses the feature request of adding in TPUs as a native resource within Ray. --------- Signed-off-by: allenwang28 Signed-off-by: Archit Kulkarni Co-authored-by: Archit Kulkarni * Fix docstring Signed-off-by: Archit Kulkarni --------- Signed-off-by: allenwang28 Signed-off-by: Archit Kulkarni Co-authored-by: Allen Wang --- python/ray/_private/accelerator.py | 219 ++++++++++++++---- python/ray/_private/ray_constants.py | 30 +++ python/ray/_private/ray_option_utils.py | 101 ++++++-- python/ray/_private/resource_spec.py | 11 +- python/ray/_private/utils.py | 100 ++++++-- python/ray/_private/worker.py | 9 +- python/ray/_raylet.pyx | 2 +- python/ray/autoscaler/_private/gcp/config.py | 9 +- .../ray/autoscaler/_private/load_metrics.py | 2 + python/ray/autoscaler/_private/monitor.py | 6 +- python/ray/runtime_context.py | 9 +- python/ray/tests/test_accelerator.py | 210 +++++++++++++++-- .../tests/test_autoscaler_fake_multinode.py | 16 ++ python/ray/util/accelerators/__init__.py | 6 + python/ray/util/accelerators/accelerators.py | 3 + src/ray/common/ray_config_def.h | 8 +- src/ray/common/test/scheduling_ids_test.cc | 3 +- 17 files changed, 623 insertions(+), 121 deletions(-) diff --git a/python/ray/_private/accelerator.py b/python/ray/_private/accelerator.py index 2a98e7048b41..b47202df6fe4 100644 --- a/python/ray/_private/accelerator.py +++ b/python/ray/_private/accelerator.py @@ -1,79 +1,118 @@ import json import os +import glob import subprocess import sys -from typing import Optional +import requests +import logging +import ray._private.ray_constants as ray_constants +import ray._private.utils as utils +import re +from typing import Iterable, Optional def update_resources_with_accelerator_type(resources: dict): """Update the resources dictionary with the accelerator type and custom resources. - Currently, we support AWS NeuronCore (neuron_cores / - accelerator_type:aws-neuron-core) detection and configuration. + Currently, we support detection and configuration of: + - AWS NeuronCore (neuron_cores / accelerator_type:aws-neuron-core) + - Google Cloud TPUs (TPU / accelerator_type:TPU-V*) Args: resources: Resources dictionary to be updated with accelerator type and custom resources. """ - _detect_and_configure_aws_neuron_core(resources) + # Autodetect AWS NeuronCore + _detect_and_configure_custom_accelerator( + resources=resources, + accelerator_key=ray_constants.NEURON_CORES, + accelerator_type=utils.get_neuron_core_constraint_name(), + visible_ids=utils.get_aws_neuron_core_visible_ids(), + autodetected_accelerators=_autodetect_aws_neuron_cores(), + visible_devices_env_variable=ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, + ) + # Autodetect Google Cloud TPUs + _detect_and_configure_custom_accelerator( + resources=resources, + accelerator_key=ray_constants.TPU, + accelerator_type=_autodetect_tpu_version(), + visible_ids=utils.get_tpu_visible_chips(), + autodetected_accelerators=_autodetect_num_tpus(), + visible_devices_env_variable=ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR, + ) -def _detect_and_configure_aws_neuron_core(resources: dict): - """Configuration and auto-detection of AWS NeuronCore accelerator type - and number of NeuronCore (neuron_cores). +def _detect_and_configure_custom_accelerator( + resources: dict, + accelerator_key: str, + accelerator_type: str, + visible_ids: Optional[Iterable[str]], + autodetected_accelerators: int, + visible_devices_env_variable: str, +): + """Configure and autodetect custom accelerators counts and types. - If the number of NeuronCore is not specified in the resources, this - function will try to detect the number of NeuronCore. + If the number of accelerators is not specified in the resources, this + function will try to detect the number of accelerators. - If the number of NeuronCore is specified in the resources, this - function will check if the number of NeuronCore is greater than the - number of visible NeuronCore and raise an error if it is true. + If the number of accelerators is specified in the resources, this + function will check if the number of accelerators is greater than the + number of visible devices and raise an error if it is true. - If the number of NeuronCore is greater than the number of visible - NeuronCore, this function will raise an error. + If the number of accelerators is greater than the number of visible + devices, this function will raise an error. - Lastly, update accelerator_type and neuron_cores in resources. + Lastly, update accelerator_type and number of accelerators in resources. Args: - resources: Resources dictionary to be updated with - NeuronCore accelerator type and custom resources(neuron_cores). + resources: Resources dictionary to be updated with the custom + accelerator type and resource count. + accelerator_key: The key used to access the number of accelerators + within `resources`. This can be: + ray_constants.NEURON_CORES or ray_constants.TPU + accelerator_type: The name of the accelerator type. This + is the unique identifier of the accelerator version, e.g. + ray_constants.AWS_NEURON_CORE or ray_constants.GOOGLE_TPU_V4. + visible_ids: The visible IDs specified by the user. This is typically + controlled by an environment variable, e.g. NEURON_RT_VISIBLE_CORES + or TPU_VISIBLE_CHIPS. + autodetected_accelerators: The number of accelerators autodetected + on the machine. + visible_devices_env_variable: The environment variable a user uses + to specify which devices are visible. Raises: - ValueError: If the number of NeuronCore is greater than the number of - visible NeuronCore. + ValueError: If the number of requested accelerator chips is greater + than the number of visible accelerator chips. """ - import ray._private.ray_constants as ray_constants - import ray._private.utils as utils - - # AWS NeuronCore detection and configuration - # 1. Check if the user specified neuron_cores in resources - neuron_cores = resources.get(ray_constants.NEURON_CORES, None) - # 2. Check if the user specified NEURON_RT_VISIBLE_CORES - neuron_core_ids = utils.get_aws_neuron_core_visible_ids() + # Custom accelerator detection and configuration + # 1. Check if the user specified accelerator_count in resources + accelerator_count = resources.get(accelerator_key, None) + # 2. Check if the user specified visible cores/chips (within `visible_ids`) if ( - neuron_cores is not None - and neuron_core_ids is not None - and neuron_cores > len(neuron_core_ids) + accelerator_count is not None + and visible_ids is not None + and accelerator_count > len(visible_ids) ): raise ValueError( - f"Attempting to start raylet with {neuron_cores} " - f"neuron cores, but NEURON_RT_VISIBLE_CORES contains " - f"{neuron_core_ids}." + f"Attempting to start raylet with {accelerator_count} " + f"{accelerator_key}, but f{visible_devices_env_variable} " + f"contains {visible_ids}." ) - # 3. Auto-detect neuron_cores if not specified in resources - if neuron_cores is None: - neuron_cores = _autodetect_aws_neuron_cores() - # Don't use more neuron cores than allowed by NEURON_RT_VISIBLE_CORES. - if neuron_cores is not None and neuron_core_ids is not None: - neuron_cores = min(neuron_cores, len(neuron_core_ids)) - if neuron_cores is not None: - # 4. Update accelerator_type and neuron_cores with - # number of neuron cores detected or configured. + # 3. Auto-detect accelerator_count if not specified in resources + if accelerator_count is None: + accelerator_count = autodetected_accelerators + # Don't use more resources than allowed by the user's pre-set values. + if accelerator_count is not None and visible_ids is not None: + accelerator_count = min(accelerator_count, len(visible_ids)) + if accelerator_count is not None: + # 4. Update accelerator_type and accelerator_count with + # number of accelerators detected or configured. resources.update( { - ray_constants.NEURON_CORES: neuron_cores, - utils.get_neuron_core_constraint_name(): neuron_cores, + accelerator_key: accelerator_count, + accelerator_type: accelerator_count, } ) @@ -109,3 +148,95 @@ def _get_neuron_core_count() -> int: for neuron_device in json_out: nc_count += neuron_device.get("nc_count", 0) return nc_count + + +def _autodetect_num_tpus() -> int: + """Attempt to detect the number of TPUs on this machine. + + TPU chips are represented as devices within `/dev/`, either as + `/dev/accel*` or `/dev/vfio/*`. + + Returns: + The number of TPUs if any were detected, otherwise 0. + """ + accel_files = glob.glob("/dev/accel*") + if accel_files: + return len(accel_files) + + try: + vfio_entries = os.listdir("/dev/vfio") + numeric_entries = [int(entry) for entry in vfio_entries if entry.isdigit()] + return len(numeric_entries) + except FileNotFoundError as e: + logging.info("Failed to detect number of TPUs: %s", e) + return 0 + + +def _autodetect_tpu_version() -> Optional[str]: + """Attempt to detect the TPU version. + + Individual TPU VMs within a TPU pod must know what type + of pod it is a part of. This is necessary for the + ML framework to work properly. + + The logic is different if the TPU was provisioned via: + ``` + gcloud tpus tpu-vm create ... + ``` + (i.e. a GCE VM), vs through GKE: + - GCE VMs will always have a metadata server to poll this info + - GKE VMS will have environment variables preset. + + Returns: + A string representing the TPU version, + e.g. "TPU-V2", "TPU-V3", "TPU-V4" if applicable, else None. + + """ + + def accelerator_type_to_version(accelerator_type: str) -> str: + assert_tpu_accelerator_type(accelerator_type) + return "TPU-" + str(accelerator_type.split("-")[0]).upper() + + # GKE-based check + accelerator_type = os.getenv( + ray_constants.RAY_GKE_TPU_ACCELERATOR_TYPE_ENV_VAR, None + ) + if accelerator_type is not None: + return accelerator_type_to_version(accelerator_type) + + # GCE-based VM check + try: + accelerator_type_request = requests.get( + ray_constants.RAY_GCE_TPU_ACCELERATOR_ENDPOINT, + headers=ray_constants.RAY_GCE_TPU_HEADERS, + ) + if accelerator_type_request.status_code == 200: + return accelerator_type_to_version(accelerator_type_request.text) + except requests.RequestException as e: + logging.info("Unable to poll TPU GCE metadata: %s", e) + + return None + + +def assert_tpu_accelerator_type(accelerator_type: str): + """Assert that the inputed accelerator_type is formatted correctly. + + The accelerator_type field follows a form of v{generation}-{cores/chips}. + + See the following for more information: + https://cloud.google.com/sdk/gcloud/reference/compute/tpus/tpu-vm/accelerator-types/describe + + Args: + accelerator_type: The string representation of the accelerator type + to be asserted for validity. + + Raises: + ValueError: If the provided accelerator_type is malformed. + + """ + expected_pattern = re.compile(r"^v\d+[a-zA-Z]*-\d+$") + if not expected_pattern.match(accelerator_type): + raise ValueError( + "`acceleratorType` should match v(generation)-(cores/chips). " + f"Got {accelerator_type}." + ) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 9ff5ba3745f1..91641b19d003 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -400,10 +400,16 @@ def env_set_by_user(key): "RAY_EXPERIMENTAL_NOSET_NEURON_RT_VISIBLE_CORES" ) NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" +NOSET_TPU_VISIBLE_CHIPS_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_TPU_VISIBLE_CHIPS" + CUDA_VISIBLE_DEVICES_ENV_VAR = "CUDA_VISIBLE_DEVICES" NEURON_RT_VISIBLE_CORES_ENV_VAR = "NEURON_RT_VISIBLE_CORES" +TPU_VISIBLE_CHIPS_ENV_VAR = "TPU_VISIBLE_CHIPS" + NEURON_CORES = "neuron_cores" GPU = "GPU" +TPU = "TPU" + # https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/arch/neuron-hardware/inf2-arch.html#aws-inf2-arch # https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/arch/neuron-hardware/trn1-arch.html#aws-trn1-arch # Subject to removal after the information is available via public API @@ -479,9 +485,33 @@ def gcs_actor_scheduling_enabled(): RAY_DEFAULT_LABEL_KEYS_PREFIX = "ray.io/" RAY_TPU_MAX_CONCURRENT_CONNECTIONS_ENV_VAR = "RAY_TPU_MAX_CONCURRENT_ACTIVE_CONNECTIONS" +RAY_GKE_TPU_ACCELERATOR_TYPE_ENV_VAR = "TPU_ACCELERATOR_TYPE" + +# Constants for accessing the `accelerator-type` from TPU VM +# instance metadata. +# See https://cloud.google.com/compute/docs/metadata/overview +# for more details about VM instance metadata. +RAY_GCE_TPU_ACCELERATOR_ENDPOINT = ( + "http://metadata.google.internal/computeMetadata/" + "v1/instance/attributes/accelerator-type" +) +RAY_GCE_TPU_HEADERS = {"Metadata-Flavor": "Google"} # TPU VMs come with 4 chips per host and 2 tensorcores per chip. # For more details: https://cloud.google.com/tpu/docs/system-architecture-tpu-vm RAY_TPU_NUM_CHIPS_PER_HOST = 4 RAY_TPU_CORES_PER_CHIP = 2 + +# The following defines environment variables that allow +# us to access a subset of TPU visible chips. +# +# See: https://github.com/google/jax/issues/14977 for an example/more details. +TPU_VALID_CHIP_OPTIONS = (1, 2, 4) +TPU_CHIPS_PER_HOST_BOUNDS_ENV_VAR = "TPU_CHIPS_PER_HOST_BOUNDS" +TPU_CHIPS_PER_HOST_BOUNDS_1_CHIP_CONFIG = "1,1,1" +TPU_CHIPS_PER_HOST_BOUNDS_2_CHIP_CONFIG = "1,2,1" + +TPU_HOST_BOUNDS_ENV_VAR = "TPU_HOST_BOUNDS" +TPU_SINGLE_HOST_BOUNDS = "1,1,1" + RAY_NODE_IP_FILENAME = "node_ip_address.json" diff --git a/python/ray/_private/ray_option_utils.py b/python/ray/_private/ray_option_utils.py index 4d15fa7f9711..a014ec8dc771 100644 --- a/python/ray/_private/ray_option_utils.py +++ b/python/ray/_private/ray_option_utils.py @@ -1,8 +1,9 @@ """Manage, parse and validate options for Ray tasks, actors and actor methods.""" import warnings from dataclasses import dataclass -from typing import Any, Callable, Dict, Optional, Tuple, Union +from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union +import logging import ray from ray._private import ray_constants from ray._private.utils import get_ray_doc_version @@ -108,9 +109,35 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]: return None -def _validate_neuron_core_accelerator(options: Dict[str, Any]): - """Validate options for NeuronCore accelerator/ neuron_cores and GPU, - supports only one or the other (Either NeuronCore or GPU). +def _maybe_warn_misconfigured_tpu_chips(resources: Dict[str, Any]): + """Possibly warn against misconfigured TPU chip configuration.""" + num_tpus = resources.get(ray_constants.TPU, 0) + if num_tpus not in ray_constants.TPU_VALID_CHIP_OPTIONS: + logging.warning( + f"The number of requested 'TPU' was set to {num_tpus} which " + "is not a supported chip configuration. Supported configs: " + f"{ray_constants.TPU_VALID_CHIP_OPTIONS}" + ) + + +def _validate_accelerators(options: Dict[str, Any]): + """Validate options for accelerators - support only one out of multiple options. + + GPUs, NeuronCore accelerators (neuron_cores), and TPUs are valid options, but + individual nodes do not support heterogeneous accelerators. This function + guards against this setting. + + The control flow is as follows: + - For each accelerator, determine if the options indicate usage of + a particular accelerator, captured by booleans. + - For GPUs, this is set if num_gpus > 0. + - For custom resources, this is set if the + resource name ("neuron_cores" or "TPU") or + accelerator_type ("aws-neuron-core", "TPU-V2", "TPU-V3", etc.) + is requested. + - If we identify that >1 resource type is requested, + we raise an error indicating that heterogeneous raylets + are not supported. Args: options: The options to be validated. @@ -119,21 +146,57 @@ def _validate_neuron_core_accelerator(options: Dict[str, Any]): ValueError: If the options are invalid. """ num_gpus = options.get("num_gpus", None) - if num_gpus is not None and num_gpus > 0: - resources = options["resources"] if "resources" in options else None + non_zero_gpus = num_gpus is not None and num_gpus > 0 + resources = options["resources"] if "resources" in options else None + + def non_zero_custom_resource( + resource_id: str, accelerator_ids: Iterable[str] + ) -> bool: + """Return whether or not the options includes >0 of a custom resource.""" + result = False accelerator_type_value: str = options.get("accelerator_type", "") if resources is not None: - neuron_cores: int = resources.get(ray_constants.NEURON_CORES, 0) - if neuron_cores > 0: - raise ValueError( - "'num_gpus' cannot be used together with " - "neuron_cores/accelerator_type:aws-neuron-core." - ) - elif accelerator_type_value == accelerators.AWS_NEURON_CORE: - raise ValueError( - "'num_gpus' cannot be used together with " - "neuron_cores/accelerator_type:aws-neuron-core." - ) + num_resources: int = resources.get(resource_id, 0) + if num_resources > 0: + result = True + if accelerator_type_value in accelerator_ids: + result = True + return result + + non_zero_neuron_cores = non_zero_custom_resource( + resource_id=ray_constants.NEURON_CORES, + accelerator_ids=[accelerators.AWS_NEURON_CORE], + ) + non_zero_tpus = non_zero_custom_resource( + resource_id=ray_constants.TPU, + accelerator_ids=[ + accelerators.GOOGLE_TPU_V2, + accelerators.GOOGLE_TPU_V3, + accelerators.GOOGLE_TPU_V4, + ], + ) + + if non_zero_tpus: + _maybe_warn_misconfigured_tpu_chips(resources) + + num_configured_accelerators = sum( + [non_zero_gpus, non_zero_tpus, non_zero_neuron_cores] + ) + if num_configured_accelerators > 1: + hardware_requested = [] + if non_zero_gpus: + hardware_requested.append("GPU") + if non_zero_neuron_cores: + hardware_requested.append("neuron_cores") + if non_zero_tpus: + hardware_requested.append("TPU") + hardware_str = ",".join(hardware_requested) + raise ValueError( + "Only one of 'num_gpus', 'neuron_cores/accelerator_type:aws-neuron-core' " + "and 'TPU/accelerator_type:TPU-V*' can be set. " + f"Detected {num_configured_accelerators} " + f"options were configured: {hardware_str}." + ) _common_options = { @@ -320,7 +383,7 @@ def validate_task_options(options: Dict[str, Any], in_options: bool): if in_options and "max_calls" in options: raise ValueError("Setting 'max_calls' is not supported in '.options()'.") _check_deprecate_placement_group(options) - _validate_neuron_core_accelerator(options) + _validate_accelerators(options) def validate_actor_options(options: Dict[str, Any], in_options: bool): @@ -365,7 +428,7 @@ def validate_actor_options(options: Dict[str, Any], in_options: bool): ) _check_deprecate_placement_group(options) - _validate_neuron_core_accelerator(options) + _validate_accelerators(options) def update_options( diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 4cf5b01a0338..000ba730898e 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -90,8 +90,8 @@ def resolved(self): def to_resource_dict(self): """Returns a dict suitable to pass to raylet initialization. - This renames num_cpus / num_gpus to "CPU" / "GPU", translates memory - from bytes into 100MB memory units, and checks types. + This renames num_cpus / num_gpus to "CPU" / "GPU", + translates memory from bytes into 100MB memory units, and checks types. """ assert self.resolved() @@ -275,7 +275,12 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): ) spec = ResourceSpec( - num_cpus, num_gpus, memory, object_store_memory, resources, redis_max_memory + num_cpus, + num_gpus, + memory, + object_store_memory, + resources, + redis_max_memory, ) assert spec.resolved() return spec diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 976007b3f006..8abdd7e3f2a4 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -278,20 +278,23 @@ def compute_driver_id_from_job(job_id): def get_gpu_and_accelerator_runtime_ids() -> Mapping[str, Optional[List[str]]]: """ - Get the device IDs of GPUs (CUDA), accelerators(NeuronCore) using - (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES) environment variables. + Get the device IDs of GPUs (CUDA), accelerators(NeuronCore and TPUs) + using (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, TPU_VISIBLE_CHIPS) + environment variables. Returns: A dictionary with keys: - ray_constants.GPU: The list of device IDs of GPUs. - ray_constants.NEURON_CORES: The list of device IDs of accelerators. + - ray_constants.TPU: The list of device IDs of TPUs. If either of the environment variables is not set, returns None for corresponding key. """ return { ray_constants.GPU: get_cuda_visible_devices(), ray_constants.NEURON_CORES: get_aws_neuron_core_visible_ids(), + ray_constants.TPU: get_tpu_visible_chips(), } @@ -319,11 +322,25 @@ def get_aws_neuron_core_visible_ids() -> Optional[List[str]]: return _get_visible_ids(env_var=ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR) +def get_tpu_visible_chips() -> Optional[List[str]]: + """ + Get the device IDs using TPU_VISIBLE_CHIPS environment variable. + + Returns: + devices (List[str]): If environment variable is set, returns a + list of strings representing the IDs of the visible devices. + If it is not set or is set to NoDevFiles, returns empty list. + + """ + return _get_visible_ids(env_var=ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR) + + def _get_visible_ids(env_var: str) -> Optional[List[str]]: """Get the device IDs from defined environment variable. Args: env_var: Environment variable (e.g., CUDA_VISIBLE_DEVICES, - NEURON_RT_VISIBLE_CORES) to set based on the accelerator runtime. + NEURON_RT_VISIBLE_CORES, TPU_VISIBLE_CHIPS) to set based + on the accelerator runtime. Returns: devices (List[str]): If environment variable is set, returns a @@ -333,6 +350,7 @@ def _get_visible_ids(env_var: str) -> Optional[List[str]]: if env_var not in ( ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR, ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, + ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR, ): raise ValueError(f"Invalid environment variable {env_var} to get visible IDs.") visible_ids_str = os.environ.get(env_var, None) @@ -352,6 +370,7 @@ def _get_visible_ids(env_var: str) -> Optional[List[str]]: last_set_gpu_ids = None last_set_neuron_core_ids = None +last_set_tpu_chips = None def set_omp_num_threads_if_unset() -> bool: @@ -394,6 +413,20 @@ def set_omp_num_threads_if_unset() -> bool: return True +def set_gpu_and_accelerator_runtime_ids() -> None: + """Set (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, TPU_VISIBLE_CHIPS ,...) + environment variables based on the accelerator runtime. + + Raises: + ValueError: If the environment variable is set to a different + environment variable. + """ + ids = ray.get_runtime_context().get_resource_ids() + set_cuda_visible_devices(ids[ray_constants.GPU]) + set_aws_neuron_core_visible_ids(ids[ray_constants.NEURON_CORES]) + set_tpu_visible_ids_and_bounds(ids[ray_constants.TPU]) + + def set_cuda_visible_devices(gpu_ids: List[str]): """Set the CUDA_VISIBLE_DEVICES environment variable. @@ -409,19 +442,6 @@ def set_cuda_visible_devices(gpu_ids: List[str]): last_set_gpu_ids = gpu_ids -def set_gpu_and_accelerator_runtime_ids() -> None: - """Set (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, ..) environment variables - based on the accelerator runtime. - - Raises: - ValueError: If the environment variable is set to a different - environment variable. - """ - ids = ray.get_runtime_context().get_resource_ids() - set_cuda_visible_devices(ids[ray_constants.GPU]) - set_aws_neuron_core_visible_ids(ids[ray_constants.NEURON_CORES]) - - def set_aws_neuron_core_visible_ids(neuron_core_ids: List[str]) -> None: """Set the NEURON_RT_VISIBLE_CORES environment variable based on given neuron_core_ids. @@ -438,10 +458,51 @@ def set_aws_neuron_core_visible_ids(neuron_core_ids: List[str]) -> None: last_set_neuron_core_ids = neuron_core_ids +def set_tpu_visible_ids_and_bounds(tpu_chips: List[str]) -> None: + """Set TPU environment variables based on the provided tpu_chips. + + To access a subset of the TPU visible chips, we must use a combination of + environment variables that tells the compiler (via ML framework) the: + - Visible chips + - The physical bounds of chips per host + - The host bounds within the context of a TPU pod. + + See: https://github.com/google/jax/issues/14977 for an example/more details. + + Args: + tpu_chips (List[str]): List of int representing TPU chips. + """ + if os.environ.get(ray_constants.NOSET_TPU_VISIBLE_CHIPS_ENV_VAR): + return + global last_set_tpu_chips + if last_set_tpu_chips == tpu_chips: + return # optimization: already set + if len(tpu_chips) == ray_constants.RAY_TPU_NUM_CHIPS_PER_HOST: + # Let the ML framework use the defaults + return + _set_visible_ids(tpu_chips, ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR) + num_chips = len(tpu_chips) + if num_chips == 1: + os.environ[ + ray_constants.TPU_CHIPS_PER_HOST_BOUNDS_ENV_VAR + ] = ray_constants.TPU_CHIPS_PER_HOST_BOUNDS_1_CHIP_CONFIG + os.environ[ + ray_constants.TPU_HOST_BOUNDS_ENV_VAR + ] = ray_constants.TPU_SINGLE_HOST_BOUNDS + elif num_chips == 2: + os.environ[ + ray_constants.TPU_CHIPS_PER_HOST_BOUNDS_ENV_VAR + ] = ray_constants.TPU_CHIPS_PER_HOST_BOUNDS_2_CHIP_CONFIG + os.environ[ + ray_constants.TPU_HOST_BOUNDS_ENV_VAR + ] = ray_constants.TPU_SINGLE_HOST_BOUNDS + last_set_tpu_chips = tpu_chips + + def _set_visible_ids(visible_ids: List[str], env_var: str): - """Set the environment variable (e.g., CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES) - passed based on accelerator runtime and will raise an error if the function uses - different environment variable. + """Set the environment variable (e.g., CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, + TPU_VISIBLE_CHIPS) passed based on accelerator runtime and will raise an error if + the function uses different environment variable. Args: visible_ids (List[str]): List of strings representing GPU IDs or NeuronCore IDs. @@ -451,6 +512,7 @@ def _set_visible_ids(visible_ids: List[str], env_var: str): if env_var not in ( ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR, ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, + ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR, ): raise ValueError(f"Invalid environment variable {env_var} to set visible IDs.") os.environ[env_var] = ",".join([str(i) for i in visible_ids]) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index ebe72fe3d2ca..831653596799 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -426,7 +426,8 @@ def __init__(self): self.mode = None self.actors = {} # When the worker is constructed. Record the original value of the - # (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, ..) environment variables. + # (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, TPU_VISIBLE_CHIPS, ..) + # environment variables. self.original_gpu_and_accelerator_runtime_ids = ( ray._private.utils.get_gpu_and_accelerator_runtime_ids() ) @@ -861,9 +862,9 @@ def get_resource_ids_for_resource( assigned_ids.add(resource_id) # If the user had already set the environment variables - # (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, ..) then respect that - # in the sense that only IDs that appear in (CUDA_VISIBLE_DEVICES, - # NEURON_RT_VISIBLE_CORES, ..) should be returned. + # (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, TPU_VISIBLE_CHIPS, ..) then + # respect that in the sense that only IDs that appear in (CUDA_VISIBLE_DEVICES, + # NEURON_RT_VISIBLE_CORES, TPU_VISIBLE_CHIPS, ..) should be returned. if ( self.original_gpu_and_accelerator_runtime_ids.get(resource_name, None) is not None diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index a0faaac40514..5f1dd37415c7 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1837,7 +1837,7 @@ cdef execute_task_with_cancellation_handler( task_name = name.decode("utf-8") title = f"ray::{task_name}" - # Automatically restrict the GPUs (CUDA), neuron_core accelerator + # Automatically restrict the GPUs (CUDA), neuron_core, TPU accelerator # runtime_ids to restrict availability to this task. ray._private.utils.set_gpu_and_accelerator_runtime_ids() diff --git a/python/ray/autoscaler/_private/gcp/config.py b/python/ray/autoscaler/_private/gcp/config.py index 4f186960294b..ebd56118806b 100644 --- a/python/ray/autoscaler/_private/gcp/config.py +++ b/python/ray/autoscaler/_private/gcp/config.py @@ -13,7 +13,7 @@ from google.oauth2.credentials import Credentials as OAuthCredentials from googleapiclient import discovery, errors -from ray._private import ray_constants +from ray._private import accelerator, ray_constants from ray.autoscaler._private.gcp.node import MAX_POLLS, POLL_INTERVAL, GCPNodeType from ray.autoscaler._private.util import check_legacy_fields @@ -65,12 +65,7 @@ def _validate_tpu_config(node: dict): ) if "acceleratorType" in node: accelerator_type = node["acceleratorType"] - expected_pattern = re.compile(r"^v\d+[a-zA-Z]*-\d+$") - if not expected_pattern.match(accelerator_type): - raise ValueError( - "`acceleratorType` should match v(generation)-(cores/chips)" - f"Got {accelerator_type}." - ) + accelerator.assert_tpu_accelerator_type(accelerator_type) else: # "acceleratorConfig" in node accelerator_config = node["acceleratorConfig"] if "type" not in accelerator_config or "topology" not in accelerator_config: diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index f6522c0b440f..8f728182270d 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -266,6 +266,8 @@ def resources_avail_summary(self) -> str: out = "{} CPUs".format(int(total_resources.get("CPU", 0))) if "GPU" in total_resources: out += ", {} GPUs".format(int(total_resources["GPU"])) + if "TPU" in total_resources: + out += ", {} TPUs".format(int(total_resources["TPU"])) return out def summary(self): diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index d01193bf5b81..01456667ddb0 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -443,7 +443,7 @@ def emit_metrics(self, load_metrics_summary, autoscaler_summary, node_types): if autoscaler_summary is None: return None - for resource_name in ["CPU", "GPU"]: + for resource_name in ["CPU", "GPU", "TPU"]: _, total = load_metrics_summary.usage.get(resource_name, (0, 0)) pending = autoscaler_summary.pending_resources.get(resource_name, 0) self.prom_metrics.cluster_resources.labels( @@ -493,14 +493,14 @@ def emit_metrics(self, load_metrics_summary, autoscaler_summary, node_types): def update_event_summary(self): """Report the current size of the cluster. - To avoid log spam, only cluster size changes (CPU or GPU count change) + To avoid log spam, only cluster size changes (CPU, GPU or TPU count change) are reported to the event summarizer. The event summarizer will report only the latest cluster size per batch. """ avail_resources = self.load_metrics.resources_avail_summary() if not self.readonly_config and avail_resources != self.last_avail_resources: self.event_summarizer.add( - "Resized to {}.", # e.g., Resized to 100 CPUs, 4 GPUs. + "Resized to {}.", # e.g., Resized to 100 CPUs, 4 GPUs, 4 TPUs. quantity=avail_resources, aggregate=lambda old, new: new, ) diff --git a/python/ray/runtime_context.py b/python/ray/runtime_context.py index 5d212398efed..003411d2245b 100644 --- a/python/ray/runtime_context.py +++ b/python/ray/runtime_context.py @@ -394,16 +394,17 @@ def _get_actor_call_stats(self): def get_resource_ids(self) -> Dict[str, List[str]]: """ - Get the current worker's GPU and accelerator ids. + Get the current worker's GPU, accelerator and TPU ids. Returns: - A dictionary keyed by the resource name. The values are list - of ids `{'GPU': ['0', '1'], 'neuron_cores': ['0', '1']}`. + A dictionary keyed by the resource name. The values are lists of ids. + Example: + {'GPU': ['0', '1'], 'neuron_cores': ['0', '1'], 'TPU': ['0', '1']}. """ worker = self.worker worker.check_connected() ids_dict: Dict[str, List[str]] = {} - for name in [ray_constants.GPU, ray_constants.NEURON_CORES]: + for name in [ray_constants.GPU, ray_constants.NEURON_CORES, ray_constants.TPU]: resource_ids = worker.get_resource_ids_for_resource( name, f"^{name}_group_[0-9A-Za-z]+$" ) diff --git a/python/ray/tests/test_accelerator.py b/python/ray/tests/test_accelerator.py index 33a5f4f4d2ce..51ca5a7ff987 100644 --- a/python/ray/tests/test_accelerator.py +++ b/python/ray/tests/test_accelerator.py @@ -1,9 +1,17 @@ import mock import pytest +import os + +import logging +from unittest.mock import patch +import requests + import ray._private.accelerator as accelerator import ray._private.utils as utils import ray._private.ray_constants as ray_constants +from ray._private.ray_option_utils import _validate_accelerators +from ray.util.accelerators.accelerators import AWS_NEURON_CORE def test_configured_aws_neuron_core(): @@ -13,9 +21,7 @@ def test_configured_aws_neuron_core(): assert resources.get(ray_constants.NEURON_CORES) == 4 -@mock.patch( - "ray._private.utils.get_aws_neuron_core_visible_ids", return_value=[0, 1, 2] -) +@patch("ray._private.utils.get_aws_neuron_core_visible_ids", return_value=[0, 1, 2]) def test_aws_neuron_core_with_more_user_configured(mock_get_nc_ids): resources = {"CPU": 1, "neuron_cores": 4} with pytest.raises(ValueError): @@ -23,7 +29,7 @@ def test_aws_neuron_core_with_more_user_configured(mock_get_nc_ids): assert mock_get_nc_ids.called -@mock.patch("ray._private.accelerator._autodetect_aws_neuron_cores", return_value=2) +@patch("ray._private.accelerator._autodetect_aws_neuron_cores", return_value=2) def test_auto_detect_aws_neuron_core(mock_autodetect_aws_neuron_cores): resources = {"CPU": 1} accelerator.update_resources_with_accelerator_type(resources) @@ -32,10 +38,8 @@ def test_auto_detect_aws_neuron_core(mock_autodetect_aws_neuron_cores): assert resources.get(ray_constants.NEURON_CORES) == 2 -@mock.patch( - "ray._private.utils.get_aws_neuron_core_visible_ids", return_value=[0, 1, 2] -) -@mock.patch("ray._private.accelerator._autodetect_aws_neuron_cores", return_value=4) +@patch("ray._private.utils.get_aws_neuron_core_visible_ids", return_value=[0, 1, 2]) +@patch("ray._private.accelerator._autodetect_aws_neuron_cores", return_value=4) def test_auto_detect_nc_with_more_user_configured( mock_get_nc_ids, mock_autodetect_aws_neuron_cores ): @@ -47,7 +51,7 @@ def test_auto_detect_nc_with_more_user_configured( assert resources.get(ray_constants.NEURON_CORES) == 3 -@mock.patch("subprocess.run") +@patch("subprocess.run") def test_get_neuron_core_count_single_device(mock_subprocess): mock_subprocess.return_value.returncode = 0 mock_subprocess.return_value.stdout = ( @@ -59,7 +63,7 @@ def test_get_neuron_core_count_single_device(mock_subprocess): assert mock_subprocess.called -@mock.patch("subprocess.run") +@patch("subprocess.run") def test_get_neuron_core_count_multiple_devices(mock_subprocess): mock_subprocess.return_value.returncode = 0 mock_subprocess.return_value.stdout = ( @@ -73,7 +77,7 @@ def test_get_neuron_core_count_multiple_devices(mock_subprocess): assert mock_subprocess.called -@mock.patch("subprocess.run") +@patch("subprocess.run") def test_get_neuron_core_count_failure_with_error(mock_subprocess): mock_subprocess.return_value.returncode = 1 mock_subprocess.return_value.stderr = b"AccessDenied" @@ -81,7 +85,7 @@ def test_get_neuron_core_count_failure_with_error(mock_subprocess): assert mock_subprocess.called -@mock.patch("subprocess.run") +@patch("subprocess.run") def test_get_neuron_core_count_failure_with_empty_results(mock_subprocess): mock_subprocess.return_value.returncode = 0 mock_subprocess.return_value.stdout = b"[{}]" @@ -89,9 +93,189 @@ def test_get_neuron_core_count_failure_with_empty_results(mock_subprocess): assert mock_subprocess.called +@patch("glob.glob") +def test_autodetect_num_tpus_accel(mock_glob): + mock_glob.return_value = [ + "/dev/accel0", + "/dev/accel1", + "/dev/accel2", + "/dev/accel3", + ] + assert accelerator._autodetect_num_tpus() == 4 + + +@patch("glob.glob") +@patch("os.listdir") +def test_autodetect_num_tpus_vfio(mock_list, mock_glob): + mock_glob.return_value = [] + mock_list.return_value = [f"{i}" for i in range(4)] + assert accelerator._autodetect_num_tpus() == 4 + + +@patch("glob.glob") +@patch("os.listdir") +def test_autodetect_num_tpus_without_devices(mock_list, mock_glob): + mock_list.side_effect = FileNotFoundError + mock_glob.return_value = [] + assert accelerator._autodetect_num_tpus() == 0 + + +@pytest.mark.parametrize( + "accelerator_type_version_tuple", + [ + ("gce", "v2-8", "TPU-V2"), + ("gce", "v2-32", "TPU-V2"), + ("gce", "v3-8", "TPU-V3"), + ("gce", "v3-128", "TPU-V3"), + ("gce", "v4-8", "TPU-V4"), + ("gce", "v4-2048", "TPU-V4"), + ("gke", "v2-8", "TPU-V2"), + ("gke", "v2-32", "TPU-V2"), + ("gke", "v3-8", "TPU-V3"), + ("gke", "v3-128", "TPU-V3"), + ("gke", "v4-8", "TPU-V4"), + ("gke", "v4-2048", "TPU-V4"), + ], +) +@patch("requests.get") +@patch("os.getenv") +def test_autodetect_tpu_version(mock_os, mock_request, accelerator_type_version_tuple): + gce_or_gke, accelerator_type, expected_version = accelerator_type_version_tuple + if gce_or_gke == "gce": + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_response.text = accelerator_type + mock_request.return_value = mock_response + mock_os.return_value = None + else: + mock_os.return_value = accelerator_type + assert accelerator._autodetect_tpu_version() == expected_version + + +@pytest.mark.parametrize( + "test_case", + [ + ("gce", "not-a-valid-version"), + ("gce", "vNOTVALID-8"), + ("gce", "230498230948230948"), + ("gke", "not-a-valid-version"), + ("gke", "vNOTVALID-8"), + ("gke", "230498230948230948"), + ], +) +@patch("requests.get") +@patch("os.getenv") +def test_autodetect_invalid_type(mock_os, mock_request, test_case): + gce_or_gke, accelerator_type = test_case + if gce_or_gke == "gce": + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_response.text = accelerator_type + mock_request.return_value = mock_response + mock_os.return_value = None + else: + mock_os.return_value = accelerator_type + with pytest.raises(ValueError): + accelerator._autodetect_tpu_version() + + +def test_autodetect_tpu_fails_gracefully(): + with patch("requests.get") as mock_get: + mock_get.side_effect = requests.exceptions.RequestException + tpu_result = accelerator._autodetect_tpu_version() + assert tpu_result is None + + +@pytest.mark.parametrize( + "test_config", + [ + (1, 0, 0, False, False), + (0, 1, 0, False, False), + (0, 0, 1, False, False), + (0, 0, 0, True, False), + (1, 1, 0, False, True), + (0, 1, 1, False, True), + (0, 1, 0, True, True), + (1, 0, 0, True, True), + ], +) +def test_validate_accelerator_options(test_config): + num_gpus, num_tpus, num_neuron_cores, use_neuron_acc, expect_error = test_config + options = { + "num_gpus": num_gpus, + "resources": {}, + } + + if use_neuron_acc: + options["accelerator_type"] = AWS_NEURON_CORE + if num_neuron_cores > 0: + options["resources"]["neuron_cores"] = num_neuron_cores + if num_tpus > 0: + options["resources"]["TPU"] = num_tpus + + if expect_error: + with pytest.raises(ValueError): + _validate_accelerators(options) + else: + # Should run without raising an error + _validate_accelerators(options) + + +@pytest.mark.parametrize( + "tpu_chips", + [ + ["1"], + ["1", "2"], + ["1", "2", "3", "4"], + ], +) +def test_set_tpu_visible_ids_and_bounds(tpu_chips): + with patch.dict("os.environ", {}, clear=True): + utils.set_tpu_visible_ids_and_bounds(tpu_chips=tpu_chips) + if len(tpu_chips) == 1: + assert ( + os.environ[ray_constants.TPU_CHIPS_PER_HOST_BOUNDS_ENV_VAR] + == ray_constants.TPU_CHIPS_PER_HOST_BOUNDS_1_CHIP_CONFIG + ) + assert ( + os.environ[ray_constants.TPU_HOST_BOUNDS_ENV_VAR] + == ray_constants.TPU_SINGLE_HOST_BOUNDS + ) + assert os.environ[ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR] == ",".join( + tpu_chips + ) + elif len(tpu_chips) == 2: + assert ( + os.environ[ray_constants.TPU_CHIPS_PER_HOST_BOUNDS_ENV_VAR] + == ray_constants.TPU_CHIPS_PER_HOST_BOUNDS_2_CHIP_CONFIG + ) + assert ( + os.environ[ray_constants.TPU_HOST_BOUNDS_ENV_VAR] + == ray_constants.TPU_SINGLE_HOST_BOUNDS + ) + assert os.environ[ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR] == ",".join( + tpu_chips + ) + else: # len(tpu_chips) == 4 + # Check that nothing is set, let the ML framework use the defaults. + assert ( + os.environ.get(ray_constants.TPU_CHIPS_PER_HOST_BOUNDS_ENV_VAR, None) + is None + ) + assert os.environ.get(ray_constants.TPU_SINGLE_HOST_BOUNDS, None) is None + assert os.environ.get(ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR, None) is None + + +@pytest.mark.parametrize("num_tpus", [3, 8, 10, 0.3, 0.2]) +def test_invalid_tpu_chip_configuration_warning(propagate_logs, caplog, num_tpus): + options = {"resources": {"TPU": num_tpus}} + with caplog.at_level(logging.WARNING, logger="ray._private.ray_option_utils"): + _validate_accelerators(options) + assert "not a supported chip configuration" in caplog.text + + if __name__ == "__main__": import sys - import os if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/tests/test_autoscaler_fake_multinode.py b/python/ray/tests/test_autoscaler_fake_multinode.py index 7fb8199a0588..9f8ee37c1e0c 100644 --- a/python/ray/tests/test_autoscaler_fake_multinode.py +++ b/python/ray/tests/test_autoscaler_fake_multinode.py @@ -30,6 +30,16 @@ def test_fake_autoscaler_basic_e2e(shutdown_only): "min_workers": 0, "max_workers": 2, }, + "tpu_node": { + "resources": { + "CPU": 2, + "TPU": 4, + "object_store_memory": 1024 * 1024 * 1024, + }, + "node_config": {}, + "min_workers": 0, + "max_workers": 2, + }, }, ) @@ -47,8 +57,14 @@ def f(): def g(): print("cpu ok") + # Triggers the addition of a TPU node. + @ray.remote(resources={"TPU": 4}) + def h(): + print("tpu ok") + ray.get(f.remote()) ray.get(g.remote()) + ray.get(h.remote()) ray.shutdown() finally: cluster.shutdown() diff --git a/python/ray/util/accelerators/__init__.py b/python/ray/util/accelerators/__init__.py index c30f5936e0df..49d846aede52 100644 --- a/python/ray/util/accelerators/__init__.py +++ b/python/ray/util/accelerators/__init__.py @@ -7,6 +7,9 @@ NVIDIA_TESLA_A100, NVIDIA_TESLA_A10G, AWS_NEURON_CORE, + GOOGLE_TPU_V2, + GOOGLE_TPU_V3, + GOOGLE_TPU_V4, ) __all__ = [ @@ -18,4 +21,7 @@ "NVIDIA_TESLA_A100", "NVIDIA_TESLA_A10G", "AWS_NEURON_CORE", + "GOOGLE_TPU_V2", + "GOOGLE_TPU_V3", + "GOOGLE_TPU_V4", ] diff --git a/python/ray/util/accelerators/accelerators.py b/python/ray/util/accelerators/accelerators.py index 291b2222b157..b9be5ff06a18 100644 --- a/python/ray/util/accelerators/accelerators.py +++ b/python/ray/util/accelerators/accelerators.py @@ -6,3 +6,6 @@ NVIDIA_TESLA_A100 = "A100" NVIDIA_TESLA_A10G = "A10G" AWS_NEURON_CORE = "aws-neuron-core" +GOOGLE_TPU_V2 = "TPU-V2" +GOOGLE_TPU_V3 = "TPU-V3" +GOOGLE_TPU_V4 = "TPU-V4" diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 899aede1f10c..eed3dc963319 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -691,9 +691,11 @@ RAY_CONFIG(uint32_t, RAY_CONFIG(std::string, predefined_unit_instance_resources, "GPU") /// The scheduler will treat these custom resource types as unit_instance. -/// Default custom_unit_instance_resources is "neuron_cores". -/// When set it to "neuron_cores,FPGA", we will also treat FPGA as unit_instance. -RAY_CONFIG(std::string, custom_unit_instance_resources, "neuron_cores") +/// This allows the scheduler to provide chip IDs for custom resources like +/// "neuron_cores", "TPUs" and "FPGAs". +/// Default custom_unit_instance_resources is "neuron_cores,TPU". +/// When set it to "neuron_cores,TPU,FPGA", we will also treat FPGA as unit_instance. +RAY_CONFIG(std::string, custom_unit_instance_resources, "neuron_cores,TPU") // Maximum size of the batches when broadcasting resources to raylet. RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512) diff --git a/src/ray/common/test/scheduling_ids_test.cc b/src/ray/common/test/scheduling_ids_test.cc index d85aa3b08bf7..f06a5cd10544 100644 --- a/src/ray/common/test/scheduling_ids_test.cc +++ b/src/ray/common/test/scheduling_ids_test.cc @@ -55,13 +55,14 @@ TEST_F(SchedulingIDsTest, UnitInstanceResourceTest) { R"( { "predefined_unit_instance_resources": "CPU,GPU", - "custom_unit_instance_resources": "neuron_cores,custom1" + "custom_unit_instance_resources": "neuron_cores,TPU,custom1" } )"); ASSERT_TRUE(ResourceID::CPU().IsUnitInstanceResource()); ASSERT_TRUE(ResourceID::GPU().IsUnitInstanceResource()); ASSERT_TRUE(ResourceID("custom1").IsUnitInstanceResource()); ASSERT_TRUE(ResourceID("neuron_cores").IsUnitInstanceResource()); + ASSERT_TRUE(ResourceID("TPU").IsUnitInstanceResource()); ASSERT_FALSE(ResourceID::Memory().IsUnitInstanceResource()); ASSERT_FALSE(ResourceID("custom2").IsUnitInstanceResource());