Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Support Intel GPU #36493

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ def env_set_by_user(key):
LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"]

NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES"
NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR"

RAY_ONEAPI_DEVICE_BACKEND_TYPE = "level_zero"
RAY_ONEAPI_DEVICE_TYPE = "gpu"

RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"}
RAY_ACCELERATOR_DEFAULT = "CUDA"

RAY_WORKER_NICENESS = "RAY_worker_niceness"

# Default max_retries option in @ray.remote for non-actor
Expand Down
100 changes: 76 additions & 24 deletions python/ray/_private/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,35 +168,21 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
if is_head:
resources[HEAD_NODE_RESOURCE_NAME] = 1.0

# Get cpu num
num_cpus = self.num_cpus
if num_cpus is None:
num_cpus = ray._private.utils.get_num_cpus()

num_gpus = self.num_gpus
gpu_ids = ray._private.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the raylet wants doesn't
# exceed the amount allowed by CUDA_VISIBLE_DEVICES.
if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids):
raise ValueError(
"Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids)
)
if num_gpus is None:
# Try to automatically detect the number of GPUs.
num_gpus = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
num_gpus = min(num_gpus, len(gpu_ids))

try:
if importlib.util.find_spec("GPUtil") is not None:
gpu_types = _get_gpu_types_gputil()
else:
info_string = _get_gpu_info_string()
gpu_types = _constraints_from_gpu_info(info_string)
# Get accelerate device info
accelerator = ray._private.utils.get_current_accelerator()
if accelerator == "CUDA": # get cuda device num
num_gpus, gpu_types = _get_cuda_info(self.num_gpus)
resources.update(gpu_types)
elif accelerator == "XPU": # get xpu device num
# here we take xpu as gpu, so no need to develop core's scheduling policy
# If we don't want to take xpu as gpu, ray core need to develop new scheduling policy
num_gpus, gpu_types = _get_xpu_info(self.num_gpus)
resources.update(gpu_types)
except Exception:
logger.exception("Could not parse gpu information.")

# Choose a default object store size.
system_memory = ray._private.utils.get_system_memory()
Expand Down Expand Up @@ -276,6 +262,72 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
return spec


def _get_cuda_info(num_gpus):
"""Attemp to process the number and type of GPUs
Notice:
If gpu id not specified in CUDA_VISIBLE_DEVICES,
and num_gpus is defined in task or actor,
this function will return the input num_gpus, not 0

Returns:
(num_gpus, gpu_types)
"""
gpu_ids = ray._private.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the raylet wants doesn't
# exceed the amount allowed by CUDA_VISIBLE_DEVICES.
if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids):
raise ValueError(
"Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids)
)
if num_gpus is None:
# Try to automatically detect the number of GPUs.
num_gpus = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
num_gpus = min(num_gpus, len(gpu_ids))

gpu_types = ""
try:
if importlib.util.find_spec("GPUtil") is not None:
gpu_types = _get_gpu_types_gputil()
else:
info_string = _get_gpu_info_string()
gpu_types = _constraints_from_gpu_info(info_string)
except Exception:
logger.exception("Could not parse gpu information.")

return num_gpus, gpu_types


def _get_xpu_info(num_xpus):
"""Attempt to process the number of XPUs
Notice:
If xpu id not specified in ONEAPI_DEVICE_SELECTOR,
and num_gpus is defined in task or actor,
this function will return the input num_gpus, not 0

Returns:
(num_xpus, xpu_types)
"""
# get visible xpu ids
xpu_ids = ray._private.utils.get_xpu_visible_devices()
if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids):
raise ValueError(
"Attempting to start raylet with {} XPUs, "
"but ONEAPI_DEVICE_SELECTOR contains {}.".format(num_xpus, xpu_ids)
)
if num_xpus is None:
# Try to detect all number of XPUs.
num_xpus = len(ray._private.utils.get_xpu_all_devices())
# Don't use more XPUs than allowed by ONEAPI_DEVICE_SELECTOR.
if xpu_ids is not None:
num_xpus = min(num_xpus, len(xpu_ids))

xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1}
return num_xpus, xpu_types


