From 4f2662e99187cf7d4f294315f904b0c89ade6134 Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Fri, 16 Jun 2023 16:27:40 +0800 Subject: [PATCH 1/6] [Core] Support intel GPU Signed-off-by: Wu, Gangsheng --- python/ray/_private/ray_constants.py | 9 +++ python/ray/_private/resource_spec.py | 96 ++++++++++++++++++------ python/ray/_private/utils.py | 45 +++++++++++ python/ray/_private/worker.py | 68 +++++++++++++++-- python/ray/_raylet.pyx | 2 +- python/ray/tests/test_actor_resources.py | 30 ++++++-- python/ray/tests/test_advanced_2.py | 4 +- python/ray/tests/test_advanced_6.py | 25 ++++++ python/ray/tests/test_basic.py | 4 +- 9 files changed, 243 insertions(+), 40 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index c274095b2750..d8ff6123837e 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -382,6 +382,15 @@ def env_set_by_user(key): LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"] NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" +NOSET_XPU_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_XPU_VISIBLE_DEVICES" + +RAY_DEVICE_XPU_SELECTOR_ENV_VAR = "ONEAPI_DEVICE_SELECTOR" +RAY_DEVICE_XPU_BACKEND_TYPE = "level_zero" +RAY_DEVICE_XPU_DEVICE_TYPE = "gpu" + +RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"} +RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA" + RAY_WORKER_NICENESS = "RAY_worker_niceness" # Default max_retries option in @ray.remote for non-actor diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index d916a7db43b6..25cf6fd8a25b 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -168,35 +168,20 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if is_head: resources[HEAD_NODE_RESOURCE_NAME] = 1.0 + # get cpu num num_cpus = self.num_cpus if num_cpus is None: num_cpus = ray._private.utils.get_num_cpus() - num_gpus = self.num_gpus - gpu_ids = ray._private.utils.get_cuda_visible_devices() - # Check that the number of GPUs that the raylet wants doesn't - # exceed the amount allowed by CUDA_VISIBLE_DEVICES. - if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): - raise ValueError( - "Attempting to start raylet with {} GPUs, " - "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) - ) - if num_gpus is None: - # Try to automatically detect the number of GPUs. - num_gpus = _autodetect_num_gpus() - # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. - if gpu_ids is not None: - num_gpus = min(num_gpus, len(gpu_ids)) - - try: - if importlib.util.find_spec("GPUtil") is not None: - gpu_types = _get_gpu_types_gputil() - else: - info_string = _get_gpu_info_string() - gpu_types = _constraints_from_gpu_info(info_string) + # get accelerate device info + if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": # get cuda device num + num_gpus, gpu_types = _get_cuda_info(self.num_gpus) + resources.update(gpu_types) + elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": # get xpu device num + # here we take xpu as gpu, so no need to develop core's scheduling policy + # If we don't want to take xpu as gpu, ray core need to develop new scheduling policy + num_gpus, gpu_types = _get_xpu_info(self.num_gpus) resources.update(gpu_types) - except Exception: - logger.exception("Could not parse gpu information.") # Choose a default object store size. system_memory = ray._private.utils.get_system_memory() @@ -276,6 +261,69 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): return spec +def _get_cuda_info(num_gpus): + """ Attemp to process the number and type of GPUs + Notice: + If gpu id not specified in CUDA_VISIBLE_DEVICES, + and num_gpus is defined in task or actor, + this function will return the input num_gpus, not 0 + + Returns: + (num_gpus, gpu_types) + """ + gpu_ids = ray._private.utils.get_cuda_visible_devices() + # Check that the number of GPUs that the raylet wants doesn't + # exceed the amount allowed by CUDA_VISIBLE_DEVICES. + if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): + raise ValueError( + "Attempting to start raylet with {} GPUs, " + "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) + ) + if num_gpus is None: + # Try to automatically detect the number of GPUs. + num_gpus = _autodetect_num_gpus() + # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. + if gpu_ids is not None: + num_gpus = min(num_gpus, len(gpu_ids)) + + gpu_types = "" + try: + if importlib.util.find_spec("GPUtil") is not None: + gpu_types = _get_gpu_types_gputil() + else: + info_string = _get_gpu_info_string() + gpu_types = _constraints_from_gpu_info(info_string) + except Exception: + logger.exception("Could not parse gpu information.") + + return num_gpus, gpu_types + + +def _get_xpu_info(num_xpus): + """Attempt to process the number of XPUs as GPUs + Returns: + The number of XPUs that detected by dpctl with specific backend and device type + """ + xpu_ids = ray._private.utils.get_xpu_visible_devices() + if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids): + raise ValueError( + "Attempting to start raylet with {} XPUs, " + "but XPU_VISIBLE_DEVICES contains {}.".format(num_xpus, xpu_ids) + ) + if num_xpus is None: + try: + import dpctl + num_xpus = len(dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE)) + except ImportError: + num_xpus = 0 + + if xpu_ids is not None: + num_xpus = min(num_xpus, len(xpu_ids)) + xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1} + return num_xpus, xpu_types + + def _autodetect_num_gpus(): """Attempt to detect the number of GPUs on this machine. diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index ac5d35fd5676..ede243f22e88 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -298,6 +298,30 @@ def get_cuda_visible_devices(): last_set_gpu_ids = None +def get_xpu_visible_devices(): + """Get the devices IDs in the XPU_VISIBLE_DEVICES environment variable. + Returns: + devices (List[str]): If XPU_VISIBLE_DEVICES is set, return a + list of strings representing the IDs of the visible XPUs. + If it is not set or is set, returns empty list. + """ + xpu_ids_str = os.environ.get("XPU_VISIBLE_DEVICES", None) + if xpu_ids_str is None: + return None + + if xpu_ids_str == "": + return [] + + return list(xpu_ids_str.split(",")) + + +def get_gpu_visible_devices(): + if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": + return get_cuda_visible_devices() + elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": + return get_xpu_visible_devices() + + def set_omp_num_threads_if_unset() -> bool: """Set the OMP_NUM_THREADS to default to num cpus assigned to the worker @@ -356,6 +380,27 @@ def set_cuda_visible_devices(gpu_ids): last_set_gpu_ids = gpu_ids +def set_xpu_visible_devices(xpu_ids): + """Set the ONEAPI_DEVICE_SELECTOR environment variable. + Args: + xpu_ids (List[str]): List of strings representing GPU IDs + """ + + if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR): + return + + ids_str = ",".join([str(i) for i in xpu_ids]) + os.environ["XPU_VISIBLE_DEVICES"] = ids_str + os.environ["ONEAPI_DEVICE_SELECTOR"] = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str + + +def set_gpu_visible_devices(gpu_ids): + if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": + set_cuda_visible_devices(gpu_ids) + elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": + set_xpu_visible_devices(gpu_ids) + + def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: """Determine a task's resource requirements. diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 5b045fe26e80..352d2fce0f08 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -422,9 +422,10 @@ def __init__(self): self.node = None self.mode = None self.actors = {} - # When the worker is constructed. Record the original value of the - # CUDA_VISIBLE_DEVICES environment variable. - self.original_gpu_ids = ray._private.utils.get_cuda_visible_devices() + # When the worker is constructed. + # Record the original value of the CUDA_VISIBLE_DEVICES environment variable. + # Record the original value of the XPU_VISIBLE_DEVICES environment variable. + self.original_gpu_ids = ray._private.utils.get_gpu_visible_devices() # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. self.serialization_context_map = {} @@ -830,9 +831,7 @@ def print_logs(self): subscriber.close() -@PublicAPI -@client_mode_hook -def get_gpu_ids(): +def get_cuda_ids(): """Get the IDs of the GPUs that are available to the worker. If the CUDA_VISIBLE_DEVICES environment variable was set when the worker @@ -883,6 +882,63 @@ def get_gpu_ids(): return assigned_ids +def get_xpu_ids(): + """ Get the IDs of the XPUs that are available to the worker. + If the XPU_VISIBLE_DEVICES environment variable was set when the worker + started up, + Returns: + A list of XPU IDs + """ + worker = global_worker + worker.check_connected() + + if worker.mode != WORKER_MODE: + if log_once("worker_get_gpu_ids_empty_from_driver"): + logger.warning( + "`ray.get_xpu_ids()` will always return the empty list when " + "called from the driver. This is because Ray does not manage " + "XPU allocations to the driver process." + ) + # Here we use `dpctl` to detect XPU device: + # Enumrate all device by API dpctl.get_devices + # Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset + # Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR + # Another method to enumrate XPU device is to use C++ API, maybe can upgrade laster + has_dpctl = True + try: + import dpctl + except ImportError: + has_dpctl = False + + if not has_dpctl: + return [] + + xpu_devices = dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE) + xpu_ava_ids = set() + xpu_dev_prefix = f"{ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE}:{ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE}" + for xpu_dev in xpu_devices: + xpu_id = int(xpu_dev.filter_string.split(xpu_dev_prefix)[1]) + xpu_ava_ids.add(xpu_id) + + xpu_ids = [] + if global_worker.original_gpu_ids is not None: + xpu_ids = [ + global_worker.original_gpu_ids[xpu_id] for xpu_id in xpu_ava_ids + ] + + return xpu_ids + + +@PublicAPI +@client_mode_hook +def get_gpu_ids(): + if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": + return get_cuda_ids() + elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": + return get_xpu_ids() + + @Deprecated( message="Use ray.get_runtime_context().get_assigned_resources() instead.", warning=True, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 6031816fc3a7..1f35a8593511 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1693,7 +1693,7 @@ cdef execute_task_with_cancellation_handler( title = f"ray::{task_name}" # Automatically restrict the GPUs available to this task. - ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids()) + ray._private.utils.set_gpu_visible_devices(ray.get_gpu_ids()) # Automatically configure OMP_NUM_THREADS to the assigned CPU number. # It will be unset after the task execution if it was overwridden here. diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 6aa6f01f4fec..3a30e5de5c8f 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -8,6 +8,8 @@ import ray import ray.cluster_utils +import ray._private.ray_constants as ray_constants + try: import pytest_timeout except ImportError: @@ -80,7 +82,9 @@ def echo(self, value): @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -def test_actor_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_gpus(ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 4 @@ -123,7 +127,9 @@ def get_location_and_ids(self): assert ready_ids == [] -def test_actor_multiple_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_multiple_gpus(ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 5 @@ -201,7 +207,9 @@ def get_location_and_ids(self): @pytest.mark.skipif(sys.platform == "win32", reason="Very flaky.") -def test_actor_different_numbers_of_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_different_numbers_of_gpus(ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE # Test that we can create actors on two nodes that have different # numbers of GPUs. cluster = ray_start_cluster @@ -243,7 +251,9 @@ def get_location_and_ids(self): assert ready_ids == [] -def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -320,7 +330,9 @@ def get_location_and_ids(self): assert ready_ids == [] -def test_actors_and_tasks_with_gpus(enable_syncer_test, ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actors_and_tasks_with_gpus(enable_syncer_test, ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -439,7 +451,9 @@ def locations_to_intervals_for_many_tasks(): assert len(ready_ids) == 0 -def test_actors_and_tasks_with_gpus_version_two(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actors_and_tasks_with_gpus_version_two(shutdown_only, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs num_gpus = 4 @@ -616,7 +630,9 @@ def get_location(self): assert location == custom_resource2_node.unique_id -def test_creating_more_actors_than_resources(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_creating_more_actors_than_resources(shutdown_only, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE ray.init(num_cpus=10, num_gpus=2, resources={"CustomResource1": 1}) @ray.remote(num_gpus=1) diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 8bb2838c61e6..42234e35ce13 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -137,7 +137,9 @@ def method(self): assert valid_node.unique_id == ray.get(a.method.remote()) -def test_fractional_resources(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_fractional_resources(shutdown_only, ACCELERATOR_TYPE): + ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1}) @ray.remote(num_gpus=0.5) diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index 3e916f7eebcb..82d044f585a2 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -39,6 +39,7 @@ def save_gpu_ids_shutdown_only(): @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") def test_specific_gpus(save_gpu_ids_shutdown_only): + ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA" allowed_gpu_ids = [4, 5, 6] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) ray.init(num_gpus=3) @@ -60,6 +61,30 @@ def g(): ray.get([g.remote() for _ in range(100)]) +@pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") +def test_specific_xpus(save_gpu_ids_shutdown_only): + ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "XPU" + allowed_xpu_ids = [1, 3, 5] + os.environ["XPU_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_xpu_ids]) + ray.init(num_gpus=3) + + @ray.remote(num_gpus=1) + def f(): + xpu_ids = ray.get_xpu_ids() + assert len(xpu_ids) == 1 + assert int(xpu_ids[0]) in allowed_xpu_ids + + @ray.remote(num_gpus=2) + def g(): + xpu_ids = ray.get_xpu_ids() + assert len(xpu_ids) == 2 + assert int(xpu_ids[0]) in allowed_xpu_ids + assert int(xpu_ids[1]) in allowed_xpu_ids + + ray.get([f.remote() for _ in range(100)]) + ray.get([g.remote() for _ in range(100)]) + + def test_local_mode_gpus(save_gpu_ids_shutdown_only): allowed_gpu_ids = [4, 5, 6, 7, 8] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index d7a9a7919e85..f3a5cbe6c65f 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -230,7 +230,9 @@ def g(): assert ray.get(f.options(num_cpus=4).remote()) == "1" -def test_submit_api(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_submit_api(shutdown_only, ACCELERATOR_TYPE): + ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @ray.remote From ee0a3582d26549df27bc879dd9c2cf31bff3934d Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Mon, 19 Jun 2023 17:34:34 +0800 Subject: [PATCH 2/6] update Signed-off-by: Wu, Gangsheng --- python/ray/_private/ray_constants.py | 2 +- python/ray/_private/resource_spec.py | 16 ++++-- python/ray/_private/utils.py | 36 ++++++++------ python/ray/_private/worker.py | 62 ++++++++++++------------ python/ray/tests/test_actor_resources.py | 14 +++--- python/ray/tests/test_advanced_2.py | 2 +- python/ray/tests/test_advanced_6.py | 27 +++++++++-- python/ray/tests/test_basic.py | 2 +- 8 files changed, 96 insertions(+), 65 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index d8ff6123837e..d5a2120e865d 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -389,7 +389,7 @@ def env_set_by_user(key): RAY_DEVICE_XPU_DEVICE_TYPE = "gpu" RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"} -RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA" +RAY_ACCELERATOR_DEFAULT = "CUDA" RAY_WORKER_NICENESS = "RAY_worker_niceness" diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 25cf6fd8a25b..912aba1411b0 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -168,16 +168,17 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if is_head: resources[HEAD_NODE_RESOURCE_NAME] = 1.0 - # get cpu num + # Get cpu num num_cpus = self.num_cpus if num_cpus is None: num_cpus = ray._private.utils.get_num_cpus() - # get accelerate device info - if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": # get cuda device num + # Get accelerate device info + accelerator = ray._private.utils.get_current_accelerator() + if accelerator == "CUDA": # get cuda device num num_gpus, gpu_types = _get_cuda_info(self.num_gpus) resources.update(gpu_types) - elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": # get xpu device num + elif accelerator == "XPU": # get xpu device num # here we take xpu as gpu, so no need to develop core's scheduling policy # If we don't want to take xpu as gpu, ray core need to develop new scheduling policy num_gpus, gpu_types = _get_xpu_info(self.num_gpus) @@ -301,6 +302,13 @@ def _get_cuda_info(num_gpus): def _get_xpu_info(num_xpus): """Attempt to process the number of XPUs as GPUs + + Here we use `dpctl` to detect XPU device: + Enumrate all device by API dpctl.get_devices + Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset + Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR + Another method to enumrate XPU device is to use C++ API, maybe can upgrade later + Returns: The number of XPUs that detected by dpctl with specific backend and device type """ diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index ede243f22e88..ad56814fa164 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -315,11 +315,14 @@ def get_xpu_visible_devices(): return list(xpu_ids_str.split(",")) +def get_current_accelerator(): + return os.environ.get("RAY_ACCELERATOR", ray_constants.RAY_ACCELERATOR_DEFAULT) + + def get_gpu_visible_devices(): - if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": - return get_cuda_visible_devices() - elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": + if get_current_accelerator() == "XPU": return get_xpu_visible_devices() + return get_cuda_visible_devices() def set_omp_num_threads_if_unset() -> bool: @@ -362,43 +365,44 @@ def set_omp_num_threads_if_unset() -> bool: return True -def set_cuda_visible_devices(gpu_ids): +def set_cuda_visible_devices(dev_ids): """Set the CUDA_VISIBLE_DEVICES environment variable. Args: - gpu_ids (List[str]): List of strings representing GPU IDs. + dev_ids (List[str]): List of strings representing GPU IDs. """ if os.environ.get(ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR): return global last_set_gpu_ids - if last_set_gpu_ids == gpu_ids: + if last_set_gpu_ids == dev_ids: return # optimization: already set - os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids]) - last_set_gpu_ids = gpu_ids + os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in dev_ids]) + last_set_gpu_ids = dev_ids -def set_xpu_visible_devices(xpu_ids): +def set_xpu_visible_devices(dev_ids): """Set the ONEAPI_DEVICE_SELECTOR environment variable. Args: - xpu_ids (List[str]): List of strings representing GPU IDs + dev_ids (List[str]): List of strings representing GPU IDs """ if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR): return - ids_str = ",".join([str(i) for i in xpu_ids]) + ids_str = ",".join([str(i) for i in dev_ids]) os.environ["XPU_VISIBLE_DEVICES"] = ids_str os.environ["ONEAPI_DEVICE_SELECTOR"] = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str -def set_gpu_visible_devices(gpu_ids): - if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": - set_cuda_visible_devices(gpu_ids) - elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": - set_xpu_visible_devices(gpu_ids) +def set_gpu_visible_devices(device_ids): + accelerator = get_current_accelerator() + if accelerator == "XPU": + return set_xpu_visible_devices(device_ids) + elif accelerator == "CUDA": + return set_cuda_visible_devices(device_ids) def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 352d2fce0f08..0bdecc34b4a3 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -423,8 +423,8 @@ def __init__(self): self.mode = None self.actors = {} # When the worker is constructed. - # Record the original value of the CUDA_VISIBLE_DEVICES environment variable. - # Record the original value of the XPU_VISIBLE_DEVICES environment variable. + # Record the original value of the CUDA_VISIBLE_DEVICES environment variable, + # or value of the XPU_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray._private.utils.get_gpu_visible_devices() # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. @@ -884,8 +884,12 @@ def get_cuda_ids(): def get_xpu_ids(): """ Get the IDs of the XPUs that are available to the worker. + If the XPU_VISIBLE_DEVICES environment variable was set when the worker - started up, + started up, then the IDs returned by this method will be a subset of the + IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range + [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has. + Returns: A list of XPU IDs """ @@ -899,44 +903,42 @@ def get_xpu_ids(): "called from the driver. This is because Ray does not manage " "XPU allocations to the driver process." ) - # Here we use `dpctl` to detect XPU device: - # Enumrate all device by API dpctl.get_devices - # Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset - # Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR - # Another method to enumrate XPU device is to use C++ API, maybe can upgrade laster - has_dpctl = True - try: - import dpctl - except ImportError: - has_dpctl = False - if not has_dpctl: - return [] + # Get all resources from global core worker + all_resource_ids = global_worker.core_worker.resource_ids() + assigned_ids = set() + for resource, assignment in all_resource_ids.items(): + # Handle both normal and placement group GPU resources. + # Note: We should only get the GPU ids from the placement + # group resource that does not contain the bundle index! + import re - xpu_devices = dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE, - device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE) - xpu_ava_ids = set() - xpu_dev_prefix = f"{ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE}:{ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE}" - for xpu_dev in xpu_devices: - xpu_id = int(xpu_dev.filter_string.split(xpu_dev_prefix)[1]) - xpu_ava_ids.add(xpu_id) + if resource == "GPU" or re.match(r"^GPU_group_[0-9A-Za-z]+$", resource): + for resource_id, _ in assignment: + assigned_ids.add(resource_id) - xpu_ids = [] + assigned_ids = list(assigned_ids) + # If the user had already set CUDA_VISIBLE_DEVICES, then respect that (in + # the sense that only GPU IDs that appear in CUDA_VISIBLE_DEVICES should be + # returned). if global_worker.original_gpu_ids is not None: - xpu_ids = [ - global_worker.original_gpu_ids[xpu_id] for xpu_id in xpu_ava_ids + assigned_ids = [ + global_worker.original_gpu_ids[gpu_id] for gpu_id in assigned_ids ] - return xpu_ids + return assigned_ids @PublicAPI @client_mode_hook def get_gpu_ids(): - if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": - return get_cuda_ids() - elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": - return get_xpu_ids() + accelerator = ray._private.utils.get_current_accelerator() + ids = [] + if accelerator == "CUDA": + ids = get_cuda_ids() + elif accelerator == "XPU": + ids = get_xpu_ids() + return ids @Deprecated( diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 3a30e5de5c8f..ab5e4341db9d 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -84,7 +84,7 @@ def echo(self, value): @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_gpus(ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 4 @@ -129,7 +129,7 @@ def get_location_and_ids(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_multiple_gpus(ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 5 @@ -209,7 +209,7 @@ def get_location_and_ids(self): @pytest.mark.skipif(sys.platform == "win32", reason="Very flaky.") @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_different_numbers_of_gpus(ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE # Test that we can create actors on two nodes that have different # numbers of GPUs. cluster = ray_start_cluster @@ -253,7 +253,7 @@ def get_location_and_ids(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -332,7 +332,7 @@ def get_location_and_ids(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actors_and_tasks_with_gpus(enable_syncer_test, ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -453,7 +453,7 @@ def locations_to_intervals_for_many_tasks(): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actors_and_tasks_with_gpus_version_two(shutdown_only, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs num_gpus = 4 @@ -632,7 +632,7 @@ def get_location(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_creating_more_actors_than_resources(shutdown_only, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE ray.init(num_cpus=10, num_gpus=2, resources={"CustomResource1": 1}) @ray.remote(num_gpus=1) diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 42234e35ce13..00aa5bc675bb 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -139,7 +139,7 @@ def method(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_fractional_resources(shutdown_only, ACCELERATOR_TYPE): - ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1}) @ray.remote(num_gpus=0.5) diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index 82d044f585a2..27b0c5681672 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -37,9 +37,26 @@ def save_gpu_ids_shutdown_only(): del os.environ["CUDA_VISIBLE_DEVICES"] +@pytest.fixture +def save_xpu_ids_shutdown_only(): + # Record the curent value of this environment variable so that we can + # reset it after the test. + original_xpu_ids = os.environ.get("XPU_VISIBLE_DEVICES", None) + + yield None + + # The code after the yield will run as teardown code. + ray.shutdown() + # Reset the environment variable. + if original_xpu_ids is not None: + os.environ["XPU_VISIBLE_DEVICES"] = original_xpu_ids + else: + del os.environ["XPU_VISIBLE_DEVICES"] + + @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") def test_specific_gpus(save_gpu_ids_shutdown_only): - ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA" + os.environ["RAY_ACCELERATOR"] = "CUDA" allowed_gpu_ids = [4, 5, 6] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) ray.init(num_gpus=3) @@ -62,21 +79,21 @@ def g(): @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") -def test_specific_xpus(save_gpu_ids_shutdown_only): - ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "XPU" +def test_specific_xpus(save_xpu_ids_shutdown_only): + os.environ["RAY_ACCELERATOR"] = "XPU" allowed_xpu_ids = [1, 3, 5] os.environ["XPU_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_xpu_ids]) ray.init(num_gpus=3) @ray.remote(num_gpus=1) def f(): - xpu_ids = ray.get_xpu_ids() + xpu_ids = ray.get_gpu_ids() assert len(xpu_ids) == 1 assert int(xpu_ids[0]) in allowed_xpu_ids @ray.remote(num_gpus=2) def g(): - xpu_ids = ray.get_xpu_ids() + xpu_ids = ray.get_gpu_ids() assert len(xpu_ids) == 2 assert int(xpu_ids[0]) in allowed_xpu_ids assert int(xpu_ids[1]) in allowed_xpu_ids diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index f3a5cbe6c65f..a4e289978d35 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -232,7 +232,7 @@ def g(): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_submit_api(shutdown_only, ACCELERATOR_TYPE): - ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @ray.remote From fcfa6ec71459d4a4b92770f480e7f86c4bc35f59 Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Sun, 25 Jun 2023 18:29:46 +0800 Subject: [PATCH 3/6] change method of detecting xpu Signed-off-by: Wu, Gangsheng --- python/ray/_private/resource_spec.py | 41 +++++++++++++++++++++------- python/ray/_private/utils.py | 2 +- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 912aba1411b0..66b43c443e11 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -303,14 +303,15 @@ def _get_cuda_info(num_gpus): def _get_xpu_info(num_xpus): """Attempt to process the number of XPUs as GPUs - Here we use `dpctl` to detect XPU device: - Enumrate all device by API dpctl.get_devices - Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset - Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR - Another method to enumrate XPU device is to use C++ API, maybe can upgrade later - + XPU detect need oneAPI runtime. Here we enumrate XPU device by two method: + 1. by using dpctl python API + 2. by calling system commmand 'sycl-ls' + + Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset + Or dpctl return filtered device by ONEAPI_DEVICE_SELECTOR + Returns: - The number of XPUs that detected by dpctl with specific backend and device type + The number of XPUs that detected by dpctl or oneAPI with specific backend and device type """ xpu_ids = ray._private.utils.get_xpu_visible_devices() if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids): @@ -319,15 +320,35 @@ def _get_xpu_info(num_xpus): "but XPU_VISIBLE_DEVICES contains {}.".format(num_xpus, xpu_ids) ) if num_xpus is None: + has_dpctl = True + backend = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE + device_type = ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE try: import dpctl - num_xpus = len(dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE, - device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE)) + num_xpus = len(dpctl.get_devices(backend=backend, device_type=device_type)) except ImportError: - num_xpus = 0 + has_dpctl = False + + if has_dpctl == False: + xpu_env = os.environ.copy() + xpu_env["ONEAPI_DEVICE_SELECTOR"] = f"{backend}:{device_type}" + xpu_res = subprocess.run("sycl-ls", capture_output=True, env=xpu_env) + if xpu_res.returncode == 0: + stdout = res.stdout.decode().strip() + if stdout != "": + num_xpus = len(stdout.split("\n")) + else: + num_xpus = 0 + else: + num_xpus = 0 + raise ValueError( + f"Attempting to enumrate XPU device by 'sycl-ls', " + "but fail to run 'sycl-ls' with error code {xpu_res.returncode}." + ) if xpu_ids is not None: num_xpus = min(num_xpus, len(xpu_ids)) + xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1} return num_xpus, xpu_types diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index ad56814fa164..69a8950bfae6 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -303,7 +303,7 @@ def get_xpu_visible_devices(): Returns: devices (List[str]): If XPU_VISIBLE_DEVICES is set, return a list of strings representing the IDs of the visible XPUs. - If it is not set or is set, returns empty list. + If it is not set, returns empty list. """ xpu_ids_str = os.environ.get("XPU_VISIBLE_DEVICES", None) if xpu_ids_str is None: From 67dab607a0095ac9d59753fc2184607039ef4d5a Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 06:45:34 +0000 Subject: [PATCH 4/6] only using "ONEAPI_DEVICE_SELECTOR", and update UTs --- python/ray/_private/ray_constants.py | 7 +- python/ray/_private/resource_spec.py | 47 +++--------- python/ray/_private/utils.py | 70 ++++++++++++++---- python/ray/_private/worker.py | 25 ++++--- python/ray/tests/test_actor_resources.py | 14 ++-- python/ray/tests/test_advanced_2.py | 94 +++++++++++++++++++++++- python/ray/tests/test_advanced_6.py | 61 +++++++-------- python/ray/tests/test_basic.py | 20 ++++- python/requirements_xpu.txt | 1 + 9 files changed, 233 insertions(+), 106 deletions(-) create mode 100644 python/requirements_xpu.txt diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index d5a2120e865d..b2230ef39066 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -382,11 +382,10 @@ def env_set_by_user(key): LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"] NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" -NOSET_XPU_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_XPU_VISIBLE_DEVICES" +NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR" -RAY_DEVICE_XPU_SELECTOR_ENV_VAR = "ONEAPI_DEVICE_SELECTOR" -RAY_DEVICE_XPU_BACKEND_TYPE = "level_zero" -RAY_DEVICE_XPU_DEVICE_TYPE = "gpu" +RAY_ONEAPI_DEVICE_BACKEND_TYPE = "level_zero" +RAY_ONEAPI_DEVICE_TYPE = "gpu" RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"} RAY_ACCELERATOR_DEFAULT = "CUDA" diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 66b43c443e11..05a6c8986424 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -301,51 +301,26 @@ def _get_cuda_info(num_gpus): def _get_xpu_info(num_xpus): - """Attempt to process the number of XPUs as GPUs - - XPU detect need oneAPI runtime. Here we enumrate XPU device by two method: - 1. by using dpctl python API - 2. by calling system commmand 'sycl-ls' - - Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset - Or dpctl return filtered device by ONEAPI_DEVICE_SELECTOR + """ Attempt to process the number of XPUs + Notice: + If xpu id not specified in ONEAPI_DEVICE_SELECTOR, + and num_gpus is defined in task or actor, + this function will return the input num_gpus, not 0 Returns: - The number of XPUs that detected by dpctl or oneAPI with specific backend and device type + (num_xpus, xpu_types) """ + # get visible xpu ids xpu_ids = ray._private.utils.get_xpu_visible_devices() if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids): raise ValueError( "Attempting to start raylet with {} XPUs, " - "but XPU_VISIBLE_DEVICES contains {}.".format(num_xpus, xpu_ids) + "but ONEAPI_DEVICE_SELECTOR contains {}.".format(num_xpus, xpu_ids) ) if num_xpus is None: - has_dpctl = True - backend = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE - device_type = ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE - try: - import dpctl - num_xpus = len(dpctl.get_devices(backend=backend, device_type=device_type)) - except ImportError: - has_dpctl = False - - if has_dpctl == False: - xpu_env = os.environ.copy() - xpu_env["ONEAPI_DEVICE_SELECTOR"] = f"{backend}:{device_type}" - xpu_res = subprocess.run("sycl-ls", capture_output=True, env=xpu_env) - if xpu_res.returncode == 0: - stdout = res.stdout.decode().strip() - if stdout != "": - num_xpus = len(stdout.split("\n")) - else: - num_xpus = 0 - else: - num_xpus = 0 - raise ValueError( - f"Attempting to enumrate XPU device by 'sycl-ls', " - "but fail to run 'sycl-ls' with error code {xpu_res.returncode}." - ) - + # Try to detect all number of XPUs. + num_xpus = len(ray._private.utils.get_xpu_all_devices()) + # Don't use more XPUs than allowed by ONEAPI_DEVICE_SELECTOR. if xpu_ids is not None: num_xpus = min(num_xpus, len(xpu_ids)) diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 69a8950bfae6..38dc8bb63151 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -298,31 +298,71 @@ def get_cuda_visible_devices(): last_set_gpu_ids = None +def get_xpu_devices(): + """ Get xpu device IDs by calling `dpctl` api + device with specific backend and device_type + Returns: + devices IDs (List[str]): return the list of string representing + the relative IDs filtered by + ONEAPI_DEVICE_SELECTOR, specific backend and device_type + """ + backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE + device_type = ray_constants.RAY_ONEAPI_DEVICE_TYPE + xpu_ids = [] + try: + import dpctl + for dev in dpctl.get_devices(backend=backend, device_type=device_type): + # device filter_string with format: "backend:device_type:relative_id" + xpu_ids.append(int(dev.filter_string.split(':')[-1])) + except ImportError: + ValueError("Import dpctl error, maybe dpctl not installed.") + return xpu_ids + + def get_xpu_visible_devices(): - """Get the devices IDs in the XPU_VISIBLE_DEVICES environment variable. + """ Get xpu devices IDs filtered by ONEAPI_DEVICE_SELECTOR environment variable. Returns: - devices (List[str]): If XPU_VISIBLE_DEVICES is set, return a - list of strings representing the IDs of the visible XPUs. - If it is not set, returns empty list. + devices (List[str]): return the list of string representing the relative IDS + filtered by ONEAPI_DEVICE_SELECTOR. """ - xpu_ids_str = os.environ.get("XPU_VISIBLE_DEVICES", None) - if xpu_ids_str is None: + if os.environ.get("ONEAPI_DEVICE_SELECTOR", None) is None: return None - if xpu_ids_str == "": - return [] + xpu_ids = get_xpu_devices() + + return xpu_ids - return list(xpu_ids_str.split(",")) + +def get_xpu_all_devices(): + """ Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter, + But all xpu device still filtered by specific backend and device_type + Returns: + devices (List[str]): list of strings representing the numeric index (zero-based), + with sepcific backend and device_type + """ + selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None) + # unset "ONEAPI_DEVICE_SELECTOR" + os.unsetenv("ONEAPI_DEVICE_SELECTOR") + + xpu_ids = get_xpu_devices() + + # set "ONEAPI_DEVICE_SELECTOR" value back + if selector is not None: + os.environ["ONEAPI_DEVICE_SELECTOR"] = selector + + return xpu_ids def get_current_accelerator(): - return os.environ.get("RAY_ACCELERATOR", ray_constants.RAY_ACCELERATOR_DEFAULT) + return os.environ.get("RAY_EXPERIMENTAL_ACCELERATOR_TYPE", ray_constants.RAY_ACCELERATOR_DEFAULT) def get_gpu_visible_devices(): + accelerator = get_current_accelerator() if get_current_accelerator() == "XPU": return get_xpu_visible_devices() - return get_cuda_visible_devices() + elif accelerator == "CUDA": + return get_cuda_visible_devices() def set_omp_num_threads_if_unset() -> bool: @@ -389,12 +429,12 @@ def set_xpu_visible_devices(dev_ids): dev_ids (List[str]): List of strings representing GPU IDs """ - if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR): + if os.environ.get(ray_constants.NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR): return - ids_str = ",".join([str(i) for i in dev_ids]) - os.environ["XPU_VISIBLE_DEVICES"] = ids_str - os.environ["ONEAPI_DEVICE_SELECTOR"] = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str + backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE + dev_str = ",".join([str(i) for i in dev_ids]) + os.environ["ONEAPI_DEVICE_SELECTOR"] = backend + ":" + dev_str def set_gpu_visible_devices(device_ids): diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 0bdecc34b4a3..efdfe78d3cfa 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -424,7 +424,7 @@ def __init__(self): self.actors = {} # When the worker is constructed. # Record the original value of the CUDA_VISIBLE_DEVICES environment variable, - # or value of the XPU_VISIBLE_DEVICES environment variable. + # or value of the ONEAPI_DEVICE_SELECTOR environment variable. self.original_gpu_ids = ray._private.utils.get_gpu_visible_devices() # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. @@ -848,7 +848,7 @@ def get_cuda_ids(): if worker.mode != WORKER_MODE: if log_once("worker_get_gpu_ids_empty_from_driver"): logger.warning( - "`ray.get_gpu_ids()` will always return the empty list when " + "`ray.get_cuda_ids()` will always return the empty list when " "called from the driver. This is because Ray does not manage " "GPU allocations to the driver process." ) @@ -885,9 +885,9 @@ def get_cuda_ids(): def get_xpu_ids(): """ Get the IDs of the XPUs that are available to the worker. - If the XPU_VISIBLE_DEVICES environment variable was set when the worker + If the ONEAPI_DEVICE_SELECTOR environment variable was set before the worker started up, then the IDs returned by this method will be a subset of the - IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range + IDs in ONEAPI_DEVICE_SELECTOR. If not, the IDs will fall in the range [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has. Returns: @@ -916,15 +916,18 @@ def get_xpu_ids(): if resource == "GPU" or re.match(r"^GPU_group_[0-9A-Za-z]+$", resource): for resource_id, _ in assignment: assigned_ids.add(resource_id) - assigned_ids = list(assigned_ids) - # If the user had already set CUDA_VISIBLE_DEVICES, then respect that (in - # the sense that only GPU IDs that appear in CUDA_VISIBLE_DEVICES should be + # If the user had already set ONEAPI_DEVICE_SELECTOR, then respect that (in + # the sense that only GPU IDs that appear in ONEAPI_DEVICE_SELECTOR should be # returned). if global_worker.original_gpu_ids is not None: assigned_ids = [ global_worker.original_gpu_ids[gpu_id] for gpu_id in assigned_ids ] + # Give all GPUs in local_mode. + if global_worker.mode == LOCAL_MODE: + max_gpus = global_worker.node.get_resource_spec().num_gpus + assigned_ids = global_worker.original_gpu_ids[:max_gpus] return assigned_ids @@ -933,12 +936,11 @@ def get_xpu_ids(): @client_mode_hook def get_gpu_ids(): accelerator = ray._private.utils.get_current_accelerator() - ids = [] if accelerator == "CUDA": - ids = get_cuda_ids() + return get_cuda_ids() elif accelerator == "XPU": - ids = get_xpu_ids() - return ids + return get_xpu_ids() + return [] @Deprecated( @@ -1528,7 +1530,6 @@ def init( usage_lib.show_usage_stats_prompt(cli=False) else: usage_lib.set_usage_stats_enabled_via_env_var(False) - # Use a random port by not specifying Redis port / GCS server port. ray_params = ray._private.parameter.RayParams( node_ip_address=node_ip_address, diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index ab5e4341db9d..a321dae2e928 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -84,7 +84,7 @@ def echo(self, value): @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_gpus(ray_start_cluster, ACCELERATOR_TYPE): - os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 4 @@ -129,7 +129,7 @@ def get_location_and_ids(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_multiple_gpus(ray_start_cluster, ACCELERATOR_TYPE): - os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 5 @@ -209,7 +209,7 @@ def get_location_and_ids(self): @pytest.mark.skipif(sys.platform == "win32", reason="Very flaky.") @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_different_numbers_of_gpus(ray_start_cluster, ACCELERATOR_TYPE): - os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE # Test that we can create actors on two nodes that have different # numbers of GPUs. cluster = ray_start_cluster @@ -253,7 +253,7 @@ def get_location_and_ids(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster, ACCELERATOR_TYPE): - os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -332,7 +332,7 @@ def get_location_and_ids(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actors_and_tasks_with_gpus(enable_syncer_test, ray_start_cluster, ACCELERATOR_TYPE): - os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -453,7 +453,7 @@ def locations_to_intervals_for_many_tasks(): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actors_and_tasks_with_gpus_version_two(shutdown_only, ACCELERATOR_TYPE): - os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs num_gpus = 4 @@ -632,7 +632,7 @@ def get_location(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_creating_more_actors_than_resources(shutdown_only, ACCELERATOR_TYPE): - os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE ray.init(num_cpus=10, num_gpus=2, resources={"CustomResource1": 1}) @ray.remote(num_gpus=1) diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 00aa5bc675bb..68b512b12c5b 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -9,12 +9,102 @@ import ray import ray.cluster_utils +from ray._private.ray_constants import RAY_ONEAPI_DEVICE_BACKEND_TYPE as XPU_BACKEND from ray._private.test_utils import RayTestTimeoutException, wait_for_condition logger = logging.getLogger(__name__) -def test_gpu_ids(shutdown_only): +def test_xpu_ids(shutdown_only): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "XPU" + + num_gpus = 3 + ray.init(num_cpus=num_gpus, num_gpus=num_gpus) + """ + def get_gpu_ids(num_gpus_per_worker): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == num_gpus_per_worker + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] # noqa + ) + for gpu_id in gpu_ids: + assert gpu_id in range(num_gpus) + return gpu_ids + + f0 = ray.remote(num_gpus=0)(lambda: get_gpu_ids(0)) + f1 = ray.remote(num_gpus=1)(lambda: get_gpu_ids(1)) + f2 = ray.remote(num_gpus=2)(lambda: get_gpu_ids(2)) + f3 = ray.remote(num_gpus=3)(lambda: get_gpu_ids(3)) + + # Wait for all workers to start up. + @ray.remote + def f(): + time.sleep(0.2) + return os.getpid() + + start_time = time.time() + while True: + num_workers_started = len(set(ray.get([f.remote() for _ in range(num_gpus)]))) + if num_workers_started == num_gpus: + break + if time.time() > start_time + 10: + raise RayTestTimeoutException( + "Timed out while waiting for workers to start up." + ) + + list_of_ids = ray.get([f0.remote() for _ in range(10)]) + assert list_of_ids == 10 * [[]] + ray.get([f1.remote() for _ in range(10)]) + ray.get([f2.remote() for _ in range(10)]) + """ + # Test that actors have ONEAPI_DEVICE_SELECTOR set properly. + + @ray.remote + class Actor0: + def __init__(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 0 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] # noqa + ) + # Set self.x to make sure that we got here. + self.x = 1 + + def test(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 0 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] + ) + return self.x + + @ray.remote(num_gpus=1) + class Actor1: + def __init__(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] + ) + # Set self.x to make sure that we got here. + self.x = 1 + + def test(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] + ) + return self.x + + a0 = Actor0.remote() + ray.get(a0.test.remote()) + + a1 = Actor1.remote() + ray.get(a1.test.remote()) + + +def test_cuda_ids(shutdown_only): num_gpus = 3 ray.init(num_cpus=num_gpus, num_gpus=num_gpus) @@ -139,7 +229,7 @@ def method(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_fractional_resources(shutdown_only, ACCELERATOR_TYPE): - os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1}) @ray.remote(num_gpus=0.5) diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index 27b0c5681672..481587415ae6 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -11,12 +11,17 @@ import ray import ray.cluster_utils + from ray._private.test_utils import ( run_string_as_driver_nonblocking, wait_for_condition, wait_for_pid_to_exit, ) +from ray._private.ray_constants import RAY_ONEAPI_DEVICE_BACKEND_TYPE as XPU_BACKEND + +from unittest.mock import Mock, MagicMock + logger = logging.getLogger(__name__) @@ -41,24 +46,25 @@ def save_gpu_ids_shutdown_only(): def save_xpu_ids_shutdown_only(): # Record the curent value of this environment variable so that we can # reset it after the test. - original_xpu_ids = os.environ.get("XPU_VISIBLE_DEVICES", None) + selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None) yield None # The code after the yield will run as teardown code. ray.shutdown() # Reset the environment variable. - if original_xpu_ids is not None: - os.environ["XPU_VISIBLE_DEVICES"] = original_xpu_ids + if selector is not None: + os.environ["ONEAPI_DEVICE_SELECTOR"] = selector else: - del os.environ["XPU_VISIBLE_DEVICES"] + del os.environ["ONEAPI_DEVICE_SELECTOR"] @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") -def test_specific_gpus(save_gpu_ids_shutdown_only): - os.environ["RAY_ACCELERATOR"] = "CUDA" +def test_specific_cudas(save_gpu_ids_shutdown_only): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "CUDA" allowed_gpu_ids = [4, 5, 6] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) + ray.init(num_gpus=3) @ray.remote(num_gpus=1) @@ -78,36 +84,33 @@ def g(): ray.get([g.remote() for _ in range(100)]) -@pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") -def test_specific_xpus(save_xpu_ids_shutdown_only): - os.environ["RAY_ACCELERATOR"] = "XPU" - allowed_xpu_ids = [1, 3, 5] - os.environ["XPU_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_xpu_ids]) - ray.init(num_gpus=3) +def test_local_mode_cudas(save_gpu_ids_shutdown_only): + allowed_gpu_ids = [4, 5, 6, 7, 8] + os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) - @ray.remote(num_gpus=1) - def f(): - xpu_ids = ray.get_gpu_ids() - assert len(xpu_ids) == 1 - assert int(xpu_ids[0]) in allowed_xpu_ids + from importlib import reload - @ray.remote(num_gpus=2) - def g(): - xpu_ids = ray.get_gpu_ids() - assert len(xpu_ids) == 2 - assert int(xpu_ids[0]) in allowed_xpu_ids - assert int(xpu_ids[1]) in allowed_xpu_ids + reload(ray._private.worker) + + ray.init(num_gpus=3, local_mode=True) + + @ray.remote + def f(): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 3 + for gpu in gpu_ids: + assert int(gpu) in allowed_gpu_ids ray.get([f.remote() for _ in range(100)]) - ray.get([g.remote() for _ in range(100)]) -def test_local_mode_gpus(save_gpu_ids_shutdown_only): - allowed_gpu_ids = [4, 5, 6, 7, 8] - os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) +def test_local_mode_xpus(save_xpu_ids_shutdown_only): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "XPU" + allowed_xpu_ids = [0, 1, 2, 3, 4, 5] + os.environ["ONEAPI_DEVICE_SELECTOR"] = XPU_BACKEND + ":" + ",".join([str(i) for i in allowed_xpu_ids]) + ray._private.utils.get_xpu_devices = Mock(return_value=allowed_xpu_ids) from importlib import reload - reload(ray._private.worker) ray.init(num_gpus=3, local_mode=True) @@ -117,7 +120,7 @@ def f(): gpu_ids = ray.get_gpu_ids() assert len(gpu_ids) == 3 for gpu in gpu_ids: - assert int(gpu) in allowed_gpu_ids + assert int(gpu) in allowed_xpu_ids ray.get([f.remote() for _ in range(100)]) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index a4e289978d35..7597cdf488bf 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -232,7 +232,7 @@ def g(): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_submit_api(shutdown_only, ACCELERATOR_TYPE): - os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @ray.remote @@ -542,6 +542,24 @@ def check(): ) +def test_disable_xpu_devices(): + script = """ +import ray +ray.init() + +@ray.remote +def check(): + import os + assert "ONEAPI_DEVICE_SELECTOR" not in os.environ + +print("remote", ray.get(check.remote())) +""" + + run_string_as_driver( + script, dict(os.environ, **{"RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR": "1"}) + ) + + def test_put_get(shutdown_only): ray.init(num_cpus=0) diff --git a/python/requirements_xpu.txt b/python/requirements_xpu.txt new file mode 100644 index 000000000000..3e7ffe68d330 --- /dev/null +++ b/python/requirements_xpu.txt @@ -0,0 +1 @@ +dpctl From 9aa547d12c555eda6245dc75291695b465189a5d Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 07:56:37 +0000 Subject: [PATCH 5/6] update --- python/ray/_private/utils.py | 5 +++++ python/ray/tests/test_advanced_2.py | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 38dc8bb63151..c91097fdeaf2 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -305,6 +305,11 @@ def get_xpu_devices(): devices IDs (List[str]): return the list of string representing the relative IDs filtered by ONEAPI_DEVICE_SELECTOR, specific backend and device_type + returned visible IDs start with index 0 + Example: + if ONEAPI_DEVICE_SELECTOR="level_zero:2,3,4" + the device IDs enumerated will be [0,1,2] + same with CUDA_VISIBLE_DEVICES """ backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE device_type = ray_constants.RAY_ONEAPI_DEVICE_TYPE diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 68b512b12c5b..3af90043e23c 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -20,7 +20,7 @@ def test_xpu_ids(shutdown_only): num_gpus = 3 ray.init(num_cpus=num_gpus, num_gpus=num_gpus) - """ + def get_gpu_ids(num_gpus_per_worker): gpu_ids = ray.get_gpu_ids() assert len(gpu_ids) == num_gpus_per_worker @@ -56,7 +56,7 @@ def f(): assert list_of_ids == 10 * [[]] ray.get([f1.remote() for _ in range(10)]) ray.get([f2.remote() for _ in range(10)]) - """ + # Test that actors have ONEAPI_DEVICE_SELECTOR set properly. @ray.remote From e2625f3ec2f1a74e8038eb2746b5345ec3d113c1 Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 08:14:15 +0000 Subject: [PATCH 6/6] fix format Signed-off-by: harborn --- python/ray/_private/resource_spec.py | 20 ++++++++++---------- python/ray/_private/utils.py | 13 ++++++++----- python/ray/_private/worker.py | 2 +- python/ray/tests/test_advanced_6.py | 5 ++++- python/ray/tests/test_basic.py | 3 ++- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 05a6c8986424..e3667a1a936f 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -175,10 +175,10 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): # Get accelerate device info accelerator = ray._private.utils.get_current_accelerator() - if accelerator == "CUDA": # get cuda device num + if accelerator == "CUDA": # get cuda device num num_gpus, gpu_types = _get_cuda_info(self.num_gpus) resources.update(gpu_types) - elif accelerator == "XPU": # get xpu device num + elif accelerator == "XPU": # get xpu device num # here we take xpu as gpu, so no need to develop core's scheduling policy # If we don't want to take xpu as gpu, ray core need to develop new scheduling policy num_gpus, gpu_types = _get_xpu_info(self.num_gpus) @@ -263,7 +263,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): def _get_cuda_info(num_gpus): - """ Attemp to process the number and type of GPUs + """Attemp to process the number and type of GPUs Notice: If gpu id not specified in CUDA_VISIBLE_DEVICES, and num_gpus is defined in task or actor, @@ -277,9 +277,9 @@ def _get_cuda_info(num_gpus): # exceed the amount allowed by CUDA_VISIBLE_DEVICES. if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): raise ValueError( - "Attempting to start raylet with {} GPUs, " - "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) - ) + "Attempting to start raylet with {} GPUs, " + "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) + ) if num_gpus is None: # Try to automatically detect the number of GPUs. num_gpus = _autodetect_num_gpus() @@ -301,7 +301,7 @@ def _get_cuda_info(num_gpus): def _get_xpu_info(num_xpus): - """ Attempt to process the number of XPUs + """Attempt to process the number of XPUs Notice: If xpu id not specified in ONEAPI_DEVICE_SELECTOR, and num_gpus is defined in task or actor, @@ -314,9 +314,9 @@ def _get_xpu_info(num_xpus): xpu_ids = ray._private.utils.get_xpu_visible_devices() if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids): raise ValueError( - "Attempting to start raylet with {} XPUs, " - "but ONEAPI_DEVICE_SELECTOR contains {}.".format(num_xpus, xpu_ids) - ) + "Attempting to start raylet with {} XPUs, " + "but ONEAPI_DEVICE_SELECTOR contains {}.".format(num_xpus, xpu_ids) + ) if num_xpus is None: # Try to detect all number of XPUs. num_xpus = len(ray._private.utils.get_xpu_all_devices()) diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 941fab8c99da..53a7c51f1121 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -305,7 +305,7 @@ def get_cuda_visible_devices(): def get_xpu_devices(): - """ Get xpu device IDs by calling `dpctl` api + """Get xpu device IDs by calling `dpctl` api device with specific backend and device_type Returns: devices IDs (List[str]): return the list of string representing @@ -322,16 +322,17 @@ def get_xpu_devices(): xpu_ids = [] try: import dpctl + for dev in dpctl.get_devices(backend=backend, device_type=device_type): # device filter_string with format: "backend:device_type:relative_id" - xpu_ids.append(int(dev.filter_string.split(':')[-1])) + xpu_ids.append(int(dev.filter_string.split(":")[-1])) except ImportError: ValueError("Import dpctl error, maybe dpctl not installed.") return xpu_ids def get_xpu_visible_devices(): - """ Get xpu devices IDs filtered by ONEAPI_DEVICE_SELECTOR environment variable. + """Get xpu devices IDs filtered by ONEAPI_DEVICE_SELECTOR environment variable. Returns: devices (List[str]): return the list of string representing the relative IDS filtered by ONEAPI_DEVICE_SELECTOR. @@ -345,7 +346,7 @@ def get_xpu_visible_devices(): def get_xpu_all_devices(): - """ Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter, + """Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter, But all xpu device still filtered by specific backend and device_type Returns: devices (List[str]): list of strings representing the numeric index (zero-based), @@ -365,7 +366,9 @@ def get_xpu_all_devices(): def get_current_accelerator(): - return os.environ.get("RAY_EXPERIMENTAL_ACCELERATOR_TYPE", ray_constants.RAY_ACCELERATOR_DEFAULT) + return os.environ.get( + "RAY_EXPERIMENTAL_ACCELERATOR_TYPE", ray_constants.RAY_ACCELERATOR_DEFAULT + ) def get_gpu_visible_devices(): diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 6109e8ce290a..84f0306a1f4a 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -886,7 +886,7 @@ def get_cuda_ids(): def get_xpu_ids(): - """ Get the IDs of the XPUs that are available to the worker. + """Get the IDs of the XPUs that are available to the worker. If the ONEAPI_DEVICE_SELECTOR environment variable was set before the worker started up, then the IDs returned by this method will be a subset of the diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index 481587415ae6..baea4fc73fd0 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -107,10 +107,13 @@ def f(): def test_local_mode_xpus(save_xpu_ids_shutdown_only): os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "XPU" allowed_xpu_ids = [0, 1, 2, 3, 4, 5] - os.environ["ONEAPI_DEVICE_SELECTOR"] = XPU_BACKEND + ":" + ",".join([str(i) for i in allowed_xpu_ids]) + os.environ["ONEAPI_DEVICE_SELECTOR"] = ( + XPU_BACKEND + ":" + ",".join([str(i) for i in allowed_xpu_ids]) + ) ray._private.utils.get_xpu_devices = Mock(return_value=allowed_xpu_ids) from importlib import reload + reload(ray._private.worker) ray.init(num_gpus=3, local_mode=True) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index bbb3562cc2bb..32b34523e978 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -556,7 +556,8 @@ def check(): """ run_string_as_driver( - script, dict(os.environ, **{"RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR": "1"}) + script, + dict(os.environ, **{"RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR": "1"}), )