Skip to content

Enabling INTEL XPU #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: releases/2.4.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def env_integer(key, default):


def env_bool(key, default):
print(f"going to create env var name = {key} with value = {default}")
if key in os.environ:
return (
True
Expand Down Expand Up @@ -226,7 +227,7 @@ def env_set_by_user(key):
LOG_MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_LOG_MONITOR}.log"

# Enable log deduplication.
RAY_DEDUP_LOGS = env_bool("RAY_DEDUP_LOGS", True)
RAY_DEDUP_LOGS = env_bool("RAY_DEDUP_LOGS", False)

# How many seconds of messages to buffer for log deduplication.
RAY_DEDUP_LOGS_AGG_WINDOW_S = env_integer("RAY_DEDUP_LOGS_AGG_WINDOW_S", 5)
Expand Down Expand Up @@ -378,6 +379,13 @@ def env_set_by_user(key):
LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"]

NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES"
NOSET_XPU_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_XPU_VISIBLE_DEVICES"
RAY_DEVICE_XPU_SELECTOR_ENV_VAR = "ONEAPI_DEVICE_SELECTOR"
RAY_DEVICE_XPU_BACKEND_TYPE = "level_zero"
RAY_DEVICE_XPU_DEVICE_TYPE = "gpu"

RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"}
RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA"

# Default max_retries option in @ray.remote for non-actor
# tasks.
Expand Down
104 changes: 80 additions & 24 deletions python/ray/_private/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import ray
import ray._private.ray_constants as ray_constants

try:
import dpctl
except ImportError:
pass

try:
import GPUtil
except ImportError:
Expand Down Expand Up @@ -157,35 +162,22 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
# ray._private.state.current_node_id().
resources[NODE_ID_PREFIX + node_ip_address] = 1.0

# get cpu num
num_cpus = self.num_cpus
print(f"going to get_num_cpus, num_cpus = {num_cpus}")
if num_cpus is None:
num_cpus = ray._private.utils.get_num_cpus()

num_gpus = self.num_gpus
gpu_ids = ray._private.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the raylet wants doesn't
# exceed the amount allowed by CUDA_VISIBLE_DEVICES.
if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids):
raise ValueError(
"Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids)
)
if num_gpus is None:
# Try to automatically detect the number of GPUs.
num_gpus = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
num_gpus = min(num_gpus, len(gpu_ids))

try:
if importlib.util.find_spec("GPUtil") is not None:
gpu_types = _get_gpu_types_gputil()
else:
info_string = _get_gpu_info_string()
gpu_types = _constraints_from_gpu_info(info_string)
# get accelerator info
if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": # get cuda device num
num_gpus, gpu_types = _get_cuda_info(self.num_gpus)
resources.update(gpu_types)
except Exception:
logger.exception("Could not parse gpu information.")
elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": # get xpu device num
# here we take xpu as gpu, so no need to develop core's scheduling policy
# If we don't want to take xpu as gpu, ray core need to develop new scheduling policy
num_gpus, gpu_types = _get_xpu_info(self.num_gpus)
resources.update(gpu_types)
print(f"after get accelerator info, num_gpus = {num_gpus}, gpu_types = {gpu_types}")

# Choose a default object store size.
system_memory = ray._private.utils.get_system_memory()
Expand Down Expand Up @@ -265,6 +257,70 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
return spec


def _get_cuda_info(num_gpus):
""" Attemp to process the number and type of GPUs

Notice:
If gpu id not specified in CUDA_VISIBLE_DEVICES,
and num_gpus is defined in task or actor,
this function will return the input num_gpus, not 0

Returns:
(num_gpus, gpu_types)
"""
gpu_ids = ray._private.utils.get_cuda_visible_devices()
print(f"in _get_cuda_info, num_gpus = {num_gpus}, gpu_ids = {gpu_ids}")
# Check that the number of GPUs that the raylet wants doesn't
# exceed the amount allowed by CUDA_VISIBLE_DEVICES.
if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids):
raise ValueError(
"Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids)
)
if num_gpus is None:
# Try to automatically detect the number of GPUs.
num_gpus = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
num_gpus = min(num_gpus, len(gpu_ids))

