Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train] Add accelerator ids to workers and share neuron_cores by default #39091

Merged
merged 18 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 71 additions & 12 deletions python/ray/train/_internal/backend_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Callable, Dict, List, Optional, Tuple, Type, TypeVar, Any

import ray
import ray._private.ray_constants as ray_constants
from ray.data import Dataset
from ray._private.ray_constants import env_integer
from ray.air.config import CheckpointConfig
Expand All @@ -27,6 +28,8 @@
TRAIN_ENABLE_WORKER_SPREAD_ENV,
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV,
DISABLE_LAZY_CHECKPOINTING_ENV,
ENABLE_SHARE_ACCELERATOR_DEVICES_ENV,
SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR,
)
from ray.util.placement_group import get_current_placement_group, remove_placement_group

Expand Down Expand Up @@ -153,6 +156,13 @@ def start(

if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled:
self._share_cuda_visible_devices()
elif self._additional_resources_per_worker:
for (
accelerator,
env_var,
) in SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR.items():
if self._share_accelerator_devices_enabled(accelerator):
self._share_resource_ids(accelerator, env_var)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I am thinking about is unifying the logic between GPUs and the other resources, which might be simpler since that's how it's set up in the WorkerGroup now, but this does not need to be done in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally kept separate for now.

self._backend.on_start(self.worker_group, self._backend_config)
except RayActorError as exc:
logger.exception(str(exc))
Expand Down Expand Up @@ -245,32 +255,81 @@ def _share_cuda_visible_devices(self):
- Worker2: "0,1"

"""
self._share_resource_ids(
ray_constants.GPU, ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR
)

node_ids_and_gpu_ids = [
(w.metadata.node_id, w.metadata.gpu_ids) for w in self.worker_group.workers
]
def _share_resource_ids(self, accelerator: str, env_var: str):
"""Sets the given env_var on all workers.

For each worker, desired will be set to the accelerator ids
chappidim marked this conversation as resolved.
Show resolved Hide resolved
and visible to all workers on that worker's node.

This allows workers on the same node to communicate with one
another.

Example:

Setup:
- Node1:
- Worker1: {0, 1}
- Worker2: {2, 3}
- Node2:
- Worker3: {0, 1}

NEURON_RT_VISIBLE_CORES/TPU_VISIBLE_CHIPS/...:
- Worker1: "0,1,2,3"
- Worker2: "0,1,2,3"
- Worker2: "0,1"

Args:
accelerator: The name of the accelerator.
env_var: The name of the environment variable to set.
"""
node_ids_and_resource_ids = [
(
w.metadata.node_id,
w.metadata.resource_ids[accelerator],
)
for w in self.worker_group.workers
]
node_id_to_worker_id = defaultdict(set)
node_id_to_gpu_ids = defaultdict(set)
node_id_to_resource_ids = defaultdict(set)

for worker_id, (node_id, gpu_ids) in enumerate(node_ids_and_gpu_ids):
for worker_id, (node_id, resource_id) in enumerate(node_ids_and_resource_ids):
chappidim marked this conversation as resolved.
Show resolved Hide resolved
node_id_to_worker_id[node_id].add(worker_id)
node_id_to_gpu_ids[node_id].update(gpu_ids)
node_id_to_resource_ids[node_id].update(resource_id)

futures = []
for node_id, gpu_ids in node_id_to_gpu_ids.items():
gpu_ids = sorted(gpu_ids)
all_gpu_ids = ",".join(gpu_ids)
for node_id, resource_runtime_ids in node_id_to_resource_ids.items():
chappidim marked this conversation as resolved.
Show resolved Hide resolved
resource_runtime_ids = sorted(resource_runtime_ids)
all_resource_runtime_ids = ",".join(resource_runtime_ids)

def set_gpu_ids():
os.environ["CUDA_VISIBLE_DEVICES"] = all_gpu_ids
def set_resource_runtime_ids():
chappidim marked this conversation as resolved.
Show resolved Hide resolved
os.environ[env_var] = all_resource_runtime_ids

for worker_id in node_id_to_worker_id[node_id]:
futures.append(
self.worker_group.execute_single_async(worker_id, set_gpu_ids)
self.worker_group.execute_single_async(
worker_id, set_resource_runtime_ids
)
)
ray.get(futures)

def _share_accelerator_devices_enabled(self, accelerator: str):
"""Whether to share NEURON_RT_VISIBLE_CORES/TPU_VISIBLE_CHIPS/..
on all workers. This is enabled by default if neuron_cores/TPU/.. are
requested for workers. User can disable it by configuring the
TRAIN_ENABLE_SHARE_ACCELERATOR_DEVICES to "0"
"""
return bool(
env_integer(
ENABLE_SHARE_ACCELERATOR_DEVICES_ENV,
chappidim marked this conversation as resolved.
Show resolved Hide resolved
self._additional_resources_per_worker.get(accelerator, None)
is not None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: Let's make this a variable so it's easier to parse through the branching logic.

has_accelerator_requested = self._additional_resources_per_worker.get(accelerator) is not None

)
)

def _create_rank_world_size_mappings(self) -> List[Dict]:
"""Create rank and world size mappings for workers.
There are three maps returned:
Expand Down
7 changes: 3 additions & 4 deletions python/ray/train/_internal/worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ class WorkerMetadata:
node_id: ID of the node this worker is on.
node_ip: IP address of the node this worker is on.
hostname: Hostname that this worker is on.
gpu_ids: List of CUDA IDs available to this worker.
resource_ids: Map of GPU IDs, accelerator IDs (AWS NeuronCore, ..).
chappidim marked this conversation as resolved.
Show resolved Hide resolved
pid: Process ID of this worker.
"""

node_id: str
node_ip: str
hostname: str
gpu_ids: Optional[List[str]]
resource_ids: Dict[str, List[str]]
pid: int


Expand Down Expand Up @@ -86,14 +86,13 @@ def construct_metadata() -> WorkerMetadata:
node_id = ray.get_runtime_context().get_node_id()
node_ip = ray.util.get_node_ip_address()
hostname = socket.gethostname()
gpu_ids = [str(gpu_id) for gpu_id in ray.get_gpu_ids()]
pid = os.getpid()

return WorkerMetadata(
node_id=node_id,
node_ip=node_ip,
hostname=hostname,
gpu_ids=gpu_ids,
resource_ids=ray.get_runtime_context().get_resource_ids(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: For consistency with others, define this above in line 89.

pid=pid,
)

Expand Down
14 changes: 14 additions & 0 deletions python/ray/train/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from pathlib import Path

import ray._private.ray_constants as ray_constants

from ray.air.constants import ( # noqa: F401
EVALUATION_DATASET_KEY,
MODEL_KEY,
Expand Down Expand Up @@ -55,6 +57,13 @@ def _get_defaults_results_dir() -> str:
# Deprecated configs can use this value to detect if the user has set it.
_DEPRECATED_VALUE = "DEPRECATED"

# Map of supported accelerators to the environment variable that
# are available in worker metadata as resource_ids.
SUPPORTED_ACCELERATOR_DEVICES_TO_ENV_VAR = {
ray_constants.NEURON_CORES: ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR,
ray_constants.TPU: ray_constants.TPU_VISIBLE_CHIPS_ENV_VAR,
chappidim marked this conversation as resolved.
Show resolved Hide resolved
}

# ==================================================
# Environment Variables
# ==================================================
Expand All @@ -67,6 +76,10 @@ def _get_defaults_results_dir() -> str:
# Backend.share_cuda_visible_devices. 1 for True, 0 for False.
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV = "TRAIN_ENABLE_SHARE_CUDA_VISIBLE_DEVICES"

# Integer value which if set will not share the accelerator visible cores/devices
# across workers. 1 for True (default), 0 for False.
ENABLE_SHARE_ACCELERATOR_DEVICES_ENV = "TRAIN_ENABLE_SHARE_ACCELERATOR_DEVICES"

# Integer value which indicates the number of seconds to wait when creating
# the worker placement group before timing out.
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV = "TRAIN_PLACEMENT_GROUP_TIMEOUT_S"
Expand All @@ -85,6 +98,7 @@ def _get_defaults_results_dir() -> str:
TRAIN_ENV_VARS = {
ENABLE_DETAILED_AUTOFILLED_METRICS_ENV,
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV,
ENABLE_SHARE_ACCELERATOR_DEVICES_ENV,
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV,
TRAIN_ENABLE_WORKER_SPREAD_ENV,
RAY_AIR_NEW_PERSISTENCE_MODE,
Expand Down
14 changes: 14 additions & 0 deletions python/ray/train/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def ray_2_node_2_gpu():
cluster.shutdown()


@pytest.fixture
def ray_2_node_2_neuron_cores():
cluster = Cluster()
for _ in range(2):
cluster.add_node(num_cpus=4, resources={"neuron_cores": 2})

ray.init(address=cluster.address)

yield

ray.shutdown()
cluster.shutdown()


@pytest.fixture
def ray_start_2_cpus():
address_info = ray.init(num_cpus=2)
Expand Down
86 changes: 78 additions & 8 deletions python/ray/train/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import time

import ray
import ray._private.ray_constants as ray_constants
from ray import train
from ray.air._internal.util import StartTraceback

# Trigger pytest hook to automatically zip test cluster logs to archive dir on failure
from ray.tests.conftest import pytest_runtest_makereport # noqa
# from ray.tests.conftest import pytest_runtest_makereport # noqa
chappidim marked this conversation as resolved.
Show resolved Hide resolved
from ray.train._internal.backend_executor import (
BackendExecutor,
InactiveWorkerGroupError,
Expand All @@ -25,6 +26,7 @@
from ray.train.constants import (
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV,
TRAIN_ENABLE_WORKER_SPREAD_ENV,
ENABLE_SHARE_ACCELERATOR_DEVICES_ENV,
)
from ray.train.tensorflow import TensorflowConfig
from ray.train.torch import TorchConfig
Expand Down Expand Up @@ -97,7 +99,7 @@ def mock_add_workers(self, num_workers):
node_id=0,
node_ip=str(i % 2),
hostname=0,
gpu_ids=[0],
resource_ids={"GPU": ["0"]},
pid=0,
)
worker.metadata = metadata
Expand Down Expand Up @@ -307,12 +309,6 @@ def check_process_group():
def test_cuda_visible_devices(ray_2_node_2_gpu, worker_results):
config = TestConfig()

if worker_results[0] != len(worker_results[1]):
raise ValueError(
"Invalid test parameter. Length of expected result should "
"match number of workers."
)

def get_resources():
cuda_visible_devices = os.environ["CUDA_VISIBLE_DEVICES"]
# Sort the cuda visible devices to have exact match with expected result.
Expand Down Expand Up @@ -419,6 +415,80 @@ def get_resources():
assert results == expected_results


@pytest.mark.parametrize(
"worker_results",
[
(1, [[0]]),
(2, [[0, 1]] * 2),
(3, [[0]] + [[0, 1]] * 2),
(4, [[0, 1]] * 4),
],
)
def test_neuron_core_accelerator_ids(ray_2_node_2_neuron_cores, worker_results):
config = TestConfig()

def get_resources():
neuron_resource_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR]
# Sort the runtime ids to have exact match with expected result.
sorted_devices = [
int(device) for device in sorted(neuron_resource_ids.split(","))
]
return sorted_devices

num_workers, expected_results = worker_results
# sharing enabled by default
os.environ.pop(ENABLE_SHARE_ACCELERATOR_DEVICES_ENV, None)
e = BackendExecutor(
config,
num_workers=num_workers,
num_cpus_per_worker=0,
additional_resources_per_worker={"neuron_cores": 1},
)
e.start()
_start_training(e, get_resources)
results = e.finish_training()
results.sort()
assert results == expected_results


@pytest.mark.parametrize(
"worker_results",
[
(1, [[0]]),
(2, [[0]] + [[1]]),
(3, [[0]] * 2 + [[1]]),
(4, [[0]] * 2 + [[1]] * 2),
],
)
def test_neuron_core_accelerator_ids_sharing_disabled(
ray_2_node_2_neuron_cores, worker_results
):
config = TestConfig()

def get_resources():
neuron_resource_ids = os.environ[ray_constants.NEURON_RT_VISIBLE_CORES_ENV_VAR]
# Sort the runtime ids to have exact match with expected result.
sorted_devices = [
int(device) for device in sorted(neuron_resource_ids.split(","))
]
return sorted_devices

num_workers, expected_results = worker_results

os.environ[ENABLE_SHARE_ACCELERATOR_DEVICES_ENV] = "0"
e = BackendExecutor(
config,
num_workers=num_workers,
num_cpus_per_worker=0,
additional_resources_per_worker={"neuron_cores": 1},
)
e.start()
_start_training(e, get_resources)
results = e.finish_training()
results.sort()
assert results == expected_results


def get_node_id_set():
node_id_set = set()
for actor_info in ray._private.state.actors().values():
Expand Down
Loading