def _autodetect_num_gpus():
"""Attempt to detect the number of GPUs on this machine.

Expand Down
107 changes: 102 additions & 5 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,81 @@ def get_cuda_visible_devices():
last_set_gpu_ids = None


def get_xpu_devices():
"""Get xpu device IDs by calling `dpctl` api
device with specific backend and device_type
Returns:
devices IDs (List[str]): return the list of string representing
the relative IDs filtered by
ONEAPI_DEVICE_SELECTOR, specific backend and device_type
returned visible IDs start with index 0
Example:
if ONEAPI_DEVICE_SELECTOR="level_zero:2,3,4"
the device IDs enumerated will be [0,1,2]
same with CUDA_VISIBLE_DEVICES
"""
backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE
device_type = ray_constants.RAY_ONEAPI_DEVICE_TYPE
xpu_ids = []
try:
import dpctl

for dev in dpctl.get_devices(backend=backend, device_type=device_type):
# device filter_string with format: "backend:device_type:relative_id"
xpu_ids.append(int(dev.filter_string.split(":")[-1]))
except ImportError:
ValueError("Import dpctl error, maybe dpctl not installed.")
return xpu_ids


def get_xpu_visible_devices():
"""Get xpu devices IDs filtered by ONEAPI_DEVICE_SELECTOR environment variable.
Returns:
devices (List[str]): return the list of string representing the relative IDS
filtered by ONEAPI_DEVICE_SELECTOR.
"""
if os.environ.get("ONEAPI_DEVICE_SELECTOR", None) is None:
return None

xpu_ids = get_xpu_devices()

return xpu_ids


def get_xpu_all_devices():
"""Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter,
But all xpu device still filtered by specific backend and device_type
Returns:
devices (List[str]): list of strings representing the numeric index (zero-based),
with sepcific backend and device_type
"""
selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None)
# unset "ONEAPI_DEVICE_SELECTOR"
os.unsetenv("ONEAPI_DEVICE_SELECTOR")

xpu_ids = get_xpu_devices()

# set "ONEAPI_DEVICE_SELECTOR" value back
if selector is not None:
os.environ["ONEAPI_DEVICE_SELECTOR"] = selector

return xpu_ids


def get_current_accelerator():
return os.environ.get(
"RAY_EXPERIMENTAL_ACCELERATOR_TYPE", ray_constants.RAY_ACCELERATOR_DEFAULT
)


def get_gpu_visible_devices():
accelerator = get_current_accelerator()
if get_current_accelerator() == "XPU":
return get_xpu_visible_devices()
elif accelerator == "CUDA":
return get_cuda_visible_devices()


def set_omp_num_threads_if_unset() -> bool:
"""Set the OMP_NUM_THREADS to default to num cpus assigned to the worker

Expand Down Expand Up @@ -344,22 +419,44 @@ def set_omp_num_threads_if_unset() -> bool:
return True


def set_cuda_visible_devices(gpu_ids):
def set_cuda_visible_devices(dev_ids):
"""Set the CUDA_VISIBLE_DEVICES environment variable.

Args:
gpu_ids (List[str]): List of strings representing GPU IDs.
dev_ids (List[str]): List of strings representing GPU IDs.
"""

if os.environ.get(ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR):
return

global last_set_gpu_ids
if last_set_gpu_ids == gpu_ids:
if last_set_gpu_ids == dev_ids:
return # optimization: already set

os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids])
last_set_gpu_ids = gpu_ids
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in dev_ids])
last_set_gpu_ids = dev_ids


def set_xpu_visible_devices(dev_ids):
"""Set the ONEAPI_DEVICE_SELECTOR environment variable.
Args:
dev_ids (List[str]): List of strings representing GPU IDs
"""

if os.environ.get(ray_constants.NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR):
return

backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE
dev_str = ",".join([str(i) for i in dev_ids])
os.environ["ONEAPI_DEVICE_SELECTOR"] = backend + ":" + dev_str


def set_gpu_visible_devices(device_ids):
accelerator = get_current_accelerator()
if accelerator == "XPU":
return set_xpu_visible_devices(device_ids)
elif accelerator == "CUDA":
return set_cuda_visible_devices(device_ids)