gpu_types = ""
try:
if importlib.util.find_spec("GPUtil") is not None:
print(f"in _get_cuda_info, not find GPUtil lib")
gpu_types = _get_gpu_types_gputil()
else:
info_string = _get_gpu_info_string()
gpu_types = _constraints_from_gpu_info(info_string)
print(f"in _get_cuda_info, info_string = {info_string}, gpu_types = {gpu_types}")
except Exception:
logger.exception("Could not parse gpu information.")

return num_gpus, gpu_types


def _get_xpu_info(num_xpus):
"""Attempt to process the number of XPUs as GPUs
Returns:
The number of XPUs that detected by dpctl with specific backend and device type
"""
print(f"in _get_xpu_info, input num_xpus = {num_xpus}")
xpu_ids = ray._private.utils.get_xpu_visible_devices()
print(f"in _get_xpu_info, xpu_ids = {xpu_ids}")
if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids):
raise ValueError(
"Attempting to start raylet with {} XPUs, "
"but XPU_VISIBLE_DEVICES contains {}.".format(num_xpus, xpu_ids)
)
if num_xpus is None:
num_xpus = len(dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE,
device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE))
if xpu_ids is not None:
num_xpus = min(num_xpus, len(xpu_ids))
xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1}
return num_xpus, xpu_types


def _autodetect_num_gpus():
"""Attempt to detect the number of GPUs on this machine.

Expand Down
52 changes: 51 additions & 1 deletion python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def get_cuda_visible_devices():
If it is not set or is set to NoDevFiles, returns empty list.
"""
gpu_ids_str = os.environ.get("CUDA_VISIBLE_DEVICES", None)
print(f"in get_cuda_visible_devices, env var = {gpu_ids_str}")

if gpu_ids_str is None:
return None
Expand All @@ -326,6 +327,31 @@ def get_cuda_visible_devices():
last_set_gpu_ids = None


def get_xpu_visible_devices():
"""Get the devices IDs in the XPU_VISIBLE_DEVICES environment variable.
Returns:
devices (List[str]): If XPU_VISIBLE_DEVICES is set, return a
list of strings representing the IDs of the visible XPUs.
If it is not set or is set, returns empty list.
"""
xpu_ids_str = os.environ.get("XPU_VISIBLE_DEVICES", None)
print(f"in get_xpu_visible_devices, env var = {xpu_ids_str}")
if xpu_ids_str is None:
return None

if xpu_ids_str == "":
return []

return list(xpu_ids_str.split(","))


def get_gpu_visible_devices():
if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA":
return get_cuda_visible_devices()
elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU":
return get_xpu_visible_devices()


