Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Wu, Gangsheng <[email protected]>
  • Loading branch information
harborn committed Jun 20, 2023
1 parent 4f2662e commit ee0a358
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 65 deletions.
2 changes: 1 addition & 1 deletion python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def env_set_by_user(key):
RAY_DEVICE_XPU_DEVICE_TYPE = "gpu"

RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"}
RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA"
RAY_ACCELERATOR_DEFAULT = "CUDA"

RAY_WORKER_NICENESS = "RAY_worker_niceness"

Expand Down
16 changes: 12 additions & 4 deletions python/ray/_private/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,17 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
if is_head:
resources[HEAD_NODE_RESOURCE_NAME] = 1.0

# get cpu num
# Get cpu num
num_cpus = self.num_cpus
if num_cpus is None:
num_cpus = ray._private.utils.get_num_cpus()

# get accelerate device info
if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": # get cuda device num
# Get accelerate device info
accelerator = ray._private.utils.get_current_accelerator()
if accelerator == "CUDA": # get cuda device num
num_gpus, gpu_types = _get_cuda_info(self.num_gpus)
resources.update(gpu_types)
elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": # get xpu device num
elif accelerator == "XPU": # get xpu device num
# here we take xpu as gpu, so no need to develop core's scheduling policy
# If we don't want to take xpu as gpu, ray core need to develop new scheduling policy
num_gpus, gpu_types = _get_xpu_info(self.num_gpus)
Expand Down Expand Up @@ -301,6 +302,13 @@ def _get_cuda_info(num_gpus):

def _get_xpu_info(num_xpus):
"""Attempt to process the number of XPUs as GPUs
Here we use `dpctl` to detect XPU device:
Enumrate all device by API dpctl.get_devices
Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset
Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR
Another method to enumrate XPU device is to use C++ API, maybe can upgrade later
Returns:
The number of XPUs that detected by dpctl with specific backend and device type
"""
Expand Down
36 changes: 20 additions & 16 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,14 @@ def get_xpu_visible_devices():
return list(xpu_ids_str.split(","))


def get_current_accelerator():
return os.environ.get("RAY_ACCELERATOR", ray_constants.RAY_ACCELERATOR_DEFAULT)


def get_gpu_visible_devices():
if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA":
return get_cuda_visible_devices()
elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU":
if get_current_accelerator() == "XPU":
return get_xpu_visible_devices()
return get_cuda_visible_devices()


def set_omp_num_threads_if_unset() -> bool:
Expand Down Expand Up @@ -362,43 +365,44 @@ def set_omp_num_threads_if_unset() -> bool:
return True


def set_cuda_visible_devices(gpu_ids):
def set_cuda_visible_devices(dev_ids):
"""Set the CUDA_VISIBLE_DEVICES environment variable.
Args:
gpu_ids (List[str]): List of strings representing GPU IDs.
dev_ids (List[str]): List of strings representing GPU IDs.
"""

if os.environ.get(ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR):
return

global last_set_gpu_ids
if last_set_gpu_ids == gpu_ids:
if last_set_gpu_ids == dev_ids:
return # optimization: already set

os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids])
last_set_gpu_ids = gpu_ids
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in dev_ids])
last_set_gpu_ids = dev_ids


def set_xpu_visible_devices(xpu_ids):
def set_xpu_visible_devices(dev_ids):
"""Set the ONEAPI_DEVICE_SELECTOR environment variable.
Args:
xpu_ids (List[str]): List of strings representing GPU IDs
dev_ids (List[str]): List of strings representing GPU IDs
"""

if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR):
return

ids_str = ",".join([str(i) for i in xpu_ids])
ids_str = ",".join([str(i) for i in dev_ids])
os.environ["XPU_VISIBLE_DEVICES"] = ids_str
os.environ["ONEAPI_DEVICE_SELECTOR"] = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str


def set_gpu_visible_devices(gpu_ids):
if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA":
set_cuda_visible_devices(gpu_ids)
elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU":
set_xpu_visible_devices(gpu_ids)
def set_gpu_visible_devices(device_ids):
accelerator = get_current_accelerator()
if accelerator == "XPU":
return set_xpu_visible_devices(device_ids)
elif accelerator == "CUDA":
return set_cuda_visible_devices(device_ids)


