From 94ed600f22437d9821ea07b6aca3a1d0202f0210 Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Tue, 29 Aug 2023 17:50:02 -0700 Subject: [PATCH 01/11] [Train] Add accelerator ids to worker metadata, share neuron_cores by default Signed-off-by: maheedhar reddy chappidi --- .../ray/train/_internal/backend_executor.py | 91 ++++++++++++++++--- python/ray/train/_internal/worker_group.py | 33 ++++++- python/ray/train/constants.py | 4 + python/ray/train/tests/conftest.py | 14 +++ python/ray/train/tests/test_backend.py | 84 +++++++++++++++-- python/ray/train/tests/test_worker_group.py | 57 +++++++++++- 6 files changed, 260 insertions(+), 23 deletions(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index 287e80cf954a8..aacd1cc5f4ba8 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -4,6 +4,7 @@ from typing import Callable, Dict, List, Optional, Tuple, Type, TypeVar, Any import ray +import ray._private.ray_constants as ray_constants from ray.data import Dataset from ray._private.ray_constants import env_integer from ray.air.config import CheckpointConfig @@ -27,6 +28,7 @@ TRAIN_ENABLE_WORKER_SPREAD_ENV, TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, DISABLE_LAZY_CHECKPOINTING_ENV, + ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, ) from ray.util.placement_group import get_current_placement_group, remove_placement_group @@ -153,6 +155,9 @@ def start( if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled: self._share_cuda_visible_devices() + elif self._additional_resources_per_worker: + if self._share_neuron_core_ids_enabled(): + self._share_neuron_core_ids() self._backend.on_start(self.worker_group, self._backend_config) except RayActorError as exc: logger.exception(str(exc)) @@ -245,32 +250,96 @@ def _share_cuda_visible_devices(self): - Worker2: "0,1" """ - node_ids_and_gpu_ids = [ - (w.metadata.node_id, w.metadata.gpu_ids) for w in self.worker_group.workers + (w.metadata.node_id, w.metadata.gpu_and_accelerator_ids[ray_constants.GPU]) + for w in self.worker_group.workers + ] + self._share_runtime_ids( + node_ids_and_runtime_ids=node_ids_and_gpu_ids, + env_var=ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR, + ) + + def _share_neuron_core_ids(self): + """Sets NEURON_RT_VISIBLE_CORES on all workers. + + For each worker, NEURON_RT_VISIBLE_CORES will be set to the + NEURON_CORE IDs visible to all workers on that worker's node. + + This allows the workers on the same node to communicate + with one another. + + Example: + + Setup: + - Node1: + - Worker1: {0, 1} + - Worker2: {2, 3} + - Node2: + - Worker3: {0, 1} + + NEURON_RT_VISIBLE_CORES: + - Worker1: "0,1,2,3" + - Worker2: "0,1,2,3" + - Worker2: "0,1" + """ + node_ids_and_neuron_core_ids = [ + ( + w.metadata.node_id, + w.metadata.gpu_and_accelerator_ids[ray_constants.NEURON_CORES], + ) + for w in self.worker_group.workers ] + self._share_runtime_ids( + node_ids_and_runtime_ids=node_ids_and_neuron_core_ids, + env_var=ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, + ) + def _share_runtime_ids( + self, node_ids_and_runtime_ids: List[Tuple[str, List[str]]], env_var: str + ): + """Sets the given env_var on all workers. + Args: + node_ids_and_runtime_ids: A list of tuples of node_id and + list of runtime_ids. + env_var: The name of the environment variable to set. + """ node_id_to_worker_id = defaultdict(set) - node_id_to_gpu_ids = defaultdict(set) + node_id_to_runtime_ids = defaultdict(set) - for worker_id, (node_id, gpu_ids) in enumerate(node_ids_and_gpu_ids): + for worker_id, (node_id, runtime_id) in enumerate(node_ids_and_runtime_ids): node_id_to_worker_id[node_id].add(worker_id) - node_id_to_gpu_ids[node_id].update(gpu_ids) + node_id_to_runtime_ids[node_id].update(runtime_id) futures = [] - for node_id, gpu_ids in node_id_to_gpu_ids.items(): - gpu_ids = sorted(gpu_ids) - all_gpu_ids = ",".join(gpu_ids) + for node_id, runtime_ids in node_id_to_runtime_ids.items(): + runtime_ids = sorted(runtime_ids) + all_runtime_ids = ",".join(runtime_ids) - def set_gpu_ids(): - os.environ["CUDA_VISIBLE_DEVICES"] = all_gpu_ids + def set_runtime_ids(): + os.environ[env_var] = all_runtime_ids for worker_id in node_id_to_worker_id[node_id]: futures.append( - self.worker_group.execute_single_async(worker_id, set_gpu_ids) + self.worker_group.execute_single_async(worker_id, set_runtime_ids) ) ray.get(futures) + def _share_neuron_core_ids_enabled(self): + """Whether to share NEURON_RT_VISIBLE_CORES on all workers. + This is enabled by default if neuron_cores are requested for + workers. User can disable it by configuring the + TRAIN_ENABLE_SHARE_NEURON_RT_VISIBLE_CORES to "0" + """ + return bool( + env_integer( + ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, + self._additional_resources_per_worker.get( + ray_constants.NEURON_CORES, None + ) + is not None, + ) + ) + def _create_rank_world_size_mappings(self) -> List[Dict]: """Create rank and world size mappings for workers. There are three maps returned: diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index 59ba775b5630d..5ea553d655588 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -6,6 +6,7 @@ from typing import Callable, List, TypeVar, Optional, Dict, Type, Tuple, Union import ray +import ray._private.ray_constants as ray_constants from ray.actor import ActorHandle from ray.air._internal.util import skip_exceptions, exception_cause from ray.types import ObjectRef @@ -44,14 +45,14 @@ class WorkerMetadata: node_id: ID of the node this worker is on. node_ip: IP address of the node this worker is on. hostname: Hostname that this worker is on. - gpu_ids: List of CUDA IDs available to this worker. + gpu_and_accelerator_ids: Map of GPU IDs, accelerator IDs (AWS NeuronCore, ..). pid: Process ID of this worker. """ node_id: str node_ip: str hostname: str - gpu_ids: Optional[List[str]] + gpu_and_accelerator_ids: Dict[str, List[str]] pid: int @@ -86,18 +87,42 @@ def construct_metadata() -> WorkerMetadata: node_id = ray.get_runtime_context().get_node_id() node_ip = ray.util.get_node_ip_address() hostname = socket.gethostname() - gpu_ids = [str(gpu_id) for gpu_id in ray.get_gpu_ids()] pid = os.getpid() return WorkerMetadata( node_id=node_id, node_ip=node_ip, hostname=hostname, - gpu_ids=gpu_ids, + gpu_and_accelerator_ids=_get_gpu_and_accelerator_ids(), pid=pid, ) +def _get_gpu_and_accelerator_ids() -> Dict[str, List[str]]: + """Get GPU and accelerator IDs from runtime context for given actor/worker. + + Returns: + A dictionary mapping resource IDs to a list of resource IDs. + For example, + { + "GPU": ["0", "1"], + "neuron_cores": ["0", "1"] + } + """ + gpu_and_accelerator_ids = defaultdict(list) + + resource_ids = ray.get_runtime_context().get_resource_ids() + gpu_ids = resource_ids[ray_constants.GPU] + neuron_core_ids = resource_ids[ray_constants.NEURON_CORES] + + gpu_and_accelerator_ids[ray_constants.GPU].extend(gpu_ids) + gpu_and_accelerator_ids[ray_constants.NEURON_CORES].extend(neuron_core_ids) + + if len(gpu_ids) > 0 and len(neuron_core_ids) > 0: + raise RuntimeError("Cannot support GPU and Neuron Core IDs on same Worker.") + return gpu_and_accelerator_ids + + class WorkerGroup: """Group of Ray Actors that can execute arbitrary functions. diff --git a/python/ray/train/constants.py b/python/ray/train/constants.py index 9a9fe5c5be83d..1871a0ece3630 100644 --- a/python/ray/train/constants.py +++ b/python/ray/train/constants.py @@ -65,6 +65,10 @@ def _get_defaults_results_dir() -> str: # Backend.share_cuda_visible_devices. 1 for True, 0 for False. ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV = "TRAIN_ENABLE_SHARE_CUDA_VISIBLE_DEVICES" +# Integer value which if set will not share the RT visible cores across workers. +# 1 for True (default), 0 for False. +ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV = "TRAIN_ENABLE_SHARE_NEURON_RT_VISIBLE_CORES" + # Integer value which indicates the number of seconds to wait when creating # the worker placement group before timing out. TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV = "TRAIN_PLACEMENT_GROUP_TIMEOUT_S" diff --git a/python/ray/train/tests/conftest.py b/python/ray/train/tests/conftest.py index 5c0338ea3a510..302306f4fc6d9 100644 --- a/python/ray/train/tests/conftest.py +++ b/python/ray/train/tests/conftest.py @@ -55,6 +55,20 @@ def ray_2_node_2_gpu(): cluster.shutdown() +@pytest.fixture +def ray_2_node_2_neuron_cores(): + cluster = Cluster() + for _ in range(2): + cluster.add_node(num_cpus=4, resources={"neuron_cores": 2}) + + ray.init(address=cluster.address) + + yield + + ray.shutdown() + cluster.shutdown() + + @pytest.fixture def ray_start_2_cpus(): address_info = ray.init(num_cpus=2) diff --git a/python/ray/train/tests/test_backend.py b/python/ray/train/tests/test_backend.py index c3a44cddedb30..f55d253af1d7f 100644 --- a/python/ray/train/tests/test_backend.py +++ b/python/ray/train/tests/test_backend.py @@ -7,6 +7,7 @@ import time import ray +import ray._private.ray_constants as ray_constants from ray import train from ray.air._internal.util import StartTraceback @@ -25,6 +26,7 @@ from ray.train.constants import ( ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, TRAIN_ENABLE_WORKER_SPREAD_ENV, + ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, ) from ray.train.tensorflow import TensorflowConfig from ray.train.torch import TorchConfig @@ -97,7 +99,7 @@ def mock_add_workers(self, num_workers): node_id=0, node_ip=str(i % 2), hostname=0, - gpu_ids=[0], + gpu_and_accelerator_ids={"GPU": ["0"]}, pid=0, ) worker.metadata = metadata @@ -307,12 +309,6 @@ def check_process_group(): def test_cuda_visible_devices(ray_2_node_2_gpu, worker_results): config = TestConfig() - if worker_results[0] != len(worker_results[1]): - raise ValueError( - "Invalid test parameter. Length of expected result should " - "match number of workers." - ) - def get_resources(): cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"] # Sort the cuda visible devices to have exact match with expected result. @@ -419,6 +415,80 @@ def get_resources(): assert results == expected_results +@pytest.mark.parametrize( + "worker_results", + [ + (1, [[0]]), + (2, [[0, 1]] * 2), + (3, [[0]] + [[0, 1]] * 2), + (4, [[0, 1]] * 4), + ], +) +def test_neuron_core_accelerator_ids(ray_2_node_2_neuron_cores, worker_results): + config = TestConfig() + + def get_resources(): + neuron_runtime_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR] + # Sort the runtime ids to have exact match with expected result. + sorted_devices = [ + int(device) for device in sorted(neuron_runtime_ids.split(",")) + ] + return sorted_devices + + num_workers, expected_results = worker_results + # sharing enabled by default + os.environ.pop(ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, None) + e = BackendExecutor( + config, + num_workers=num_workers, + num_cpus_per_worker=0, + additional_resources_per_worker={"neuron_cores": 1}, + ) + e.start() + _start_training(e, get_resources) + results = e.finish_training() + results.sort() + assert results == expected_results + + +@pytest.mark.parametrize( + "worker_results", + [ + (1, [[0]]), + (2, [[0]] + [[1]]), + (3, [[0]] * 2 + [[1]]), + (4, [[0]] * 2 + [[1]] * 2), + ], +) +def test_neuron_core_accelerator_ids_sharing_disabled( + ray_2_node_2_neuron_cores, worker_results +): + config = TestConfig() + + def get_resources(): + neuron_runtime_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR] + # Sort the runtime ids to have exact match with expected result. + sorted_devices = [ + int(device) for device in sorted(neuron_runtime_ids.split(",")) + ] + return sorted_devices + + num_workers, expected_results = worker_results + + os.environ[ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV] = "0" + e = BackendExecutor( + config, + num_workers=num_workers, + num_cpus_per_worker=0, + additional_resources_per_worker={"neuron_cores": 1}, + ) + e.start() + _start_training(e, get_resources) + results = e.finish_training() + results.sort() + assert results == expected_results + + def get_node_id_set(): node_id_set = set() for actor_info in ray._private.state.actors().values(): diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index 37ae87e72f1de..a3341750567bd 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -4,6 +4,7 @@ import ray from ray.train._internal.worker_group import WorkerGroup, Worker, WorkerMetadata +import ray._private.ray_constants as ray_constants @pytest.fixture @@ -14,6 +15,22 @@ def ray_start_2_cpus(): ray.shutdown() +@pytest.fixture +def ray_start_2_cpus_and_gpus(): + address_info = ray.init(num_cpus=2, num_gpus=2) + yield address_info + # The code after the yield will run as teardown code. + ray.shutdown() + + +@pytest.fixture +def ray_start_2_cpus_and_neuron_core_accelerator(): + address_info = ray.init(num_cpus=2, resources={ray_constants.NEURON_CORES: 2}) + yield address_info + # The code after the yield will run as teardown code. + ray.shutdown() + + def test_worker_creation(ray_start_2_cpus): assert ray.available_resources()["CPU"] == 2 wg = WorkerGroup(num_workers=2) @@ -59,6 +76,44 @@ def test_worker_restart(ray_start_2_cpus): wg.execute(lambda: 1) +def test_worker_with_gpu_ids(ray_start_2_cpus_and_gpus): + num_gpus = 2 + wg = WorkerGroup(num_workers=2, num_gpus_per_worker=1) + assert len(wg.workers) == 2 + time.sleep(1) + assert ray_constants.GPU not in ray.available_resources() + wg.execute(lambda: 1) + assert len(wg.workers) == 2 + for w in wg.workers: + gpu_and_accelerator_ids = w.metadata.gpu_and_accelerator_ids + assert len(gpu_and_accelerator_ids) == 2 + gpu_ids = gpu_and_accelerator_ids[ray_constants.GPU] + for gpu_id in gpu_ids: + assert gpu_id in [str(i) for i in range(num_gpus)] + assert len(gpu_and_accelerator_ids[ray_constants.NEURON_CORES]) == 0 + + +def test_worker_with_neuron_core_accelerator_ids( + ray_start_2_cpus_and_neuron_core_accelerator, +): + num_nc = 2 + wg = WorkerGroup( + num_workers=2, additional_resources_per_worker={ray_constants.NEURON_CORES: 1} + ) + assert len(wg.workers) == 2 + time.sleep(1) + assert ray_constants.NEURON_CORES not in ray.available_resources() + wg.execute(lambda: 1) + assert len(wg.workers) == 2 + for w in wg.workers: + gpu_and_accelerator_ids = w.metadata.gpu_and_accelerator_ids + assert len(gpu_and_accelerator_ids) == 2 + assert len(gpu_and_accelerator_ids[ray_constants.GPU]) == 0 + neuron_core_ids = gpu_and_accelerator_ids[ray_constants.NEURON_CORES] + for neuron_core_id in neuron_core_ids: + assert neuron_core_id in [str(i) for i in range(num_nc)] + + def test_execute_async(ray_start_2_cpus): wg = WorkerGroup(num_workers=2) futures = wg.execute_async(lambda: 1) @@ -91,7 +146,7 @@ def create_worker_group(ips): node_id="dummy", node_ip=ip, hostname="dummy", - gpu_ids=None, + gpu_and_accelerator_ids=None, pid=0, ), ) From 8d168f855c5f56935fe725aed51abdc28210d578 Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Tue, 29 Aug 2023 18:02:07 -0700 Subject: [PATCH 02/11] Add env to list Signed-off-by: maheedhar reddy chappidi --- python/ray/train/constants.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/train/constants.py b/python/ray/train/constants.py index 1871a0ece3630..55344774dc417 100644 --- a/python/ray/train/constants.py +++ b/python/ray/train/constants.py @@ -83,6 +83,7 @@ def _get_defaults_results_dir() -> str: TRAIN_ENV_VARS = { ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, + ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, TRAIN_ENABLE_WORKER_SPREAD_ENV, RAY_AIR_NEW_PERSISTENCE_MODE, From 04ba327866af858ec8199ccf11c488a10023e4a5 Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Mon, 11 Sep 2023 11:05:29 -0700 Subject: [PATCH 03/11] Refactor and make sharing generic for resource_ids Signed-off-by: maheedhar reddy chappidi --- .../ray/train/_internal/backend_executor.py | 89 ++++++++----------- python/ray/train/_internal/worker_group.py | 32 +------ python/ray/train/constants.py | 17 +++- python/ray/train/tests/test_backend.py | 16 ++-- python/ray/train/tests/test_worker_group.py | 6 +- 5 files changed, 65 insertions(+), 95 deletions(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index 3ae2eb687b159..c2c25f1a427a6 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -28,7 +28,8 @@ TRAIN_ENABLE_WORKER_SPREAD_ENV, TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, DISABLE_LAZY_CHECKPOINTING_ENV, - ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, + ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, + SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR, ) from ray.util.placement_group import get_current_placement_group, remove_placement_group @@ -156,8 +157,9 @@ def start( if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled: self._share_cuda_visible_devices() elif self._additional_resources_per_worker: - if self._share_neuron_core_ids_enabled(): - self._share_neuron_core_ids() + for accelerator, env_var in SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR: + if self._share_accelerator_devices_enabled(accelerator): + self._share_resource_ids(accelerator, env_var) self._backend.on_start(self.worker_group, self._backend_config) except RayActorError as exc: logger.exception(str(exc)) @@ -250,23 +252,18 @@ def _share_cuda_visible_devices(self): - Worker2: "0,1" """ - node_ids_and_gpu_ids = [ - (w.metadata.node_id, w.metadata.gpu_and_accelerator_ids[ray_constants.GPU]) - for w in self.worker_group.workers - ] - self._share_runtime_ids( - node_ids_and_runtime_ids=node_ids_and_gpu_ids, - env_var=ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR, + self._share_resource_ids( + ray_constants.GPU, ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR ) - def _share_neuron_core_ids(self): - """Sets NEURON_RT_VISIBLE_CORES on all workers. + def _share_resource_ids(self, accelerator: str, env_var: str): + """Sets the given env_var on all workers. - For each worker, NEURON_RT_VISIBLE_CORES will be set to the - NEURON_CORE IDs visible to all workers on that worker's node. + For each worker, desired will be set to the accelerator ids + and visible to all workers on that worker's node. - This allows the workers on the same node to communicate - with one another. + This allows workers on the same node to communicate with one + another. Example: @@ -277,65 +274,55 @@ def _share_neuron_core_ids(self): - Node2: - Worker3: {0, 1} - NEURON_RT_VISIBLE_CORES: + NEURON_RT_VISIBLE_CORES/TPU_VISIBLE_CHIPS/...: - Worker1: "0,1,2,3" - Worker2: "0,1,2,3" - Worker2: "0,1" + + Args: + accelerator: The name of the accelerator. + env_var: The name of the environment variable to set. """ - node_ids_and_neuron_core_ids = [ + node_ids_and_resource_ids = [ ( w.metadata.node_id, - w.metadata.gpu_and_accelerator_ids[ray_constants.NEURON_CORES], + w.metadata.resource_ids[accelerator], ) for w in self.worker_group.workers ] - self._share_runtime_ids( - node_ids_and_runtime_ids=node_ids_and_neuron_core_ids, - env_var=ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, - ) - - def _share_runtime_ids( - self, node_ids_and_runtime_ids: List[Tuple[str, List[str]]], env_var: str - ): - """Sets the given env_var on all workers. - Args: - node_ids_and_runtime_ids: A list of tuples of node_id and - list of runtime_ids. - env_var: The name of the environment variable to set. - """ node_id_to_worker_id = defaultdict(set) - node_id_to_runtime_ids = defaultdict(set) + node_id_to_resource_ids = defaultdict(set) - for worker_id, (node_id, runtime_id) in enumerate(node_ids_and_runtime_ids): + for worker_id, (node_id, resource_id) in enumerate(node_ids_and_resource_ids): node_id_to_worker_id[node_id].add(worker_id) - node_id_to_runtime_ids[node_id].update(runtime_id) + node_id_to_resource_ids[node_id].update(resource_id) futures = [] - for node_id, runtime_ids in node_id_to_runtime_ids.items(): - runtime_ids = sorted(runtime_ids) - all_runtime_ids = ",".join(runtime_ids) + for node_id, resource_runtime_ids in node_id_to_resource_ids.items(): + resource_runtime_ids = sorted(resource_runtime_ids) + all_resource_runtime_ids = ",".join(resource_runtime_ids) - def set_runtime_ids(): - os.environ[env_var] = all_runtime_ids + def set_resource_runtime_ids(): + os.environ[env_var] = all_resource_runtime_ids for worker_id in node_id_to_worker_id[node_id]: futures.append( - self.worker_group.execute_single_async(worker_id, set_runtime_ids) + self.worker_group.execute_single_async( + worker_id, set_resource_runtime_ids + ) ) ray.get(futures) - def _share_neuron_core_ids_enabled(self): - """Whether to share NEURON_RT_VISIBLE_CORES on all workers. - This is enabled by default if neuron_cores are requested for - workers. User can disable it by configuring the - TRAIN_ENABLE_SHARE_NEURON_RT_VISIBLE_CORES to "0" + def _share_accelerator_devices_enabled(self, accelerator: str): + """Whether to share NEURON_RT_VISIBLE_CORES/TPU_VISIBLE_CHIPS/.. + on all workers. This is enabled by default if neuron_cores/TPU/.. are + requested for workers. User can disable it by configuring the + TRAIN_ENABLE_SHARE_ACCELERATOR_DEVICES to "0" """ return bool( env_integer( - ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, - self._additional_resources_per_worker.get( - ray_constants.NEURON_CORES, None - ) + ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, + self._additional_resources_per_worker.get(accelerator, None) is not None, ) ) diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index 5ea553d655588..4b6b66fcf2abf 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -6,7 +6,6 @@ from typing import Callable, List, TypeVar, Optional, Dict, Type, Tuple, Union import ray -import ray._private.ray_constants as ray_constants from ray.actor import ActorHandle from ray.air._internal.util import skip_exceptions, exception_cause from ray.types import ObjectRef @@ -45,14 +44,14 @@ class WorkerMetadata: node_id: ID of the node this worker is on. node_ip: IP address of the node this worker is on. hostname: Hostname that this worker is on. - gpu_and_accelerator_ids: Map of GPU IDs, accelerator IDs (AWS NeuronCore, ..). + resource_ids: Map of GPU IDs, accelerator IDs (AWS NeuronCore, ..). pid: Process ID of this worker. """ node_id: str node_ip: str hostname: str - gpu_and_accelerator_ids: Dict[str, List[str]] + resource_ids: Dict[str, List[str]] pid: int @@ -93,36 +92,11 @@ def construct_metadata() -> WorkerMetadata: node_id=node_id, node_ip=node_ip, hostname=hostname, - gpu_and_accelerator_ids=_get_gpu_and_accelerator_ids(), + resource_ids=ray.get_runtime_context().get_resource_ids(), pid=pid, ) -def _get_gpu_and_accelerator_ids() -> Dict[str, List[str]]: - """Get GPU and accelerator IDs from runtime context for given actor/worker. - - Returns: - A dictionary mapping resource IDs to a list of resource IDs. - For example, - { - "GPU": ["0", "1"], - "neuron_cores": ["0", "1"] - } - """ - gpu_and_accelerator_ids = defaultdict(list) - - resource_ids = ray.get_runtime_context().get_resource_ids() - gpu_ids = resource_ids[ray_constants.GPU] - neuron_core_ids = resource_ids[ray_constants.NEURON_CORES] - - gpu_and_accelerator_ids[ray_constants.GPU].extend(gpu_ids) - gpu_and_accelerator_ids[ray_constants.NEURON_CORES].extend(neuron_core_ids) - - if len(gpu_ids) > 0 and len(neuron_core_ids) > 0: - raise RuntimeError("Cannot support GPU and Neuron Core IDs on same Worker.") - return gpu_and_accelerator_ids - - class WorkerGroup: """Group of Ray Actors that can execute arbitrary functions. diff --git a/python/ray/train/constants.py b/python/ray/train/constants.py index fdbc4130b4264..051f5b046a8d0 100644 --- a/python/ray/train/constants.py +++ b/python/ray/train/constants.py @@ -1,6 +1,8 @@ import os from pathlib import Path +import ray._private.ray_constants as ray_constants + from ray.air.constants import ( # noqa: F401 EVALUATION_DATASET_KEY, MODEL_KEY, @@ -55,6 +57,13 @@ def _get_defaults_results_dir() -> str: # Deprecated configs can use this value to detect if the user has set it. _DEPRECATED_VALUE = "DEPRECATED" +# Map of supported accelerators to the environment variable that +# are available in worker metadata as resource_ids. +SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR = { + ray_constants.NEURON_CORES: ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, + ray_constants.TPU: ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR, +} + # ================================================== # Environment Variables # ================================================== @@ -67,9 +76,9 @@ def _get_defaults_results_dir() -> str: # Backend.share_cuda_visible_devices. 1 for True, 0 for False. ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV = "TRAIN_ENABLE_SHARE_CUDA_VISIBLE_DEVICES" -# Integer value which if set will not share the RT visible cores across workers. -# 1 for True (default), 0 for False. -ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV = "TRAIN_ENABLE_SHARE_NEURON_RT_VISIBLE_CORES" +# Integer value which if set will not share the accelerator visible cores/devices +# across workers. 1 for True (default), 0 for False. +ENABLE_SHARE_ACCELERATOR_DEVICES_ENV = "TRAIN_ENABLE_SHARE_ACCELERATOR_DEVICES" # Integer value which indicates the number of seconds to wait when creating # the worker placement group before timing out. @@ -89,7 +98,7 @@ def _get_defaults_results_dir() -> str: TRAIN_ENV_VARS = { ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, - ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, + ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, TRAIN_ENABLE_WORKER_SPREAD_ENV, RAY_AIR_NEW_PERSISTENCE_MODE, diff --git a/python/ray/train/tests/test_backend.py b/python/ray/train/tests/test_backend.py index f55d253af1d7f..437f0480979da 100644 --- a/python/ray/train/tests/test_backend.py +++ b/python/ray/train/tests/test_backend.py @@ -26,7 +26,7 @@ from ray.train.constants import ( ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, TRAIN_ENABLE_WORKER_SPREAD_ENV, - ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, + ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, ) from ray.train.tensorflow import TensorflowConfig from ray.train.torch import TorchConfig @@ -99,7 +99,7 @@ def mock_add_workers(self, num_workers): node_id=0, node_ip=str(i % 2), hostname=0, - gpu_and_accelerator_ids={"GPU": ["0"]}, + resource_ids={"GPU": ["0"]}, pid=0, ) worker.metadata = metadata @@ -428,16 +428,16 @@ def test_neuron_core_accelerator_ids(ray_2_node_2_neuron_cores, worker_results): config = TestConfig() def get_resources(): - neuron_runtime_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR] + neuron_resource_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR] # Sort the runtime ids to have exact match with expected result. sorted_devices = [ - int(device) for device in sorted(neuron_runtime_ids.split(",")) + int(device) for device in sorted(neuron_resource_ids.split(",")) ] return sorted_devices num_workers, expected_results = worker_results # sharing enabled by default - os.environ.pop(ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV, None) + os.environ.pop(ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, None) e = BackendExecutor( config, num_workers=num_workers, @@ -466,16 +466,16 @@ def test_neuron_core_accelerator_ids_sharing_disabled( config = TestConfig() def get_resources(): - neuron_runtime_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR] + neuron_resource_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR] # Sort the runtime ids to have exact match with expected result. sorted_devices = [ - int(device) for device in sorted(neuron_runtime_ids.split(",")) + int(device) for device in sorted(neuron_resource_ids.split(",")) ] return sorted_devices num_workers, expected_results = worker_results - os.environ[ENABLE_SHARE_NEURON_RT_VISIBLE_CORES_ENV] = "0" + os.environ[ENABLE_SHARE_ACCELERATOR_DEVICES_ENV] = "0" e = BackendExecutor( config, num_workers=num_workers, diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index a3341750567bd..d9c96c2391e29 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -85,7 +85,7 @@ def test_worker_with_gpu_ids(ray_start_2_cpus_and_gpus): wg.execute(lambda: 1) assert len(wg.workers) == 2 for w in wg.workers: - gpu_and_accelerator_ids = w.metadata.gpu_and_accelerator_ids + gpu_and_accelerator_ids = w.metadata.resource_ids assert len(gpu_and_accelerator_ids) == 2 gpu_ids = gpu_and_accelerator_ids[ray_constants.GPU] for gpu_id in gpu_ids: @@ -106,7 +106,7 @@ def test_worker_with_neuron_core_accelerator_ids( wg.execute(lambda: 1) assert len(wg.workers) == 2 for w in wg.workers: - gpu_and_accelerator_ids = w.metadata.gpu_and_accelerator_ids + gpu_and_accelerator_ids = w.metadata.resource_ids assert len(gpu_and_accelerator_ids) == 2 assert len(gpu_and_accelerator_ids[ray_constants.GPU]) == 0 neuron_core_ids = gpu_and_accelerator_ids[ray_constants.NEURON_CORES] @@ -146,7 +146,7 @@ def create_worker_group(ips): node_id="dummy", node_ip=ip, hostname="dummy", - gpu_and_accelerator_ids=None, + resource_ids=None, pid=0, ), ) From f76d4a55bc3a58e0b88dc01c3eae16a05ffcd1bf Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Mon, 11 Sep 2023 13:23:46 -0700 Subject: [PATCH 04/11] Bug-fix, missing dict items Signed-off-by: maheedhar reddy chappidi --- python/ray/train/_internal/backend_executor.py | 2 +- python/ray/train/tests/test_backend.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index c2c25f1a427a6..aab05a2e658f1 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -157,7 +157,7 @@ def start( if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled: self._share_cuda_visible_devices() elif self._additional_resources_per_worker: - for accelerator, env_var in SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR: + for accelerator, env_var in SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR.items(): if self._share_accelerator_devices_enabled(accelerator): self._share_resource_ids(accelerator, env_var) self._backend.on_start(self.worker_group, self._backend_config) diff --git a/python/ray/train/tests/test_backend.py b/python/ray/train/tests/test_backend.py index 437f0480979da..d48b7503c5ce6 100644 --- a/python/ray/train/tests/test_backend.py +++ b/python/ray/train/tests/test_backend.py @@ -12,7 +12,7 @@ from ray.air._internal.util import StartTraceback # Trigger pytest hook to automatically zip test cluster logs to archive dir on failure -from ray.tests.conftest import pytest_runtest_makereport # noqa +# from ray.tests.conftest import pytest_runtest_makereport # noqa from ray.train._internal.backend_executor import ( BackendExecutor, InactiveWorkerGroupError, From 8acae7a6e10b16b8b33ce264a3c53631bb7baa7c Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Mon, 11 Sep 2023 14:07:50 -0700 Subject: [PATCH 05/11] Bug-fix, missing dict items - lint Signed-off-by: maheedhar reddy chappidi --- python/ray/train/_internal/backend_executor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index aab05a2e658f1..c0b6debf7c57f 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -157,7 +157,10 @@ def start( if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled: self._share_cuda_visible_devices() elif self._additional_resources_per_worker: - for accelerator, env_var in SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR.items(): + for ( + accelerator, + env_var, + ) in SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR.items(): if self._share_accelerator_devices_enabled(accelerator): self._share_resource_ids(accelerator, env_var) self._backend.on_start(self.worker_group, self._backend_config) From 3264e012280efaa9b0744ef7861ad3beb0e935b9 Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Wed, 13 Sep 2023 15:01:39 -0700 Subject: [PATCH 06/11] Refactor changes Signed-off-by: maheedhar reddy chappidi --- .../ray/train/_internal/backend_executor.py | 69 ++++++++++--------- python/ray/train/_internal/worker_group.py | 6 +- python/ray/train/constants.py | 24 ++++--- python/ray/train/tests/test_backend.py | 8 +-- python/ray/train/tests/test_worker_group.py | 6 +- 5 files changed, 63 insertions(+), 50 deletions(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index c0b6debf7c57f..395dbf5a57844 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -28,8 +28,7 @@ TRAIN_ENABLE_WORKER_SPREAD_ENV, TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, DISABLE_LAZY_CHECKPOINTING_ENV, - ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, - SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR, + SUPPORTED_ACCELERATOR_DEVICES_TO_CONFIG, ) from ray.util.placement_group import get_current_placement_group, remove_placement_group @@ -159,10 +158,16 @@ def start( elif self._additional_resources_per_worker: for ( accelerator, - env_var, - ) in SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR.items(): - if self._share_accelerator_devices_enabled(accelerator): - self._share_resource_ids(accelerator, env_var) + accelerator_config, + ) in SUPPORTED_ACCELERATOR_DEVICES_TO_CONFIG.items(): + enable_sharing_env = accelerator_config[0] + accelerator_runtime_env_var = accelerator_config[1] + if self._share_accelerator_devices_enabled( + accelerator, enable_sharing_env + ): + self._share_resource_ids( + accelerator, accelerator_runtime_env_var + ) self._backend.on_start(self.worker_group, self._backend_config) except RayActorError as exc: logger.exception(str(exc)) @@ -262,11 +267,9 @@ def _share_cuda_visible_devices(self): def _share_resource_ids(self, accelerator: str, env_var: str): """Sets the given env_var on all workers. - For each worker, desired will be set to the accelerator ids - and visible to all workers on that worker's node. - - This allows workers on the same node to communicate with one - another. + For each worker, the cores/devices are visible to all the + workers on that worker's node.This allows workers on the + same node to communicate with one another. Example: @@ -296,39 +299,41 @@ def _share_resource_ids(self, accelerator: str, env_var: str): node_id_to_worker_id = defaultdict(set) node_id_to_resource_ids = defaultdict(set) - for worker_id, (node_id, resource_id) in enumerate(node_ids_and_resource_ids): + for worker_id, (node_id, resource_ids) in enumerate(node_ids_and_resource_ids): node_id_to_worker_id[node_id].add(worker_id) - node_id_to_resource_ids[node_id].update(resource_id) + node_id_to_resource_ids[node_id].update(resource_ids) futures = [] - for node_id, resource_runtime_ids in node_id_to_resource_ids.items(): - resource_runtime_ids = sorted(resource_runtime_ids) - all_resource_runtime_ids = ",".join(resource_runtime_ids) + for node_id, resource_ids in node_id_to_resource_ids.items(): + resource_ids = sorted(resource_ids) + all_resource_ids = ",".join(resource_ids) - def set_resource_runtime_ids(): - os.environ[env_var] = all_resource_runtime_ids + def set_resource_ids(): + os.environ[env_var] = all_resource_ids for worker_id in node_id_to_worker_id[node_id]: futures.append( - self.worker_group.execute_single_async( - worker_id, set_resource_runtime_ids - ) + self.worker_group.execute_single_async(worker_id, set_resource_ids) ) ray.get(futures) - def _share_accelerator_devices_enabled(self, accelerator: str): - """Whether to share NEURON_RT_VISIBLE_CORES/TPU_VISIBLE_CHIPS/.. - on all workers. This is enabled by default if neuron_cores/TPU/.. are - requested for workers. User can disable it by configuring the - TRAIN_ENABLE_SHARE_ACCELERATOR_DEVICES to "0" + def _share_accelerator_devices_enabled( + self, accelerator: str, enable_sharing_env: str + ): + """Whether to share cores/devices on all workers + based on enable_sharing_env. + For example, user can disable it by configuring the + TRAIN_ENABLE_SHARE_ACCELERATOR_DEVICES to "0". + + Args: + accelerator: The name of the accelerator. + enable_sharing_env: The name of the environment variable + to check. """ - return bool( - env_integer( - ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, - self._additional_resources_per_worker.get(accelerator, None) - is not None, - ) + has_accelerator_requested = ( + self._additional_resources_per_worker.get(accelerator, None) is not None ) + return bool(env_integer(enable_sharing_env, has_accelerator_requested)) def _create_rank_world_size_mappings(self) -> List[Dict]: """Create rank and world size mappings for workers. diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index 4b6b66fcf2abf..229f5a474daaa 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -44,7 +44,8 @@ class WorkerMetadata: node_id: ID of the node this worker is on. node_ip: IP address of the node this worker is on. hostname: Hostname that this worker is on. - resource_ids: Map of GPU IDs, accelerator IDs (AWS NeuronCore, ..). + resource_ids: Map of accelerator resources + ("GPU", "neuron_cores", ..) to their IDs. pid: Process ID of this worker. """ @@ -86,13 +87,14 @@ def construct_metadata() -> WorkerMetadata: node_id = ray.get_runtime_context().get_node_id() node_ip = ray.util.get_node_ip_address() hostname = socket.gethostname() + resource_ids = ray.get_runtime_context().get_resource_ids() pid = os.getpid() return WorkerMetadata( node_id=node_id, node_ip=node_ip, hostname=hostname, - resource_ids=ray.get_runtime_context().get_resource_ids(), + resource_ids=resource_ids, pid=pid, ) diff --git a/python/ray/train/constants.py b/python/ray/train/constants.py index 051f5b046a8d0..b94c4c505cd8a 100644 --- a/python/ray/train/constants.py +++ b/python/ray/train/constants.py @@ -57,13 +57,6 @@ def _get_defaults_results_dir() -> str: # Deprecated configs can use this value to detect if the user has set it. _DEPRECATED_VALUE = "DEPRECATED" -# Map of supported accelerators to the environment variable that -# are available in worker metadata as resource_ids. -SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR = { - ray_constants.NEURON_CORES: ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, - ray_constants.TPU: ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR, -} - # ================================================== # Environment Variables # ================================================== @@ -76,9 +69,11 @@ def _get_defaults_results_dir() -> str: # Backend.share_cuda_visible_devices. 1 for True, 0 for False. ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV = "TRAIN_ENABLE_SHARE_CUDA_VISIBLE_DEVICES" -# Integer value which if set will not share the accelerator visible cores/devices +# Integer value which if set will not share neuron-core accelerator visible cores # across workers. 1 for True (default), 0 for False. -ENABLE_SHARE_ACCELERATOR_DEVICES_ENV = "TRAIN_ENABLE_SHARE_ACCELERATOR_DEVICES" +ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV = ( + "TRAIN_ENABLE_SHARE_NEURON_CORES_ACCELERATOR" +) # Integer value which indicates the number of seconds to wait when creating # the worker placement group before timing out. @@ -98,7 +93,7 @@ def _get_defaults_results_dir() -> str: TRAIN_ENV_VARS = { ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, - ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, + ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV, TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, TRAIN_ENABLE_WORKER_SPREAD_ENV, RAY_AIR_NEW_PERSISTENCE_MODE, @@ -113,3 +108,12 @@ def _get_defaults_results_dir() -> str: # Key for AIR Checkpoint world rank in TrainingResult metadata CHECKPOINT_RANK_KEY = "checkpoint_rank" + +# Map of supported accelerators to the environment variable configuration +# 1/sharing cores/devices across workers 2/runtime environment variable to configure. +SUPPORTED_ACCELERATOR_DEVICES_TO_CONFIG = { + ray_constants.NEURON_CORES: ( + ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV, + ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, + ) +} diff --git a/python/ray/train/tests/test_backend.py b/python/ray/train/tests/test_backend.py index d48b7503c5ce6..eafbccab34698 100644 --- a/python/ray/train/tests/test_backend.py +++ b/python/ray/train/tests/test_backend.py @@ -12,7 +12,7 @@ from ray.air._internal.util import StartTraceback # Trigger pytest hook to automatically zip test cluster logs to archive dir on failure -# from ray.tests.conftest import pytest_runtest_makereport # noqa +from ray.tests.conftest import pytest_runtest_makereport # noqa from ray.train._internal.backend_executor import ( BackendExecutor, InactiveWorkerGroupError, @@ -26,7 +26,7 @@ from ray.train.constants import ( ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV, TRAIN_ENABLE_WORKER_SPREAD_ENV, - ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, + ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV, ) from ray.train.tensorflow import TensorflowConfig from ray.train.torch import TorchConfig @@ -437,7 +437,7 @@ def get_resources(): num_workers, expected_results = worker_results # sharing enabled by default - os.environ.pop(ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, None) + os.environ.pop(ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV, None) e = BackendExecutor( config, num_workers=num_workers, @@ -475,7 +475,7 @@ def get_resources(): num_workers, expected_results = worker_results - os.environ[ENABLE_SHARE_ACCELERATOR_DEVICES_ENV] = "0" + os.environ[ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV] = "0" e = BackendExecutor( config, num_workers=num_workers, diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index d9c96c2391e29..718f34507ceb3 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -86,7 +86,8 @@ def test_worker_with_gpu_ids(ray_start_2_cpus_and_gpus): assert len(wg.workers) == 2 for w in wg.workers: gpu_and_accelerator_ids = w.metadata.resource_ids - assert len(gpu_and_accelerator_ids) == 2 + # 2_CPUs, 1_GPU + assert len(gpu_and_accelerator_ids) == 3 gpu_ids = gpu_and_accelerator_ids[ray_constants.GPU] for gpu_id in gpu_ids: assert gpu_id in [str(i) for i in range(num_gpus)] @@ -107,7 +108,8 @@ def test_worker_with_neuron_core_accelerator_ids( assert len(wg.workers) == 2 for w in wg.workers: gpu_and_accelerator_ids = w.metadata.resource_ids - assert len(gpu_and_accelerator_ids) == 2 + # 2_CPUs, 1_neuron_core + assert len(gpu_and_accelerator_ids) == 3 assert len(gpu_and_accelerator_ids[ray_constants.GPU]) == 0 neuron_core_ids = gpu_and_accelerator_ids[ray_constants.NEURON_CORES] for neuron_core_id in neuron_core_ids: From 735731539a13830a38657256f4038b6ac003dfa2 Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Tue, 26 Sep 2023 17:57:37 -0700 Subject: [PATCH 07/11] Refactor changes with resource_config Signed-off-by: maheedhar reddy chappidi --- .../ray/train/_internal/backend_executor.py | 70 +++++++++++++------ 1 file changed, 47 insertions(+), 23 deletions(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index 395dbf5a57844..0c96111aa18b5 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -1,6 +1,7 @@ import logging import os from collections import defaultdict +from dataclasses import dataclass from typing import Callable, Dict, List, Optional, Tuple, Type, TypeVar, Any import ray @@ -28,7 +29,7 @@ TRAIN_ENABLE_WORKER_SPREAD_ENV, TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV, DISABLE_LAZY_CHECKPOINTING_ENV, - SUPPORTED_ACCELERATOR_DEVICES_TO_CONFIG, + ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV, ) from ray.util.placement_group import get_current_placement_group, remove_placement_group @@ -45,6 +46,25 @@ class TrainingWorkerError(Exception): """Raised if a worker fails during training.""" +@dataclass +class ResourceConfig: + """ + Resource configuration for resource_ids to share between workers. + + Args: + resource_name: The name of the resource to configure + (Example: "neuron_cores" or "gpu"). + resource_enable_sharing_env_var: The environment variable to + check if the resource should be shared. + share_resource_ids_env_var: The environment variable to configure for + sharing the resources with other workers. + """ + + resource_name: str + resource_enable_sharing_env_var: str + share_resource_ids_env_var: str + + class BackendExecutor: """Main execution class for training backends. @@ -103,6 +123,13 @@ def __init__( self._checkpoint_upload_from_workers = ( checkpoint_config and checkpoint_config._checkpoint_upload_from_workers ) + self._resource_configs = [ + ResourceConfig( + ray_constants.NEURON_CORES, + ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV, + ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, + ) + ] def start( self, @@ -156,17 +183,14 @@ def start( if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled: self._share_cuda_visible_devices() elif self._additional_resources_per_worker: - for ( - accelerator, - accelerator_config, - ) in SUPPORTED_ACCELERATOR_DEVICES_TO_CONFIG.items(): - enable_sharing_env = accelerator_config[0] - accelerator_runtime_env_var = accelerator_config[1] - if self._share_accelerator_devices_enabled( - accelerator, enable_sharing_env + for resource_config in self._resource_configs: + if self._is_share_resources_enabled( + resource_config.resource_name, + resource_config.resource_enable_sharing_env_var, ): self._share_resource_ids( - accelerator, accelerator_runtime_env_var + resource_config.resource_name, + resource_config.share_resource_ids_env_var, ) self._backend.on_start(self.worker_group, self._backend_config) except RayActorError as exc: @@ -264,7 +288,7 @@ def _share_cuda_visible_devices(self): ray_constants.GPU, ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR ) - def _share_resource_ids(self, accelerator: str, env_var: str): + def _share_resource_ids(self, resource: str, env_var: str): """Sets the given env_var on all workers. For each worker, the cores/devices are visible to all the @@ -286,13 +310,13 @@ def _share_resource_ids(self, accelerator: str, env_var: str): - Worker2: "0,1" Args: - accelerator: The name of the accelerator. + resource: The name of the resource/accelerator. env_var: The name of the environment variable to set. """ node_ids_and_resource_ids = [ ( w.metadata.node_id, - w.metadata.resource_ids[accelerator], + w.metadata.resource_ids[resource], ) for w in self.worker_group.workers ] @@ -317,23 +341,23 @@ def set_resource_ids(): ) ray.get(futures) - def _share_accelerator_devices_enabled( - self, accelerator: str, enable_sharing_env: str - ): - """Whether to share cores/devices on all workers + def _is_share_resources_enabled(self, resource_name: str, enable_sharing_env: str): + """Whether to share resource IDs on all workers based on enable_sharing_env. - For example, user can disable it by configuring the - TRAIN_ENABLE_SHARE_ACCELERATOR_DEVICES to "0". + + This will return true if resources are requested, or + `enable_sharing_env` is set to 1. + Also, user can disable by configuring the `enable_sharing_env` to "0". Args: - accelerator: The name of the accelerator. + resource_name: The name of the resource/accelerator. enable_sharing_env: The name of the environment variable to check. """ - has_accelerator_requested = ( - self._additional_resources_per_worker.get(accelerator, None) is not None + has_resource_requested = ( + self._additional_resources_per_worker.get(resource_name, None) is not None ) - return bool(env_integer(enable_sharing_env, has_accelerator_requested)) + return bool(env_integer(enable_sharing_env, has_resource_requested)) def _create_rank_world_size_mappings(self) -> List[Dict]: """Create rank and world size mappings for workers. From 719d4558194147eb2598b9c7f328fc5415312e08 Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Tue, 26 Sep 2023 21:43:25 -0700 Subject: [PATCH 08/11] Fix UT Signed-off-by: maheedhar reddy chappidi --- python/ray/train/tests/test_worker_group.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index 718f34507ceb3..1c5445977953b 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -85,13 +85,11 @@ def test_worker_with_gpu_ids(ray_start_2_cpus_and_gpus): wg.execute(lambda: 1) assert len(wg.workers) == 2 for w in wg.workers: - gpu_and_accelerator_ids = w.metadata.resource_ids - # 2_CPUs, 1_GPU - assert len(gpu_and_accelerator_ids) == 3 - gpu_ids = gpu_and_accelerator_ids[ray_constants.GPU] + resource_ids = w.metadata.resource_ids + gpu_ids = resource_ids[ray_constants.GPU] for gpu_id in gpu_ids: assert gpu_id in [str(i) for i in range(num_gpus)] - assert len(gpu_and_accelerator_ids[ray_constants.NEURON_CORES]) == 0 + assert len(resource_ids[ray_constants.NEURON_CORES]) == 0 def test_worker_with_neuron_core_accelerator_ids( @@ -107,11 +105,9 @@ def test_worker_with_neuron_core_accelerator_ids( wg.execute(lambda: 1) assert len(wg.workers) == 2 for w in wg.workers: - gpu_and_accelerator_ids = w.metadata.resource_ids - # 2_CPUs, 1_neuron_core - assert len(gpu_and_accelerator_ids) == 3 - assert len(gpu_and_accelerator_ids[ray_constants.GPU]) == 0 - neuron_core_ids = gpu_and_accelerator_ids[ray_constants.NEURON_CORES] + resource_ids = w.metadata.resource_ids + assert len(resource_ids[ray_constants.GPU]) == 0 + neuron_core_ids = resource_ids[ray_constants.NEURON_CORES] for neuron_core_id in neuron_core_ids: assert neuron_core_id in [str(i) for i in range(num_nc)] From 755dc2e62aed0826da84033b292116f266089eea Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Wed, 27 Sep 2023 15:58:06 -0700 Subject: [PATCH 09/11] Remove unused code Signed-off-by: maheedhar reddy chappidi --- python/ray/train/constants.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/python/ray/train/constants.py b/python/ray/train/constants.py index b94c4c505cd8a..f9eedec1cbd83 100644 --- a/python/ray/train/constants.py +++ b/python/ray/train/constants.py @@ -1,8 +1,6 @@ import os from pathlib import Path -import ray._private.ray_constants as ray_constants - from ray.air.constants import ( # noqa: F401 EVALUATION_DATASET_KEY, MODEL_KEY, @@ -108,12 +106,3 @@ def _get_defaults_results_dir() -> str: # Key for AIR Checkpoint world rank in TrainingResult metadata CHECKPOINT_RANK_KEY = "checkpoint_rank" - -# Map of supported accelerators to the environment variable configuration -# 1/sharing cores/devices across workers 2/runtime environment variable to configure. -SUPPORTED_ACCELERATOR_DEVICES_TO_CONFIG = { - ray_constants.NEURON_CORES: ( - ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV, - ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR, - ) -} From 0f812fcdbc4d9503634b1422d8411003ef9639ea Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Thu, 28 Sep 2023 10:49:37 -0700 Subject: [PATCH 10/11] Bug fix on env Signed-off-by: maheedhar reddy chappidi --- python/ray/train/_internal/backend_executor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index 0c96111aa18b5..80371ded0cab3 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -357,7 +357,10 @@ def _is_share_resources_enabled(self, resource_name: str, enable_sharing_env: st has_resource_requested = ( self._additional_resources_per_worker.get(resource_name, None) is not None ) - return bool(env_integer(enable_sharing_env, has_resource_requested)) + return ( + bool(env_integer(enable_sharing_env, has_resource_requested)) + and has_resource_requested + ) def _create_rank_world_size_mappings(self) -> List[Dict]: """Create rank and world size mappings for workers. From 114699a68435ed072137a78b17fcd5bdd873c934 Mon Sep 17 00:00:00 2001 From: maheedhar reddy chappidi Date: Thu, 28 Sep 2023 15:34:25 -0700 Subject: [PATCH 11/11] Fix on resources value Signed-off-by: maheedhar reddy chappidi --- python/ray/train/_internal/backend_executor.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index 80371ded0cab3..eb5344f417c2d 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -345,8 +345,7 @@ def _is_share_resources_enabled(self, resource_name: str, enable_sharing_env: st """Whether to share resource IDs on all workers based on enable_sharing_env. - This will return true if resources are requested, or - `enable_sharing_env` is set to 1. + This will return true if resources are requested and greater than 0. Also, user can disable by configuring the `enable_sharing_env` to "0". Args: @@ -355,11 +354,10 @@ def _is_share_resources_enabled(self, resource_name: str, enable_sharing_env: st to check. """ has_resource_requested = ( - self._additional_resources_per_worker.get(resource_name, None) is not None + self._additional_resources_per_worker.get(resource_name, 0) > 0 ) - return ( - bool(env_integer(enable_sharing_env, has_resource_requested)) - and has_resource_requested + return has_resource_requested and ray_constants.env_bool( + enable_sharing_env, True ) def _create_rank_world_size_mappings(self) -> List[Dict]: