Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SVM Memory Pool #587

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/runtime_memory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ Transfers

.. autofunction:: enqueue_copy(queue, dest, src, **kwargs)

.. autofunction:: enqueue_fill(queue, dest, src, **kwargs)

Mapping Memory into Host Address Space
--------------------------------------

Expand Down
46 changes: 46 additions & 0 deletions examples/demo_array_svm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pyopencl as cl
import pyopencl.array as cl_array
from pyopencl.tools import SVMAllocator
import numpy as np
import numpy.linalg as la

n = 500000
a = np.random.rand(n).astype(np.float32)
b = np.random.rand(n).astype(np.float32)


ctx = cl.create_some_context()
queue = cl.CommandQueue(ctx)

alloc = SVMAllocator(ctx, cl.svm_mem_flags.READ_WRITE, queue=queue)

a_dev = cl_array.to_device(queue, a, allocator=alloc)
print("A_DEV", a_dev.data)
b_dev = cl_array.to_device(queue, b, allocator=alloc)
dest_dev = cl_array.empty_like(a_dev)
print("DEST", dest_dev.data)

prg = cl.Program(ctx, """
__kernel void sum(__global const float *a,
__global const float *b, __global float *c)
{
int gid = get_global_id(0);
c[gid] = a[gid] + b[gid];
}
""").build()

knl = prg.sum # Use this Kernel object for repeated calls
knl(queue, a.shape, None, a_dev.data, b_dev.data, dest_dev.data)

# PROBLEM: numpy frees the temporary out of (a_dev+b_dev) before
# we're done with it
diff = dest_dev - (a_dev+b_dev)

if 0:
diff = diff.get()
np.set_printoptions(linewidth=400)
print(dest_dev)
print((a_dev+b_dev).get())
print(diff)
print(la.norm(diff))
print("A_DEV", a_dev.data.mem.__array_interface__)
94 changes: 54 additions & 40 deletions pyopencl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
"""

from sys import intern
from warnings import warn
from typing import Union, Any

from pyopencl.version import VERSION, VERSION_STATUS, VERSION_TEXT # noqa

Expand All @@ -43,7 +45,6 @@
import os
from os.path import dirname, join, realpath
if realpath(join(os.getcwd(), "pyopencl")) == realpath(dirname(__file__)):
from warnings import warn
warn("It looks like you are importing PyOpenCL from "
"its source directory. This likely won't work.")
raise
Expand Down Expand Up @@ -199,11 +200,9 @@

if get_cl_header_version() >= (2, 0):
from pyopencl._cl import ( # noqa
SVMAllocation,
SVMPointer,
SVM,

# FIXME
#enqueue_svm_migratemem,
SVMAllocation,
)

if _cl.have_gl():
Expand Down Expand Up @@ -267,7 +266,6 @@ class CommandQueueUsedAfterExit(UserWarning):

def compiler_output(text):
import os
from warnings import warn
if int(os.environ.get("PYOPENCL_COMPILER_OUTPUT", "0")):
warn(text, CompilerWarning)
else:
Expand Down Expand Up @@ -389,7 +387,6 @@ def enable_debugging(platform_or_context):
import os
os.environ["CPU_MAX_COMPUTE_UNITS"] = "1"
else:
from warnings import warn
warn("do not know how to enable debugging on '%s'"
% platform.name)

Expand Down Expand Up @@ -428,7 +425,6 @@ def _get_prg(self):
return self._prg
else:
# "no program" can only happen in from-source case.
from warnings import warn
warn("Pre-build attribute access defeats compiler caching.",
stacklevel=3)

Expand Down Expand Up @@ -662,7 +658,6 @@ def device_hashable_model_and_version_identifier(self):
return ("v1", self.vendor, self.vendor_id, self.name, self.version)

def device_persistent_unique_id(self):
from warnings import warn
warn("Device.persistent_unique_id is deprecated. "
"Use Device.hashable_model_and_version_identifier instead.",
DeprecationWarning, stacklevel=2)
Expand All @@ -684,7 +679,6 @@ def device_persistent_unique_id(self):

def context_init(self, devices, properties, dev_type, cache_dir=None):
if cache_dir is not None:
from warnings import warn
warn("The 'cache_dir' argument to the Context constructor "
"is deprecated and no longer has an effect. "
"It was removed because it only applied to the wrapper "
Expand Down Expand Up @@ -970,7 +964,6 @@ def image_init(self, context, flags, format, shape=None, pitches=None,

if hostbuf is not None and not \
(flags & (mem_flags.USE_HOST_PTR | mem_flags.COPY_HOST_PTR)):
from warnings import warn
warn("'hostbuf' was passed, but no memory flags to make use of it.")

if hostbuf is None and pitches is not None:
Expand Down Expand Up @@ -1043,7 +1036,6 @@ def image_init(self, context, flags, format, shape=None, pitches=None,

class _ImageInfoGetter:
def __init__(self, event):
from warnings import warn
warn("Image.image.attr is deprecated and will go away in 2021. "
"Use Image.attr directly, instead.")

Expand Down Expand Up @@ -1150,24 +1142,19 @@ def memory_map_exit(self, exc_type, exc_val, exc_tb):
"""

