Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][compiled graphs] Support experimental_compile(_default_communicator=comm) #50023

Merged
merged 33 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
409 changes: 248 additions & 161 deletions python/ray/dag/compiled_dag_node.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def experimental_compile(
enable_asyncio: bool = False,
_max_inflight_executions: Optional[int] = None,
_overlap_gpu_communication: Optional[bool] = None,
_default_communicator: Optional[Union[Communicator, str]] = "create",
) -> "ray.dag.CompiledDAG":
"""Compile an accelerated execution path for this DAG.

Expand All @@ -250,6 +251,17 @@ def experimental_compile(
communication and computation can be overlapped, which can improve
the performance of the DAG execution. If None, the default value
will be used.
_default_communicator: The default communicator to use to transfer
tensors. For p2p operations, this is the default communicator to use
for nodes annotated with `with_tensor_transport()` and when shared memory
is not the desired option (e.g., when transport="nccl", or when
transport="auto" for communication between two different GPUs).
If it is "create", a default communicator is created when needed.
If None, an error will be thrown. All other values are invalid.
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
For collective operations, this is the default communicator to use
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
when a custom communicator is not specified. If it is "create", a communicator
is created for each collective operation and initialized on the involved actors,
or an already created communicator is reused if the set of actors is the same.

Returns:
A compiled DAG.
Expand Down Expand Up @@ -278,6 +290,7 @@ def experimental_compile(
enable_asyncio,
_max_inflight_executions,
_overlap_gpu_communication,
_default_communicator,
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
)

def execute(
Expand Down
21 changes: 11 additions & 10 deletions python/ray/dag/tests/experimental/test_collective_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def test_comm_deduplicate_p2p_and_collective(ray_start_regular, monkeypatch):
monkeypatch,
dag,
{(frozenset(workers), None)},
(frozenset(workers), None),
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand All @@ -177,7 +176,6 @@ def test_comm_deduplicate_p2p_and_collective(ray_start_regular, monkeypatch):
monkeypatch,
dag,
{(frozenset(workers), None)},
(frozenset(workers), None),
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand All @@ -186,9 +184,10 @@ def test_comm_deduplicate_p2p_and_collective(ray_start_regular, monkeypatch):
@pytest.mark.parametrize(
"ray_start_regular", [{"num_cpus": 4, "num_gpus": 4}], indirect=True
)
def test_custom_comm_deduplicate(ray_start_regular, monkeypatch):
def test_custom_comm(ray_start_regular, monkeypatch):
"""
Test a custom GPU communicator is reused when possible.
Test a custom GPU communicator is used when specified and a default
communicator is used otherwise.
"""
actor_cls = CPUTorchTensorWorker.options(num_cpus=0, num_gpus=1)

Expand All @@ -208,8 +207,10 @@ def test_custom_comm_deduplicate(ray_start_regular, monkeypatch):
compiled_dag, mock_nccl_group_set = check_nccl_group_init(
monkeypatch,
dag,
{(frozenset(workers), comm)},
(frozenset(workers), comm),
{
(frozenset(workers), comm),
(frozenset(workers), None),
},
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand All @@ -225,8 +226,10 @@ def test_custom_comm_deduplicate(ray_start_regular, monkeypatch):
compiled_dag, mock_nccl_group_set = check_nccl_group_init(
monkeypatch,
dag,
{(frozenset(workers), comm)},
(frozenset(workers), comm),
{
(frozenset(workers), comm),
(frozenset(workers), None),
},
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand Down Expand Up @@ -259,7 +262,6 @@ def test_custom_comm_init_teardown(ray_start_regular, monkeypatch):
monkeypatch,
dag,
{(frozenset(workers), comm)},
(frozenset(workers), comm),
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand All @@ -285,7 +287,6 @@ def test_custom_comm_init_teardown(ray_start_regular, monkeypatch):
(frozenset(workers), comm_2),
(frozenset(workers), comm_3),
},
(frozenset(workers), comm_3),
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand Down
143 changes: 3 additions & 140 deletions python/ray/dag/tests/experimental/test_torch_tensor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def test_torch_tensor_nccl(
dag = receiver.recv.bind(dag)

compiled_dag = dag.experimental_compile(
_overlap_gpu_communication=overlap_gpu_communication
_overlap_gpu_communication=overlap_gpu_communication,
)

# Test that we can pass different shapes and data.
Expand Down Expand Up @@ -351,7 +351,7 @@ def test_torch_tensor_nccl_overlap_timed(ray_start_regular, overlap_gpu_communic

# Test normal execution.
compiled_dag = dag.experimental_compile(
_overlap_gpu_communication=overlap_gpu_communication
_overlap_gpu_communication=overlap_gpu_communication,
)

start = time.monotonic()
Expand Down Expand Up @@ -529,143 +529,6 @@ def get_transport_name(self) -> str:
assert result == (i, shape, dtype)


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_custom_comm_invalid(ray_start_regular):
if not USE_GPU:
pytest.skip("NCCL tests require GPUs")

assert (
sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1
), "This test requires at least 2 GPUs"

actor_cls = TorchTensorWorker.options(num_cpus=0, num_gpus=1)

actor1 = actor_cls.remote()
actor2 = actor_cls.remote()

class MockNcclGroup(Communicator):
"""
A mock NCCL group for testing. Send and recv are not implemented.
"""

import cupy as cp

def __init__(self, world_size, actor_handles):
self._world_size = world_size
self._actor_handles = actor_handles
self._rank = None

def initialize(self, rank: int) -> None:
expected_rank = self.get_rank(ray.get_runtime_context().current_actor)
assert (
rank == expected_rank
), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}"
self._rank = rank
self._device = torch_utils.get_devices()[0]

def get_rank(self, actor: ray.actor.ActorHandle) -> int:
actor_ids = [a._ray_actor_id for a in self._actor_handles]
try:
rank = actor_ids.index(actor._ray_actor_id)
except ValueError:
raise ValueError("Actor is not in the NCCL group.")
return rank

def get_world_size(self) -> int:
return self._world_size

def get_self_rank(self) -> Optional[int]:
return self._rank

def get_actor_handles(self) -> List["ray.actor.ActorHandle"]:
return self._actor_handles

def send(self, value: "torch.Tensor", peer_rank: int) -> None:
return None

def recv(
self,
shape: Tuple[int],
dtype: "torch.dtype",
peer_rank: int,
allocator: Optional[TorchTensorAllocator] = None,
) -> "torch.Tensor":
return None

def allreduce(
self,
send_buf: "torch.Tensor",
recv_buf: "torch.Tensor",
op: ReduceOp,
) -> None:
raise NotImplementedError

@property
def recv_stream(self) -> Optional["cp.cuda.ExternalStream"]:
return None

@property
def send_stream(self) -> Optional["cp.cuda.ExternalStream"]:
return None

def destroy(self) -> None:
pass

def get_transport_name(self) -> str:
return "nccl"

nccl_group = MockNcclGroup(2, [actor1, actor2])

# Mixed usage of NCCL groups should throw an error
# Case 1: custom NCCL group first, then default NCCL group
with InputNode() as inp:
dag = actor1.send.bind(inp.shape, inp.dtype, inp.value)
dag = dag.with_tensor_transport(transport=nccl_group)
dag = actor2.recv.options(num_returns=3).bind(dag)
dag = actor2.send.bind(*dag)
dag = dag.with_tensor_transport(transport="nccl")
dag = actor1.recv.bind(dag)
with pytest.raises(
ValueError,
match=r"Compiled Graphs do not support mixed usage of type hints.*",
):
dag.experimental_compile()

# Case 2: default NCCL group first, then custom NCCL group
with InputNode() as inp:
dag = actor1.send.bind(inp.shape, inp.dtype, inp.value)
dag = dag.with_tensor_transport(transport="nccl")
dag = actor2.recv.options(num_returns=3).bind(dag)
dag = actor2.send.bind(*dag)
dag = dag.with_tensor_transport(transport=nccl_group)
dag = actor1.recv.bind(dag)
with pytest.raises(
ValueError,
match=r"Compiled Graphs do not support mixed usage of type hints.*",
):
dag.experimental_compile()

nccl_group2 = MockNcclGroup(2, [actor1, actor2])

# Using two different custom NCCL groups are currently not supported
with InputNode() as inp:
dag = actor1.send.bind(inp.shape, inp.dtype, inp.value)
dag = dag.with_tensor_transport(transport=nccl_group)
dag = actor2.recv.options(num_returns=3).bind(dag)
dag = actor2.send.bind(*dag)
dag = dag.with_tensor_transport(transport=nccl_group2)
dag = actor1.recv.bind(dag)
with pytest.raises(
ValueError,
match=(
"Compiled Graphs currently only support "
"a single custom NCCL group, but multiple "
"have been specified."
),
):
dag.experimental_compile()


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_custom_comm_inited(ray_start_regular):
if not USE_GPU:
Expand Down Expand Up @@ -937,7 +800,7 @@ def test_torch_tensor_exceptions(
dag = receiver.recv.bind(dag)

compiled_dag = dag.experimental_compile(
_overlap_gpu_communication=overlap_gpu_communication
_overlap_gpu_communication=overlap_gpu_communication,
)

shape = (10,)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/experimental/channel/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ def get_custom_communicator(self) -> Optional[Communicator]:
"""
Return the custom NCCL group if one is specified.
"""
if self._contains_type is not None:
return self._contains_type.get_custom_nccl_group()
if hasattr(self, "contains_type") and self.contains_type is not None:
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
return self.contains_type.get_custom_nccl_group()
return None

def set_communicator_id(self, group_id: str) -> None:
Expand Down
4 changes: 2 additions & 2 deletions python/ray/experimental/channel/torch_tensor_nccl_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def __init__(
ctx = ChannelContext.get_current()
assert isinstance(
typ.communicator_id, str
), "NCCL group ID ({nccl_group_id}) must be a str."
), f"NCCL group ID ({typ.communicator_id}) must be a str."
self._typ = typ

assert self._typ.communicator_id is not None, "No NCCL group specified."
Expand Down Expand Up @@ -732,7 +732,7 @@ def _init_communicator(
custom_communicator: A custom NCCL group to initialize.
use_communication_streams: Whether to use dedicated send and recv
streams for communication. If True, communication and computation
can be overlapped to improve perfomrance.
can be overlapped to improve performance.
"""
ctx = ChannelContext.get_current()