def set_omp_num_threads_if_unset() -> bool:
"""Set the OMP_NUM_THREADS to default to num cpus assigned to the worker

Expand Down Expand Up @@ -379,11 +405,35 @@ def set_cuda_visible_devices(gpu_ids):
global last_set_gpu_ids
if last_set_gpu_ids == gpu_ids:
return # optimization: already set

ids_str = ",".join([str(i) for i in gpu_ids])
print(f"in set_cuda_visible_devices, env var = {ids_str}")
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids])
last_set_gpu_ids = gpu_ids


def set_xpu_visible_devices(xpu_ids):
"""Set the ONEAPI_DEVICE_SELECTOR environment variable.
Args:
xpu_ids (List[str]): List of strings representing GPU IDs
"""

if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR):
print(f"no set xpu visible devices env var")
return

ids_str = ",".join([str(i) for i in xpu_ids])
print(f"in set_xpu_visible_devices, env var = {ids_str}")
os.environ["XPU_VISIBLE_DEVICES"] = ids_str
os.environ["ONEAPI_DEVICE_SELECTOR"] = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str


def set_gpu_visible_devices(gpu_ids):
if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA":
set_cuda_visible_devices(gpu_ids)
elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU":
set_xpu_visible_devices(gpu_ids)


def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Determine a task's resource requirements.

Expand Down
59 changes: 54 additions & 5 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import traceback
import urllib
import warnings
import dpctl
from abc import ABCMeta, abstractmethod
from collections.abc import Mapping
from contextlib import contextmanager
Expand Down Expand Up @@ -433,7 +434,8 @@ def __init__(self):
self.actors = {}
# When the worker is constructed. Record the original value of the
# CUDA_VISIBLE_DEVICES environment variable.
self.original_gpu_ids = ray._private.utils.get_cuda_visible_devices()
# Record the original value of the XPU_VISIBLE_DEVICES environment variable.
self.original_gpu_ids = ray._private.utils.get_gpu_visible_devices()
# A dictionary that maps from driver id to SerializationContext
# TODO: clean up the SerializationContext once the job finished.
self.serialization_context_map = {}
Expand Down Expand Up @@ -897,9 +899,7 @@ def print_logs(self):
subscriber.close()


@PublicAPI
@client_mode_hook(auto_init=True)
def get_gpu_ids():
def get_cuda_ids():
"""Get the IDs of the GPUs that are available to the worker.

If the CUDA_VISIBLE_DEVICES environment variable was set when the worker
Expand All @@ -916,7 +916,7 @@ def get_gpu_ids():
if worker.mode != WORKER_MODE:
if log_once("worker_get_gpu_ids_empty_from_driver"):
logger.warning(
"`ray.get_gpu_ids()` will always return the empty list when "
"`ray.get_cuda_ids()` will always return the empty list when "
"called from the driver. This is because Ray does not manage "
"GPU allocations to the driver process."
)
Expand Down Expand Up @@ -950,6 +950,55 @@ def get_gpu_ids():
return assigned_ids


def get_xpu_ids():
""" Get the IDs of the XPUs that are available to the worker.
If the XPU_VISIBLE_DEVICES environment variable was set when the worker
started up,
Returns:
A list of XPU IDs
"""
worker = global_worker
worker.check_connected()

if worker.mode != WORKER_MODE:
if log_once("worker_get_gpu_ids_empty_from_driver"):
logger.warning(
"`ray.get_xpu_ids()` will always return the empty list when "
"called from the driver. This is because Ray does not manage "
"XPU allocations to the driver process."
)
# Here we use `dpctl` to detect XPU device:
# Enumrate all device by API dpctl.get_devices
# Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset
# Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR
# Another method to enumrate XPU device is to use C++ API, maybe can upgrade laster

xpu_devices = dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE,
device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE)
xpu_ava_ids = set()
xpu_dev_prefix = f"{ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE}:{ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE}"
for xpu_dev in xpu_devices:
xpu_id = int(xpu_dev.filter_string.split(xpu_dev_prefix)[1])
xpu_ava_ids.add(xpu_id)

xpu_ids = []
if global_worker.original_gpu_ids is not None:
xpu_ids = [
global_worker.original_gpu_ids[xpu_id] for xpu_id in xpu_ava_ids
]

return xpu_ids


@PublicAPI
@client_mode_hook(auto_init=True)
def get_gpu_ids():
if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA":
return get_cuda_ids()
elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU":
return get_xpu_ids()


@Deprecated(
message="Use ray.get_runtime_context().get_assigned_resources() instead.",
warning=True,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ cdef execute_task_with_cancellation_handler(
title = f"ray::{task_name}"

# Automatically restrict the GPUs available to this task.
ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids())
ray._private.utils.set_gpu_visible_devices(ray.get_gpu_ids())

# Automatically configure OMP_NUM_THREADS to the assigned CPU number.
# It will be unset after the task execution if it was overwridden here.
Expand Down
Loading