Skip to content

Commit

Permalink
add _get_queue_for_pickling, outline some pool support
Browse files Browse the repository at this point in the history
  • Loading branch information
matthiasdiener committed Oct 2, 2024
1 parent 60e71ee commit 0ec19de
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 43 deletions.
117 changes: 79 additions & 38 deletions pyopencl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
MemoryObject,
MemoryMap,
Buffer,
PooledBuffer,

_Program,
Kernel,
Expand Down Expand Up @@ -194,7 +195,7 @@
enqueue_migrate_mem_objects, unload_platform_compiler)

if get_cl_header_version() >= (2, 0):
from pyopencl._cl import SVM, SVMAllocation, SVMPointer
from pyopencl._cl import SVM, SVMAllocation, SVMPointer, PooledSVM

if _cl.have_gl():
from pyopencl._cl import ( # noqa: F401
Expand Down Expand Up @@ -2436,21 +2437,28 @@ def queue_for_pickling(queue, alloc=None):
_QUEUE_FOR_PICKLING_TLS.alloc = None


def _getstate_buffer(self):
import pyopencl as cl
state = {}
state["size"] = self.size
state["flags"] = self.flags

def _get_queue_for_pickling(obj):
try:
queue = _QUEUE_FOR_PICKLING_TLS.queue
alloc = _QUEUE_FOR_PICKLING_TLS.alloc
except AttributeError:
queue = None

if queue is None:
raise RuntimeError("CL Buffer instances can only be pickled while "
raise RuntimeError(f"{type(obj).__name__} instances can only be pickled while "
"queue_for_pickling is active.")

return queue, alloc


def _getstate_buffer(self):
import pyopencl as cl
queue, _alloc = _get_queue_for_pickling(self)

state = {}
state["size"] = self.size
state["flags"] = self.flags

a = bytearray(self.size)
cl.enqueue_copy(queue, a, self)

Expand All @@ -2460,42 +2468,57 @@ def _getstate_buffer(self):


def _setstate_buffer(self, state):
try:
queue = _QUEUE_FOR_PICKLING_TLS.queue
except AttributeError:
queue = None

if queue is None:
raise RuntimeError("CL Buffer instances can only be unpickled while "
"queue_for_pickling is active.")
import pyopencl as cl
queue, _alloc = _get_queue_for_pickling(self)

size = state["size"]
flags = state["flags"]

import pyopencl as cl

a = state["_pickle_data"]
Buffer.__init__(self, queue.context, flags | cl.mem_flags.COPY_HOST_PTR, size, a)


Buffer.__getstate__ = _getstate_buffer
Buffer.__setstate__ = _setstate_buffer


def _getstate_pooledbuffer(self):
import pyopencl as cl
queue, _alloc = _get_queue_for_pickling(self)

state = {}
state["size"] = self.size
state["flags"] = self.flags

a = bytearray(self.size)
cl.enqueue_copy(queue, a, self)
state["_pickle_data"] = a

return state


def _setstate_pooledbuffer(self, state):
_queue, _alloc = _get_queue_for_pickling(self)

_size = state["size"]
_flags = state["flags"]

_a = state["_pickle_data"]
# FIXME: Unclear what to do here - PooledBuffer does not have __init__


PooledBuffer.__getstate__ = _getstate_pooledbuffer
PooledBuffer.__setstate__ = _setstate_pooledbuffer


if get_cl_header_version() >= (2, 0):
def _getstate_svm(self):
def _getstate_svmallocation(self):
import pyopencl as cl

state = {}
state["size"] = self.size

try:
queue = _QUEUE_FOR_PICKLING_TLS.queue
except AttributeError:
queue = None

if queue is None:
raise RuntimeError(f"{self.__class__.__name__} instances can only be "
"pickled while queue_for_pickling is active.")
queue, _alloc = _get_queue_for_pickling(self)

a = bytearray(self.size)
cl.enqueue_copy(queue, a, self)
Expand All @@ -2504,17 +2527,10 @@ def _getstate_svm(self):

return state

def _setstate_svm(self, state):
def _setstate_svmallocation(self, state):
import pyopencl as cl

try:
queue = _QUEUE_FOR_PICKLING_TLS.queue
except AttributeError:
queue = None

if queue is None:
raise RuntimeError(f"{self.__class__.__name__} instances can only be "
"unpickled while queue_for_pickling is active.")
queue, _alloc = _get_queue_for_pickling(self)

size = state["size"]

Expand All @@ -2523,8 +2539,33 @@ def _setstate_svm(self, state):
queue=queue)
cl.enqueue_copy(queue, self, a)

SVMAllocation.__getstate__ = _getstate_svm
SVMAllocation.__setstate__ = _setstate_svm
SVMAllocation.__getstate__ = _getstate_svmallocation
SVMAllocation.__setstate__ = _setstate_svmallocation

def _getstate_pooled_svm(self):
import pyopencl as cl

state = {}
state["size"] = self.size

queue, _alloc = _get_queue_for_pickling(self)

a = bytearray(self.size)
cl.enqueue_copy(queue, a, self)

state["_pickle_data"] = a

return state

def _setstate_pooled_svm(self, state):
_queue, _alloc = _get_queue_for_pickling(self)
_size = state["size"]
_data = state["_pickle_data"]

# FIXME: Unclear what to do here - PooledSVM does not have __init__

PooledSVM.__getstate__ = _getstate_pooled_svm
PooledSVM.__setstate__ = _setstate_pooled_svm

# }}}

Expand Down
16 changes: 11 additions & 5 deletions test/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -2404,12 +2404,18 @@ def __init__(self, cq, shape, dtype, tags):
self.tags = tags


def test_array_pickling(ctx_factory):
@pytest.mark.parametrize("use_mempool", [False, True])
def test_array_pickling(ctx_factory, use_mempool):
context = ctx_factory()
queue = cl.CommandQueue(context)

if use_mempool:
alloc = cl_tools.MemoryPool(cl_tools.ImmediateAllocator(queue))
else:
alloc = None

a = np.array([1, 2, 3, 4, 5]).astype(np.float32)
a_gpu = cl_array.to_device(queue, a)
a_gpu = cl_array.to_device(queue, a, allocator=alloc)

import pickle
with pytest.raises(RuntimeError):
Expand Down Expand Up @@ -2437,11 +2443,11 @@ def test_array_pickling(ctx_factory):
from pyopencl.characterize import has_coarse_grain_buffer_svm

if has_coarse_grain_buffer_svm(queue.device):
from pyopencl.tools import SVMAllocator
from pyopencl.tools import SVMAllocator, SVMPool

alloc = SVMAllocator(context, alignment=0, queue=queue)
# FIXME: SVMPool is not picklable
# alloc = SVMPool(alloc)
if use_mempool:
alloc = SVMPool(alloc)

a_dev = cl_array.to_device(queue, a, allocator=alloc)

Expand Down

0 comments on commit 0ec19de

Please sign in to comment.