From ce27e2573bfdb8197954d54222403ff6bed201f7 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:23:11 -0500 Subject: [PATCH 01/10] abstract functionality of pyop2 types to be overloaded by backends --- pyop2/compilation.py | 2 +- pyop2/global_kernel.py | 56 ++++++------------------------- pyop2/parloop.py | 63 ++++++++++++---------------------- pyop2/types/dat.py | 76 ++++++++++++++++++++++-------------------- pyop2/types/dataset.py | 6 ++++ pyop2/types/glob.py | 42 +++++++++-------------- pyop2/types/map.py | 3 +- pyop2/types/set.py | 20 ++++++----- 8 files changed, 108 insertions(+), 160 deletions(-) diff --git a/pyop2/compilation.py b/pyop2/compilation.py index f4a1af36a..387d2eaf2 100644 --- a/pyop2/compilation.py +++ b/pyop2/compilation.py @@ -565,7 +565,7 @@ def load(jitmodule, extension, fn_name, cppargs=(), ldargs=(), :kwarg comm: Optional communicator to compile the code on (only rank 0 compiles code) (defaults to pyop2.mpi.COMM_WORLD). """ - from pyop2.global_kernel import GlobalKernel + from pyop2.global_kernel import AbstractGlobalKernel as GlobalKernel if isinstance(jitmodule, str): class StrCode(object): diff --git a/pyop2/global_kernel.py b/pyop2/global_kernel.py index 536d717e9..f80c38838 100644 --- a/pyop2/global_kernel.py +++ b/pyop2/global_kernel.py @@ -1,8 +1,8 @@ import collections.abc import ctypes +import abc from dataclasses import dataclass import itertools -import os from typing import Optional, Tuple import loopy as lp @@ -10,7 +10,7 @@ import pytools from petsc4py import PETSc -from pyop2 import compilation, mpi +from pyop2 import mpi from pyop2.caching import Cached from pyop2.configuration import configuration from pyop2.datatypes import IntType, as_ctypes @@ -247,7 +247,7 @@ def pack(self): return MatPack -class GlobalKernel(Cached): +class AbstractGlobalKernel(Cached, abc.ABC): """Class representing the generated code for the global computation. :param local_kernel: :class:`pyop2.LocalKernel` instance representing the @@ -383,48 +383,6 @@ def builder(self): builder.add_argument(arg) return builder - @cached_property - def code_to_compile(self): - """Return the C/C++ source code as a string.""" - from pyop2.codegen.rep2loopy import generate - - wrapper = generate(self.builder) - code = lp.generate_code_v2(wrapper) - - if self.local_kernel.cpp: - from loopy.codegen.result import process_preambles - preamble = "".join(process_preambles(getattr(code, "device_preambles", []))) - device_code = "\n\n".join(str(dp.ast) for dp in code.device_programs) - return preamble + "\nextern \"C\" {\n" + device_code + "\n}\n" - return code.device_code() - - @PETSc.Log.EventDecorator() - @mpi.collective - def compile(self, comm): - """Compile the kernel. - - :arg comm: The communicator the compilation is collective over. - :returns: A ctypes function pointer for the compiled function. - """ - extension = "cpp" if self.local_kernel.cpp else "c" - cppargs = ( - tuple("-I%s/include" % d for d in get_petsc_dir()) - + tuple("-I%s" % d for d in self.local_kernel.include_dirs) - + ("-I%s" % os.path.abspath(os.path.dirname(__file__)),) - ) - ldargs = ( - tuple("-L%s/lib" % d for d in get_petsc_dir()) - + tuple("-Wl,-rpath,%s/lib" % d for d in get_petsc_dir()) - + ("-lpetsc", "-lm") - + tuple(self.local_kernel.ldargs) - ) - - return compilation.load(self, extension, self.name, - cppargs=cppargs, - ldargs=ldargs, - restype=ctypes.c_int, - comm=comm) - @cached_property def argtypes(self): """Return the ctypes datatypes of the compiled function.""" @@ -445,3 +403,11 @@ def num_flops(self, iterset): elif region not in {IterationRegion.TOP, IterationRegion.BOTTOM}: size = layers - 1 return size * self.local_kernel.num_flops + + @abc.abstractproperty + def code_to_compile(self): + """Return the C/C++ source code as a string.""" + + @abc.abstractmethod + def compile(self): + pass diff --git a/pyop2/parloop.py b/pyop2/parloop.py index c70f4c9fb..6dab30f16 100644 --- a/pyop2/parloop.py +++ b/pyop2/parloop.py @@ -13,7 +13,7 @@ from pyop2.datatypes import as_numpy_dtype from pyop2.exceptions import KernelTypeError, MapValueError, SetTypeError from pyop2.global_kernel import (GlobalKernelArg, DatKernelArg, MixedDatKernelArg, - MatKernelArg, MixedMatKernelArg, PassthroughKernelArg, GlobalKernel) + MatKernelArg, MixedMatKernelArg, PassthroughKernelArg) from pyop2.local_kernel import LocalKernel, CStringLocalKernel, LoopyLocalKernel from pyop2.types import (Access, Global, AbstractDat, Dat, DatView, MixedDat, Mat, Set, MixedSet, ExtrudedSet, Subset, Map, ComposedMap, MixedMap) @@ -164,7 +164,7 @@ def maps(self): return () -class Parloop: +class AbstractParloop: """A parallel loop invocation. :arg global_knl: The :class:`GlobalKernel` to be executed. @@ -274,7 +274,7 @@ def increment_dat_version(self): def zero_global_increments(self): """Zero any global increments every time the loop is executed.""" for g in self.reduced_globals.keys(): - g._data[...] = 0 + g.data[...] = 0 def replace_lgmaps(self): """Swap out any lgmaps for any :class:`MatParloopArg` instances @@ -406,38 +406,13 @@ def _l2g_idxs(self): seen.add(pl_arg.data) return tuple(indices) - @PETSc.Log.EventDecorator("ParLoopRednBegin") - @mpi.collective + @abc.abstractmethod def reduction_begin(self): """Begin reductions.""" - requests = [] - for idx in self._reduction_idxs: - glob = self.arguments[idx].data - mpi_op = {Access.INC: mpi.MPI.SUM, - Access.MIN: mpi.MPI.MIN, - Access.MAX: mpi.MPI.MAX}.get(self.accesses[idx]) - - if mpi.MPI.VERSION >= 3: - requests.append(self.comm.Iallreduce(glob._data, glob._buf, op=mpi_op)) - else: - self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) - return tuple(requests) - - @PETSc.Log.EventDecorator("ParLoopRednEnd") - @mpi.collective + + @abc.abstractmethod def reduction_end(self, requests): """Finish reductions.""" - if mpi.MPI.VERSION >= 3: - for idx, req in zip(self._reduction_idxs, requests): - req.Wait() - glob = self.arguments[idx].data - glob._data[:] = glob._buf - else: - assert len(requests) == 0 - - for idx in self._reduction_idxs: - glob = self.arguments[idx].data - glob._data[:] = glob._buf @cached_property def _reduction_idxs(self): @@ -449,7 +424,7 @@ def _reduction_idxs(self): def finalize_global_increments(self): """Finalise global increments.""" for tmp, glob in self.reduced_globals.items(): - glob.data._data += tmp._data + glob.data.data[:] += tmp.data_ro @mpi.collective def update_arg_data_state(self): @@ -519,11 +494,12 @@ def prepare_reduced_globals(self, arguments, global_knl): :class:`Global` in parallel produces the right result. The same is not needed for MAX and MIN because they commute with the reduction. """ + from pyop2.op2 import compute_backend arguments = list(arguments) reduced_globals = {} for i, (lk_arg, gk_arg, pl_arg) in enumerate(self.zip_arguments(global_knl, arguments)): if isinstance(gk_arg, GlobalKernelArg) and lk_arg.access == Access.INC: - tmp = Global(gk_arg.dim, data=np.zeros_like(pl_arg.data.data_ro), dtype=lk_arg.dtype, comm=self.comm) + tmp = compute_backend.Global(gk_arg.dim, data=np.zeros_like(pl_arg.data.data_ro), dtype=lk_arg.dtype, comm=self.comm) reduced_globals[tmp] = pl_arg arguments[i] = GlobalParloopArg(tmp) @@ -721,6 +697,7 @@ def LegacyParloop(local_knl, iterset, *args, **kwargs): raise SetTypeError("Iteration set is of the wrong type") # finish building the local kernel + from pyop2.op2 import compute_backend local_knl.accesses = tuple(a.access for a in args) if isinstance(local_knl, CStringLocalKernel): local_knl.dtypes = tuple(a.dtype for a in args) @@ -730,15 +707,15 @@ def LegacyParloop(local_knl, iterset, *args, **kwargs): extruded_periodic = iterset._extruded_periodic constant_layers = extruded and iterset.constant_layers subset = isinstance(iterset, Subset) - global_knl = GlobalKernel(local_knl, global_knl_args, - extruded=extruded, - extruded_periodic=extruded_periodic, - constant_layers=constant_layers, - subset=subset, - **kwargs) + global_knl = compute_backend.GlobalKernel(local_knl, global_knl_args, + extruded=extruded, + extruded_periodic=extruded_periodic, + constant_layers=constant_layers, + subset=subset, + **kwargs) parloop_args = tuple(a.parloop_arg for a in args) - return Parloop(global_knl, iterset, parloop_args) + return compute_backend.Parloop(global_knl, iterset, parloop_args) def par_loop(*args, **kwargs): @@ -752,8 +729,10 @@ def parloop(knl, *args, **kwargs): For a description of the possible arguments to this function see :class:`Parloop` and :func:`LegacyParloop`. """ - if isinstance(knl, GlobalKernel): - Parloop(knl, *args, **kwargs)() + from pyop2.global_kernel import AbstractGlobalKernel + if isinstance(knl, AbstractGlobalKernel): + from pyop2.op2 import compute_backend + compute_backend.Parloop(knl, *args, **kwargs)() elif isinstance(knl, LocalKernel): LegacyParloop(knl, *args, **kwargs)() else: diff --git a/pyop2/types/dat.py b/pyop2/types/dat.py index fb877c1a8..4c8d877ac 100644 --- a/pyop2/types/dat.py +++ b/pyop2/types/dat.py @@ -1,4 +1,3 @@ -import abc import contextlib import ctypes import itertools @@ -16,13 +15,14 @@ mpi, utils ) +from pyop2.offload_utils import OffloadMixin from pyop2.types.access import Access -from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet +from pyop2.types.dataset import DataSet, GlobalDataSet from pyop2.types.data_carrier import DataCarrier, EmptyDataMixin, VecAccessMixin from pyop2.types.set import ExtrudedSet, GlobalSet, Set -class AbstractDat(DataCarrier, EmptyDataMixin, abc.ABC): +class AbstractDat(DataCarrier, EmptyDataMixin): """OP2 vector data. A :class:`Dat` holds values on every element of a :class:`DataSet`.o @@ -305,19 +305,7 @@ def copy(self, other, subset=None): :arg other: The destination :class:`Dat` :arg subset: A :class:`Subset` of elements to copy (optional)""" - if other is self: - return - if subset is None: - # If the current halo is valid we can also copy these values across. - if self.halo_valid: - other._data[:] = self._data - other.halo_valid = True - else: - other.data[:] = self.data_ro - elif subset.superset != self.dataset.set: - raise ex.MapValueError("The subset and dataset are incompatible") - else: - other.data[subset.owned_indices] = self.data_ro[subset.owned_indices] + raise NotImplementedError("Backend-specific subclass must implement it.") def __iter__(self): """Yield self when iterated over.""" @@ -373,12 +361,11 @@ def _op_kernel(self, op, globalp, dtype): return self._op_kernel_cache.setdefault(key, Kernel(knl, name)) def _op(self, other, op): - from pyop2.types.glob import Global + from pyop2.op2 import compute_backend from pyop2.parloop import parloop - - ret = Dat(self.dataset, None, self.dtype) + ret = compute_backend.Dat(self.dataset, None, self.dtype) if np.isscalar(other): - other = Global(1, data=other, comm=self.comm) + other = compute_backend.Global(1, data=other, comm=self.comm) globalp = True else: self._check_shape(other) @@ -422,15 +409,16 @@ def _iop_kernel(self, op, globalp, other_is_self, dtype): return self._iop_kernel_cache.setdefault(key, Kernel(knl, name)) def _iop(self, other, op): + from pyop2.op2 import compute_backend from pyop2.parloop import parloop - from pyop2.types.glob import Global, Constant + from pyop2.types.glob import Constant globalp = False if np.isscalar(other): - other = Global(1, data=other, comm=self.comm) + other = compute_backend.Global(1, data=other, comm=self.comm) globalp = True elif isinstance(other, Constant): - other = Global(other, comm=self.comm) + other = compute_backend.Global(other, comm=self.comm) globalp = True elif other is not self: self._check_shape(other) @@ -474,10 +462,10 @@ def inner(self, other): """ from pyop2.parloop import parloop - from pyop2.types.glob import Global + from pyop2.op2 import compute_backend self._check_shape(other) - ret = Global(1, data=0, dtype=self.dtype, comm=self.comm) + ret = compute_backend.Global(1, data=0, dtype=self.dtype, comm=self.comm) parloop(self._inner_kernel(other.dtype), self.dataset.set, self(Access.READ), other(Access.READ), ret(Access.INC)) return ret.data_ro[0] @@ -493,7 +481,8 @@ def norm(self): return sqrt(self.inner(self).real) def __pos__(self): - pos = Dat(self) + from pyop2.op2 import compute_backend + pos = compute_backend.Dat(self) return pos def __add__(self, other): @@ -526,8 +515,9 @@ def _neg_kernel(self): def __neg__(self): from pyop2.parloop import parloop + from pyop2.op2 import compute_backend - neg = Dat(self.dataset, dtype=self.dtype) + neg = compute_backend.Dat(self.dataset, dtype=self.dtype) parloop(self._neg_kernel, self.dataset.set, neg(Access.WRITE), self(Access.READ)) return neg @@ -754,7 +744,11 @@ def data_wo_with_halos(self): return self._parent.data_wo_with_halos[self._idx] -class Dat(AbstractDat, VecAccessMixin): +class Dat(AbstractDat, VecAccessMixin, OffloadMixin): + + @utils.cached_property + def can_be_represented_as_petscvec(self): + return ((self.dtype == PETSc.ScalarType) and self.cdim > 0) def __init__(self, *args, **kwargs): AbstractDat.__init__(self, *args, **kwargs) @@ -765,14 +759,17 @@ def __init__(self, *args, **kwargs): @utils.cached_property def _vec(self): assert self.dtype == PETSc.ScalarType, \ - "Can't create Vec with type %s, must be %s" % (self.dtype, PETSc.ScalarType) + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) # Can't duplicate layout_vec of dataset, because we then # carry around extra unnecessary data. # But use getSizes to save an Allreduce in computing the # global size. size = self.dataset.layout_vec.getSizes() data = self._data[:size[0]] - return PETSc.Vec().createWithArray(data, size=size, bsize=self.cdim, comm=self.comm) + vec = PETSc.Vec().createWithArray(data, size=size, + bsize=self.cdim, comm=self.comm) + return vec @contextlib.contextmanager def vec_context(self, access): @@ -801,12 +798,13 @@ class MixedDat(AbstractDat, VecAccessMixin): def __init__(self, mdset_or_dats): from pyop2.types.glob import Global + from pyop2.op2 import compute_backend def what(x): if isinstance(x, (Global, GlobalDataSet, GlobalSet)): - return Global + return compute_backend.Global elif isinstance(x, (Dat, DataSet, Set)): - return Dat + return compute_backend.Dat else: raise ex.DataSetTypeError("Huh?!") if isinstance(mdset_or_dats, MixedDat): @@ -863,7 +861,8 @@ def split(self): @utils.cached_property def dataset(self): r""":class:`MixedDataSet`\s this :class:`MixedDat` is defined on.""" - return MixedDataSet(tuple(s.dataset for s in self._dats)) + from pyop2.op2 import compute_backend + return compute_backend.MixedDataSet(tuple(s.dataset for s in self._dats)) @utils.cached_property def _data(self): @@ -1023,6 +1022,7 @@ def inner(self, other): return ret def _op(self, other, op): + from pyop2.op2 import compute_backend ret = [] if np.isscalar(other): for s in self: @@ -1031,7 +1031,7 @@ def _op(self, other, op): self._check_shape(other) for s, o in zip(self, other): ret.append(op(s, o)) - return MixedDat(ret) + return compute_backend.MixedDat(ret) def _iop(self, other, op): if np.isscalar(other): @@ -1044,16 +1044,20 @@ def _iop(self, other, op): return self def __pos__(self): + from pyop2.op2 import compute_backend + ret = [] for s in self: ret.append(s.__pos__()) - return MixedDat(ret) + return compute_backend.MixedDat(ret) def __neg__(self): + from pyop2.op2 import compute_backend + ret = [] for s in self: ret.append(s.__neg__()) - return MixedDat(ret) + return compute_backend.MixedDat(ret) def __add__(self, other): """Pointwise addition of fields.""" diff --git a/pyop2/types/dataset.py b/pyop2/types/dataset.py index 3b4f4bfd8..6c5e5a38a 100644 --- a/pyop2/types/dataset.py +++ b/pyop2/types/dataset.py @@ -182,9 +182,11 @@ def local_ises(self): @utils.cached_property def layout_vec(self): """A PETSc Vec compatible with the dof layout of this DataSet.""" + from pyop2.op2 import compute_backend vec = PETSc.Vec().create(comm=self.comm) size = ((self.size - self.set.constrained_size) * self.cdim, None) vec.setSizes(size, bsize=self.cdim) + vec.setType(compute_backend.PETScVecType) vec.setUp() return vec @@ -288,9 +290,11 @@ def local_ises(self): @utils.cached_property def layout_vec(self): """A PETSc Vec compatible with the dof layout of this DataSet.""" + from pyop2.op2 import compute_backend vec = PETSc.Vec().create(comm=self.comm) size = (self.size * self.cdim, None) vec.setSizes(size, bsize=self.cdim) + vec.setType(compute_backend.PETScVecType) vec.setUp() return vec @@ -436,10 +440,12 @@ def __repr__(self): @utils.cached_property def layout_vec(self): """A PETSc Vec compatible with the dof layout of this MixedDataSet.""" + from pyop2.op2 import compute_backend vec = PETSc.Vec().create(comm=self.comm) # Compute local and global size from sizes of layout vecs lsize, gsize = map(sum, zip(*(d.layout_vec.sizes for d in self))) vec.setSizes((lsize, gsize), bsize=1) + vec.setType(compute_backend.PETScVecType) vec.setUp() return vec diff --git a/pyop2/types/glob.py b/pyop2/types/glob.py index d8ed99134..55d12b6fc 100644 --- a/pyop2/types/glob.py +++ b/pyop2/types/glob.py @@ -11,8 +11,8 @@ mpi, utils ) +from pyop2.offload_utils import OffloadMixin from pyop2.types.access import Access -from pyop2.types.dataset import GlobalDataSet from pyop2.types.data_carrier import DataCarrier, EmptyDataMixin, VecAccessMixin @@ -205,7 +205,7 @@ def inner(self, other): # must have comm, can be modified in parloop (implies a reduction) -class Global(SetFreeDataCarrier, VecAccessMixin): +class Global(SetFreeDataCarrier, VecAccessMixin, OffloadMixin): """OP2 global value. When a ``Global`` is passed to a :func:`pyop2.op2.par_loop`, the access @@ -251,10 +251,10 @@ def __init__(self, dim, data=None, dtype=None, name=None, comm=None): def __str__(self): return "OP2 Global Argument: %s with dim %s and value %s" \ - % (self._name, self._dim, self._data) + % (self._name, self._dim, self.data_ro) def __repr__(self): - return "Global(%r, %r, %r, %r)" % (self._dim, self._data, + return "Global(%r, %r, %r, %r)" % (self._dim, self.data_ro, self._data.dtype, self._name) @utils.validate_in(('access', _modes, ex.ModeValueError)) @@ -275,7 +275,8 @@ def __neg__(self): @utils.cached_property def dataset(self): - return GlobalDataSet(self) + from pyop2.op2 import compute_backend + return compute_backend.GlobalDataSet(self) @mpi.collective def duplicate(self): @@ -338,37 +339,24 @@ def unfreeze_halo(self): @utils.cached_property def _vec(self): - assert self.dtype == PETSc.ScalarType, \ - "Can't create Vec with type %s, must be %s" % (self.dtype, PETSc.ScalarType) - # Can't duplicate layout_vec of dataset, because we then - # carry around extra unnecessary data. - # But use getSizes to save an Allreduce in computing the - # global size. - data = self._data - size = self.dataset.layout_vec.getSizes() - if self.comm.rank == 0: - return PETSc.Vec().createWithArray(data, size=size, - bsize=self.cdim, - comm=self.comm) - else: - return PETSc.Vec().createWithArray(np.empty(0, dtype=self.dtype), - size=size, - bsize=self.cdim, - comm=self.comm) + raise NotImplementedError("Backend-specific subclass must implement it.") @contextlib.contextmanager def vec_context(self, access): """A context manager for a :class:`PETSc.Vec` from a :class:`Global`. :param access: Access descriptor: READ, WRITE, or RW.""" - yield self._vec - if access is not Access.READ: - data = self._data - self.comm.Bcast(data, 0) + raise NotImplementedError() + + def ensure_availability_on_host(self): + raise NotImplementedError() + + def ensure_availability_on_device(self): + raise NotImplementedError() # has no comm, can only be READ -class Constant(SetFreeDataCarrier): +class Constant(SetFreeDataCarrier, OffloadMixin): """OP2 constant value. When a ``Constant`` is passed to a :func:`pyop2.op2.par_loop`, the access diff --git a/pyop2/types/map.py b/pyop2/types/map.py index 81e386546..1e4794996 100644 --- a/pyop2/types/map.py +++ b/pyop2/types/map.py @@ -11,10 +11,11 @@ utils ) from pyop2 import mpi +from pyop2.offload_utils import OffloadMixin from pyop2.types.set import GlobalSet, MixedSet, Set -class Map: +class Map(OffloadMixin): """OP2 map, a relation between two :class:`Set` objects. diff --git a/pyop2/types/set.py b/pyop2/types/set.py index f10c93404..1565f5ec7 100644 --- a/pyop2/types/set.py +++ b/pyop2/types/set.py @@ -11,6 +11,7 @@ mpi, utils ) +from pyop2.offload_utils import OffloadMixin class Set: @@ -176,7 +177,9 @@ def __call__(self, *indices): indices = indices[0] if np.isscalar(indices): indices = [indices] - return Subset(self, indices) + + from pyop2.op2 import compute_backend + return compute_backend.Subset(self, indices) def __contains__(self, dset): """Indicate whether a given DataSet is compatible with this Set.""" @@ -188,8 +191,8 @@ def __contains__(self, dset): def __pow__(self, e): """Derive a :class:`DataSet` with dimension ``e``""" - from pyop2.types import DataSet - return DataSet(self, dim=e) + from pyop2.op2 import compute_backend + return compute_backend.DataSet(self, dim=e) @utils.cached_property def layers(self): @@ -302,7 +305,7 @@ def __hash__(self): return hash(type(self)) -class ExtrudedSet(Set): +class ExtrudedSet(Set, OffloadMixin): """OP2 ExtrudedSet. @@ -391,7 +394,7 @@ def layers_array(self): return self._layers -class Subset(ExtrudedSet): +class Subset(ExtrudedSet, OffloadMixin): """OP2 subset. @@ -465,7 +468,8 @@ def __call__(self, *indices): indices = indices[0] if np.isscalar(indices): indices = [indices] - return Subset(self, indices) + from pyop2.op2 import compute_backend + return compute_backend.Subset(self, indices) @utils.cached_property def superset(self): @@ -649,8 +653,8 @@ def __len__(self): def __pow__(self, e): """Derive a :class:`MixedDataSet` with dimensions ``e``""" - from pyop2.types import MixedDataSet - return MixedDataSet(self._sets, e) + from pyop2.op2 import compute_backend + return compute_backend.MixedDataSet(self._sets, e) def __str__(self): return "OP2 MixedSet composed of Sets: %s" % (self._sets,) From 4129f3733aa389e91cee94a0a1de0b7bfc6aeef3 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:25:16 -0500 Subject: [PATCH 02/10] generalize loopy codegen to allow OpenCL/CUDA targets --- pyop2/codegen/rep2loopy.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/pyop2/codegen/rep2loopy.py b/pyop2/codegen/rep2loopy.py index f85041107..b9d4d5c6d 100644 --- a/pyop2/codegen/rep2loopy.py +++ b/pyop2/codegen/rep2loopy.py @@ -400,13 +400,12 @@ def bounds(exprs): return dict((op, (names[op], dep)) for op, dep in deps.items()) -def generate(builder, wrapper_name=None): +def generate(builder, wrapper_name=None, include_math=True, include_petsc=True, include_complex=True): # Reset all terminal counters to avoid generated code becoming different across ranks Argument._count = defaultdict(partial(itertools.count)) Index._count = itertools.count() Materialise._count = itertools.count() RuntimeIndex._count = itertools.count() - if builder.layer_index is not None: outer_inames = frozenset([builder._loop_index.name, builder.layer_index.name]) @@ -553,7 +552,16 @@ def renamer(expr): # register kernel kernel = builder.kernel headers = set(kernel.headers) - headers = headers | set(["#include ", "#include ", "#include "]) + + if include_math: + headers.add("#include ") + + if include_petsc: + headers.add("#include ") + + if include_complex: + headers.add("#include ") + if PETSc.Log.isActive(): headers = headers | set(["#include "]) preamble = "\n".join(sorted(headers)) @@ -622,15 +630,22 @@ def statement_assign(expr, context): if isinstance(lvalue, Indexed): context.index_ordering.append(tuple(i.name for i in lvalue.index_ordering())) lvalue, rvalue = tuple(expression(c, context.parameters) for c in expr.children) - within_inames = context.within_inames[expr] + if isinstance(expr.label, (PreUnpackInst, UnpackInst)): + tag = "scatter" + elif isinstance(expr.label, PackInst): + tag = "gather" + else: + raise NotImplementedError() + within_inames = context.within_inames[expr] id, depends_on = context.instruction_dependencies[expr] predicates = frozenset(context.conditions) return loopy.Assignment(lvalue, rvalue, within_inames=within_inames, within_inames_is_final=True, predicates=predicates, id=id, - depends_on=depends_on, depends_on_is_final=True) + depends_on=depends_on, depends_on_is_final=True, + tags=frozenset([tag])) @statement.register(FunctionCall) From e4bc3eb40f897ed6714acf3352838a6bd504e661 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:26:17 -0500 Subject: [PATCH 03/10] defines an AbstractComputeBackend type --- pyop2/backends/__init__.py | 44 ++++++++++++++++++++++++++++++++++++++ setup.py | 2 +- 2 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 pyop2/backends/__init__.py diff --git a/pyop2/backends/__init__.py b/pyop2/backends/__init__.py new file mode 100644 index 000000000..f5da7cd55 --- /dev/null +++ b/pyop2/backends/__init__.py @@ -0,0 +1,44 @@ +class _not_implemented: # noqa + """Not Implemented""" + + +class AbstractComputeBackend: + """ + Abstract class to record all the backend specific implementation of + :mod:`pyop2`'s data structures. + """ + GlobalKernel = _not_implemented + Parloop = _not_implemented + Set = _not_implemented + ExtrudedSet = _not_implemented + MixedSet = _not_implemented + Subset = _not_implemented + DataSet = _not_implemented + MixedDataSet = _not_implemented + Map = _not_implemented + MixedMap = _not_implemented + Dat = _not_implemented + MixedDat = _not_implemented + DatView = _not_implemented + Mat = _not_implemented + Global = _not_implemented + Constant = _not_implemented + GlobalDataSet = _not_implemented + PETScVecType = _not_implemented + + def __getattribute__(self, key): + val = super().__getattribute__(key) + if val is _not_implemented: + raise NotImplementedError(f"'{val}' is not implemented for backend" + f" '{type(self).__name__}'.") + return val + + def turn_on_offloading(self): + raise NotImplementedError() + + def turn_off_offloading(self): + raise NotImplementedError() + + @property + def cache_key(self): + raise NotImplementedError() diff --git a/setup.py b/setup.py index 06c03e152..0ecf6ba8d 100644 --- a/setup.py +++ b/setup.py @@ -139,7 +139,7 @@ def run(self): 'Programming Language :: Python :: 3.6', ], install_requires=install_requires + test_requires, - packages=['pyop2', 'pyop2.codegen', 'pyop2.types'], + packages=['pyop2', 'pyop2.backends', 'pyop2.codegen', 'pyop2.types'], package_data={ 'pyop2': ['assets/*', '*.h', '*.pxd', '*.pyx', 'codegen/c/*.c']}, scripts=glob('scripts/*'), From edc4124b75d7fd999f96796b2509e1c66622552f Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:28:10 -0500 Subject: [PATCH 04/10] implements CPU backend --- pyop2/backends/cpu.py | 279 ++++++++++++++++++++++++++++++++++++++++++ pyop2/op2.py | 12 +- 2 files changed, 288 insertions(+), 3 deletions(-) create mode 100644 pyop2/backends/cpu.py diff --git a/pyop2/backends/cpu.py b/pyop2/backends/cpu.py new file mode 100644 index 000000000..a718a3650 --- /dev/null +++ b/pyop2/backends/cpu.py @@ -0,0 +1,279 @@ +from pyop2.types.dat import Dat as BaseDat, MixedDat, DatView +from pyop2.types.set import Set, ExtrudedSet, Subset, MixedSet +from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet +from pyop2.types.map import Map, MixedMap +from pyop2.parloop import AbstractParloop +from pyop2.global_kernel import AbstractGlobalKernel +from pyop2.types.access import READ, INC, MIN, MAX +from pyop2.types.mat import Mat +from pyop2.types.glob import Global as BaseGlobal, Constant as BaseConstant +from pyop2.backends import AbstractComputeBackend +from petsc4py import PETSc +from pyop2 import ( + compilation, + mpi, + utils +) + +import ctypes +import os +import loopy as lp +from contextlib import contextmanager +import numpy as np + + +class Dat(BaseDat): + @utils.cached_property + def _vec(self): + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + size = self.dataset.layout_vec.getSizes() + data = self._data[:size[0]] + vec = PETSc.Vec().createWithArray(data, size=size, + bsize=self.cdim, comm=self.comm) + return vec + + @contextmanager + def vec_context(self, access): + # PETSc Vecs have a state counter and cache norm computations + # to return immediately if the state counter is unchanged. + # Since we've updated the data behind their back, we need to + # change that state counter. + self._vec.stateIncrease() + yield self._vec + if access is not READ: + self.halo_valid = False + + def ensure_availability_on_device(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + def ensure_availability_on_host(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + @mpi.collective + def copy(self, other, subset=None): + if other is self: + return + if subset is None: + # If the current halo is valid we can also copy these values across. + if self.halo_valid: + other._data[:] = self.data_ro + other.halo_valid = True + else: + other.data[:] = self.data_ro + elif subset.superset != self.dataset.set: + raise ex.MapValueError("The subset and dataset are incompatible") + else: + other.data[subset.owned_indices] = self.data_ro[subset.owned_indices] + + +class Global(BaseGlobal): + @utils.cached_property + def _vec(self): + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + data = self._data + size = self.dataset.layout_vec.getSizes() + if self.comm.rank == 0: + return PETSc.Vec().createWithArray(data, size=size, + bsize=self.cdim, + comm=self.comm) + else: + return PETSc.Vec().createWithArray(np.empty(0, dtype=self.dtype), + size=size, + bsize=self.cdim, + comm=self.comm) + + @contextmanager + def vec_context(self, access): + """A context manager for a :class:`PETSc.Vec` from a :class:`Global`. + + :param access: Access descriptor: READ, WRITE, or RW.""" + yield self._vec + if access is not READ: + data = self._data + self.comm.Bcast(data, 0) + + def ensure_availability_on_device(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + def ensure_availability_on_host(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + +class Constant(BaseConstant): + @utils.cached_property + def _vec(self): + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + data = self._data + size = self.dataset.layout_vec.getSizes() + if self.comm.rank == 0: + return PETSc.Vec().createWithArray(data, size=size, + bsize=self.cdim, + comm=self.comm) + else: + return PETSc.Vec().createWithArray(np.empty(0, dtype=self.dtype), + size=size, + bsize=self.cdim, + comm=self.comm) + + @contextmanager + def vec_context(self, access): + """A context manager for a :class:`PETSc.Vec` from a :class:`Global`. + + :param access: Access descriptor: READ, WRITE, or RW.""" + yield self._vec + if access is not READ: + data = self._data + self.comm.Bcast(data, 0) + + def ensure_availability_on_device(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + def ensure_availability_on_host(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + +class GlobalKernel(AbstractGlobalKernel): + + @utils.cached_property + def code_to_compile(self): + """Return the C/C++ source code as a string.""" + from pyop2.codegen.rep2loopy import generate + + wrapper = generate(self.builder) + code = lp.generate_code_v2(wrapper) + + if self.local_kernel.cpp: + from loopy.codegen.result import process_preambles + preamble = "".join( + process_preambles(getattr(code, "device_preambles", []))) + device_code = "\n\n".join(str(dp.ast) for dp in code.device_programs) + return preamble + '\nextern "C" {\n' + device_code + "\n}\n" + return code.device_code() + + @PETSc.Log.EventDecorator() + @mpi.collective + def compile(self, comm): + """Compile the kernel. + + :arg comm: The communicator the compilation is collective over. + :returns: A ctypes function pointer for the compiled function. + """ + extension = "cpp" if self.local_kernel.cpp else "c" + cppargs = ( + tuple("-I%s/include" % d for d in utils.get_petsc_dir()) + + tuple("-I%s" % d for d in self.local_kernel.include_dirs) + + ("-I%s" % os.path.abspath(os.path.dirname(__file__)),) + ) + ldargs = ( + tuple("-L%s/lib" % d for d in utils.get_petsc_dir()) + + tuple("-Wl,-rpath,%s/lib" % d for d in utils.get_petsc_dir()) + + ("-lpetsc", "-lm") + + tuple(self.local_kernel.ldargs) + ) + + return compilation.load(self, extension, self.name, + cppargs=cppargs, + ldargs=ldargs, + restype=ctypes.c_int, + comm=comm) + + +class Parloop(AbstractParloop): + @PETSc.Log.EventDecorator("ParLoopRednBegin") + @mpi.collective + def reduction_begin(self): + """Begin reductions.""" + requests = [] + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + mpi_op = {INC: mpi.MPI.SUM, + MIN: mpi.MPI.MIN, + MAX: mpi.MPI.MAX}.get(self.accesses[idx]) + + if mpi.MPI.VERSION >= 3: + requests.append(self.comm.Iallreduce(glob._data, + glob._buf, + op=mpi_op)) + else: + self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) + return tuple(requests) + + @PETSc.Log.EventDecorator("ParLoopRednEnd") + @mpi.collective + def reduction_end(self, requests): + """Finish reductions.""" + if mpi.MPI.VERSION >= 3: + mpi.MPI.Request.Waitall(requests) + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + else: + assert len(requests) == 0 + + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + + +class CPUBackend(AbstractComputeBackend): + GlobalKernel = GlobalKernel + Parloop = Parloop + Set = Set + ExtrudedSet = ExtrudedSet + MixedSet = MixedSet + Subset = Subset + DataSet = DataSet + MixedDataSet = MixedDataSet + Map = Map + MixedMap = MixedMap + Dat = Dat + MixedDat = MixedDat + DatView = DatView + Mat = Mat + Global = Global + Constant = Constant + GlobalDataSet = GlobalDataSet + PETScVecType = PETSc.Vec.Type.STANDARD + + def turn_on_offloading(self): + pass + + def turn_off_offloading(self): + pass + + @property + def cache_key(self): + return (type(self),) + + +cpu_backend = CPUBackend() diff --git a/pyop2/op2.py b/pyop2/op2.py index 85788eafa..de7c5a0d2 100644 --- a/pyop2/op2.py +++ b/pyop2/op2.py @@ -51,12 +51,13 @@ from pyop2.local_kernel import CStringLocalKernel, LoopyLocalKernel, Kernel # noqa: F401 from pyop2.global_kernel import (GlobalKernelArg, DatKernelArg, MixedDatKernelArg, # noqa: F401 - MatKernelArg, MixedMatKernelArg, MapKernelArg, GlobalKernel) + MatKernelArg, MixedMatKernelArg, MapKernelArg) from pyop2.parloop import (GlobalParloopArg, DatParloopArg, MixedDatParloopArg, # noqa: F401 - MatParloopArg, MixedMatParloopArg, PassthroughArg, Parloop, parloop, par_loop) + MatParloopArg, MixedMatParloopArg, PassthroughArg, AbstractParloop, parloop, par_loop) from pyop2.parloop import (GlobalLegacyArg, DatLegacyArg, MixedDatLegacyArg, # noqa: F401 MatLegacyArg, MixedMatLegacyArg, LegacyParloop, ParLoop) +from pyop2.backends.cpu import cpu_backend __all__ = ['configuration', 'READ', 'WRITE', 'RW', 'INC', 'MIN', 'MAX', 'ON_BOTTOM', 'ON_TOP', 'ON_INTERIOR_FACETS', 'ALL', @@ -64,7 +65,7 @@ 'set_log_level', 'MPI', 'init', 'exit', 'Kernel', 'Set', 'ExtrudedSet', 'MixedSet', 'Subset', 'DataSet', 'GlobalDataSet', 'MixedDataSet', 'Halo', 'Dat', 'MixedDat', 'Mat', 'Global', 'Map', 'MixedMap', - 'Sparsity', 'parloop', 'Parloop', 'ParLoop', 'par_loop', + 'Sparsity', 'parloop', 'AbstractParloop', 'ParLoop', 'par_loop', 'DatView', 'PermutedMap', 'ComposedMap'] @@ -120,3 +121,8 @@ def exit(): configuration.reset() global _initialised _initialised = False + + +compute_backend = cpu_backend + +GlobalKernel = cpu_backend.GlobalKernel From 03db3a525b311058e353688445c20901a1784b4c Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:27:16 -0500 Subject: [PATCH 05/10] defines offloading helper types --- pyop2/offload_utils.py | 71 ++++++++++++++++++++++++++++++++++++++++++ pyop2/op2.py | 4 ++- 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 pyop2/offload_utils.py diff --git a/pyop2/offload_utils.py b/pyop2/offload_utils.py new file mode 100644 index 000000000..a60ee5c47 --- /dev/null +++ b/pyop2/offload_utils.py @@ -0,0 +1,71 @@ +from enum import IntEnum +from contextlib import contextmanager + + +class DataAvailability(IntEnum): + """ + Indicates whether the device or host contains valid data. + """ + AVAILABLE_ON_HOST_ONLY = 1 + AVAILABLE_ON_DEVICE_ONLY = 2 + AVAILABLE_ON_BOTH = 3 + + +class OffloadMixin: + def get_availability(self): + raise NotImplementedError + + def ensure_availability_on_host(self): + raise NotImplementedError + + def ensure_availaibility_on_device(self): + raise NotImplementedError + + def is_available_on_host(self): + return bool(self.get_availability() & AVAILABLE_ON_HOST_ONLY) + + def is_available_on_device(self): + return bool(self.get_availability() & AVAILABLE_ON_DEVICE_ONLY) + + +AVAILABLE_ON_HOST_ONLY = DataAvailability.AVAILABLE_ON_HOST_ONLY +AVAILABLE_ON_DEVICE_ONLY = DataAvailability.AVAILABLE_ON_DEVICE_ONLY +AVAILABLE_ON_BOTH = DataAvailability.AVAILABLE_ON_BOTH + + +def set_offloading_backend(backend): + """ + Sets a backend for offloading operations to. + + By default the operations are executed on the host, to mark operations for + offloading wrap them within a :func:`~pyop2.offload_utils.offloading` + context. + + :arg backend: An instance of :class:`pyop2.backends.AbstractComputeBackend`. + + .. warning:: + + * Must be called before any instance of PyOP2 type is allocated. + * Calling :func:`set_offloading_bacckend` different values of *backend* + over the course of the program is an undefined behavior. (i.e. + preferably avoided) + """ + from pyop2 import op2 + from pyop2.backends import AbstractComputeBackend + assert isinstance(backend, AbstractComputeBackend) + op2.compute_backend = backend + + +@contextmanager +def offloading(): + """ + Operations (such as manipulating a :class:`~pyop2.types.Dat`, calling a + :class:`~pyop2.global_kernel.GlobalKernel`, etc) within the offloading + region will be executed on backend as selected via + :func:`set_offloading_backend`. + """ + from pyop2 import op2 + op2.compute_backend.turn_on_offloading() + yield + op2.compute_backend.turn_off_offloading() + return diff --git a/pyop2/op2.py b/pyop2/op2.py index de7c5a0d2..f868c5c91 100644 --- a/pyop2/op2.py +++ b/pyop2/op2.py @@ -58,6 +58,7 @@ MatLegacyArg, MixedMatLegacyArg, LegacyParloop, ParLoop) from pyop2.backends.cpu import cpu_backend +from pyop2.offload_utils import set_offloading_backend, offloading __all__ = ['configuration', 'READ', 'WRITE', 'RW', 'INC', 'MIN', 'MAX', 'ON_BOTTOM', 'ON_TOP', 'ON_INTERIOR_FACETS', 'ALL', @@ -66,7 +67,8 @@ 'MixedSet', 'Subset', 'DataSet', 'GlobalDataSet', 'MixedDataSet', 'Halo', 'Dat', 'MixedDat', 'Mat', 'Global', 'Map', 'MixedMap', 'Sparsity', 'parloop', 'AbstractParloop', 'ParLoop', 'par_loop', - 'DatView', 'PermutedMap', 'ComposedMap'] + 'DatView', 'PermutedMap', 'ComposedMap', 'set_offloading_backend', + 'offloading'] _initialised = False From 8903a46ced4db10961197499eb77ebfb61271c37 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:27:41 -0500 Subject: [PATCH 06/10] implement GPU codegen helpers --- pyop2/compilation.py | 116 ++++++++++++++++++++++++++++++---- pyop2/configuration.py | 10 ++- pyop2/transforms/__init__.py | 0 pyop2/transforms/gpu_utils.py | 94 +++++++++++++++++++++++++++ pyop2/transforms/snpt.py | 50 +++++++++++++++ setup.py | 2 +- 6 files changed, 257 insertions(+), 15 deletions(-) create mode 100644 pyop2/transforms/__init__.py create mode 100644 pyop2/transforms/gpu_utils.py create mode 100644 pyop2/transforms/snpt.py diff --git a/pyop2/compilation.py b/pyop2/compilation.py index 387d2eaf2..28118c517 100644 --- a/pyop2/compilation.py +++ b/pyop2/compilation.py @@ -154,6 +154,29 @@ def sniff_compiler(exe, comm=mpi.COMM_WORLD): return comm.bcast(compiler, 0) +def _check_src_hashes(comm, global_kernel): + hsh = md5(str(global_kernel.cache_key[1:]).encode()) + basename = hsh.hexdigest() + dirpart, basename = basename[:2], basename[2:] + cachedir = configuration["cache_dir"] + cachedir = os.path.join(cachedir, dirpart) + + if configuration["check_src_hashes"] or configuration["debug"]: + matching = comm.allreduce(basename, op=_check_op) + if matching != basename: + # Dump all src code to disk for debugging + output = os.path.join(cachedir, "mismatching-kernels") + srcfile = os.path.join(output, "src-rank%d.c" % comm.rank) + if comm.rank == 0: + os.makedirs(output, exist_ok=True) + comm.barrier() + with open(srcfile, "w") as f: + f.write(global_kernel.code_to_compile) + comm.barrier() + raise CompilationError("Generated code differs across ranks" + f" (see output in {output})") + + class Compiler(ABC): """A compiler for shared libraries. @@ -324,19 +347,8 @@ def get_so(self, jitmodule, extension): # atomically (avoiding races). tmpname = os.path.join(cachedir, "%s_p%d.so.tmp" % (basename, pid)) - if configuration['check_src_hashes'] or configuration['debug']: - matching = self.comm.allreduce(basename, op=_check_op) - if matching != basename: - # Dump all src code to disk for debugging - output = os.path.join(configuration["cache_dir"], "mismatching-kernels") - srcfile = os.path.join(output, "src-rank%d.c" % self.comm.rank) - if self.comm.rank == 0: - os.makedirs(output, exist_ok=True) - self.comm.barrier() - with open(srcfile, "w") as f: - f.write(jitmodule.code_to_compile) - self.comm.barrier() - raise CompilationError("Generated code differs across ranks (see output in %s)" % output) + _check_src_hashes(self.comm, jitmodule) + try: # Are we in the cache? return ctypes.CDLL(soname) @@ -652,3 +664,81 @@ def clear_cache(prompt=False): shutil.rmtree(cachedir, ignore_errors=True) else: print("Not removing cached libraries") + + +def _get_code_to_compile(comm, global_kernel): + # Determine cache key + hsh = md5(str(global_kernel.cache_key[1:]).encode()) + basename = hsh.hexdigest() + cachedir = configuration["cache_dir"] + dirpart, basename = basename[:2], basename[2:] + cachedir = os.path.join(cachedir, dirpart) + cname = os.path.join(cachedir, f"{basename}_code.cu") + + _check_src_hashes(comm, global_kernel) + + if os.path.isfile(cname): + # Are we in the cache? + with open(cname, "r") as f: + code_to_compile = f.read() + else: + # No, let"s go ahead and build + if comm.rank == 0: + # No need to do this on all ranks + os.makedirs(cachedir, exist_ok=True) + with progress(INFO, "Compiling wrapper"): + # make sure that compiles successfully before writing to file + code_to_compile = global_kernel.code_to_compile + with open(cname, "w") as f: + f.write(code_to_compile) + comm.barrier() + + return code_to_compile + + +@mpi.collective +def get_prepared_cuda_function(comm, global_kernel): + from pycuda.compiler import SourceModule + + # Determine cache key + hsh = md5(str(global_kernel.cache_key[1:]).encode()) + basename = hsh.hexdigest() + cachedir = configuration["cache_dir"] + dirpart, basename = basename[:2], basename[2:] + cachedir = os.path.join(cachedir, dirpart) + + nvcc_opts = ["-use_fast_math", "-w"] + + code_to_compile = _get_code_to_compile(comm, global_kernel) + source_module = SourceModule(code_to_compile, options=nvcc_opts, + cache_dir=cachedir) + + cu_func = source_module.get_function(global_kernel.name) + + type_map = {ctypes.c_void_p: "P", ctypes.c_int: "i"} + argtypes = "".join(type_map[t] for t in global_kernel.argtypes) + cu_func.prepare(argtypes) + + return cu_func + + +@mpi.collective +def get_opencl_kernel(comm, global_kernel): + import pyopencl as cl + from pyop2.backends.opencl import opencl_backend + cl_ctx = opencl_backend.context + + # Determine cache key + hsh = md5(str(global_kernel.cache_key[1:]).encode()) + basename = hsh.hexdigest() + cachedir = configuration["cache_dir"] + dirpart, basename = basename[:2], basename[2:] + cachedir = os.path.join(cachedir, dirpart) + + code_to_compile = _get_code_to_compile(comm, global_kernel) + + prg = cl.Program(cl_ctx, code_to_compile).build(options=[], + cache_dir=cachedir) + + cl_knl = cl.Kernel(prg, global_kernel.name) + return cl_knl diff --git a/pyop2/configuration.py b/pyop2/configuration.py index 29717718c..a98cad124 100644 --- a/pyop2/configuration.py +++ b/pyop2/configuration.py @@ -74,6 +74,12 @@ class Configuration(dict): cdim > 1 be built as block sparsities, or dof sparsities. The former saves memory but changes which preconditioners are available for the resulting matrices. (Default yes) + :param gpu_strategy: A :class:str` indicating the transformation strategy + that must be applied to a :class:`pyop2.global_kernel.GlobalKernel` + when offloading to a GPGPU. Can be one of: + - ``"snpt"``: Single-"N" Per Thread. In the transform strategy, the + work of each element of the iteration set over which a global kernel + operates is assigned to a work-item (i.e. a CUDA thread) """ # name, env variable, type, default, write once cache_dir = os.path.join(gettempdir(), "pyop2-cache-uid%s" % os.getuid()) @@ -113,7 +119,9 @@ class Configuration(dict): "matnest": ("PYOP2_MATNEST", bool, True), "block_sparsity": - ("PYOP2_BLOCK_SPARSITY", bool, True) + ("PYOP2_BLOCK_SPARSITY", bool, True), + "gpu_strategy": + ("PYOP2_GPU_STRATEGY", str, "snpt"), } """Default values for PyOP2 configuration parameters""" diff --git a/pyop2/transforms/__init__.py b/pyop2/transforms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pyop2/transforms/gpu_utils.py b/pyop2/transforms/gpu_utils.py new file mode 100644 index 000000000..28cd558e1 --- /dev/null +++ b/pyop2/transforms/gpu_utils.py @@ -0,0 +1,94 @@ +import loopy as lp +from pyop2.configuration import configuration + + +def get_loopy_target(target): + if target == "opencl": + return lp.PyOpenCLTarget() + elif target == "cuda": + return lp.CudaTarget() + else: + raise NotImplementedError() + + +def preprocess_t_unit_for_gpu(t_unit): + + # {{{ inline all kernels in t_unit + + kernels_to_inline = { + name for name, clbl in t_unit.callables_table.items() + if isinstance(clbl, lp.CallableKernel)} + + for knl_name in kernels_to_inline: + t_unit = lp.inline_callable_kernel(t_unit, knl_name) + + # }}} + + kernel = t_unit.default_entrypoint + + # changing the address space of temps + def _change_aspace_tvs(tv): + if tv.read_only: + assert tv.initializer is not None + return tv.copy(address_space=lp.AddressSpace.GLOBAL) + else: + return tv.copy(address_space=lp.AddressSpace.PRIVATE) + + new_tvs = {tv_name: _change_aspace_tvs(tv) for tv_name, tv in + kernel.temporary_variables.items()} + kernel = kernel.copy(temporary_variables=new_tvs) + + def insn_needs_atomic(insn): + # updates to global variables are atomic + import pymbolic + if isinstance(insn, lp.Assignment): + if isinstance(insn.assignee, pymbolic.primitives.Subscript): + assignee_name = insn.assignee.aggregate.name + else: + assert isinstance(insn.assignee, pymbolic.primitives.Variable) + assignee_name = insn.assignee.name + + if assignee_name in kernel.arg_dict: + return assignee_name in insn.read_dependency_names() + return False + + new_insns = [] + args_marked_for_atomic = set() + for insn in kernel.instructions: + if insn_needs_atomic(insn): + atomicity = (lp.AtomicUpdate(insn.assignee.aggregate.name), ) + insn = insn.copy(atomicity=atomicity) + args_marked_for_atomic |= set([insn.assignee.aggregate.name]) + + new_insns.append(insn) + + # label args as atomic + new_args = [] + for arg in kernel.args: + if arg.name in args_marked_for_atomic: + new_args.append(arg.copy(for_atomic=True)) + else: + new_args.append(arg) + + kernel = kernel.copy(instructions=new_insns, args=new_args) + + return t_unit.with_kernel(kernel) + + +def apply_gpu_transforms(t_unit, target): + t_unit = t_unit.copy(target=get_loopy_target(target)) + t_unit = preprocess_t_unit_for_gpu(t_unit) + kernel = t_unit.default_entrypoint + transform_strategy = configuration["gpu_strategy"] + + kernel = lp.assume(kernel, "end > start") + + if transform_strategy == "snpt": + from pyop2.transforms.snpt import split_n_across_workgroups + kernel, args_to_make_global = split_n_across_workgroups(kernel, 32) + else: + raise NotImplementedError(f"'{transform_strategy}' transform strategy.") + + t_unit = t_unit.with_kernel(kernel) + + return t_unit, args_to_make_global diff --git a/pyop2/transforms/snpt.py b/pyop2/transforms/snpt.py new file mode 100644 index 000000000..23c145f1c --- /dev/null +++ b/pyop2/transforms/snpt.py @@ -0,0 +1,50 @@ +import loopy as lp + + +def _make_tv_array_arg(tv): + assert tv.address_space != lp.AddressSpace.PRIVATE + arg = lp.ArrayArg(name=tv.name, + dtype=tv.dtype, + shape=tv.shape, + dim_tags=tv.dim_tags, + offset=tv.offset, + dim_names=tv.dim_names, + order=tv.order, + alignment=tv.alignment, + address_space=tv.address_space, + is_output=not tv.read_only, + is_input=tv.read_only) + return arg + + +def split_n_across_workgroups(kernel, workgroup_size): + """ + Returns a transformed version of *kernel* with the workload in the loop + with induction variable 'n' distributed across work-groups of size + *workgroup_size* and each work-item in the work-group performing the work + of a single iteration of 'n'. + """ + + kernel = lp.assume(kernel, "start < end") + kernel = lp.split_iname(kernel, "n", workgroup_size, + outer_tag="g.0", inner_tag="l.0") + + # {{{ making consts as globals: necessary to make the strategy emit valid + # kernels for all forms + + old_temps = kernel.temporary_variables.copy() + args_to_make_global = [tv.initializer.flatten() + for tv in old_temps.values() + if tv.initializer is not None] + + new_temps = {tv.name: tv + for tv in old_temps.values() + if tv.initializer is None} + kernel = kernel.copy(args=kernel.args+[_make_tv_array_arg(tv) + for tv in old_temps.values() + if tv.initializer is not None], + temporary_variables=new_temps) + + # }}} + + return kernel, args_to_make_global diff --git a/setup.py b/setup.py index 0ecf6ba8d..1d16cef86 100644 --- a/setup.py +++ b/setup.py @@ -139,7 +139,7 @@ def run(self): 'Programming Language :: Python :: 3.6', ], install_requires=install_requires + test_requires, - packages=['pyop2', 'pyop2.backends', 'pyop2.codegen', 'pyop2.types'], + packages=['pyop2', 'pyop2.backends', 'pyop2.codegen', 'pyop2.types', 'pyop2.transforms'], package_data={ 'pyop2': ['assets/*', '*.h', '*.pxd', '*.pyx', 'codegen/c/*.c']}, scripts=glob('scripts/*'), From 2189c394ae55c478d2f4b0023afebf4401ea2802 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:30:39 -0500 Subject: [PATCH 07/10] Implements CUDA backend --- pyop2/backends/cuda.py | 760 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 760 insertions(+) create mode 100644 pyop2/backends/cuda.py diff --git a/pyop2/backends/cuda.py b/pyop2/backends/cuda.py new file mode 100644 index 000000000..fa8e52ea9 --- /dev/null +++ b/pyop2/backends/cuda.py @@ -0,0 +1,760 @@ +"""OP2 CUDA backend.""" + +import os + +from hashlib import md5 +from contextlib import contextmanager + +from pyop2 import compilation, mpi, utils +from pyop2.offload_utils import (AVAILABLE_ON_HOST_ONLY, + AVAILABLE_ON_DEVICE_ONLY, + AVAILABLE_ON_BOTH, + DataAvailability) +from pyop2.configuration import configuration +from pyop2.types.set import (MixedSet, Subset as BaseSubset, + ExtrudedSet as BaseExtrudedSet, + Set) +from pyop2.types.map import Map as BaseMap, MixedMap +from pyop2.types.dat import Dat as BaseDat, MixedDat, DatView +from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet +from pyop2.types.mat import Mat +from pyop2.types.glob import Global as BaseGlobal, Constant as BaseConstant +from pyop2.types.access import RW, READ, MIN, MAX, INC +from pyop2.parloop import AbstractParloop +from pyop2.global_kernel import AbstractGlobalKernel +from pyop2.backends import AbstractComputeBackend, cpu as cpu_backend +from pyop2.exceptions import MapValueError +from petsc4py import PETSc + +import numpy +import pycuda.driver as cuda +import pycuda.gpuarray as cuda_np +import loopy as lp +from pytools import memoize_method +from dataclasses import dataclass +from typing import Tuple +import ctypes + + +class Map(BaseMap): + + def __init__(self, *args, **kwargs): + super(Map, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _cuda_values(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cuda_np.to_gpu(self._values) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + self._cuda_values + + def ensure_availability_on_host(self): + # Map once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if cuda_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self._cuda_values.gpudata,) + else: + return super(Map, self)._kernel_args_ + + +class ExtrudedSet(BaseExtrudedSet): + """ + ExtrudedSet for CUDA. + """ + + def __init__(self, *args, **kwargs): + super(ExtrudedSet, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def cuda_layers_array(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cuda_np.to_gpu(self.layers_array) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + self.cuda_layers_array + + def ensure_availability_on_host(self): + # ExtrudedSet once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if cuda_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self.cuda_layers_array,) + else: + return super(ExtrudedSet, self)._kernel_args_ + + +class Subset(BaseSubset): + """ + Subset for CUDA. + """ + def __init__(self, *args, **kwargs): + super(Subset, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + def get_availability(self): + return self._availability_flag + + @utils.cached_property + def _cuda_indices(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cuda_np.to_gpu(self._indices) + + def ensure_availability_on_device(self): + self._cuda_indices + + def ensure_availability_on_host(self): + # Subset once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if cuda_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self._cuda_indices,) + else: + return super(Subset, self)._kernel_args_ + + +class Dat(BaseDat): + """ + Dat for CUDA. + """ + def __init__(self, *args, **kwargs): + super(Dat, self).__init__(*args, **kwargs) + # _availability_flag: only used when Dat cannot be represented as a + # petscvec; when Dat can be represented as a petscvec the availability + # flag is directly read from the petsc vec. + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _cuda_data(self): + """ + Only used when the Dat's data cannot be represented as a petsc Vec. + """ + self._availability_flag = AVAILABLE_ON_BOTH + if self.can_be_represented_as_petscvec: + with self.vec as petscvec: + return cuda_np.GPUArray(shape=self._data.shape, + dtype=self._data.dtype, + gpudata=petscvec.getCUDAHandle("r"), + strides=self._data.strides) + else: + return cuda_np.to_gpu(self._data) + + def zero(self, subset=None): + if subset is None: + self.data[:] = 0*self.data + self.halo_valid = True + else: + raise NotImplementedError + + @utils.cached_property + def _vec(self): + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + size = self.dataset.layout_vec.getSizes() + data = self._data[:size[0]] + return PETSc.Vec().createCUDAWithArrays(data, size=size, + bsize=self.cdim, + comm=self.comm) + + def get_availability(self): + if self.can_be_represented_as_petscvec: + return DataAvailability(self._vec.getOffloadMask()) + else: + return self._availability_flag + + def ensure_availability_on_device(self): + if self.can_be_represented_as_petscvec: + if not cuda_backend.offloading: + raise NotImplementedError("PETSc limitation: can ensure availability" + " on GPU only within an offloading" + " context.") + # perform a host->device transfer if needed + self._vec.getCUDAHandle("r") + else: + if not self.is_available_on_device(): + self._cuda_data.set(self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + def ensure_availability_on_host(self): + if self.can_be_represented_as_petscvec: + # perform a device->host transfer if needed + self._vec.getArray(readonly=True) + else: + if not self.is_available_on_host(): + self._cuda_data.get(self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + @contextmanager + def vec_context(self, access): + r"""A context manager for a :class:`PETSc.Vec` from a :class:`Dat`. + + :param access: Access descriptor: READ, WRITE, or RW.""" + # PETSc Vecs have a state counter and cache norm computations + # to return immediately if the state counter is unchanged. + # Since we've updated the data behind their back, we need to + # change that state counter. + self._vec.stateIncrease() + + if cuda_backend.offloading: + self.ensure_availability_on_device() + self._vec.bindToCPU(False) + else: + self.ensure_availability_on_host() + self._vec.bindToCPU(True) + + yield self._vec + + if access is not READ: + self.halo_valid = False + + @property + def _kernel_args_(self): + if self.can_be_represented_as_petscvec: + if cuda_backend.offloading: + self.ensure_availability_on_device() + # tell petsc that we have updated the data in the CL buffer + with self.vec as v: + v.restoreCUDAHandle(v.getCUDAHandle()) + return (self._cuda_data,) + else: + self.ensure_availability_on_host() + # tell petsc that we have updated the data on the host + with self.vec as v: + v.stateIncrease() + return (self._data.ctypes.data, ) + else: + if cuda_backend.offloading: + self.ensure_availability_on_device() + + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return (self._cuda_data, ) + else: + self.ensure_availability_on_host() + + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return (self._data.ctypes.data, ) + + @mpi.collective + @property + def data(self): + if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + raise RuntimeError("Illegal access: no data associated with this Dat!") + self.halo_valid = False + + if cuda_backend.offloading: + + self.ensure_availability_on_device() + + # {{{ marking data on the host as invalid + + if self.can_be_represented_as_petscvec: + # report to petsc that we are updating values on the device + with self.vec as petscvec: + petscvec.restoreCUDAHandle(petscvec.getCUDAHandle("w")) + else: + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + + # }}} + v = self._cuda_data[:self.dataset.size] + else: + self.ensure_availability_on_host() + v = self._data[:self.dataset.size].view() + v.setflags(write=True) + + # {{{ marking data on the device as invalid + + if self.can_be_represented_as_petscvec: + # report to petsc that we are altering data on the CPU + with self.vec as petscvec: + petscvec.stateIncrease() + else: + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + # }}} + + return v + + @property + @mpi.collective + def data_with_halos(self): + self.global_to_local_begin(RW) + self.global_to_local_end(RW) + return self.data + + @property + @mpi.collective + def data_ro(self): + + if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + raise RuntimeError("Illegal access: no data associated with this Dat!") + + if cuda_backend.offloading: + self.ensure_availability_on_device() + v = self._cuda_data[:self.dataset.size].view() + # FIXME: PyCUDA doesn't support 'read-only' arrays yet + else: + self.ensure_availability_on_host() + v = self._data[:self.dataset.size].view() + v.setflags(write=False) + + return v + + @property + @mpi.collective + def data_ro_with_halos(self): + self.global_to_local_begin(READ) + self.global_to_local_end(READ) + + if cuda_backend.offloading: + self.ensure_availability_on_device() + v = self._cuda_data.view() + # FIXME: PyCUDA doesn't support 'read-only' arrays yet + else: + self.ensure_availability_on_host() + v = self._data.view() + v.setflags(write=False) + + return v + + @mpi.collective + def copy(self, other, subset=None): + if other is self: + return + if subset is None: + # If the current halo is valid we can also copy these values across. + if self.halo_valid: + if cuda_backend.offloading: + other._cuda_data[:] = self.data_ro + else: + other._data[:] = self.data_ro + + if other.can_be_represented_as_petscvec: + other._vec.setOffloadMask(self._vec.getOffloadMask()) + else: + other._availability_flag = self._availability_flag + other.halo_valid = True + else: + other.data[:] = self.data_ro + elif subset.superset != self.dataset.set: + raise MapValueError("The subset and dataset are incompatible") + else: + other.data[subset.owned_indices] = self.data_ro[subset.owned_indices] + + +class Global(BaseGlobal): + """ + Global for CUDA. + """ + + def __init__(self, *args, **kwargs): + super(Global, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _cuda_data(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cuda_np.to_gpu(self._data) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + if not self.is_available_on_device(): + self._cuda_data.set(self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + def ensure_availability_on_host(self): + if not self.is_available_on_host(): + self._cuda_data.get(ary=self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + @property + def _kernel_args_(self): + if cuda_backend.offloading: + self.ensure_availability_on_device() + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return (self._cuda_data,) + else: + self.ensure_availability_on_host() + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return super(Global, self)._kernel_args_ + + @mpi.collective + @property + def data(self): + + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if cuda_backend.offloading: + self.ensure_availability_on_device() + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return self._cuda_data + else: + self.ensure_availability_on_host() + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return self._data + + @property + def data_ro(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if cuda_backend.offloading: + self.ensure_availability_on_device() + view = self._cuda_data.view() + # FIXME: PyCUDA doesn't support read-only arrays yet + return view + else: + self.ensure_availability_on_host() + view = self._data.view() + view.setflags(write=False) + return view + + @utils.cached_property + def _vec(self): + + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + size = self.dataset.layout_vec.getSizes() + data = self._data[:size[0]] + if self.comm.rank == 0: + return PETSc.Vec().createCUDAWithArrays(data, size=size, + bsize=self.cdim, + comm=self.comm) + else: + return PETSc.Vec().createCUDAWithArrays(cuda_np.empty(0, dtype=self.dtype), + size=size, + bsize=self.cdim, + comm=self.comm) + + +class Constant(BaseConstant): + """ + Constant for CUDA. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._cuda_data = cuda_np.to_gpu(self._data) + + def get_availability(self): + return AVAILABLE_ON_BOTH + + def ensure_availability_on_device(self): + pass + + def ensure_availability_on_host(self): + pass + + @property + def _kernel_args_(self): + if cuda_backend.offloading: + return (self._cuda_data,) + else: + return super()._kernel_args_ + + @mpi.collective + @property + def data(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if cuda_backend.offloading: + return self._cuda_data + else: + return self._data + + @property + def data_ro(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Constant!") + + if cuda_backend.offloading: + view = self._cuda_data.view() + # FIXME: PyCUDA doesn't support read-only arrays yet + return view + else: + view = self._data.view() + view.setflags(write=False) + return view + + @utils.cached_property + def _vec(self): + raise NotImplementedError + + +@dataclass(frozen=True) +class CUFunctionWithExtraArgs: + """ + A partial :class:`pycuda.driver.Function` with bound arguments *args_to_append* + that are appended to the arguments passed in :meth:`__call__`. + """ + cu_func: cuda.Function + args_to_append: Tuple[cuda_np.GPUArray, ...] + + def __call__(self, grid, block, *args): + if all(grid + block): + self.cu_func.prepared_call(grid, block, + *[arg.gpudata + if isinstance(arg, cuda_np.GPUArray) + else arg + for arg in args], + *[arg_to_append.gpudata + for arg_to_append in self.args_to_append]) + + +class GlobalKernel(AbstractGlobalKernel): + @classmethod + def _cache_key(cls, *args, **kwargs): + key = super()._cache_key(*args, **kwargs) + key = key + (configuration["gpu_strategy"], "cuda") + return key + + @utils.cached_property + def encoded_cache_key(self): + return md5(str(self.cache_key[1:]).encode()).hexdigest() + + @utils.cached_property + def computational_grid_expr_cache_file_path(self): + cachedir = configuration["cache_dir"] + return os.path.join(cachedir, f"{self.encoded_cache_key}_grid_params.py") + + @utils.cached_property + def extra_args_cache_file_path(self): + cachedir = configuration["cache_dir"] + return os.path.join(cachedir, f"{self.encoded_cache_key}_extra_args.npz") + + @memoize_method + def get_grid_size(self, start, end): + """ + Returns a :class:`tuple` of the form ``(grid, block)``, where *grid* is + the 2-:class:`tuple` corresponding to the CUDA computational grid size + and ``block`` is the thread block size. Refer to + `CUDA docs `_ + for a detailed explanation of these terms. + """ # noqa: E501 + fpath = self.computational_grid_expr_cache_file_path + + with open(fpath, "r") as f: + globals_dict = {} + exec(f.read(), globals_dict) + get_grid_sizes = globals_dict["get_grid_sizes"] + + grid, block = get_grid_sizes(start=start, end=end) + + # TODO: PyCUDA doesn't allow numpy int's for grid dimensions. Remove + # these casts after PyCUDA has been patched. + return (tuple(int(dim) for dim in grid), + tuple(int(dim) for dim in block)) + + @memoize_method + def get_extra_args(self): + """ + Returns device buffers corresponding to array literals baked into + :attr:`local_kernel`. + """ + fpath = self.extra_args_cache_file_path + npzfile = numpy.load(fpath) + assert npzfile["ids"].ndim == 1 + assert len(npzfile.files) == len(npzfile["ids"]) + 1 + extra_args_np = [npzfile[arg_id] + for arg_id in npzfile["ids"]] + + return tuple([cuda_np.to_gpu(arg_np) for arg_np in extra_args_np]) + + @utils.cached_property + def argtypes(self): + result = super().argtypes + return result + (ctypes.c_voidp,) * len(self.get_extra_args()) + + @utils.cached_property + def code_to_compile(self): + from pyop2.codegen.rep2loopy import generate + from pyop2.transforms.gpu_utils import apply_gpu_transforms + from pymbolic.interop.ast import to_evaluatable_python_function + + t_unit = generate(self.builder, + include_math=False, + include_petsc=False, + include_complex=False) + + # Make temporary variables with initializers kernel's arguments. + t_unit, extra_args = apply_gpu_transforms(t_unit, "cuda") + + ary_ids = [f"_op2_arg_{i}" + for i in range(len(extra_args))] + + numpy.savez(self.extra_args_cache_file_path, + ids=numpy.array(ary_ids), + **{ary_id: extra_arg + for ary_id, extra_arg in zip(ary_ids, extra_args)}) + + # {{{ save python code to get grid sizes + + with open(self.computational_grid_expr_cache_file_path, "w") as f: + glens, llens = (t_unit + .default_entrypoint + .get_grid_size_upper_bounds_as_exprs(t_unit + .callables_table)) + glens = glens + (1,) * (2 - len(glens)) # cuda expects 2d grid shape + llens = llens + (1,) * (3 - len(llens)) # cuda expects 3d block shape + f.write(to_evaluatable_python_function((glens, llens), "get_grid_sizes")) + + # }}} + + code = lp.generate_code_v2(t_unit).device_code() + + return code + + @mpi.collective + def __call__(self, comm, *args): + key = id(comm) + try: + func = self._func_cache[key] + except KeyError: + func = self.compile(comm) + self._func_cache[key] = func + + grid, block = self.get_grid_size(args[0], args[1]) + func(grid, block, *args) + + @mpi.collective + def compile(self, comm): + cu_func = compilation.get_prepared_cuda_function(comm, + self) + return CUFunctionWithExtraArgs(cu_func, self.get_extra_args()) + + +class Parloop(AbstractParloop): + @PETSc.Log.EventDecorator("ParLoopRednBegin") + @mpi.collective + def reduction_begin(self): + """Begin reductions.""" + requests = [] + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + mpi_op = {INC: mpi.MPI.SUM, + MIN: mpi.MPI.MIN, + MAX: mpi.MPI.MAX}.get(self.accesses[idx]) + + if mpi.MPI.VERSION >= 3: + # FIXME: Get rid of this D2H for CUDA-Aware MPI. + glob.ensure_availability_on_host() + requests.append(self.comm.Iallreduce(glob._data, + glob._buf, + op=mpi_op)) + else: + # FIXME: Get rid of this D2H for CUDA-Aware MPI. + glob.ensure_availability_on_host() + self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) + return tuple(requests) + + @PETSc.Log.EventDecorator("ParLoopRednEnd") + @mpi.collective + def reduction_end(self, requests): + """Finish reductions.""" + if mpi.MPI.VERSION >= 3: + mpi.MPI.Request.Waitall(requests) + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + glob._availability_flag = AVAILABLE_ON_HOST_ONLY + # FIXME: Get rid of this H2D for CUDA-Aware MPI. + glob.ensure_availability_on_device() + else: + assert len(requests) == 0 + + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + glob._availability_flag = AVAILABLE_ON_HOST_ONLY + # FIXME: Get rid of this H2D for CUDA-Aware MPI. + glob.ensure_availability_on_device() + + +class CUDABackend(AbstractComputeBackend): + Parloop_offloading = Parloop + Parloop_no_offloading = cpu_backend.Parloop + GlobalKernel_offloading = GlobalKernel + GlobalKernel_no_offloading = cpu_backend.GlobalKernel + + Parloop = cpu_backend.Parloop + GlobalKernel = cpu_backend.GlobalKernel + Set = Set + ExtrudedSet = ExtrudedSet + MixedSet = MixedSet + Subset = Subset + DataSet = DataSet + MixedDataSet = MixedDataSet + Map = Map + MixedMap = MixedMap + Dat = Dat + MixedDat = MixedDat + DatView = DatView + Mat = Mat + Global = Global + Constant = Constant + GlobalDataSet = GlobalDataSet + PETScVecType = PETSc.Vec.Type.CUDA + + def __init__(self): + self.offloading = False + + def turn_on_offloading(self): + self.offloading = True + self.Parloop = self.Parloop_offloading + self.GlobalKernel = self.GlobalKernel_offloading + + def turn_off_offloading(self): + self.offloading = False + self.Parloop = self.Parloop_no_offloading + self.GlobalKernel = self.GlobalKernel_no_offloading + + @property + def cache_key(self): + return (type(self), self.offloading) + + +cuda_backend = CUDABackend() From e46b15594f683e890e6de3d3a8d6940a66f2c9a8 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:30:48 -0500 Subject: [PATCH 08/10] Implements OpenCL backend --- pyop2/backends/opencl.py | 778 +++++++++++++++++++++++++++++++++++++++ pyop2/offload_utils.py | 2 + 2 files changed, 780 insertions(+) create mode 100644 pyop2/backends/opencl.py diff --git a/pyop2/backends/opencl.py b/pyop2/backends/opencl.py new file mode 100644 index 000000000..3bbf22aea --- /dev/null +++ b/pyop2/backends/opencl.py @@ -0,0 +1,778 @@ +"""OP2 OpenCL backend.""" + +import os +from hashlib import md5 +from contextlib import contextmanager + +from pyop2 import compilation, mpi, utils +from pyop2.offload_utils import (AVAILABLE_ON_HOST_ONLY, + AVAILABLE_ON_DEVICE_ONLY, + AVAILABLE_ON_BOTH, + DataAvailability) +from pyop2.configuration import configuration +from pyop2.types.set import (MixedSet, Subset as BaseSubset, + ExtrudedSet as BaseExtrudedSet, + Set) +from pyop2.types.map import Map as BaseMap, MixedMap +from pyop2.types.dat import Dat as BaseDat, MixedDat, DatView +from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet +from pyop2.types.mat import Mat +from pyop2.types.glob import Global as BaseGlobal, Constant as BaseConstant +from pyop2.types.access import RW, READ, MIN, MAX, INC +from pyop2.parloop import AbstractParloop +from pyop2.global_kernel import AbstractGlobalKernel +from pyop2.backends import AbstractComputeBackend, cpu as cpu_backend +from pyop2.exceptions import MapValueError +from petsc4py import PETSc + +import numpy +import loopy as lp +from pytools import memoize_method +from dataclasses import dataclass +from typing import Tuple +import ctypes + +import pyopencl as cl +import pyopencl.array as cla + + +def read_only_clarray_setitem(self, *args, **kwargs): + # emulates np.ndarray.setitem for numpy arrays with read-only flags + raise ValueError("assignment destination is read-only") + + +class Map(BaseMap): + + def __init__(self, *args, **kwargs): + super(Map, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _opencl_values(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cla.to_device(opencl_backend.queue, self._values) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + self._opencl_values + + def ensure_availability_on_host(self): + # Map once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if opencl_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self._opencl_values.data,) + else: + return super(Map, self)._kernel_args_ + + +class ExtrudedSet(BaseExtrudedSet): + """ + ExtrudedSet for OpenCL. + """ + + def __init__(self, *args, **kwargs): + super(ExtrudedSet, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def opencl_layers_array(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cla.to_device(opencl_backend.queue, self.layers_array) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + self.opencl_layers_array + + def ensure_availability_on_host(self): + # ExtrudedSet once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if opencl_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self.opencl_layers_array.data,) + else: + return super(ExtrudedSet, self)._kernel_args_ + + +class Subset(BaseSubset): + """ + Subset for OpenCL. + """ + def __init__(self, *args, **kwargs): + super(Subset, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + def get_availability(self): + return self._availability_flag + + @utils.cached_property + def _opencl_indices(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cla.to_device(opencl_backend.queue, self._indices) + + def ensure_availability_on_device(self): + self._opencl_indices + + def ensure_availability_on_host(self): + # Subset once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if opencl_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self._opencl_indices.data,) + else: + return super(Subset, self)._kernel_args_ + + +class Dat(BaseDat): + """ + Dat for OpenCL. + """ + def __init__(self, *args, **kwargs): + super(Dat, self).__init__(*args, **kwargs) + # _availability_flag: only used when Dat cannot be represented as a + # petscvec; when Dat can be represented as a petscvec the availability + # flag is directly read from the petsc vec. + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _opencl_data(self): + """ + Only used when the Dat's data cannot be represented as a petsc Vec. + """ + self._availability_flag = AVAILABLE_ON_BOTH + if self.can_be_represented_as_petscvec: + with self.vec as petscvec: + return cla.Array(cq=opencl_backend.queue, + shape=self._data.shape, + dtype=self._data.dtype, + data=cl.Buffer.from_int_ptr( + petscvec.getCLMemHandle("r"), + retain=False), + strides=self._data.strides) + else: + return cla.to_device(opencl_backend.queue, self._data) + + def zero(self, subset=None): + if subset is None: + self.data[:] = 0 + self.halo_valid = True + else: + raise NotImplementedError + + def copy(self, other, subset=None): + raise NotImplementedError + + @utils.cached_property + def _vec(self): + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + size = self.dataset.layout_vec.getSizes() + data = self._data[:size[0]] + return PETSc.Vec().createViennaCLWithArrays(data, size=size, + bsize=self.cdim, + comm=self.comm) + + def get_availability(self): + if self.can_be_represented_as_petscvec: + return DataAvailability(self._vec.getOffloadMask()) + else: + return self._availability_flag + + def ensure_availability_on_device(self): + if self.can_be_represented_as_petscvec: + if not opencl_backend.offloading: + raise NotImplementedError("PETSc limitation: can ensure availability" + " on GPU only within an offloading" + " context.") + # perform a host->device transfer if needed + self._vec.getCLMemHandle("r") + else: + if not self.is_available_on_device(): + self._opencl_data.set(self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + def ensure_availability_on_host(self): + if self.can_be_represented_as_petscvec: + # perform a device->host transfer if needed + self._vec.getArray(readonly=True) + else: + if not self.is_available_on_host(): + self._opencl_data.get(opencl_backend.queue, self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + @contextmanager + def vec_context(self, access): + r"""A context manager for a :class:`PETSc.Vec` from a :class:`Dat`. + + :param access: Access descriptor: READ, WRITE, or RW.""" + # PETSc Vecs have a state counter and cache norm computations + # to return immediately if the state counter is unchanged. + # Since we've updated the data behind their back, we need to + # change that state counter. + self._vec.stateIncrease() + + if opencl_backend.offloading: + self.ensure_availability_on_device() + self._vec.bindToCPU(False) + else: + self.ensure_availability_on_host() + self._vec.bindToCPU(True) + + yield self._vec + + if access is not READ: + self.halo_valid = False + + @property + def _kernel_args_(self): + if self.can_be_represented_as_petscvec: + if opencl_backend.offloading: + self.ensure_availability_on_device() + # tell petsc that we have updated the data in the CL buffer + with self.vec as v: + v.getCLMemHandle() + v.restoreCLMemHandle() + + return (self._opencl_data.data,) + else: + self.ensure_availability_on_host() + # tell petsc that we have updated the data on the host + with self.vec as v: + v.stateIncrease() + return (self._data.ctypes.data, ) + else: + if opencl_backend.offloading: + self.ensure_availability_on_device() + + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return (self._opencl_data.data, ) + else: + self.ensure_availability_on_host() + + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return (self._data.ctypes.data, ) + + @mpi.collective + @property + def data(self): + if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + raise RuntimeError("Illegal access: no data associated with this Dat!") + + self.halo_valid = False + + if opencl_backend.offloading: + + self.ensure_availability_on_device() + + # {{{ marking data on the host as invalid + + if self.can_be_represented_as_petscvec: + # report to petsc that we are updating values on the device + with self.vec as petscvec: + petscvec.getCLMemHandle("w") + petscvec.restoreCLMemHandle() + else: + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + + # }}} + v = self._opencl_data[:self.dataset.size] + else: + self.ensure_availability_on_host() + v = self._data[:self.dataset.size].view() + v.setflags(write=True) + + # {{{ marking data on the device as invalid + + if self.can_be_represented_as_petscvec: + # report to petsc that we are altering data on the CPU + with self.vec as petscvec: + petscvec.stateIncrease() + else: + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + # }}} + + return v + + @property + @mpi.collective + def data_with_halos(self): + self.global_to_local_begin(RW) + self.global_to_local_end(RW) + return self.data + + @property + @mpi.collective + def data_ro(self): + + if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + raise RuntimeError("Illegal access: no data associated with this Dat!") + + self.halo_valid = False + + if opencl_backend.offloading: + self.ensure_availability_on_device() + v = self._opencl_data[:self.dataset.size].view() + v.setitem = read_only_clarray_setitem + else: + self.ensure_availability_on_host() + v = self._data[:self.dataset.size].view() + v.setflags(write=False) + + return v + + @property + @mpi.collective + def data_ro_with_halos(self): + self.global_to_local_begin(READ) + self.global_to_local_end(READ) + + if opencl_backend.offloading: + self.ensure_availability_on_device() + v = self._opencl_data.view() + v.setitem = read_only_clarray_setitem + else: + self.ensure_availability_on_host() + v = self._data.view() + v.setflags(write=False) + + return v + + @mpi.collective + def copy(self, other, subset=None): + if other is self: + return + if subset is None: + # If the current halo is valid we can also copy these values across. + if self.halo_valid: + if opencl_backend.offloading: + other._opencl_data[:] = self.data_ro + else: + other._data[:] = self.data_ro + + if other.can_be_represented_as_petscvec: + other._vec.setOffloadMask(self._vec.getOffloadMask()) + else: + other._availability_flag = self._availability_flag + other.halo_valid = True + else: + other.data[:] = self.data_ro + elif subset.superset != self.dataset.set: + raise MapValueError("The subset and dataset are incompatible") + else: + other.data[subset.owned_indices] = self.data_ro[subset.owned_indices] + + +class Global(BaseGlobal): + """ + Global for OpenCL. + """ + + def __init__(self, *args, **kwargs): + super(Global, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _opencl_data(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cla.to_device(opencl_backend.queue, self._data) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + if not self.is_available_on_device(): + self._opencl_data.set(ary=self._data, + queue=opencl_backend.queue) + self._availability_flag = AVAILABLE_ON_BOTH + + def ensure_availability_on_host(self): + if not self.is_available_on_host(): + self._opencl_data.get(ary=self._data, + queue=opencl_backend.queue) + self._availability_flag = AVAILABLE_ON_BOTH + + @property + def _kernel_args_(self): + if opencl_backend.offloading: + self.ensure_availability_on_device() + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return (self._opencl_data.data,) + else: + self.ensure_availability_on_host() + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return super(Global, self)._kernel_args_ + + @mpi.collective + @property + def data(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if opencl_backend.offloading: + self.ensure_availability_on_device() + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return self._opencl_data + else: + self.ensure_availability_on_host() + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return self._data + + @property + def data_ro(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if opencl_backend.offloading: + self.ensure_availability_on_device() + view = self._opencl_data.view() + view.setitem = read_only_clarray_setitem + return view + else: + self.ensure_availability_on_host() + view = self._data.view() + view.setflags(write=False) + return view + + @utils.cached_property + def _vec(self): + + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + size = self.dataset.layout_vec.getSizes() + data = self._data[:size[0]] + if self.comm.rank == 0: + return PETSc.Vec().createViennaCLWithArrays(data, size=size, + bsize=self.cdim, + comm=self.comm) + else: + return PETSc.Vec().createViennaCLWithArrays(cla.empty(opencl_backend.queue, shape=0, dtype=self.dtype), + size=size, + bsize=self.cdim, + comm=self.comm) + + +class Constant(BaseConstant): + """ + Constant for OpneCL. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._opencl_data = cla.to_device(opencl_backend.queue, self._data) + + def get_availability(self): + return AVAILABLE_ON_BOTH + + def ensure_availability_on_device(self): + pass + + def ensure_availability_on_host(self): + pass + + @property + def _kernel_args_(self): + if opencl_backend.offloading: + return (self._opencl_data.data,) + else: + return super()._kernel_args_ + + @mpi.collective + @property + def data(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if opencl_backend.offloading: + return self._opencl_data + else: + return self._data + + @property + def data_ro(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Constant!") + + if opencl_backend.offloading: + view = self._opencl_data.view() + # FIXME: PyOpenCL doesn't support read-only arrays yet + return view + else: + view = self._data.view() + view.setflags(write=False) + return view + + @utils.cached_property + def _vec(self): + raise NotImplementedError + + +@dataclass(frozen=True) +class CLKernelWithExtraArgs: + cl_kernel: cl.Kernel + args_to_append: Tuple[cla.Array, ...] + + def __call__(self, grid, block, start, end, *args): + if all(grid + block): + self.cl_kernel(opencl_backend.queue, + grid, block, + numpy.int32(start), + numpy.int32(end), + *args, + *[arg_to_append.data + for arg_to_append in self.args_to_append], + g_times_l=True) + + +class GlobalKernel(AbstractGlobalKernel): + @classmethod + def _cache_key(cls, *args, **kwargs): + key = super()._cache_key(*args, **kwargs) + key = key + (configuration["gpu_strategy"], "opencl") + return key + + @utils.cached_property + def encoded_cache_key(self): + return md5(str(self.cache_key[1:]).encode()).hexdigest() + + @utils.cached_property + def computational_grid_expr_cache_file_path(self): + cachedir = configuration["cache_dir"] + return os.path.join(cachedir, f"{self.encoded_cache_key}_grid_params.py") + + @utils.cached_property + def extra_args_cache_file_path(self): + cachedir = configuration["cache_dir"] + return os.path.join(cachedir, f"{self.encoded_cache_key}_extra_args.npz") + + @memoize_method + def get_grid_size(self, start, end): + fpath = self.computational_grid_expr_cache_file_path + + with open(fpath, "r") as f: + globals_dict = {} + exec(f.read(), globals_dict) + get_grid_sizes = globals_dict["get_grid_sizes"] + + return get_grid_sizes(start=start, end=end) + + @memoize_method + def get_extra_args(self): + """ + Returns device buffers corresponding to array literals baked into + :attr:`local_kernel`. + """ + fpath = self.extra_args_cache_file_path + npzfile = numpy.load(fpath) + assert npzfile["ids"].ndim == 1 + assert len(npzfile.files) == len(npzfile["ids"]) + 1 + extra_args_np = [npzfile[arg_id] + for arg_id in npzfile["ids"]] + return tuple(cla.to_device(opencl_backend.queue, arg) + for arg in extra_args_np) + + @utils.cached_property + def argtypes(self): + result = super().argtypes + return result + (ctypes.c_voidp,) * len(self.get_extra_args()) + + @utils.cached_property + def code_to_compile(self): + from pyop2.codegen.rep2loopy import generate + from pyop2.transforms.gpu_utils import apply_gpu_transforms + from pymbolic.interop.ast import to_evaluatable_python_function + + t_unit = generate(self.builder, + include_math=False, + include_petsc=False, + include_complex=False) + + # Make temporary variables with initializers kernel's arguments. + t_unit, extra_args = apply_gpu_transforms(t_unit, "opencl") + + ary_ids = [f"_op2_arg_{i}" + for i in range(len(extra_args))] + + numpy.savez(self.extra_args_cache_file_path, + ids=numpy.array(ary_ids), + **{ary_id: extra_arg + for ary_id, extra_arg in zip(ary_ids, extra_args)}) + + # {{{ save python code to get grid sizes + + with open(self.computational_grid_expr_cache_file_path, "w") as f: + glens, llens = (t_unit + .default_entrypoint + .get_grid_size_upper_bounds_as_exprs(t_unit + .callables_table)) + f.write(to_evaluatable_python_function((glens, llens), "get_grid_sizes")) + + code = lp.generate_code_v2(t_unit).device_code() + + # }}} + + return code + + @mpi.collective + def __call__(self, comm, *args): + key = id(comm) + try: + func = self._func_cache[key] + except KeyError: + func = self.compile(comm) + self._func_cache[key] = func + + grid, block = self.get_grid_size(args[0], args[1]) + func(grid, block, *args) + + @mpi.collective + def compile(self, comm): + cl_knl = compilation.get_opencl_kernel(comm, + self) + return CLKernelWithExtraArgs(cl_knl, self.get_extra_args()) + + +class Parloop(AbstractParloop): + @PETSc.Log.EventDecorator("ParLoopRednBegin") + @mpi.collective + def reduction_begin(self): + """Begin reductions.""" + requests = [] + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + mpi_op = {INC: mpi.MPI.SUM, + MIN: mpi.MPI.MIN, + MAX: mpi.MPI.MAX}.get(self.accesses[idx]) + + if mpi.MPI.VERSION >= 3: + glob.ensure_availability_on_host() + requests.append(self.comm.Iallreduce(glob._data, + glob._buf, + op=mpi_op)) + else: + glob.ensure_availability_on_host() + self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) + return tuple(requests) + + @PETSc.Log.EventDecorator("ParLoopRednEnd") + @mpi.collective + def reduction_end(self, requests): + """Finish reductions.""" + if mpi.MPI.VERSION >= 3: + mpi.MPI.Request.Waitall(requests) + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + glob._availability_flag = AVAILABLE_ON_HOST_ONLY + glob.ensure_availability_on_device() + else: + assert len(requests) == 0 + + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + glob._availability_flag = AVAILABLE_ON_HOST_ONLY + glob.ensure_availability_on_device() + + +class OpenCLBackend(AbstractComputeBackend): + Parloop_offloading = Parloop + Parloop_no_offloading = cpu_backend.Parloop + GlobalKernel_offloading = GlobalKernel + GlobalKernel_no_offloading = cpu_backend.GlobalKernel + + Parloop = cpu_backend.Parloop + GlobalKernel = cpu_backend.GlobalKernel + Set = Set + ExtrudedSet = ExtrudedSet + MixedSet = MixedSet + Subset = Subset + DataSet = DataSet + MixedDataSet = MixedDataSet + Map = Map + MixedMap = MixedMap + Dat = Dat + MixedDat = MixedDat + DatView = DatView + Mat = Mat + Global = Global + Constant = Constant + GlobalDataSet = GlobalDataSet + PETScVecType = PETSc.Vec.Type.VIENNACL + + def __init__(self): + self.offloading = False + + @utils.cached_property + def context(self): + # create a dummy vector and extract its underlying context + x = PETSc.Vec().create(PETSc.COMM_WORLD) + x.setType("viennacl") + x.setSizes(size=1) + ctx_ptr = x.getCLContextHandle() + return cl.Context.from_int_ptr(ctx_ptr, retain=False) + + @utils.cached_property + def queue(self): + # TODO: Instruct the user to pass + # -viennacl_backend opencl + # -viennacl_opencl_device_type gpu + # create a dummy vector and extract its associated command queue + x = PETSc.Vec().create(PETSc.COMM_WORLD) + x.setType("viennacl") + x.setSizes(size=1) + queue_ptr = x.getCLQueueHandle() + return cl.CommandQueue.from_int_ptr(queue_ptr, retain=False) + + def turn_on_offloading(self): + self.offloading = True + self.Parloop = self.Parloop_offloading + self.GlobalKernel = self.GlobalKernel_offloading + + def turn_off_offloading(self): + self.offloading = False + self.Parloop = self.Parloop_no_offloading + self.GlobalKernel = self.GlobalKernel_no_offloading + + @property + def cache_key(self): + return (type(self), self.offloading) + + +opencl_backend = OpenCLBackend() diff --git a/pyop2/offload_utils.py b/pyop2/offload_utils.py index a60ee5c47..7ad1f1c16 100644 --- a/pyop2/offload_utils.py +++ b/pyop2/offload_utils.py @@ -22,9 +22,11 @@ def ensure_availaibility_on_device(self): raise NotImplementedError def is_available_on_host(self): + # bitwise op to detect both AVAILABLE_ON_HOST and AVAILABLE_ON_BOTH return bool(self.get_availability() & AVAILABLE_ON_HOST_ONLY) def is_available_on_device(self): + # bitwise op to detect both AVAILABLE_ON_DEVICE and AVAILABLE_ON_BOTH return bool(self.get_availability() & AVAILABLE_ON_DEVICE_ONLY) From e4030034d525c5e171168d2d6b2d293b0c443bd5 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Wed, 13 Jul 2022 13:45:44 -0500 Subject: [PATCH 09/10] adds a CI job: test_opencl --- .github/workflows/ci.yml | 66 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 788186ac9..076110cf5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -104,3 +104,69 @@ jobs: jekyll: false env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + test_opencl: + runs-on: ubuntu-latest + env: + CC: mpicc + PETSC_DIR: ${{ github.workspace }}/petsc + PETSC_ARCH: default + PETSC_CONFIGURE_OPTIONS: --with-debugging=1 --with-shared-libraries=1 --with-c2html=0 --with-fortran-bindings=0 --with-opencl=1 --download-viennacl --with-viennacl=1 + + steps: + - name: Install system dependencies + shell: bash + run: | + sudo apt update + sudo apt install build-essential mpich libmpich-dev \ + libblas-dev liblapack-dev gfortran + sudo apt install ocl-icd-opencl-dev pocl-opencl-icd + + - name: Set correct Python version + uses: actions/setup-python@v2 + with: + python-version: '3.10' + + - name: Clone PETSc + uses: actions/checkout@v2 + with: + repository: firedrakeproject/petsc + path: ${{ env.PETSC_DIR }} + + - name: Build and install PETSc + shell: bash + working-directory: ${{ env.PETSC_DIR }} + run: | + ./configure ${PETSC_CONFIGURE_OPTIONS} + make + + - name: Build and install petsc4py + shell: bash + working-directory: ${{ env.PETSC_DIR }}/src/binding/petsc4py + run: | + python -m pip install --upgrade cython numpy + python -m pip install --no-deps . + + - name: Checkout PyOP2 + uses: actions/checkout@v2 + with: + path: PyOP2 + + - name: Install PyOP2 + shell: bash + working-directory: PyOP2 + run: | + python -m pip install pip==20.2 # pip 20.2 needed for loopy install to work. + + # xargs is used to force installation of requirements in the order we specified. + xargs -l1 python -m pip install < requirements-ext.txt + xargs -l1 python -m pip install < requirements-git.txt + python -m pip install pulp + python -m pip install -U flake8 + python -m pip install pyopencl + python -m pip install . + + - name: Run tests + shell: bash + working-directory: PyOP2 + run: pytest test -v --tb=native From e105f5ec1de5894f6d8468a2e14c26626a981735 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Wed, 13 Jul 2022 14:08:36 -0500 Subject: [PATCH 10/10] adds test_opencl_offloading --- test/unit/test_opencl_offloading.py | 119 ++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 test/unit/test_opencl_offloading.py diff --git a/test/unit/test_opencl_offloading.py b/test/unit/test_opencl_offloading.py new file mode 100644 index 000000000..b39f11a1c --- /dev/null +++ b/test/unit/test_opencl_offloading.py @@ -0,0 +1,119 @@ +import pytest +pytest.importorskip("pyopencl") + +import sys +import petsc4py +petsc4py.init(sys.argv + + "-viennacl_backend opencl".split() + + "-viennacl_opencl_device_type cpu".split()) +from pyop2 import op2 +import pyopencl.array as cla +import numpy as np +import loopy as lp +lp.set_caching_enabled(False) + + +def allclose(a, b, rtol=1e-05, atol=1e-08): + """ + Prefer this routine over np.allclose(...) to allow pycuda/pyopencl arrays + """ + return bool(abs(a - b) < (atol + rtol * abs(b))) + + +def pytest_generate_tests(metafunc): + if "backend" in metafunc.fixturenames: + from pyop2.backends.opencl import opencl_backend + metafunc.parametrize("backend", [opencl_backend]) + + +def test_new_backend_raises_not_implemented_error(): + from pyop2.backends import AbstractComputeBackend + unimplemented_backend = AbstractComputeBackend() + + attrs = ["GlobalKernel", "Parloop", "Set", "ExtrudedSet", "MixedSet", + "Subset", "DataSet", "MixedDataSet", "Map", "MixedMap", "Dat", + "MixedDat", "DatView", "Mat", "Global", "GlobalDataSet", + "PETScVecType"] + + for attr in attrs: + with pytest.raises(NotImplementedError): + getattr(unimplemented_backend, attr) + + +def test_dat_with_petscvec_representation(backend): + op2.set_offloading_backend(backend) + + nelems = 9 + data = np.random.rand(nelems) + set_ = op2.compute_backend.Set(nelems) + dset = op2.compute_backend.DataSet(set_, 1) + dat = op2.compute_backend.Dat(dset, data.copy()) + + assert isinstance(dat.data_ro, np.ndarray) + dat.data[:] *= 3 + + with op2.offloading(): + assert isinstance(dat.data_ro, cla.Array) + dat.data[:] *= 2 + + assert isinstance(dat.data_ro, np.ndarray) + np.testing.assert_allclose(dat.data_ro, 6*data) + + +def test_dat_not_as_petscvec(backend): + op2.set_offloading_backend(backend) + + nelems = 9 + data = np.random.randint(low=-10, high=10, + size=nelems, + dtype=np.int64) + set_ = op2.compute_backend.Set(nelems) + dset = op2.compute_backend.DataSet(set_, 1) + dat = op2.compute_backend.Dat(dset, data.copy()) + + assert isinstance(dat.data_ro, np.ndarray) + dat.data[:] *= 3 + + with op2.offloading(): + assert isinstance(dat.data_ro, cla.Array) + dat.data[:] *= 2 + + assert isinstance(dat.data_ro, np.ndarray) + np.testing.assert_allclose(dat.data_ro, 6*data) + + +def test_global_reductions(backend): + op2.set_offloading_backend(backend) + + sum_knl = lp.make_function( + "{ : }", + """ + g[0] = g[0] + x[0] + """, + [lp.GlobalArg("g,x", + dtype="float64", + shape=lp.auto, + )], + name="sum_knl", + target=lp.CWithGNULibcTarget(), + lang_version=(2018, 2)) + + rng = np.random.default_rng() + nelems = 4_000 + data_to_sum = rng.random(nelems) + + with op2.offloading(): + + set_ = op2.compute_backend.Set(4_000) + dset = op2.compute_backend.DataSet(set_, 1) + + dat = op2.compute_backend.Dat(dset, data_to_sum.copy()) + glob = op2.compute_backend.Global(1, 0, np.float64, "g") + + op2.parloop(op2.Kernel(sum_knl, "sum_knl"), + set_, + glob(op2.INC), + dat(op2.READ)) + + assert isinstance(glob.data_ro, cla.Array) + assert allclose(glob.data_ro[0], data_to_sum.sum())