def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
75 changes: 67 additions & 8 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,10 @@ def __init__(self):
self.node = None
self.mode = None
self.actors = {}
# When the worker is constructed. Record the original value of the
# CUDA_VISIBLE_DEVICES environment variable.
self.original_gpu_ids = ray._private.utils.get_cuda_visible_devices()
# When the worker is constructed.
# Record the original value of the CUDA_VISIBLE_DEVICES environment variable,
# or value of the ONEAPI_DEVICE_SELECTOR environment variable.
self.original_gpu_ids = ray._private.utils.get_gpu_visible_devices()
# A dictionary that maps from driver id to SerializationContext
# TODO: clean up the SerializationContext once the job finished.
self.serialization_context_map = {}
Expand Down Expand Up @@ -833,9 +834,7 @@ def print_logs(self):
subscriber.close()


@PublicAPI
@client_mode_hook
def get_gpu_ids():
def get_cuda_ids():
"""Get the IDs of the GPUs that are available to the worker.

If the CUDA_VISIBLE_DEVICES environment variable was set when the worker
Expand All @@ -852,7 +851,7 @@ def get_gpu_ids():
if worker.mode != WORKER_MODE:
if log_once("worker_get_gpu_ids_empty_from_driver"):
logger.warning(
"`ray.get_gpu_ids()` will always return the empty list when "
"`ray.get_cuda_ids()` will always return the empty list when "
"called from the driver. This is because Ray does not manage "
"GPU allocations to the driver process."
)
Expand Down Expand Up @@ -886,6 +885,67 @@ def get_gpu_ids():
return assigned_ids


def get_xpu_ids():
"""Get the IDs of the XPUs that are available to the worker.

If the ONEAPI_DEVICE_SELECTOR environment variable was set before the worker
started up, then the IDs returned by this method will be a subset of the
IDs in ONEAPI_DEVICE_SELECTOR. If not, the IDs will fall in the range
[0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has.

Returns:
A list of XPU IDs
"""
worker = global_worker
worker.check_connected()

if worker.mode != WORKER_MODE:
if log_once("worker_get_gpu_ids_empty_from_driver"):
logger.warning(
"`ray.get_xpu_ids()` will always return the empty list when "
"called from the driver. This is because Ray does not manage "
"XPU allocations to the driver process."
)

# Get all resources from global core worker
all_resource_ids = global_worker.core_worker.resource_ids()
assigned_ids = set()
for resource, assignment in all_resource_ids.items():
# Handle both normal and placement group GPU resources.
# Note: We should only get the GPU ids from the placement
# group resource that does not contain the bundle index!
import re

if resource == "GPU" or re.match(r"^GPU_group_[0-9A-Za-z]+$", resource):
for resource_id, _ in assignment:
assigned_ids.add(resource_id)
assigned_ids = list(assigned_ids)
# If the user had already set ONEAPI_DEVICE_SELECTOR, then respect that (in
# the sense that only GPU IDs that appear in ONEAPI_DEVICE_SELECTOR should be
# returned).
if global_worker.original_gpu_ids is not None:
assigned_ids = [
global_worker.original_gpu_ids[gpu_id] for gpu_id in assigned_ids
]
# Give all GPUs in local_mode.
if global_worker.mode == LOCAL_MODE:
max_gpus = global_worker.node.get_resource_spec().num_gpus
assigned_ids = global_worker.original_gpu_ids[:max_gpus]

return assigned_ids


@PublicAPI
@client_mode_hook
def get_gpu_ids():
accelerator = ray._private.utils.get_current_accelerator()
if accelerator == "CUDA":
return get_cuda_ids()
elif accelerator == "XPU":
return get_xpu_ids()
return []


@Deprecated(
message="Use ray.get_runtime_context().get_assigned_resources() instead.",
warning=True,
Expand Down Expand Up @@ -1475,7 +1535,6 @@ def init(
usage_lib.show_usage_stats_prompt(cli=False)
else:
usage_lib.set_usage_stats_enabled_via_env_var(False)

# Use a random port by not specifying Redis port / GCS server port.
ray_params = ray._private.parameter.RayParams(
node_ip_address=node_ip_address,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ cdef execute_task_with_cancellation_handler(
title = f"ray::{task_name}"

# Automatically restrict the GPUs available to this task.
ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids())
ray._private.utils.set_gpu_visible_devices(ray.get_gpu_ids())

# Automatically configure OMP_NUM_THREADS to the assigned CPU number.
# It will be unset after the task execution if it was overwridden here.
Expand Down
Loading