Expand Down
40 changes: 5 additions & 35 deletions python/ray/experimental/collective/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import copy
import uuid
from typing import Dict, FrozenSet, List, Optional, Set, Tuple

Expand Down Expand Up @@ -138,31 +137,6 @@ def mock_destroy_nccl_group(self, group_id: str) -> None:
ctx.communicators[group_id].destroy()
del ctx.communicators[group_id]

def check_init(
self,
compiled_dag: "ray.dag.CompiledDAG",
actors_and_custom_comms: Set[
Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]]
],
p2p_actors_and_custom_comm: Optional[
Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]]
],
) -> None:
assert len(self.ids_to_actors_and_custom_comms) == len(actors_and_custom_comms)
assert (
set(self.ids_to_actors_and_custom_comms.values()) == actors_and_custom_comms
)

nccl_group_id_p2p = compiled_dag.communicator_id_p2p
if p2p_actors_and_custom_comm is None:
assert nccl_group_id_p2p is None
else:
assert nccl_group_id_p2p
assert (
self.ids_to_actors_and_custom_comms[nccl_group_id_p2p]
== p2p_actors_and_custom_comm
)

def check_teardown(self, nccl_group_ids: List[str]) -> None:
ctx = ChannelContext.get_current()
for nccl_group_id in nccl_group_ids:
Expand Down Expand Up @@ -214,9 +188,6 @@ def check_nccl_group_init(
actors_and_custom_comms: Set[
Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]]
],
p2p_actors_and_custom_comm: Optional[
Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]]
] = None,
) -> "ray.dag.CompiledDAG":
mock_nccl_group_set = MockNcclGroupSet()
monkeypatch.setattr(
Expand All @@ -229,10 +200,9 @@ def check_nccl_group_init(
)

compiled_dag = dag.experimental_compile()
mock_nccl_group_set.check_init(
compiled_dag,
actors_and_custom_comms,
p2p_actors_and_custom_comm,
assert (
set(mock_nccl_group_set.ids_to_actors_and_custom_comms.values())
== actors_and_custom_comms
)

return compiled_dag, mock_nccl_group_set
Expand All @@ -248,6 +218,6 @@ def check_nccl_group_teardown(
mock_nccl_group_set.mock_destroy_nccl_group,
)

nccl_group_ids = copy.deepcopy(compiled_dag.communicator_ids)
created_communicator_ids = compiled_dag._actors_to_created_communicator_id.values()
compiled_dag.teardown()
mock_nccl_group_set.check_teardown(nccl_group_ids)
mock_nccl_group_set.check_teardown(created_communicator_ids)
4 changes: 2 additions & 2 deletions python/ray/util/collective/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,8 @@ def send_multigpu(
):
"""Send a tensor to a remote GPU synchronously.

The function asssume each process owns >1 GPUs, and the sender
process and receiver process has equal nubmer of GPUs.
The function assumes each process owns >1 GPUs, and the sender
process and receiver process has equal number of GPUs.

Args:
tensor: the tensor to send, located on a GPU.
Expand Down