From ee0a3582d26549df27bc879dd9c2cf31bff3934d Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Mon, 19 Jun 2023 17:34:34 +0800 Subject: [PATCH] update Signed-off-by: Wu, Gangsheng --- python/ray/_private/ray_constants.py | 2 +- python/ray/_private/resource_spec.py | 16 ++++-- python/ray/_private/utils.py | 36 ++++++++------ python/ray/_private/worker.py | 62 ++++++++++++------------ python/ray/tests/test_actor_resources.py | 14 +++--- python/ray/tests/test_advanced_2.py | 2 +- python/ray/tests/test_advanced_6.py | 27 +++++++++-- python/ray/tests/test_basic.py | 2 +- 8 files changed, 96 insertions(+), 65 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index d8ff6123837e6..d5a2120e865dc 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -389,7 +389,7 @@ def env_set_by_user(key): RAY_DEVICE_XPU_DEVICE_TYPE = "gpu" RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"} -RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA" +RAY_ACCELERATOR_DEFAULT = "CUDA" RAY_WORKER_NICENESS = "RAY_worker_niceness" diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 25cf6fd8a25b6..912aba1411b05 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -168,16 +168,17 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if is_head: resources[HEAD_NODE_RESOURCE_NAME] = 1.0 - # get cpu num + # Get cpu num num_cpus = self.num_cpus if num_cpus is None: num_cpus = ray._private.utils.get_num_cpus() - # get accelerate device info - if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": # get cuda device num + # Get accelerate device info + accelerator = ray._private.utils.get_current_accelerator() + if accelerator == "CUDA": # get cuda device num num_gpus, gpu_types = _get_cuda_info(self.num_gpus) resources.update(gpu_types) - elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": # get xpu device num + elif accelerator == "XPU": # get xpu device num # here we take xpu as gpu, so no need to develop core's scheduling policy # If we don't want to take xpu as gpu, ray core need to develop new scheduling policy num_gpus, gpu_types = _get_xpu_info(self.num_gpus) @@ -301,6 +302,13 @@ def _get_cuda_info(num_gpus): def _get_xpu_info(num_xpus): """Attempt to process the number of XPUs as GPUs + + Here we use `dpctl` to detect XPU device: + Enumrate all device by API dpctl.get_devices + Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset + Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR + Another method to enumrate XPU device is to use C++ API, maybe can upgrade later + Returns: The number of XPUs that detected by dpctl with specific backend and device type """ diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index ede243f22e88a..ad56814fa1648 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -315,11 +315,14 @@ def get_xpu_visible_devices(): return list(xpu_ids_str.split(",")) +def get_current_accelerator(): + return os.environ.get("RAY_ACCELERATOR", ray_constants.RAY_ACCELERATOR_DEFAULT) + + def get_gpu_visible_devices(): - if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": - return get_cuda_visible_devices() - elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": + if get_current_accelerator() == "XPU": return get_xpu_visible_devices() + return get_cuda_visible_devices() def set_omp_num_threads_if_unset() -> bool: @@ -362,43 +365,44 @@ def set_omp_num_threads_if_unset() -> bool: return True -def set_cuda_visible_devices(gpu_ids): +def set_cuda_visible_devices(dev_ids): """Set the CUDA_VISIBLE_DEVICES environment variable. Args: - gpu_ids (List[str]): List of strings representing GPU IDs. + dev_ids (List[str]): List of strings representing GPU IDs. """ if os.environ.get(ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR): return global last_set_gpu_ids - if last_set_gpu_ids == gpu_ids: + if last_set_gpu_ids == dev_ids: return # optimization: already set - os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids]) - last_set_gpu_ids = gpu_ids + os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in dev_ids]) + last_set_gpu_ids = dev_ids -def set_xpu_visible_devices(xpu_ids): +def set_xpu_visible_devices(dev_ids): """Set the ONEAPI_DEVICE_SELECTOR environment variable. Args: - xpu_ids (List[str]): List of strings representing GPU IDs + dev_ids (List[str]): List of strings representing GPU IDs """ if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR): return - ids_str = ",".join([str(i) for i in xpu_ids]) + ids_str = ",".join([str(i) for i in dev_ids]) os.environ["XPU_VISIBLE_DEVICES"] = ids_str os.environ["ONEAPI_DEVICE_SELECTOR"] = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str -def set_gpu_visible_devices(gpu_ids): - if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": - set_cuda_visible_devices(gpu_ids) - elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": - set_xpu_visible_devices(gpu_ids) +def set_gpu_visible_devices(device_ids): + accelerator = get_current_accelerator() + if accelerator == "XPU": + return set_xpu_visible_devices(device_ids) + elif accelerator == "CUDA": + return set_cuda_visible_devices(device_ids) def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 352d2fce0f089..0bdecc34b4a3f 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -423,8 +423,8 @@ def __init__(self): self.mode = None self.actors = {} # When the worker is constructed. - # Record the original value of the CUDA_VISIBLE_DEVICES environment variable. - # Record the original value of the XPU_VISIBLE_DEVICES environment variable. + # Record the original value of the CUDA_VISIBLE_DEVICES environment variable, + # or value of the XPU_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray._private.utils.get_gpu_visible_devices() # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. @@ -884,8 +884,12 @@ def get_cuda_ids(): def get_xpu_ids(): """ Get the IDs of the XPUs that are available to the worker. + If the XPU_VISIBLE_DEVICES environment variable was set when the worker - started up, + started up, then the IDs returned by this method will be a subset of the + IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range + [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has. + Returns: A list of XPU IDs """ @@ -899,44 +903,42 @@ def get_xpu_ids(): "called from the driver. This is because Ray does not manage " "XPU allocations to the driver process." ) - # Here we use `dpctl` to detect XPU device: - # Enumrate all device by API dpctl.get_devices - # Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset - # Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR - # Another method to enumrate XPU device is to use C++ API, maybe can upgrade laster - has_dpctl = True - try: - import dpctl - except ImportError: - has_dpctl = False - if not has_dpctl: - return [] + # Get all resources from global core worker + all_resource_ids = global_worker.core_worker.resource_ids() + assigned_ids = set() + for resource, assignment in all_resource_ids.items(): + # Handle both normal and placement group GPU resources. + # Note: We should only get the GPU ids from the placement + # group resource that does not contain the bundle index! + import re - xpu_devices = dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE, - device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE) - xpu_ava_ids = set() - xpu_dev_prefix = f"{ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE}:{ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE}" - for xpu_dev in xpu_devices: - xpu_id = int(xpu_dev.filter_string.split(xpu_dev_prefix)[1]) - xpu_ava_ids.add(xpu_id) + if resource == "GPU" or re.match(r"^GPU_group_[0-9A-Za-z]+$", resource): + for resource_id, _ in assignment: + assigned_ids.add(resource_id) - xpu_ids = [] + assigned_ids = list(assigned_ids) + # If the user had already set CUDA_VISIBLE_DEVICES, then respect that (in + # the sense that only GPU IDs that appear in CUDA_VISIBLE_DEVICES should be + # returned). if global_worker.original_gpu_ids is not None: - xpu_ids = [ - global_worker.original_gpu_ids[xpu_id] for xpu_id in xpu_ava_ids + assigned_ids = [ + global_worker.original_gpu_ids[gpu_id] for gpu_id in assigned_ids ] - return xpu_ids + return assigned_ids @PublicAPI @client_mode_hook def get_gpu_ids(): - if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": - return get_cuda_ids() - elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": - return get_xpu_ids() + accelerator = ray._private.utils.get_current_accelerator() + ids = [] + if accelerator == "CUDA": + ids = get_cuda_ids() + elif accelerator == "XPU": + ids = get_xpu_ids() + return ids @Deprecated( diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 3a30e5de5c8f5..ab5e4341db9dd 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -84,7 +84,7 @@ def echo(self, value): @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_gpus(ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 4 @@ -129,7 +129,7 @@ def get_location_and_ids(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_multiple_gpus(ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 5 @@ -209,7 +209,7 @@ def get_location_and_ids(self): @pytest.mark.skipif(sys.platform == "win32", reason="Very flaky.") @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_different_numbers_of_gpus(ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE # Test that we can create actors on two nodes that have different # numbers of GPUs. cluster = ray_start_cluster @@ -253,7 +253,7 @@ def get_location_and_ids(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -332,7 +332,7 @@ def get_location_and_ids(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actors_and_tasks_with_gpus(enable_syncer_test, ray_start_cluster, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -453,7 +453,7 @@ def locations_to_intervals_for_many_tasks(): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_actors_and_tasks_with_gpus_version_two(shutdown_only, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs num_gpus = 4 @@ -632,7 +632,7 @@ def get_location(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_creating_more_actors_than_resources(shutdown_only, ACCELERATOR_TYPE): - ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE ray.init(num_cpus=10, num_gpus=2, resources={"CustomResource1": 1}) @ray.remote(num_gpus=1) diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 42234e35ce130..00aa5bc675bb2 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -139,7 +139,7 @@ def method(self): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_fractional_resources(shutdown_only, ACCELERATOR_TYPE): - ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1}) @ray.remote(num_gpus=0.5) diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index 82d044f585a20..27b0c56816727 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -37,9 +37,26 @@ def save_gpu_ids_shutdown_only(): del os.environ["CUDA_VISIBLE_DEVICES"] +@pytest.fixture +def save_xpu_ids_shutdown_only(): + # Record the curent value of this environment variable so that we can + # reset it after the test. + original_xpu_ids = os.environ.get("XPU_VISIBLE_DEVICES", None) + + yield None + + # The code after the yield will run as teardown code. + ray.shutdown() + # Reset the environment variable. + if original_xpu_ids is not None: + os.environ["XPU_VISIBLE_DEVICES"] = original_xpu_ids + else: + del os.environ["XPU_VISIBLE_DEVICES"] + + @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") def test_specific_gpus(save_gpu_ids_shutdown_only): - ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA" + os.environ["RAY_ACCELERATOR"] = "CUDA" allowed_gpu_ids = [4, 5, 6] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) ray.init(num_gpus=3) @@ -62,21 +79,21 @@ def g(): @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") -def test_specific_xpus(save_gpu_ids_shutdown_only): - ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "XPU" +def test_specific_xpus(save_xpu_ids_shutdown_only): + os.environ["RAY_ACCELERATOR"] = "XPU" allowed_xpu_ids = [1, 3, 5] os.environ["XPU_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_xpu_ids]) ray.init(num_gpus=3) @ray.remote(num_gpus=1) def f(): - xpu_ids = ray.get_xpu_ids() + xpu_ids = ray.get_gpu_ids() assert len(xpu_ids) == 1 assert int(xpu_ids[0]) in allowed_xpu_ids @ray.remote(num_gpus=2) def g(): - xpu_ids = ray.get_xpu_ids() + xpu_ids = ray.get_gpu_ids() assert len(xpu_ids) == 2 assert int(xpu_ids[0]) in allowed_xpu_ids assert int(xpu_ids[1]) in allowed_xpu_ids diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index f3a5cbe6c65f4..a4e289978d35d 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -232,7 +232,7 @@ def g(): @pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) def test_submit_api(shutdown_only, ACCELERATOR_TYPE): - ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE + os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @ray.remote