def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
62 changes: 32 additions & 30 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ def __init__(self):
self.mode = None
self.actors = {}
# When the worker is constructed.
# Record the original value of the CUDA_VISIBLE_DEVICES environment variable.
# Record the original value of the XPU_VISIBLE_DEVICES environment variable.
# Record the original value of the CUDA_VISIBLE_DEVICES environment variable,
# or value of the XPU_VISIBLE_DEVICES environment variable.
self.original_gpu_ids = ray._private.utils.get_gpu_visible_devices()
# A dictionary that maps from driver id to SerializationContext
# TODO: clean up the SerializationContext once the job finished.
Expand Down Expand Up @@ -884,8 +884,12 @@ def get_cuda_ids():

def get_xpu_ids():
""" Get the IDs of the XPUs that are available to the worker.
If the XPU_VISIBLE_DEVICES environment variable was set when the worker
started up,
started up, then the IDs returned by this method will be a subset of the
IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range
[0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has.
Returns:
A list of XPU IDs
"""
Expand All @@ -899,44 +903,42 @@ def get_xpu_ids():
"called from the driver. This is because Ray does not manage "
"XPU allocations to the driver process."
)
# Here we use `dpctl` to detect XPU device:
# Enumrate all device by API dpctl.get_devices
# Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset
# Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR
# Another method to enumrate XPU device is to use C++ API, maybe can upgrade laster
has_dpctl = True
try:
import dpctl
except ImportError:
has_dpctl = False

if not has_dpctl:
return []
# Get all resources from global core worker
all_resource_ids = global_worker.core_worker.resource_ids()
assigned_ids = set()
for resource, assignment in all_resource_ids.items():
# Handle both normal and placement group GPU resources.
# Note: We should only get the GPU ids from the placement
# group resource that does not contain the bundle index!
import re

xpu_devices = dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE,
device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE)
xpu_ava_ids = set()
xpu_dev_prefix = f"{ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE}:{ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE}"
for xpu_dev in xpu_devices:
xpu_id = int(xpu_dev.filter_string.split(xpu_dev_prefix)[1])
xpu_ava_ids.add(xpu_id)
if resource == "GPU" or re.match(r"^GPU_group_[0-9A-Za-z]+$", resource):
for resource_id, _ in assignment:
assigned_ids.add(resource_id)