if get_cl_header_version() >= (2, 0):
svmallocation_old_init = SVMAllocation.__init__

def svmallocation_init(self, ctx, size, alignment, flags, _interface=None):
"""
:arg ctx: a :class:`Context`
:arg flags: some of :class:`svm_mem_flags`.
"""
svmallocation_old_init(self, ctx, size, alignment, flags)

# mem_flags.READ_ONLY applies to kernels, not the host
read_write = True
_interface["data"] = (
int(self._ptr_as_int()), not read_write)

self.__array_interface__ = _interface

if get_cl_header_version() >= (2, 0):
SVMAllocation.__init__ = svmallocation_init
class _ArrayInterfaceSVMAllocation(SVMAllocation):
def __init__(self, ctx, size, alignment, flags, _interface=None,
queue=None):
"""
:arg ctx: a :class:`Context`
:arg flags: some of :class:`svm_mem_flags`.
"""
super().__init__(ctx, size, alignment, flags, queue)

# mem_flags.READ_ONLY applies to kernels, not the host
read_write = True
_interface["data"] = (
int(self._ptr_as_int()), not read_write)

# }}}

Expand Down Expand Up @@ -1778,12 +1765,14 @@ def enqueue_copy(queue, dest, src, **kwargs):
else:
raise ValueError("invalid dest mem object type")

elif get_cl_header_version() >= (2, 0) and isinstance(dest, SVM):
elif get_cl_header_version() >= (2, 0) and isinstance(dest, SVMPointer):
# to SVM
if not isinstance(src, SVM):
if not isinstance(src, SVMPointer):
src = SVM(src)

is_blocking = kwargs.pop("is_blocking", True)
assert kwargs.pop("src_offset", 0) == 0
assert kwargs.pop("dest_offset", 0) == 0
return _cl._enqueue_svm_memcpy(queue, is_blocking, dest, src, **kwargs)

else:
Expand All @@ -1809,7 +1798,7 @@ def enqueue_copy(queue, dest, src, **kwargs):
queue, src, origin, region, dest, **kwargs)
else:
raise ValueError("invalid src mem object type")
elif isinstance(src, SVM):
elif isinstance(src, SVMPointer):
# from svm
# dest is not a SVM instance, otherwise we'd be in the branch above
is_blocking = kwargs.pop("is_blocking", True)
Expand All @@ -1822,6 +1811,26 @@ def enqueue_copy(queue, dest, src, **kwargs):
# }}}


# {{{ enqueue_fill

def enqueue_fill(queue: CommandQueue, dest: Union[MemoryObjectHolder, SVMPointer],
pattern: Any, size: int, *, offset: int = 0, wait_for=None) -> Event:
"""
.. versionadded:: 2022.2
"""
if isinstance(dest, MemoryObjectHolder):
return enqueue_fill_buffer(queue, dest, pattern, offset, size, wait_for)
elif isinstance(dest, SVMPointer):
if offset:
raise NotImplementedError("enqueue_fill with SVM does not yet support "
"offsets")
return enqueue_svm_memfill(queue, dest, pattern, size, wait_for)
else:
raise TypeError(f"enqueue_fill does not know how to fill '{type(dest)}'")

# }}}


# {{{ image creation

