Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor synthetic utilities and add multidevice support #136

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/c/backend/include/device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ class DeviceRequirement;
/**
* @brief Architecture types for devices.
*/
enum class DeviceType { INVALID = -2, All = -1, CPU = 0, CUDA = 1 };
enum class DeviceType { INVALID = -2, Any = -1, CPU = 0, GPU = 1 };

inline const constexpr std::array architecture_types{DeviceType::CPU,
DeviceType::CUDA};
DeviceType::GPU};
inline const constexpr int NUM_DEVICE_TYPES = architecture_types.size();
inline const std::array<std::string, NUM_DEVICE_TYPES> architecture_names{
"CPU", "CUDA"};
"CPU", "GPU"};

/// Devices can be distinguished from other devices
/// by a class type and its index.
Expand Down Expand Up @@ -145,7 +145,7 @@ class Device {
class CUDADevice : public Device {
public:
CUDADevice(DevID_t dev_id, size_t mem_sz, size_t num_vcus, void *py_dev)
: Device(DeviceType::CUDA, dev_id, mem_sz, num_vcus, py_dev, 3) {}
: Device(DeviceType::GPU, dev_id, mem_sz, num_vcus, py_dev, 3) {}

private:
};
Expand Down
24 changes: 12 additions & 12 deletions src/c/backend/include/device_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,32 +44,32 @@ class DeviceManager {
}

template <DeviceType T> int get_num_devices() {
if constexpr (T == DeviceType::All) {
if constexpr (T == DeviceType::Any) {
return all_devices_.size();
} else if constexpr (T == DeviceType::CPU) {
return arch_devices_[static_cast<int>(DeviceType::CPU)].size();
} else if constexpr (T == DeviceType::CUDA) {
return arch_devices_[static_cast<int>(DeviceType::CUDA)].size();
} else if constexpr (T == DeviceType::GPU) {
return arch_devices_[static_cast<int>(DeviceType::GPU)].size();
}
}

int get_num_devices(DeviceType dev_type) {
switch (dev_type) {
case DeviceType::CPU:
return get_num_devices<DeviceType::CPU>();
case DeviceType::CUDA:
return get_num_devices<DeviceType::CUDA>();
case DeviceType::GPU:
return get_num_devices<DeviceType::GPU>();
default:
return get_num_devices<DeviceType::All>();
return get_num_devices<DeviceType::Any>();
}
}

template <DeviceType T> std::vector<Device *> &get_devices() {
if constexpr (T == DeviceType::CPU) {
return arch_devices_[static_cast<int>(DeviceType::CPU)];
} else if constexpr (T == DeviceType::CUDA) {
return arch_devices_[static_cast<int>(DeviceType::CUDA)];
} else if constexpr (T == DeviceType::All) {
} else if constexpr (T == DeviceType::GPU) {
return arch_devices_[static_cast<int>(DeviceType::GPU)];
} else if constexpr (T == DeviceType::Any) {
return all_devices_;
}
}
Expand All @@ -87,10 +87,10 @@ class DeviceManager {
switch (dev_type) {
case DeviceType::CPU:
return get_devices<DeviceType::CPU>();
case DeviceType::CUDA:
return get_devices<DeviceType::CUDA>();
case DeviceType::GPU:
return get_devices<DeviceType::GPU>();
default:
return get_devices<DeviceType::All>();
return get_devices<DeviceType::Any>();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/c/backend/parray_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
PArrayTracker::PArrayTracker(DeviceManager *device_manager)
: device_manager_(device_manager) {
this->managed_parrays_.resize(
device_manager->template get_num_devices<DeviceType::All>());
device_manager->template get_num_devices<DeviceType::Any>());
}

void PArrayTracker::track_parray(const InnerPArray &parray, DevID_t dev_id) {
Expand Down
15 changes: 8 additions & 7 deletions src/c/backend/policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ bool LocalityLoadBalancingMappingPolicy::calc_score_mdevplacement(
// multiple times. This vector marks an assigned device and filter it
// out at the next device decision.
std::vector<bool> is_dev_assigned(
this->device_manager_->get_num_devices<DeviceType::All>(), false);
this->device_manager_->get_num_devices<DeviceType::Any>(), false);
// Iterate requirements of the devices specified in multi-device placement.
// All of the member devices should be available.
for (DevID_t did = 0; did < placement_reqs_vec.size(); ++did) {
Expand Down Expand Up @@ -208,7 +208,8 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping(
std::vector<std::shared_ptr<DeviceRequirement>> *chosen_devices,
const std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
&parray_list,
std::vector<std::shared_ptr<PlacementRequirementBase>> *placement_req_options_vec) {
std::vector<std::shared_ptr<PlacementRequirementBase>>
*placement_req_options_vec) {
// A set of chosen devices to a task.
Score_t best_score{-1};
// If any device was chosen as a candidate device,
Expand Down Expand Up @@ -242,8 +243,8 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping(
std::shared_ptr<DeviceRequirement> dev_req =
std::dynamic_pointer_cast<DeviceRequirement>(base_req);
Score_t score{0};
bool is_req_available = calc_score_devplacement(
task, dev_req, mapper, &score, parray_list[0]);
bool is_req_available = calc_score_devplacement(task, dev_req, mapper,
&score, parray_list[0]);
if (!is_req_available) {
continue;
}
Expand All @@ -263,9 +264,9 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping(
// std::cout << "[Mapper] Task name:" << task->get_name() << ", " <<
// "Checking arch requirement."
// << "\n";
bool is_req_available = calc_score_archplacement(
task, arch_req, mapper, chosen_dev_req, &chosen_dev_score,
parray_list[0]);
bool is_req_available =
calc_score_archplacement(task, arch_req, mapper, chosen_dev_req,
&chosen_dev_score, parray_list[0]);
if (!is_req_available) {
continue;
}
Expand Down
6 changes: 3 additions & 3 deletions src/python/parla/common/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def copy_from(self, src, target_device_id: int):
current_context = get_current_context()
current_device = current_context.devices[0]

is_gpu = (current_device.architecture == DeviceType.CUDA)
is_gpu = (current_device.architecture == DeviceType.GPU)

if CUPY_ENABLED and isinstance(src, cupy.ndarray):
if is_gpu and (src.flags['C_CONTIGUOUS'] or src.flags['F_CONTIGUOUS']):
Expand Down Expand Up @@ -121,7 +121,7 @@ def copy_from(self, src, target_device_id: int):
current_context = get_current_context()
current_device = current_context.devices[0]

is_gpu = (current_device.architecture == DeviceType.CUDA)
is_gpu = (current_device.architecture == DeviceType.GPU)

if isinstance(src, cupy.ndarray) or isinstance(src, numpy.ndarray):

Expand Down Expand Up @@ -298,7 +298,7 @@ def clone_here(source, kind=None):
current_device = current_content.devices[0]

# FIXME: Make this a property of the device
if (current_device.architecture == DeviceType.CUDA) and CUPY_ENABLED:
if (current_device.architecture == DeviceType.GPU) and CUPY_ENABLED:
AType = CupyArray()
else:
AType = NumpyArray()
Expand Down
2 changes: 1 addition & 1 deletion src/python/parla/common/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class DeviceType(IntEnum):
INVALID = -2
ANY = -1
CPU = 0
CUDA = 1
GPU = 1


class AccessMode(IntEnum):
Expand Down
8 changes: 0 additions & 8 deletions src/python/parla/cython/device.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@ from libcpp.vector cimport vector

cdef extern from "include/device.hpp" nogil:

cdef enum DeviceType:
All "DeviceType::All"
CPU "DeviceType::CPU"
CUDA "DeviceType::CUDA"
# TODO(hc): For now, we only support CUDA gpu devices.
# Laster, it would be extended to more gpu types
# like for AMD

cdef cppclass Device:
Device(string, int, long, long, void*) except +
int get_id() except +
Expand Down
28 changes: 14 additions & 14 deletions src/python/parla/cython/device.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ cimport cython

from parla.common.globals import _Locals as Locals
from parla.common.globals import cupy, CUPY_ENABLED
from parla.common.globals import DeviceType as PyDeviceType
from parla.common.globals import DeviceType
from parla.common.globals import VCU_BASELINE, get_device_manager

from abc import ABCMeta, abstractmethod
Expand Down Expand Up @@ -81,13 +81,13 @@ class DeviceConfiguration:
"""
A dataclass to represent a device configuration.
"""
type: PyDeviceType
type: DeviceType = DeviceType.CPU
id: int = 0
memory: long = 0
vcus: int = 1000

__annotations__ = {
"type": PyDeviceType,
"type": DeviceType,
"id": int,
"memory": long,
"vcus": int
Expand All @@ -112,9 +112,9 @@ class PyDevice:
This class is to abstract a single device in Python and manages
a device context as a task runs in Python.
"""
def __init__(self, dev_type: PyDeviceType, dev_type_name, dev_id: int):
def __init__(self, dev_type: DeviceType, dev_type_name, dev_id: int):
self._dev_type = dev_type
self._device_name = dev_type_name + ":" + str(dev_id)
self._device_name = f"{dev_type_name}[{str(dev_id)}]"
self._device = self
self._device_id = dev_id

Expand Down Expand Up @@ -203,7 +203,7 @@ class PyDevice:
return hash(self._device_name)

def __eq__(self, other) -> bool:
if isinstance(other, int) or isinstance(other, PyDeviceType):
if isinstance(other, int) or isinstance(other, DeviceType):
return self.architecture == other
elif isinstance(other, PyDevice):
return self._device_name == other._device_name
Expand Down Expand Up @@ -233,7 +233,7 @@ class PyCUDADevice(PyDevice):
"""

def __init__(self, dev_id: int = 0, mem_sz: long = 0, num_vcus: long = 1):
super().__init__(DeviceType.CUDA, "CUDA", dev_id)
super().__init__(DeviceType.GPU, "GPU", dev_id)
#TODO(wlr): If we ever support VECs, we might need to move this device initialization
self._cy_device = CyCUDADevice(dev_id, mem_sz, num_vcus, self)

Expand Down Expand Up @@ -322,7 +322,7 @@ class PyArchitecture(metaclass=ABCMeta):
return self._devices

def __eq__(self, o: object) -> bool:
if isinstance(o, int) or isinstance(o, PyDeviceType):
if isinstance(o, int) or isinstance(o, DeviceType):
return self.id == o
elif isinstance(o, type(self)):
return (self.id == o.id)
Expand All @@ -333,7 +333,7 @@ class PyArchitecture(metaclass=ABCMeta):
return hash(self._id)

def __repr__(self):
return type(self).__name__
return f"{self.name}[-1]"

def __mul__(self, num_archs: int):
arch_ps = [self for i in range(0, num_archs)]
Expand Down Expand Up @@ -396,7 +396,7 @@ class ImportableArchitecture(PyArchitecture):
return self._architecture_type

def __repr__(self):
return type(self).__name__
return f"{self.name}[-1]"

def __mul__(self, num_archs: int):
#architecture = get_device_manager().get_architecture(self._architecture_type)
Expand All @@ -414,24 +414,24 @@ class ImportableArchitecture(PyArchitecture):

class PyCUDAArchitecture(PyArchitecture):
def __init__(self):
super().__init__("CUDAArch", DeviceType.CUDA)
super().__init__("GPU", DeviceType.GPU)

class ImportableCUDAArchitecture(PyCUDAArchitecture, ImportableArchitecture):
def __init__(self):
ImportableArchitecture.__init__(self, "CUDAArch", DeviceType.CUDA)
ImportableArchitecture.__init__(self, "GPU", DeviceType.GPU)


class PyCPUArchitecture(PyArchitecture):
def __init__(self):
super().__init__("CPUArch", PyDeviceType.CPU)
super().__init__("CPU", DeviceType.CPU)

def add_device(self, device):
assert isinstance(device, PyCPUDevice)
self._devices.append(device)

class ImportableCPUArchitecture(PyCPUArchitecture, ImportableArchitecture):
def __init__(self):
ImportableArchitecture.__init__(self, "CPUArch", DeviceType.CPU)
ImportableArchitecture.__init__(self, "CPU", DeviceType.CPU)


# TODO(hc): use dataclass later.
Expand Down
2 changes: 1 addition & 1 deletion src/python/parla/cython/device_manager.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class PyDeviceManager:
#self.register_devices_to_cpp()

# Initialize Device Hardware Queues
self.stream_pool = StreamPool(self.get_devices(DeviceType.CUDA))
self.stream_pool = StreamPool(self.get_devices(DeviceType.GPU))

def __dealloc__(self):
for arch in self.py_registered_archs:
Expand Down
2 changes: 1 addition & 1 deletion src/python/parla/cython/scheduler.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class WorkerThread(ControllableThread, SchedulerContext):
#TODO(wlr): Fix this in device_manager (see todo there)

if CUPY_ENABLED:
gpu_arch = device_manager.py_registered_archs[DeviceType.CUDA]
gpu_arch = device_manager.py_registered_archs[DeviceType.GPU]
ngpus = len(gpu_arch)

for index in range(ngpus):
Expand Down
21 changes: 12 additions & 9 deletions src/python/parla/cython/tasks.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ from parla.cython import device

from parla.common.globals import _Locals as Locals
from parla.common.globals import get_stream_pool, get_scheduler
from parla.common.globals import DeviceType as PyDeviceType
from parla.common.globals import DeviceType
from parla.common.globals import AccessMode, Storage

from parla.common.parray.core import PArray
Expand All @@ -27,8 +27,6 @@ PyCPUDevice = device.PyCPUDevice
PyArchitecture = device.PyArchitecture
PyCUDAArchitecture = device.PyCUDAArchitecture

DeviceType = PyDeviceType

from abc import abstractmethod, ABCMeta
from typing import Optional, List, Iterable, Union
from typing import Awaitable, Collection, Iterable, FrozenSet
Expand Down Expand Up @@ -769,7 +767,7 @@ def create_device_env(device):
if isinstance(device, PyCPUDevice):
return CPUEnvironment(device), DeviceType.CPU
elif isinstance(device, PyCUDADevice):
return GPUEnvironment(device), DeviceType.CUDA
return GPUEnvironment(device), DeviceType.GPU

def create_env(sources):
"""!
Expand Down Expand Up @@ -813,7 +811,7 @@ class TaskEnvironment:
self.event_dict = {}
self.event_dict['default'] = None

for env in environment_list:
for idx, env in enumerate(environment_list):
for dev in env.devices:
self.device_list.append(dev)
self.device_dict[dev.architecture].append(dev)
Expand All @@ -834,14 +832,14 @@ class TaskEnvironment:
"""
Returns the CUDA_VISIBLE_DEVICES ids of the GPU devices in this environment.
"""
return [device_env.get_parla_device().id for device_env in self.device_dict[DeviceType.CUDA]]
return [device_env.get_parla_device().id for device_env in self.device_dict[DeviceType.GPU]]

@property
def gpu_id(self):
"""
Returns the CUDA_VISIBLE_DEVICES id of the first GPU device in this environment.
"""
return self.device_dict[DeviceType.CUDA][0].get_parla_device().id
return self.device_dict[DeviceType.GPU][0].get_parla_device().id

def __repr__(self):
return f"TaskEnvironment({self.env_list})"
Expand Down Expand Up @@ -886,7 +884,7 @@ class TaskEnvironment:
if envlist is None:
envlist = self.contexts

for env in envlist:
for idx, env in enumerate(envlist):
env.__enter__()
yield env
env.__exit__(None, None, None)
Expand Down Expand Up @@ -914,7 +912,7 @@ class TaskEnvironment:
return self.devices[0]

def get_cupy_devices(self):
return [dev.device for dev in self.get_devices(DeviceType.CUDA)]
return [dev.device for dev in self.get_devices(DeviceType.GPU)]

def synchronize(self, events=False, tags=['default'], return_to_pool=True):
#print(f"Synchronizing {self}..", flush=True)
Expand Down Expand Up @@ -1167,6 +1165,11 @@ class TerminalEnvironment(TaskEnvironment):
def architecture(self):
return self._arch_type

@property
def id(self):
return self._device.id


def __eq__(self, other):
if isinstance(other, int) or isinstance(other, PyDevice):
return self._device == other
Expand Down
Loading