Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add-concat-on-disk-examples #1161

Merged
merged 10 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion anndata/_core/anndata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1681,7 +1681,7 @@ def concatenate(
... dict(var_names=['d', 'c', 'b'], annoA=[0, 1, 2]),
... )
>>> adata3 = AnnData(
... np.array([[1, 2, 3], [4, 5, 6]]),
... np.array([[1, 2, 3], [4, 5, 6]]),
... dict(obs_names=['s1', 's2'], anno2=['d3', 'd4']),
... dict(var_names=['d', 'c', 'b'], annoA=[0, 2, 3], annoB=[0, 1, 2]),
... )
Expand Down
114 changes: 76 additions & 38 deletions anndata/experimental/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os
import shutil
from collections.abc import Collection, Iterable, Mapping, MutableMapping, Sequence
from collections.abc import Collection, Iterable, Mapping, Sequence
from functools import singledispatch
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -105,7 +105,9 @@ def as_group(store, *args, **kwargs) -> ZarrGroup | H5Group:


@as_group.register(os.PathLike)
def _(store: os.PathLike, *args, **kwargs) -> ZarrGroup | H5Group:
@as_group.register(str)
def _(store: os.PathLike | str, *args, **kwargs) -> ZarrGroup | H5Group:
store = Path(store)
if store.suffix == ".h5ad":
import h5py

Expand All @@ -115,11 +117,6 @@ def _(store: os.PathLike, *args, **kwargs) -> ZarrGroup | H5Group:
return zarr.open_group(store, *args, **kwargs)


@as_group.register(str)
def _(store: str, *args, **kwargs) -> ZarrGroup | H5Group:
return as_group(Path(store), *args, **kwargs)