DTYPE_TO_CHANNEL_TYPE = {
Expand Down Expand Up @@ -1927,7 +1936,6 @@ def enqueue_barrier(queue, wait_for=None):

def enqueue_fill_buffer(queue, mem, pattern, offset, size, wait_for=None):
if not (queue._get_cl_version() >= (1, 2) and get_cl_header_version() >= (1, 2)):
from warnings import warn
warn("The context for this queue does not declare OpenCL 1.2 support, so "
"the next thing you might see is a crash")

Expand All @@ -1944,7 +1952,7 @@ def enqueue_fill_buffer(queue, mem, pattern, offset, size, wait_for=None):
def enqueue_svm_memfill(queue, dest, pattern, byte_count=None, wait_for=None):
"""Fill shared virtual memory with a pattern.

:arg dest: a Python buffer object, optionally wrapped in an :class:`SVM` object
:arg dest: a Python buffer object, or any implementation of :class:`SVMPointer`.
:arg pattern: a Python buffer object (e.g. a :class:`numpy.ndarray` with the
fill pattern to be used.
:arg byte_count: The size of the memory to be fill. Defaults to the
Expand All @@ -1955,8 +1963,8 @@ def enqueue_svm_memfill(queue, dest, pattern, byte_count=None, wait_for=None):
.. versionadded:: 2016.2
"""

if not isinstance(dest, SVM):
dest = SVM(dest)
if not isinstance(dest, SVMPointer):
dest = SVMPointer(dest)

return _cl._enqueue_svm_memfill(
queue, dest, pattern, byte_count=None, wait_for=None)
Expand All @@ -1965,7 +1973,7 @@ def enqueue_svm_memfill(queue, dest, pattern, byte_count=None, wait_for=None):
def enqueue_svm_migratemem(queue, svms, flags, wait_for=None):
"""
:arg svms: a collection of Python buffer objects (e.g. :mod:`numpy`
arrays), optionally wrapped in :class:`SVM` objects.
arrays), or any implementation of :class:`SVMPointer`.
:arg flags: a combination of :class:`mem_migration_flags`

|std-enqueue-blurb|
Expand All @@ -1983,7 +1991,7 @@ def enqueue_svm_migratemem(queue, svms, flags, wait_for=None):
wait_for)


def svm_empty(ctx, flags, shape, dtype, order="C", alignment=None):
def svm_empty(ctx, flags, shape, dtype, order="C", alignment=None, queue=None):
"""Allocate an empty :class:`numpy.ndarray` of the given *shape*, *dtype*
and *order*. (See :func:`numpy.empty` for the meaning of these arguments.)
The array will be allocated in shared virtual memory belonging
Expand All @@ -2001,6 +2009,10 @@ def svm_empty(ctx, flags, shape, dtype, order="C", alignment=None):
will likely want to wrap the returned array in an :class:`SVM` tag.

.. versionadded:: 2016.2

.. versionchanged:: 2022.2

*queue* argument added.
"""

dtype = np.dtype(dtype)
Expand Down Expand Up @@ -2047,7 +2059,9 @@ def svm_empty(ctx, flags, shape, dtype, order="C", alignment=None):
if alignment is None:
alignment = itemsize

svm_alloc = SVMAllocation(ctx, nbytes, alignment, flags, _interface=interface)
svm_alloc = _ArrayInterfaceSVMAllocation(
ctx, nbytes, alignment, flags, _interface=interface,
queue=queue)
return np.asarray(svm_alloc)


Expand Down
26 changes: 18 additions & 8 deletions pyopencl/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,9 +720,14 @@ def set(self, ary, queue=None, async_=None, **kwargs):
stacklevel=2)

if self.size:
event1 = cl.enqueue_copy(queue or self.queue, self.base_data, ary,
device_offset=self.offset,
is_blocking=not async_)
if self.offset:
event1 = cl.enqueue_copy(queue or self.queue, self.base_data, ary,
device_offset=self.offset,
is_blocking=not async_)
else:
event1 = cl.enqueue_copy(queue or self.queue, self.base_data, ary,
is_blocking=not async_)

self.add_event(event1)

def _get(self, queue=None, ary=None, async_=None, **kwargs):
Expand Down Expand Up @@ -770,9 +775,14 @@ def _get(self, queue=None, ary=None, async_=None, **kwargs):
"to associate one.")

if self.size:
event1 = cl.enqueue_copy(queue, ary, self.base_data,
device_offset=self.offset,
wait_for=self.events, is_blocking=not async_)
if self.offset:
event1 = cl.enqueue_copy(queue, ary, self.base_data,
device_offset=self.offset,
wait_for=self.events, is_blocking=not async_)
else:
event1 = cl.enqueue_copy(queue, ary, self.base_data,
wait_for=self.events, is_blocking=not async_)

self.add_event(event1)
else:
event1 = None
Expand Down Expand Up @@ -1458,8 +1468,8 @@ def _zero_fill(self, queue=None, wait_for=None):
# https://github.com/inducer/pyopencl/issues/395
if cl_version_gtr_1_2 and not (on_nvidia and self.nbytes >= 2**31):
self.add_event(
cl.enqueue_fill_buffer(queue, self.base_data, np.int8(0),
self.offset, self.nbytes, wait_for=wait_for))
cl.enqueue_fill(queue, self.base_data, np.int8(0),
self.nbytes, offset=self.offset, wait_for=wait_for))
else:
zero = np.zeros((), self.dtype)
self.fill(zero, queue=queue)
Expand Down
24 changes: 23 additions & 1 deletion pyopencl/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,31 @@ def _register_types():

from pyopencl._cl import ( # noqa
PooledBuffer as PooledBuffer,
SVMPooledAllocation as SVMPooledAllocation,
_tools_DeferredAllocator as DeferredAllocator,
_tools_ImmediateAllocator as ImmediateAllocator,
MemoryPool as MemoryPool)
_tools_SVMAllocator as SVMAllocator,
MemoryPool as MemoryPool,
SVMMemoryPool as SVMMemoryPool)

# }}}


# {{{ svm allocator

# # FIXME: Replace me with C++
# class SVMAllocator:
# def __init__(self, ctx, flags, *, alignment=0, queue=None):
# self._context = ctx
# self._flags = flags
# self._alignment = alignment
# self._queue = queue

# def __call__(self, nbytes):
# import pyopencl as cl
# return cl.SVMAllocation(
# self._context, nbytes, self._alignment, self._flags,
# queue=self._queue)

# }}}

Expand Down
Loading