From 7344f54c74f2222413e1fd684ad3321756531310 Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Mon, 12 Jun 2023 22:44:27 +0800 Subject: [PATCH 1/3] support intel xpu --- python/ray/_private/ray_constants.py | 10 ++- python/ray/_private/resource_spec.py | 104 +++++++++++++++++------ python/ray/_private/utils.py | 38 ++++++++- python/ray/_private/worker.py | 49 +++++++++++ python/ray/_raylet.pyx | 7 +- python/ray/tests/test_actor_resources.py | 30 +++++-- python/ray/tests/test_advanced_2.py | 4 +- python/ray/tests/test_advanced_6.py | 23 +++++ python/ray/tests/test_basic.py | 4 +- 9 files changed, 232 insertions(+), 37 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index d3e21046a693..8c79e340c758 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -23,6 +23,7 @@ def env_integer(key, default): def env_bool(key, default): + print(f"going to create env var name = {key} with value = {default}") if key in os.environ: return ( True @@ -226,7 +227,7 @@ def env_set_by_user(key): LOG_MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_LOG_MONITOR}.log" # Enable log deduplication. -RAY_DEDUP_LOGS = env_bool("RAY_DEDUP_LOGS", True) +RAY_DEDUP_LOGS = env_bool("RAY_DEDUP_LOGS", False) # How many seconds of messages to buffer for log deduplication. RAY_DEDUP_LOGS_AGG_WINDOW_S = env_integer("RAY_DEDUP_LOGS_AGG_WINDOW_S", 5) @@ -378,6 +379,13 @@ def env_set_by_user(key): LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"] NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" +NOSET_XPU_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_XPU_VISIBLE_DEVICES" +RAY_DEVICE_XPU_SELECTOR_ENV_VAR = "ONEAPI_DEVICE_SELECTOR" +RAY_DEVICE_XPU_BACKEND_TYPE = "level_zero" +RAY_DEVICE_XPU_DEVICE_TYPE = "gpu" + +RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"} +RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA" # Default max_retries option in @ray.remote for non-actor # tasks. diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 21163decf1f8..ca3388042dcd 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -10,6 +10,11 @@ import ray import ray._private.ray_constants as ray_constants +try: + import dpctl +except ImportError: + pass + try: import GPUtil except ImportError: @@ -157,35 +162,22 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): # ray._private.state.current_node_id(). resources[NODE_ID_PREFIX + node_ip_address] = 1.0 + # get cpu num num_cpus = self.num_cpus + print(f"going to get_num_cpus, num_cpus = {num_cpus}") if num_cpus is None: num_cpus = ray._private.utils.get_num_cpus() - num_gpus = self.num_gpus - gpu_ids = ray._private.utils.get_cuda_visible_devices() - # Check that the number of GPUs that the raylet wants doesn't - # exceed the amount allowed by CUDA_VISIBLE_DEVICES. - if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): - raise ValueError( - "Attempting to start raylet with {} GPUs, " - "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) - ) - if num_gpus is None: - # Try to automatically detect the number of GPUs. - num_gpus = _autodetect_num_gpus() - # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. - if gpu_ids is not None: - num_gpus = min(num_gpus, len(gpu_ids)) - - try: - if importlib.util.find_spec("GPUtil") is not None: - gpu_types = _get_gpu_types_gputil() - else: - info_string = _get_gpu_info_string() - gpu_types = _constraints_from_gpu_info(info_string) + # get accelerator info + if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": # get cuda device num + num_gpus, gpu_types = _get_cuda_info(self.num_gpus) resources.update(gpu_types) - except Exception: - logger.exception("Could not parse gpu information.") + elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": # get xpu device num + # here we take xpu as gpu, so no need to develop core's scheduling policy + # If we don't want to take xpu as gpu, ray core need to develop new scheduling policy + num_gpus, gpu_types = _get_xpu_info(self.num_gpus) + resources.update(gpu_types) + print(f"after get accelerator info, num_gpus = {num_gpus}, gpu_types = {gpu_types}") # Choose a default object store size. system_memory = ray._private.utils.get_system_memory() @@ -265,6 +257,70 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): return spec +def _get_cuda_info(num_gpus): + """ Attemp to process the number and type of GPUs + + Notice: + If gpu id not specified in CUDA_VISIBLE_DEVICES, + and num_gpus is defined in task or actor, + this function will return the input num_gpus, not 0 + + Returns: + (num_gpus, gpu_types) + """ + gpu_ids = ray._private.utils.get_cuda_visible_devices() + print(f"in _get_cuda_info, num_gpus = {num_gpus}, gpu_ids = {gpu_ids}") + # Check that the number of GPUs that the raylet wants doesn't + # exceed the amount allowed by CUDA_VISIBLE_DEVICES. + if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): + raise ValueError( + "Attempting to start raylet with {} GPUs, " + "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) + ) + if num_gpus is None: + # Try to automatically detect the number of GPUs. + num_gpus = _autodetect_num_gpus() + # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. + if gpu_ids is not None: + num_gpus = min(num_gpus, len(gpu_ids)) + + gpu_types = "" + try: + if importlib.util.find_spec("GPUtil") is not None: + print(f"in _get_cuda_info, not find GPUtil lib") + gpu_types = _get_gpu_types_gputil() + else: + info_string = _get_gpu_info_string() + gpu_types = _constraints_from_gpu_info(info_string) + print(f"in _get_cuda_info, info_string = {info_string}, gpu_types = {gpu_types}") + except Exception: + logger.exception("Could not parse gpu information.") + + return num_gpus, gpu_types + + +def _get_xpu_info(num_xpus): + """Attempt to process the number of XPUs as GPUs + Returns: + The number of XPUs that detected by dpctl with specific backend and device type + """ + print(f"in _get_xpu_info, input num_xpus = {num_xpus}") + xpu_ids = ray._private.utils.get_xpu_visible_devices() + print(f"in _get_xpu_info, xpu_ids = {xpu_ids}") + if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids): + raise ValueError( + "Attempting to start raylet with {} XPUs, " + "but XPU_VISIBLE_DEVICES contains {}.".format(num_xpus, xpu_ids) + ) + if num_xpus is None: + num_xpus = len(dpctl.get_devices(backend=RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=RAY_DEVICE_XPU_DEVICE_TYPE)) + if xpu_ids is not None: + num_xpus = min(num_xpus, len(xpu_ids)) + xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1} + return num_xpus, xpu_types + + def _autodetect_num_gpus(): """Attempt to detect the number of GPUs on this machine. diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 8f72bd4335e8..518f356f3354 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -309,6 +309,7 @@ def get_cuda_visible_devices(): If it is not set or is set to NoDevFiles, returns empty list. """ gpu_ids_str = os.environ.get("CUDA_VISIBLE_DEVICES", None) + print(f"in get_cuda_visible_devices, env var = {gpu_ids_str}") if gpu_ids_str is None: return None @@ -326,6 +327,24 @@ def get_cuda_visible_devices(): last_set_gpu_ids = None +def get_xpu_visible_devices(): + """Get the devices IDs in the XPU_VISIBLE_DEVICES environment variable. + Returns: + devices (List[str]): If XPU_VISIBLE_DEVICES is set, return a + list of strings representing the IDs of the visible XPUs. + If it is not set or is set, returns empty list. + """ + xpu_ids_str = os.environ.get("XPU_VISIBLE_DEVICES", None) + print(f"in get_xpu_visible_devices, env var = {xpu_ids_str}") + if xpu_ids_str is None: + return None + + if xpu_ids_str == "": + return [] + + return list(xpu_ids_str.split(",")) + + def set_omp_num_threads_if_unset() -> bool: """Set the OMP_NUM_THREADS to default to num cpus assigned to the worker @@ -379,11 +398,28 @@ def set_cuda_visible_devices(gpu_ids): global last_set_gpu_ids if last_set_gpu_ids == gpu_ids: return # optimization: already set - + ids_str = ",".join([str(i) for i in gpu_ids]) + print(f"in set_cuda_visible_devices, env var = {ids_str}") os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids]) last_set_gpu_ids = gpu_ids +def set_xpu_visible_devices(xpu_ids): + """Set the ONEAPI_DEVICE_SELECTOR environment variable. + Args: + xpu_ids (List[str]): List of strings representing GPU IDs + """ + + if os.environ.get(ray_constants.NOSET_XPU_VISIBLE_DEVICES_ENV_VAR): + print(f"no set xpu visible devices env var") + return + + ids_str = ",".join([str(i) for i in xpu_ids]) + print(f"in set_xpu_visible_devices, env var = {ids_str}") + os.environ["XPU_VISIBLE_DEVICES"] = ids_str + os.environ["ONEAPI_DEVICE_SELECTOR"] = RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str + + def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: """Determine a task's resource requirements. diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 161dafa34fba..0ce6460c3d18 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -434,6 +434,8 @@ def __init__(self): # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray._private.utils.get_cuda_visible_devices() + # Record the original value of the XPU_VISIBLE_DEVICES environment variable. + self.original_xpu_ids = ray._private.utils.get_xpu_visible_devices() # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. self.serialization_context_map = {} @@ -900,6 +902,13 @@ def print_logs(self): @PublicAPI @client_mode_hook(auto_init=True) def get_gpu_ids(): + if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": + return get_cuda_ids() + elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": + return get_xpu_ids() + + +def get_cuda_ids(): """Get the IDs of the GPUs that are available to the worker. If the CUDA_VISIBLE_DEVICES environment variable was set when the worker @@ -950,6 +959,46 @@ def get_gpu_ids(): return assigned_ids +def get_xpu_ids(): + """ Get the IDs of the XPUs that are available to the worker. + If the XPU_VISIBLE_DEVICES environment variable was set when the worker + started up, + Returns: + A list of XPU IDs + """ + worker = global_worker + worker.check_connected() + + if worker.mode != WORKER_MODE: + if log_once("worker_get_gpu_ids_empty_from_driver"): + logger.warning( + "`ray.get_xpu_ids()` will always return the empty list when " + "called from the driver. This is because Ray does not manage " + "XPU allocations to the driver process." + ) + # Here we use `dpctl` to detect XPU device: + # Enumrate all device by API dpctl.get_devices + # Notice that ONEAPI_DEVICE_SELECTOR environment variable should be unset + # Or dpctl.get_devices will only return filtered device set by ONEAPI_DEVICE_SELECTOR + # Another method to enumrate XPU device is to use C++ API, maybe can upgrade laster + + xpu_devices = dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE) + xpu_ava_ids = set() + xpu_dev_prefix = f"{ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE}:{ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE}" + for xpu_dev in xpu_devices: + xpu_id = int(xpu_dev.filter_string.split(xpu_dev_prefix)[1]) + xpu_ava_ids.add(xpu_id) + + xpu_ids = [] + if global_worker.original_xpu_ids is not None: + xpu_ids = [ + global_worker.original_xpu_ids[xpu_id] for xpu_id in xpu_ava_ids + ] + + return xpu_ids + + @Deprecated( message="Use ray.get_runtime_context().get_assigned_resources() instead.", warning=True, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c34179589cec..bb96033107a5 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1044,8 +1044,11 @@ cdef execute_task_with_cancellation_handler( task_name = name.decode("utf-8") title = f"ray::{task_name}" - # Automatically restrict the GPUs available to this task. - ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids()) + # Automatically restrict the CUDA devices available to this task. + ray._private.utils.set_cuda_visible_devices(ray.get_cuda_ids()) + + # Automatically restrict the XPU devices available to this task. + ray._private.utils.set_xpu_visible_devices(ray.get_xpu_ids()) # Automatically configure OMP_NUM_THREADS to the assigned CPU number. # It will be unset after the task execution if it was overwridden here. diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 6aa6f01f4fec..3a30e5de5c8f 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -8,6 +8,8 @@ import ray import ray.cluster_utils +import ray._private.ray_constants as ray_constants + try: import pytest_timeout except ImportError: @@ -80,7 +82,9 @@ def echo(self, value): @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -def test_actor_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_gpus(ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 4 @@ -123,7 +127,9 @@ def get_location_and_ids(self): assert ready_ids == [] -def test_actor_multiple_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_multiple_gpus(ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 5 @@ -201,7 +207,9 @@ def get_location_and_ids(self): @pytest.mark.skipif(sys.platform == "win32", reason="Very flaky.") -def test_actor_different_numbers_of_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_different_numbers_of_gpus(ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE # Test that we can create actors on two nodes that have different # numbers of GPUs. cluster = ray_start_cluster @@ -243,7 +251,9 @@ def get_location_and_ids(self): assert ready_ids == [] -def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -320,7 +330,9 @@ def get_location_and_ids(self): assert ready_ids == [] -def test_actors_and_tasks_with_gpus(enable_syncer_test, ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actors_and_tasks_with_gpus(enable_syncer_test, ray_start_cluster, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -439,7 +451,9 @@ def locations_to_intervals_for_many_tasks(): assert len(ready_ids) == 0 -def test_actors_and_tasks_with_gpus_version_two(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actors_and_tasks_with_gpus_version_two(shutdown_only, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs num_gpus = 4 @@ -616,7 +630,9 @@ def get_location(self): assert location == custom_resource2_node.unique_id -def test_creating_more_actors_than_resources(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_creating_more_actors_than_resources(shutdown_only, ACCELERATOR_TYPE): + ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE ray.init(num_cpus=10, num_gpus=2, resources={"CustomResource1": 1}) @ray.remote(num_gpus=1) diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index f1900ec46e16..b7f11a3a09d5 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -137,7 +137,9 @@ def method(self): assert valid_node.unique_id == ray.get(a.method.remote()) -def test_fractional_resources(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_fractional_resources(shutdown_only, ACCELERATOR_TYPE): + ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1}) @ray.remote(num_gpus=0.5) diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index 3e916f7eebcb..5daac77ec864 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -60,6 +60,29 @@ def g(): ray.get([g.remote() for _ in range(100)]) +@pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") +def test_specific_xpus(save_gpu_ids_shutdown_only): + allowed_xpu_ids = [1, 3, 5] + os.environ["XPU_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_xpu_ids]) + ray.init(num_gpus=3) + + @ray.remote(num_gpus=1) + def f(): + xpu_ids = ray.get_xpu_ids() + assert len(xpu_ids) == 1 + assert int(xpu_ids[0]) in allowed_xpu_ids + + @ray.remote(num_gpus=2) + def g(): + xpu_ids = ray.get_xpu_ids() + assert len(xpu_ids) == 2 + assert int(xpu_ids[0]) in allowed_xpu_ids + assert int(xpu_ids[1]) in allowed_xpu_ids + + ray.get([f.remote() for _ in range(100)]) + ray.get([g.remote() for _ in range(100)]) + + def test_local_mode_gpus(save_gpu_ids_shutdown_only): allowed_gpu_ids = [4, 5, 6, 7, 8] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index b60464f72792..68c73be14076 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -230,7 +230,9 @@ def g(): assert ray.get(f.options(num_cpus=4).remote()) == "1" -def test_submit_api(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_submit_api(shutdown_only, ACCELERATOR_TYPE): + ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = ACCELERATOR_TYPE ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @ray.remote From 43b9dda28e82a4d51156192ccd37686e6bf3d9de Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Mon, 12 Jun 2023 23:19:46 +0800 Subject: [PATCH 2/3] update --- python/ray/_private/resource_spec.py | 4 ++-- python/ray/_private/utils.py | 2 +- python/ray/_private/worker.py | 21 +++++++++++---------- python/ray/_raylet.pyx | 4 ++-- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index ca3388042dcd..e11cd05634aa 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -313,8 +313,8 @@ def _get_xpu_info(num_xpus): "but XPU_VISIBLE_DEVICES contains {}.".format(num_xpus, xpu_ids) ) if num_xpus is None: - num_xpus = len(dpctl.get_devices(backend=RAY_DEVICE_XPU_BACKEND_TYPE, - device_type=RAY_DEVICE_XPU_DEVICE_TYPE)) + num_xpus = len(dpctl.get_devices(backend=ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE, + device_type=ray_constants.RAY_DEVICE_XPU_DEVICE_TYPE)) if xpu_ids is not None: num_xpus = min(num_xpus, len(xpu_ids)) xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1} diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 518f356f3354..c2c8762f0963 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -417,7 +417,7 @@ def set_xpu_visible_devices(xpu_ids): ids_str = ",".join([str(i) for i in xpu_ids]) print(f"in set_xpu_visible_devices, env var = {ids_str}") os.environ["XPU_VISIBLE_DEVICES"] = ids_str - os.environ["ONEAPI_DEVICE_SELECTOR"] = RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str + os.environ["ONEAPI_DEVICE_SELECTOR"] = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 0ce6460c3d18..cf7c113da18c 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -13,6 +13,7 @@ import traceback import urllib import warnings +import dpctl from abc import ABCMeta, abstractmethod from collections.abc import Mapping from contextlib import contextmanager @@ -899,15 +900,6 @@ def print_logs(self): subscriber.close() -@PublicAPI -@client_mode_hook(auto_init=True) -def get_gpu_ids(): - if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": - return get_cuda_ids() - elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": - return get_xpu_ids() - - def get_cuda_ids(): """Get the IDs of the GPUs that are available to the worker. @@ -925,7 +917,7 @@ def get_cuda_ids(): if worker.mode != WORKER_MODE: if log_once("worker_get_gpu_ids_empty_from_driver"): logger.warning( - "`ray.get_gpu_ids()` will always return the empty list when " + "`ray.get_cuda_ids()` will always return the empty list when " "called from the driver. This is because Ray does not manage " "GPU allocations to the driver process." ) @@ -999,6 +991,15 @@ def get_xpu_ids(): return xpu_ids +@PublicAPI +@client_mode_hook(auto_init=True) +def get_gpu_ids(): + if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": + return get_cuda_ids() + elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": + return get_xpu_ids() + + @Deprecated( message="Use ray.get_runtime_context().get_assigned_resources() instead.", warning=True, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index bb96033107a5..158f68601b78 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1045,10 +1045,10 @@ cdef execute_task_with_cancellation_handler( title = f"ray::{task_name}" # Automatically restrict the CUDA devices available to this task. - ray._private.utils.set_cuda_visible_devices(ray.get_cuda_ids()) + ray._private.utils.set_cuda_visible_devices(ray._private.worker.get_cuda_ids()) # Automatically restrict the XPU devices available to this task. - ray._private.utils.set_xpu_visible_devices(ray.get_xpu_ids()) + ray._private.utils.set_xpu_visible_devices(ray._private.worker.get_xpu_ids()) # Automatically configure OMP_NUM_THREADS to the assigned CPU number. # It will be unset after the task execution if it was overwridden here. From 15bf6bd3cc27b016760701c584d4e93e4d9328ed Mon Sep 17 00:00:00 2001 From: "Wu, Gangsheng" Date: Tue, 13 Jun 2023 12:58:45 +0800 Subject: [PATCH 3/3] update --- python/ray/_private/utils.py | 14 ++++++++++++++ python/ray/_private/worker.py | 7 +++---- python/ray/_raylet.pyx | 7 ++----- python/ray/tests/test_advanced_6.py | 2 ++ 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index c2c8762f0963..3e5d157497a9 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -345,6 +345,13 @@ def get_xpu_visible_devices(): return list(xpu_ids_str.split(",")) +def get_gpu_visible_devices(): + if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": + return get_cuda_visible_devices() + elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": + return get_xpu_visible_devices() + + def set_omp_num_threads_if_unset() -> bool: """Set the OMP_NUM_THREADS to default to num cpus assigned to the worker @@ -420,6 +427,13 @@ def set_xpu_visible_devices(xpu_ids): os.environ["ONEAPI_DEVICE_SELECTOR"] = ray_constants.RAY_DEVICE_XPU_BACKEND_TYPE + ":" + ids_str +def set_gpu_visible_devices(gpu_ids): + if ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "CUDA": + set_cuda_visible_devices(gpu_ids) + elif ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR == "XPU": + set_xpu_visible_devices(gpu_ids) + + def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: """Determine a task's resource requirements. diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index cf7c113da18c..506387381c95 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -434,9 +434,8 @@ def __init__(self): self.actors = {} # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. - self.original_gpu_ids = ray._private.utils.get_cuda_visible_devices() # Record the original value of the XPU_VISIBLE_DEVICES environment variable. - self.original_xpu_ids = ray._private.utils.get_xpu_visible_devices() + self.original_gpu_ids = ray._private.utils.get_gpu_visible_devices() # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. self.serialization_context_map = {} @@ -983,9 +982,9 @@ def get_xpu_ids(): xpu_ava_ids.add(xpu_id) xpu_ids = [] - if global_worker.original_xpu_ids is not None: + if global_worker.original_gpu_ids is not None: xpu_ids = [ - global_worker.original_xpu_ids[xpu_id] for xpu_id in xpu_ava_ids + global_worker.original_gpu_ids[xpu_id] for xpu_id in xpu_ava_ids ] return xpu_ids diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 158f68601b78..e32b9787cade 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1044,11 +1044,8 @@ cdef execute_task_with_cancellation_handler( task_name = name.decode("utf-8") title = f"ray::{task_name}" - # Automatically restrict the CUDA devices available to this task. - ray._private.utils.set_cuda_visible_devices(ray._private.worker.get_cuda_ids()) - - # Automatically restrict the XPU devices available to this task. - ray._private.utils.set_xpu_visible_devices(ray._private.worker.get_xpu_ids()) + # Automatically restrict the GPUs available to this task. + ray._private.utils.set_gpu_visible_devices(ray.get_gpu_ids()) # Automatically configure OMP_NUM_THREADS to the assigned CPU number. # It will be unset after the task execution if it was overwridden here. diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index 5daac77ec864..82d044f585a2 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -39,6 +39,7 @@ def save_gpu_ids_shutdown_only(): @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") def test_specific_gpus(save_gpu_ids_shutdown_only): + ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "CUDA" allowed_gpu_ids = [4, 5, 6] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) ray.init(num_gpus=3) @@ -62,6 +63,7 @@ def g(): @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") def test_specific_xpus(save_gpu_ids_shutdown_only): + ray._private.ray_constants.RAY_DEVICE_CURRENT_ACCELERATOR = "XPU" allowed_xpu_ids = [1, 3, 5] os.environ["XPU_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_xpu_ids]) ray.init(num_gpus=3)