@as_group.register(ZarrGroup)
@as_group.register(H5Group)
def _(store, *args, **kwargs):
Expand Down Expand Up @@ -395,33 +392,33 @@ def _write_dim_annot(groups, output_group, dim, concat_indices, label, label_col


def concat_on_disk(
in_files: Collection[str | os.PathLike] | MutableMapping[str, str | os.PathLike],
in_files: Collection[str | os.PathLike] | Mapping[str, str | os.PathLike],
out_file: str | os.PathLike,
*,
overwrite: bool = False,
max_loaded_elems: int = 100_000_000,
axis: Literal[0, 1] = 0,
join: Literal["inner", "outer"] = "inner",
merge: StrategiesLiteral | Callable[[Collection[Mapping]], Mapping] | None = None,
uns_merge: StrategiesLiteral
| Callable[[Collection[Mapping]], Mapping]
| None = None,
uns_merge: (
StrategiesLiteral | Callable[[Collection[Mapping]], Mapping] | None
) = None,
label: str | None = None,
keys: Collection[str] | None = None,
index_unique: str | None = None,
fill_value: Any | None = None,
pairwise: bool = False,
) -> None:
"""Concatenates multiple AnnData objects along a specified axis using their
"""\
Concatenates multiple AnnData objects along a specified axis using their
corresponding stores or paths, and writes the resulting AnnData object
to a target location on disk.

Unlike the `concat` function, this method does not require
Unlike :func:`anndata.concat`, this method does not require
loading the input AnnData objects into memory,
making it a memory-efficient alternative for large datasets.
The resulting object written to disk should be equivalent
to the concatenation of the loaded AnnData objects using
the `concat` function.
:func:`anndata.concat`.

To adjust the maximum amount of data loaded in memory; for sparse
arrays use the max_loaded_elems argument; for dense arrays
Expand All @@ -436,19 +433,16 @@ def concat_on_disk(
argument and values are concatenated.
out_file
The target path or store to write the result in.
overwrite
If `False` while a file already exists it will raise an error,
otherwise it will overwrite.
max_loaded_elems
The maximum number of elements to load in memory when concatenating
sparse arrays. Note that this number also includes the empty entries.
Set to 100m by default meaning roughly 400mb will be loaded
to memory at simultaneously.
to memory simultaneously.
axis
Which axis to concatenate along.
join
How to align values when concatenating. If "outer", the union of the other axis
is taken. If "inner", the intersection. See :doc:`concatenation <../concatenation>`
How to align values when concatenating. If `"outer"`, the union of the other axis
is taken. If `"inner"`, the intersection. See :doc:`concatenation <../concatenation>`
for more.
merge
How elements not aligned to the axis being concatenated along are selected.
Expand All @@ -471,7 +465,7 @@ def concat_on_disk(
incrementing integer labels.
index_unique
Whether to make the index unique by using the keys. If provided, this
is the delimiter between "{orig_idx}{index_unique}{key}". When `None`,
is the delimiter between `"{orig_idx}{index_unique}{key}"`. When `None`,
the original indices are kept.
fill_value
When `join="outer"`, this is the value that will be used to fill the introduced
Expand All @@ -483,13 +477,58 @@ def concat_on_disk(

Notes
-----

.. warning::

If you use `join='outer'` this fills 0s for sparse data when
variables are absent in a batch. Use this with care. Dense data is
filled with `NaN`.
If you use `join='outer'` this fills 0s for sparse data when
variables are absent in a batch. Use this with care. Dense data is
filled with `NaN`.

Examples
--------

See :func:`anndata.concat` for the semantics.
The following examples highlight the differences this function has.

First, let’s get some “big” datasets with a compatible ``var`` axis:

>>> import httpx
>>> import scanpy as sc
>>> api_url = "https://api.cellxgene.cziscience.com/curation/v1"
>>> def get_cellxgene_data(id_: str):
... out_path = sc.settings.datasetdir / f'{id_}.h5ad'
... if out_path.exists():
... return out_path
... ds_versions = httpx.get(f'{api_url}/datasets/{id_}/versions').raise_for_status().json()
... ds = ds_versions[0] # newest
... file_url = next(a['url'] for a in ds['assets'] if a['filetype'] == 'H5AD')
... sc.settings.datasetdir.mkdir(parents=True, exist_ok=True)
... with httpx.stream('GET', file_url) as r, out_path.open('wb') as f:
... r.raise_for_status()
... for data in r.iter_bytes():
... f.write(data)
... return out_path
>>> path_b_cells = get_cellxgene_data('0895c838-e550-48a3-a777-dbcd35d30272')
>>> path_fetal = get_cellxgene_data('08e94873-c2a6-4f7d-ab72-aeaff3e3f929')

Now we can concatenate them on-disk:

>>> import anndata as ad
>>> ad.experimental.concat_on_disk(
... dict(b_cells=path_b_cells, fetal=path_fetal),
... 'merged.h5ad',
... label='dataset',
... )
>>> adata = ad.read_h5ad('merged.h5ad', backed=True)
>>> adata.X
CSRDataset: backend hdf5, shape (490, 15585), data_dtype float32
>>> adata.obs['dataset'].value_counts()
dataset
fetal 344
b_cells 146
Name: count, dtype: int64
"""
if len(in_files) == 0:
raise ValueError("No objects to concatenate.")

# Argument normalization
if pairwise:
raise NotImplementedError("pairwise concatenation not yet implemented")
Expand All @@ -498,14 +537,11 @@ def concat_on_disk(

merge = resolve_merge_strategy(merge)
uns_merge = resolve_merge_strategy(uns_merge)
if len(in_files) <= 1:
if len(in_files) == 1:
if not overwrite and Path(out_file).is_file():
raise FileExistsError(
f"File “{out_file}” already exists and `overwrite` is set to False"
)
shutil.copy2(in_files[0], out_file)
return

out_file = Path(out_file)
if not out_file.parent.exists():
raise FileNotFoundError(f"Parent directory of {out_file} does not exist.")

if isinstance(in_files, Mapping):
if keys is not None:
raise TypeError(
Expand All @@ -516,15 +552,17 @@ def concat_on_disk(
else:
in_files = list(in_files)

if len(in_files) == 1:
shutil.copy2(in_files[0], out_file)
return

if keys is None:
keys = np.arange(len(in_files)).astype(str)

_, dim = _resolve_dim(axis=axis)
_, alt_dim = _resolve_dim(axis=1 - axis)

mode = "w" if overwrite else "w-"

output_group = as_group(out_file, mode=mode)
output_group = as_group(out_file, mode="w")
groups = [as_group(f) for f in in_files]

use_reindexing = False
Expand Down
15 changes: 15 additions & 0 deletions anndata/tests/test_concatenate_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,18 @@ def gen_index(n):

def test_concatenate_obsm_inner(obsm_adatas, tmp_path, file_format):
assert_eq_concat_on_disk(obsm_adatas, tmp_path, file_format, join="inner")


def test_output_dir_exists(tmp_path):
in_pth = tmp_path / "in.h5ad"
out_pth = tmp_path / "does_not_exist" / "out.h5ad"

AnnData(X=np.ones((5, 1))).write_h5ad(in_pth)

with pytest.raises(FileNotFoundError, match=f"{out_pth}"):
concat_on_disk([in_pth], out_pth)


def test_failure_w_no_args(tmp_path):
with pytest.raises(ValueError, match="No objects to concatenate"):
concat_on_disk([], tmp_path / "out.h5ad")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ test = [
"joblib",
"boltons",
"scanpy",
"httpx", # For data downloading
"dask[array,distributed]",
"awkward>=2.3",
"pytest_memray",
Expand Down
Loading