Skip to content

Commit

Permalink
(feat): allow writing from cupy inside dask arrays (#1550)
Browse files Browse the repository at this point in the history
* (feat): allow gpu writing

* (fix): map to dupy first

* (fix): `IORegistry.get_spec` for `dask`

* (fix): typing of `cupy_dense_dask_array`

* (fix): clean up `typ` in `get_spec`

* (fix): oops

* (fix): don't override `typ`

* (fix): allow `type(elem)` for uninsteresting dense arrays

* (Fix): memory class

* (fix): oops!

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* (fix): add token to gpu codecov job

* (fix): remove flag

* (fix): try wthout workers

* (fix): add helper tests

* (fix): remove unnecessary `dtype`

* (feat): try re-enabling parallel tests

* (fix): no more parallel, actually, for `test-gpu.yml`

* (chore): add release note

* (fix): add dask intersphinx link

* (refactor): use helper functions directy + clean up `test_io_spec_cupy`

* (fix): `csc ` and `csr` in helper

* (fix): add coverage for `as_cupy` for `dask`

* (fix): skip on gpu

* (fix): no `int`

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
ilan-gold and pre-commit-ci[bot] authored Jul 12, 2024
1 parent 4c70aa8 commit 9918044
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 47 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/test-gpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ jobs:
pip list
- name: Run test
run: pytest -m gpu --cov --cov-report=xml --cov-context=test -n 4
run: pytest -m gpu --cov --cov-report=xml --cov-context=test

- uses: codecov/codecov-action@v3
with:
flags: gpu-tests
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
verbose: true
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def setup(app: Sphinx):
sklearn=("https://scikit-learn.org/stable/", None),
zarr=("https://zarr.readthedocs.io/en/stable/", None),
xarray=("https://xarray.pydata.org/en/stable/", None),
dask=("https://docs.dask.org/en/stable/", None),
)
qualname_overrides = {
"h5py._hl.group.Group": "h5py.Group",
Expand Down
1 change: 1 addition & 0 deletions docs/release-notes/0.11.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Add `should_remove_unused_categories` option to `anndata.settings` to override current behavior. Default is `True` (i.e., previous behavior). Please refer to the [documentation](https://anndata.readthedocs.io/en/latest/generated/anndata.settings.html) for usage. {pr}`1340` {user}`ilan-gold`
* `scipy.sparse.csr_array` and `scipy.sparse.csc_array` are now supported when constructing `AnnData` objects {pr}`1028` {user}`ilan-gold` {user}`isaac-virshup`
* Add `should_check_uniqueness` option to `anndata.settings` to override current behavior. Default is `True` (i.e., previous behavior). Please refer to the [documentation](https://anndata.readthedocs.io/en/latest/generated/anndata.settings.html) for usage. {pr}`1507` {user}`ilan-gold`
* Add functionality to write from GPU {class}`dask.array.Array` to disk {pr}`1550` {user}`ilan-gold`

#### Bugfix

Expand Down
23 changes: 23 additions & 0 deletions src/anndata/_io/specs/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,29 @@ def write_sparse_dataset(
f[k].attrs["encoding-version"] = "0.1.0"


@_REGISTRY.register_write(H5Group, (DaskArray, CupyArray), IOSpec("array", "0.2.0"))
@_REGISTRY.register_write(ZarrGroup, (DaskArray, CupyArray), IOSpec("array", "0.2.0"))
@_REGISTRY.register_write(
H5Group, (DaskArray, CupyCSRMatrix), IOSpec("csr_matrix", "0.1.0")
)
@_REGISTRY.register_write(
H5Group, (DaskArray, CupyCSCMatrix), IOSpec("csc_matrix", "0.1.0")
)
@_REGISTRY.register_write(
ZarrGroup, (DaskArray, CupyCSRMatrix), IOSpec("csr_matrix", "0.1.0")
)
@_REGISTRY.register_write(
ZarrGroup, (DaskArray, CupyCSCMatrix), IOSpec("csc_matrix", "0.1.0")
)
def write_cupy_dask_sparse(f, k, elem, _writer, dataset_kwargs=MappingProxyType({})):
_writer.write_elem(
f,
k,
elem.map_blocks(lambda x: x.get(), dtype=elem.dtype, meta=elem._meta.get()),
dataset_kwargs=dataset_kwargs,
)


@_REGISTRY.register_write(
H5Group, (DaskArray, sparse.csr_matrix), IOSpec("csr_matrix", "0.1.0")
)
Expand Down
14 changes: 8 additions & 6 deletions src/anndata/_io/specs/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import TYPE_CHECKING

from anndata._io.utils import report_read_key_on_error, report_write_key_on_error
from anndata.compat import _read_attr
from anndata.compat import DaskArray, _read_attr

if TYPE_CHECKING:
from collections.abc import Callable, Generator, Iterable
Expand Down Expand Up @@ -85,7 +85,7 @@ def __init__(self):
self.write: dict[
tuple[type, type | tuple[type, str], frozenset[str]], _WriteInternal
] = {}
self.write_specs: dict[type | tuple[type, str], IOSpec] = {}
self.write_specs: dict[type | tuple[type, str] | tuple[type, type], IOSpec] = {}

def register_write(
self,
Expand Down Expand Up @@ -201,10 +201,12 @@ def get_partial_read(
)

def get_spec(self, elem: Any) -> IOSpec:
if hasattr(elem, "dtype"):
typ = (type(elem), elem.dtype.kind)
if typ in self.write_specs:
return self.write_specs[typ]
if isinstance(elem, DaskArray):
if (typ_meta := (DaskArray, type(elem._meta))) in self.write_specs:
return self.write_specs[typ_meta]
elif hasattr(elem, "dtype"):
if (typ_kind := (type(elem), elem.dtype.kind)) in self.write_specs:
return self.write_specs[typ_kind]
return self.write_specs[type(elem)]


Expand Down
9 changes: 9 additions & 0 deletions src/anndata/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,15 @@ def __repr__():
from cupyx.scipy.sparse import (
spmatrix as CupySparseMatrix,
)

try:
import dask.array as da

da.register_chunk_type(CupyCSRMatrix)
da.register_chunk_type(CupyCSCMatrix)
except ImportError:
pass

except ImportError:

class CupySparseMatrix:
Expand Down
149 changes: 127 additions & 22 deletions src/anndata/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,24 +693,30 @@ def fmt_name(x):
)


def _half_chunk_size(a: tuple[int, ...]) -> tuple[int, ...]:
def half_rounded_up(x):
div, mod = divmod(x, 2)
return div + (mod > 0)

return tuple(half_rounded_up(x) for x in a)


@singledispatch
def as_dense_dask_array(a):
import dask.array as da

return da.asarray(a)
a = asarray(a)
return da.asarray(a, chunks=_half_chunk_size(a.shape))


@as_dense_dask_array.register(sparse.spmatrix)
def _(a):
return as_dense_dask_array(a.toarray())


def _half_chunk_size(a: tuple[int, ...]) -> tuple[int, ...]:
def half_rounded_up(x):
div, mod = divmod(x, 2)
return div + (mod > 0)

return tuple(half_rounded_up(x) for x in a)
@as_dense_dask_array.register(DaskArray)
def _(a):
return a.map_blocks(asarray, dtype=a.dtype, meta=np.ndarray)


@singledispatch
Expand Down Expand Up @@ -739,6 +745,78 @@ def _(a):
return a.map_blocks(sparse.csr_matrix)


@singledispatch
def as_dense_cupy_dask_array(a):
import cupy as cp

return as_dense_dask_array(a).map_blocks(
cp.array, meta=cp.array((1.0), dtype=a.dtype), dtype=a.dtype
)


@as_dense_cupy_dask_array.register(CupyArray)
def _(a):
import cupy as cp
import dask.array as da

return da.from_array(
a,
chunks=_half_chunk_size(a.shape),
meta=cp.array((1.0), dtype=a.dtype),
)


@as_dense_cupy_dask_array.register(DaskArray)
def _(a):
import cupy as cp

if isinstance(a._meta, cp.ndarray):
return a.copy()
return a.map_blocks(
partial(as_cupy, typ=CupyArray),
dtype=a.dtype,
meta=cp.array((1.0), dtype=a.dtype),
)


try:
import cupyx.scipy.sparse as cpsparse

format_to_memory_class = {"csr": cpsparse.csr_matrix, "csc": cpsparse.csc_matrix}
except ImportError:
format_to_memory_class = {}


# TODO: If there are chunks which divide along columns, then a coo_matrix is returned by compute
# We should try and fix this upstream in dask/ cupy
@singledispatch
def as_cupy_sparse_dask_array(a, format="csr"):
memory_class = format_to_memory_class[format]
cpu_da = as_sparse_dask_array(a)
return cpu_da.rechunk((cpu_da.chunks[0], -1)).map_blocks(
memory_class, dtype=a.dtype, meta=memory_class(cpu_da._meta)
)


@as_cupy_sparse_dask_array.register(CupyArray)
@as_cupy_sparse_dask_array.register(CupySparseMatrix)
def _(a, format="csr"):
import dask.array as da

memory_class = format_to_memory_class[format]
return da.from_array(memory_class(a), chunks=(_half_chunk_size(a.shape)[0], -1))


@as_cupy_sparse_dask_array.register(DaskArray)
def _(a, format="csr"):
memory_class = format_to_memory_class[format]
if isinstance(a._meta, memory_class):
return a.copy()
return a.rechunk((a.chunks[0], -1)).map_blocks(
partial(as_cupy, typ=memory_class), dtype=a.dtype
)


@contextmanager
def pytest_8_raises(exc_cls, *, match: str | re.Pattern = None):
"""Error handling using pytest 8's support for __notes__.
Expand Down Expand Up @@ -768,24 +846,32 @@ def check_error_or_notes_match(e: pytest.ExceptionInfo, pattern: str | re.Patter
), f"Could not find pattern: '{pattern}' in error:\n\n{message}\n"


def as_cupy_type(val, typ=None):
def resolve_cupy_type(val):
if not isinstance(val, type):
input_typ = type(val)
else:
input_typ = val

if issubclass(input_typ, np.ndarray):
typ = CupyArray
elif issubclass(input_typ, sparse.csr_matrix):
typ = CupyCSRMatrix
elif issubclass(input_typ, sparse.csc_matrix):
typ = CupyCSCMatrix
else:
raise NotImplementedError(f"No default target type for input type {input_typ}")
return typ


@singledispatch
def as_cupy(val, typ=None):
"""
Rough conversion function
Will try to infer target type from input type if not specified.
"""
if typ is None:
input_typ = type(val)
if issubclass(input_typ, np.ndarray):
typ = CupyArray
elif issubclass(input_typ, sparse.csr_matrix):
typ = CupyCSRMatrix
elif issubclass(input_typ, sparse.csc_matrix):
typ = CupyCSCMatrix
else:
raise NotImplementedError(
f"No default target type for input type {input_typ}"
)
typ = resolve_cupy_type(val)

if issubclass(typ, CupyArray):
import cupy as cp
Expand Down Expand Up @@ -815,6 +901,14 @@ def as_cupy_type(val, typ=None):
)


# TODO: test
@as_cupy.register(DaskArray)
def as_cupy_dask(a, typ=None):
if typ is None:
typ = resolve_cupy_type(a._meta)
return a.map_blocks(partial(as_cupy, typ=typ), dtype=a.dtype)


@singledispatch
def shares_memory(x, y) -> bool:
return np.shares_memory(x, y)
Expand Down Expand Up @@ -844,20 +938,31 @@ def shares_memory_sparse(x, y):

CUPY_MATRIX_PARAMS = [
pytest.param(
partial(as_cupy_type, typ=CupyArray), id="cupy_array", marks=pytest.mark.gpu
partial(as_cupy, typ=CupyArray), id="cupy_array", marks=pytest.mark.gpu
),
pytest.param(
partial(as_cupy_type, typ=CupyCSRMatrix),
partial(as_cupy, typ=CupyCSRMatrix),
id="cupy_csr",
marks=pytest.mark.gpu,
),
pytest.param(
partial(as_cupy_type, typ=CupyCSCMatrix),
partial(as_cupy, typ=CupyCSCMatrix),
id="cupy_csc",
marks=pytest.mark.gpu,
),
]

DASK_CUPY_MATRIX_PARAMS = [
pytest.param(
as_dense_cupy_dask_array,
id="cupy_dense_dask_array",
marks=pytest.mark.gpu,
),
pytest.param(
as_cupy_sparse_dask_array, id="cupy_csr_dask_array", marks=pytest.mark.gpu
),
]

try:
import zarr

Expand Down
31 changes: 23 additions & 8 deletions tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@

from __future__ import annotations

import numpy as np
import pandas as pd
import pytest
from scipy import sparse

import anndata as ad
from anndata._core.anndata import AnnData
from anndata.compat import DaskArray
from anndata.compat import CupyArray, DaskArray
from anndata.experimental import read_elem, write_elem
from anndata.experimental.merge import as_group
from anndata.tests.helpers import (
GEN_ADATA_DASK_ARGS,
as_dense_cupy_dask_array,
as_dense_dask_array,
as_sparse_dask_array,
assert_equal,
gen_adata,
)
Expand Down Expand Up @@ -262,11 +266,22 @@ def test_assign_X(adata):


# Test if dask arrays turn into numpy arrays after to_memory is called
def test_dask_to_memory_unbacked():
import numpy as np

orig = gen_adata((15, 10), X_type=as_dense_dask_array, **GEN_ADATA_DASK_ARGS)
orig.uns = {"da": {"da": as_dense_dask_array(np.ones(12))}}
@pytest.mark.parametrize(
("array_func", "mem_type"),
[
pytest.param(as_dense_dask_array, np.ndarray, id="dense_dask_array"),
pytest.param(as_sparse_dask_array, sparse.csr_matrix, id="sparse_dask_array"),
pytest.param(
as_dense_cupy_dask_array,
CupyArray,
id="cupy_dense_dask_array",
marks=pytest.mark.gpu,
),
],
)
def test_dask_to_memory_unbacked(array_func, mem_type):
orig = gen_adata((15, 10), X_type=array_func, **GEN_ADATA_DASK_ARGS)
orig.uns = {"da": {"da": array_func(np.ones((4, 12)))}}

assert isinstance(orig.X, DaskArray)
assert isinstance(orig.obsm["da"], DaskArray)
Expand All @@ -277,11 +292,11 @@ def test_dask_to_memory_unbacked():
curr = orig.to_memory()

assert_equal(orig, curr)
assert isinstance(curr.X, np.ndarray)
assert isinstance(curr.X, mem_type)
assert isinstance(curr.obsm["da"], np.ndarray)
assert isinstance(curr.varm["da"], np.ndarray)
assert isinstance(curr.layers["da"], np.ndarray)
assert isinstance(curr.uns["da"]["da"], np.ndarray)
assert isinstance(curr.uns["da"]["da"], mem_type)
assert isinstance(orig.X, DaskArray)
assert isinstance(orig.obsm["da"], DaskArray)
assert isinstance(orig.layers["da"], DaskArray)
Expand Down
Loading

0 comments on commit 9918044

Please sign in to comment.