xpu_ids = []
assigned_ids = list(assigned_ids)
# If the user had already set CUDA_VISIBLE_DEVICES, then respect that (in
# the sense that only GPU IDs that appear in CUDA_VISIBLE_DEVICES should be
# returned).
if global_worker.original_gpu_ids is not None:
xpu_ids = [
global_worker.original_gpu_ids[xpu_id] for xpu_id in xpu_ava_ids
assigned_ids = [
global_worker.original_gpu_ids[gpu_id] for gpu_id in assigned_ids
]

return xpu_ids
return assigned_ids


@PublicAPI
@client_mode_hook
def get_gpu_ids():
if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA":
return get_cuda_ids()
elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU":
return get_xpu_ids()
accelerator = ray._private.utils.get_current_accelerator()
ids = []
if accelerator == "CUDA":
ids = get_cuda_ids()
elif accelerator == "XPU":
ids = get_xpu_ids()
return ids


@Deprecated(
Expand Down
14 changes: 7 additions & 7 deletions python/ray/tests/test_actor_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def echo(self, value):
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"])
def test_actor_gpus(ray_start_cluster, ACCELERATOR_TYPE):
ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE
os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE
cluster = ray_start_cluster
num_nodes = 3
num_gpus_per_raylet = 4
Expand Down Expand Up @@ -129,7 +129,7 @@ def get_location_and_ids(self):

@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"])
def test_actor_multiple_gpus(ray_start_cluster, ACCELERATOR_TYPE):
ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE
os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE
cluster = ray_start_cluster
num_nodes = 3
num_gpus_per_raylet = 5
Expand Down Expand Up @@ -209,7 +209,7 @@ def get_location_and_ids(self):
@pytest.mark.skipif(sys.platform == "win32", reason="Very flaky.")
@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"])
def test_actor_different_numbers_of_gpus(ray_start_cluster, ACCELERATOR_TYPE):
ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE
os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE
# Test that we can create actors on two nodes that have different
# numbers of GPUs.
cluster = ray_start_cluster
Expand Down Expand Up @@ -253,7 +253,7 @@ def get_location_and_ids(self):

@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"])
def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster, ACCELERATOR_TYPE):
ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE
os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE
cluster = ray_start_cluster
num_nodes = 3
num_gpus_per_raylet = 2
Expand Down Expand Up @@ -332,7 +332,7 @@ def get_location_and_ids(self):

@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"])
def test_actors_and_tasks_with_gpus(enable_syncer_test, ray_start_cluster, ACCELERATOR_TYPE):
ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE
os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE
cluster = ray_start_cluster
num_nodes = 3
num_gpus_per_raylet = 2
Expand Down Expand Up @@ -453,7 +453,7 @@ def locations_to_intervals_for_many_tasks():

@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"])
def test_actors_and_tasks_with_gpus_version_two(shutdown_only, ACCELERATOR_TYPE):
ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE
os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE
# Create tasks and actors that both use GPUs and make sure that they
# are given different GPUs
num_gpus = 4
Expand Down Expand Up @@ -632,7 +632,7 @@ def get_location(self):

@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"])
def test_creating_more_actors_than_resources(shutdown_only, ACCELERATOR_TYPE):
ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE
os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE
ray.init(num_cpus=10, num_gpus=2, resources={"CustomResource1": 1})

@ray.remote(num_gpus=1)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_advanced_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def method(self):

@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"])
def test_fractional_resources(shutdown_only, ACCELERATOR_TYPE):
ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE
os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE
ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1})

@ray.remote(num_gpus=0.5)
Expand Down
27 changes: 22 additions & 5 deletions python/ray/tests/test_advanced_6.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,26 @@ def save_gpu_ids_shutdown_only():
del os.environ["CUDA_VISIBLE_DEVICES"]


@pytest.fixture
def save_xpu_ids_shutdown_only():
# Record the curent value of this environment variable so that we can
# reset it after the test.
original_xpu_ids = os.environ.get("XPU_VISIBLE_DEVICES", None)

yield None

# The code after the yield will run as teardown code.
ray.shutdown()
# Reset the environment variable.
if original_xpu_ids is not None:
os.environ["XPU_VISIBLE_DEVICES"] = original_xpu_ids
else:
del os.environ["XPU_VISIBLE_DEVICES"]


@pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows")
def test_specific_gpus(save_gpu_ids_shutdown_only):
ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA"
os.environ["RAY_ACCELERATOR"] = "CUDA"
allowed_gpu_ids = [4, 5, 6]
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids])
ray.init(num_gpus=3)
Expand All @@ -62,21 +79,21 @@ def g():


@pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows")
def test_specific_xpus(save_gpu_ids_shutdown_only):
ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "XPU"
def test_specific_xpus(save_xpu_ids_shutdown_only):
os.environ["RAY_ACCELERATOR"] = "XPU"
allowed_xpu_ids = [1, 3, 5]
os.environ["XPU_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_xpu_ids])
ray.init(num_gpus=3)

@ray.remote(num_gpus=1)
def f():
xpu_ids = ray.get_xpu_ids()
xpu_ids = ray.get_gpu_ids()
assert len(xpu_ids) == 1
assert int(xpu_ids[0]) in allowed_xpu_ids

@ray.remote(num_gpus=2)
def g():
xpu_ids = ray.get_xpu_ids()
xpu_ids = ray.get_gpu_ids()
assert len(xpu_ids) == 2
assert int(xpu_ids[0]) in allowed_xpu_ids
assert int(xpu_ids[1]) in allowed_xpu_ids
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def g():

@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"])
def test_submit_api(shutdown_only, ACCELERATOR_TYPE):
ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE
os.environ["RAY_ACCELERATOR"] = ACCELERATOR_TYPE
ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1})

@ray.remote
Expand Down

0 comments on commit ee0a358

Please sign in to comment.