diff --git a/requirements.txt b/requirements.txt index 28ead88..64385fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,6 @@ numpy scipy xarray zarr -bgen_reader>=4.0.5 +bgen_reader==4.0.5 +git+https://github.com/pangeo-data/rechunker git+https://github.com/pystatgen/sgkit diff --git a/setup.cfg b/setup.cfg index 35ae76e..86641e9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -54,7 +54,7 @@ ignore = [isort] default_section = THIRDPARTY known_first_party = sgkit -known_third_party = bgen_reader,dask,numpy,pytest,setuptools,xarray +known_third_party = bgen_reader,dask,numpy,pytest,rechunker,setuptools,xarray,zarr multi_line_output = 3 include_trailing_comma = True force_grid_wrap = 0 @@ -67,9 +67,16 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-setuptools.*] ignore_missing_imports = True +[mypy-rechunker.*] +ignore_missing_imports = True [mypy-bgen_reader.*] ignore_missing_imports = True [mypy-sgkit.*] ignore_missing_imports = True +[mypy-zarr.*] +ignore_missing_imports = True [mypy-sgkit_bgen.tests.*] disallow_untyped_defs = False +[mypy-sgkit_bgen.*] +allow_redefinition = True +warn_unused_ignores = False diff --git a/sgkit_bgen/__init__.py b/sgkit_bgen/__init__.py index aa9465b..45f9f44 100644 --- a/sgkit_bgen/__init__.py +++ b/sgkit_bgen/__init__.py @@ -1,3 +1,3 @@ -from .bgen_reader import read_bgen # noqa: F401 +from .bgen_reader import bgen_to_zarr, read_bgen, rechunk_bgen # noqa: F401 -__all__ = ["read_bgen"] +__all__ = ["read_bgen", "rechunk_bgen", "bgen_to_zarr"] diff --git a/sgkit_bgen/bgen_reader.py b/sgkit_bgen/bgen_reader.py index f90babb..20566a0 100644 --- a/sgkit_bgen/bgen_reader.py +++ b/sgkit_bgen/bgen_reader.py @@ -1,10 +1,13 @@ """BGEN reader implementation (using bgen_reader)""" +import tempfile from pathlib import Path -from typing import Any, Dict, Tuple, Union +from typing import Any, Dict, Hashable, Mapping, MutableMapping, Optional, Tuple, Union import dask.array as da import dask.dataframe as dd import numpy as np +import xarray as xr +import zarr from bgen_reader._bgen_file import bgen_file from bgen_reader._bgen_metafile import bgen_metafile from bgen_reader._metafile import create_metafile @@ -13,9 +16,11 @@ from xarray import Dataset from sgkit import create_genotype_dosage_dataset -from sgkit.typing import ArrayLike +from sgkit.typing import ArrayLike, DType from sgkit.utils import encode_array +from .rechunker_api import rechunk_dataset # type: ignore[attr-defined] + PathType = Union[str, Path] @@ -38,6 +43,13 @@ def _to_dict(df: dd.DataFrame, dtype: Any = None) -> Dict[str, da.Array]: VARIANT_DF_DTYPE = dict([(f[0], f[1]) for f in VARIANT_FIELDS]) VARIANT_ARRAY_DTYPE = dict([(f[0], f[2]) for f in VARIANT_FIELDS]) +GT_DATA_VARS = [ + "call_genotype_probability", + "call_genotype_probability_mask", + "call_dosage", + "call_dosage_mask", +] + class BgenReader: @@ -79,15 +91,7 @@ def split(allele_row: np.ndarray) -> np.ndarray: return np.apply_along_axis(split, 1, alleles[:, np.newaxis]) - variant_alleles = variant_arrs["allele_ids"].map_blocks(split_alleles) - - def max_str_len(arr: ArrayLike) -> Any: - return arr.map_blocks( - lambda s: np.char.str_len(s.astype(str)), dtype=np.int8 - ).max() - - max_allele_length = max(max_str_len(variant_alleles).compute()) - self.variant_alleles = variant_alleles.astype(f"S{max_allele_length}") + self.variant_alleles = variant_arrs["allele_ids"].map_blocks(split_alleles) with bgen_file(self.path) as bgen: sample_path = self.path.with_suffix(".sample") @@ -172,6 +176,7 @@ def read_bgen( chunks: Union[str, int, Tuple[int, ...]] = "auto", lock: bool = False, persist: bool = True, + dtype: Any = "float32", ) -> Dataset: """Read BGEN dataset. @@ -194,23 +199,23 @@ def read_bgen( memory, by default True. This is an important performance consideration as the metadata file for this data will be read multiple times when False. + dtype : Any + Genotype probability array data type, by default float32. Warnings -------- Only bi-allelic, diploid BGEN files are currently supported. """ - bgen_reader = BgenReader(path, persist) + bgen_reader = BgenReader(path, persist, dtype=dtype) variant_contig, variant_contig_names = encode_array(bgen_reader.contig.compute()) variant_contig_names = list(variant_contig_names) variant_contig = variant_contig.astype("int16") - - variant_position = np.array(bgen_reader.pos, dtype=int) - variant_alleles = np.array(bgen_reader.variant_alleles, dtype="S1") - variant_id = np.array(bgen_reader.variant_id, dtype=str) - - sample_id = np.array(bgen_reader.sample_id, dtype=str) + variant_position = np.asarray(bgen_reader.pos, dtype=int) + variant_alleles = np.asarray(bgen_reader.variant_alleles, dtype="S") + variant_id = np.asarray(bgen_reader.variant_id, dtype=str) + sample_id = np.asarray(bgen_reader.sample_id, dtype=str) call_genotype_probability = da.from_array( bgen_reader, @@ -234,3 +239,271 @@ def read_bgen( ) return ds + + +def encode_variables( + ds: Dataset, + chunk_length: int, + chunk_width: int, + compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), + probability_dtype: Optional[Any] = "uint8", +) -> Dict[Hashable, Dict[str, Any]]: + encoding = {} + for v in ds: + e = {} + if compressor is not None: + e.update({"compressor": compressor}) + if v in GT_DATA_VARS: + e.update({"chunks": (chunk_length, chunk_width) + ds[v].shape[2:]}) + if probability_dtype is not None and v == "call_genotype_probability": + dtype = np.dtype(probability_dtype) + # Xarray will decode into float32 so any int greater than + # 16 bits will cause overflow/underflow + # See https://en.wikipedia.org/wiki/Floating-point_arithmetic#Internal_representation + # *bits precision column for single precision floats + if dtype not in [np.uint8, np.uint16]: + raise ValueError( + "Probability integer dtype invalid, must " + f"be uint8 or uint16 not {probability_dtype}" + ) + divisor = np.iinfo(dtype).max - 1 + e.update( + { + "dtype": probability_dtype, + "add_offset": -1.0 / divisor, + "scale_factor": 1.0 / divisor, + "_FillValue": 0, + } + ) + if e: + encoding[v] = e + return encoding + + +def pack_variables(ds: Dataset) -> Dataset: + # Remove dosage as it is unnecessary and should be redefined + # based on encoded probabilities later (w/ reduced precision) + ds = ds.drop_vars(["call_dosage", "call_dosage_mask"], errors="ignore") + + # Remove homozygous reference GP and redefine mask + gp = ds["call_genotype_probability"][..., 1:] + gp_mask = ds["call_genotype_probability_mask"].any(dim="genotypes") + ds = ds.drop_vars(["call_genotype_probability", "call_genotype_probability_mask"]) + ds = ds.assign(call_genotype_probability=gp, call_genotype_probability_mask=gp_mask) + return ds + + +def unpack_variables(ds: Dataset, dtype: DType = "float32") -> Dataset: + # Restore homozygous reference GP + gp = ds["call_genotype_probability"].astype(dtype) # type: ignore[no-untyped-call] + if gp.sizes["genotypes"] != 2: + raise ValueError( + "Expecting variable 'call_genotype_probability' to have genotypes " + f"dimension of size 2 (received sizes = {dict(gp.sizes)})" + ) + ds = ds.drop_vars("call_genotype_probability") + ds["call_genotype_probability"] = xr.concat( + [1 - gp.sum(dim="genotypes", skipna=False), gp], dim="genotypes" + ) + + # Restore dosage + ds["call_dosage"] = gp[..., 0] + 2 * gp[..., 1] + ds["call_dosage_mask"] = ds["call_genotype_probability_mask"] + ds["call_genotype_probability_mask"] = ds[ + "call_genotype_probability_mask" + ].broadcast_like(ds["call_genotype_probability"]) + return ds + + +def rechunk_bgen( + ds: Dataset, + output: Union[PathType, MutableMapping[str, bytes]], + *, + chunk_length: int = 10_000, + chunk_width: int = 1_000, + compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), + probability_dtype: Optional[DType] = "uint8", + max_mem: str = "4GB", + pack: bool = True, + tempdir: Optional[PathType] = None, +) -> Dataset: + """Rechunk BGEN dataset as Zarr. + + This function will use the algorithm https://rechunker.readthedocs.io/en/latest/ + to rechunk certain fields in a provided Dataset for better downstream performance. + Depending on the system memory available (and the `max_mem` setting) this + rechunking may occur without the need of any intermediate data store. Otherwise, + approximately as much disk space is required as was needed to store the original + bgen data. Experiments show that this Zarr representation is ~20% larger even + with all available optimizations and fairly aggressive compression (i.e. the + default `clevel` 7). + + Note that this function is not evaluated lazily. The rechunking algorithm + will run inline so calls to it may be slow. The resulting Dataset is + generated based on the final, serialized Zarr data. + + Parameters + ---------- + ds : Dataset + Dataset to rechunk, typically the result from `read_bgen`. + output : Union[PathType, MutableMapping[str, bytes]] + Zarr store or path to directory in file system. + chunk_length : int + Length (number of variants) of chunks in which data are stored, by default 10_000. + chunk_width : int + Width (number of samples) to use when storing chunks in output, by default 1_000. + compressor : Optional[Any] + Zarr compressor, no compression is used when set as None. + probability_dtype : DType + Data type used to encode genotype probabilities, must be either uint8 or uint16. + Setting this parameter results in a loss of precision. If None, probabilities + will not be altered when stored. + max_mem : str + The amount of memory (in bytes) that workers are allowed to use. A string + (e.g. 100MB) can also be used. + pack : bool + Whether or not to optimize variable representations by removing unnecessary + dimensions and elements. This includes storing 2 genotypes instead of 3, omitting + dosage and collapsing the genotype probability mask to 2 dimensions. All of + the above are restored in the resulting Dataset at the expense of extra + computations on read. + tempdir : Optional[PathType] + Temporary directory where intermediate files are stored. The default None means + use the system default temporary directory. + + Warnings + -------- + This functional is only applicable to diploid, bi-allelic bgen datasets. + + Returns + ------- + Dataset + The rechunked dataset. + """ + if isinstance(output, Path): + output = str(output) + + chunk_length = min(chunk_length, ds.dims["variants"]) + chunk_width = min(chunk_width, ds.dims["samples"]) + + if pack: + ds = pack_variables(ds) + + encoding = encode_variables( + ds, + chunk_length=chunk_length, + chunk_width=chunk_width, + compressor=compressor, + probability_dtype=probability_dtype, + ) + with tempfile.TemporaryDirectory( + prefix="bgen_to_zarr_", suffix=".zarr", dir=tempdir + ) as tmpdir: + rechunked = rechunk_dataset( + ds, + encoding=encoding, + max_mem=max_mem, + target_store=output, + temp_store=tmpdir, + executor="dask", + ) + rechunked.execute() + + ds: Dataset = xr.open_zarr(output, concat_characters=False) # type: ignore[no-untyped-call] + if pack: + ds = unpack_variables(ds) + + return ds + + +def bgen_to_zarr( + input: PathType, + output: Union[PathType, MutableMapping[str, bytes]], + region: Optional[Mapping[Hashable, Any]] = None, + chunk_length: int = 10_000, + chunk_width: int = 1_000, + temp_chunk_length: int = 100, + compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), + probability_dtype: Optional[DType] = "uint8", + max_mem: str = "4GB", + pack: bool = True, + tempdir: Optional[PathType] = None, +) -> Dataset: + """Rechunk BGEN dataset as Zarr. + + This function will use the algorithm https://rechunker.readthedocs.io/en/latest/ + to rechunk certain fields in a provided Dataset for better downstream performance. + Depending on the system memory available (and the `max_mem` setting) this + rechunking may occur without the need of any intermediate data store. Otherwise, + approximately as much disk space is required as was needed to store the original + bgen data. Experiments show that this Zarr representation is ~20% larger even + with all available optimizations and fairly aggressive compression (i.e. the + default `clevel` 7). + + Note that this function is not evaluated lazily. The rechunking algorithm + will run inline so calls to it may be slow. The resulting Dataset is + generated based on the final, serialized Zarr data. + + Parameters + ---------- + input : PathType + Path to local bgen dataset. + output : Union[PathType, MutableMapping[str, bytes]] + Zarr store or path to directory in file system. + region : Optional[Mapping[Hashable, Any]] + Indexers on dataset dimensions used to define a subset of data to convert. + Must be None or a dict with keys matching dimension names and values + equal to integers or slice objects. This is passed directly to `Dataset.isel` + so it has the same semantics. + chunk_length : int + Length (number of variants) of chunks in which data are stored, by default 10_000. + chunk_width : int + Width (number of samples) to use when storing chunks in output, by default 1_000. + temp_chunk_length : int + Length of chunks used in raw bgen read, by default 100. This defines the vertical + chunking (i.e. in the variants dimension) used when reading the raw data and because + there is no horizontal chunking at this phase (i.e. in the samples dimension), this + value should be much smaller than the target `chunk_length`. + compressor : Optional[Any] + Zarr compressor, by default Blosc + zstd with compression level 7. No compression + is used when set as None. + probability_dtype : DType + Data type used to encode genotype probabilities, must be either uint8 or uint16. + Setting this parameter results in a loss of precision. If None, probabilities + will not be altered when stored. + max_mem : str + The amount of memory (in bytes) that workers are allowed to use. A string + (e.g. 100MB) can also be used. + pack : bool + Whether or not to optimize variable representations by removing unnecessary + dimensions and elements. This includes storing 2 genotypes instead of 3, omitting + dosage and collapsing the genotype probability mask to 2 dimensions. All of + the above are restored in the resulting Dataset at the expense of extra + computations on read. + tempdir : Optional[PathType] + Temporary directory where intermediate files are stored. The default None means + use the system default temporary directory. + + Warnings + -------- + This functional is only applicable to diploid, bi-allelic bgen datasets. + + Returns + ------- + Dataset + The rechunked dataset. + """ + ds = read_bgen(input, chunks=(temp_chunk_length, -1, -1)) + if region is not None: + ds = ds.isel(indexers=region) + return rechunk_bgen( + ds, + output, + chunk_length=chunk_length, + chunk_width=chunk_width, + compressor=compressor, + probability_dtype=probability_dtype, + max_mem=max_mem, + pack=pack, + tempdir=tempdir, + ) diff --git a/sgkit_bgen/rechunker_api.py b/sgkit_bgen/rechunker_api.py new file mode 100644 index 0000000..8fbbb13 --- /dev/null +++ b/sgkit_bgen/rechunker_api.py @@ -0,0 +1,83 @@ +# Temporary workaround until https://github.com/pangeo-data/rechunker/pull/52 is in and released +# type: ignore + +import tempfile +from typing import Mapping, Union + +import dask +import xarray +import zarr +from rechunker.api import Rechunked, _get_executor, _setup_array_rechunk +from rechunker.types import Executor +from xarray.backends.zarr import ( + DIMENSION_KEY, + encode_zarr_attr_value, + encode_zarr_variable, + extract_zarr_variable_encoding, +) + + +def rechunk_dataset( + source: xarray.Dataset, + encoding: Mapping, + max_mem, + target_store, + temp_store=None, + executor: Union[str, Executor] = "dask", +): + def _encode_zarr_attributes(attrs): + return {k: encode_zarr_attr_value(v) for k, v in attrs.items()} + + if isinstance(executor, str): + executor = _get_executor(executor) + if temp_store: + temp_group = zarr.group(temp_store) + else: + temp_group = zarr.group( + tempfile.mkdtemp(".zarr", "temp_store_") + ) # pragma: no cover + target_group = zarr.group(target_store) + target_group.attrs.update(_encode_zarr_attributes(source.attrs)) + + copy_specs = [] + for variable in source: + array = source[variable].copy() + + # Update the array encoding with provided parameters and apply it + has_chunk_encoding = "chunks" in array.encoding + array.encoding.update(encoding.get(variable, {})) + array = encode_zarr_variable(array) + + # Determine target chunking for array and remove it prior to + # validation/extraction ONLY if the array isn't also coming + # from a Zarr store (otherwise blocks need to be checked for overlap) + target_chunks = array.encoding.get("chunks") + if not has_chunk_encoding: + array.encoding.pop("chunks", None) + array_encoding = extract_zarr_variable_encoding( + array, raise_on_invalid=True, name=variable + ) + + # Default to chunking based on array shape if not explicitly provided + default_chunks = array_encoding.pop("chunks") + target_chunks = target_chunks or default_chunks + + # Extract array attributes along with reserved property for + # xarray dimension names + array_attrs = _encode_zarr_attributes(array.attrs) + array_attrs[DIMENSION_KEY] = encode_zarr_attr_value(array.dims) + + copy_spec = _setup_array_rechunk( + dask.array.asarray(array), + target_chunks, + max_mem, + target_group, + target_options=array_encoding, + temp_store_or_group=temp_group, + temp_options=array_encoding, + name=variable, + ) + copy_spec.write.array.attrs.update(array_attrs) # type: ignore + copy_specs.append(copy_spec) + plan = executor.prepare_plan(copy_specs) + return Rechunked(executor, plan, source, temp_group, target_group) diff --git a/sgkit_bgen/tests/data/.gitignore b/sgkit_bgen/tests/data/.gitignore new file mode 100644 index 0000000..4685de1 --- /dev/null +++ b/sgkit_bgen/tests/data/.gitignore @@ -0,0 +1,2 @@ +*.metadata2.mmm +*.metafile diff --git a/sgkit_bgen/tests/test_bgen_reader.py b/sgkit_bgen/tests/test_bgen_reader.py index 7f91d77..bf1406d 100644 --- a/sgkit_bgen/tests/test_bgen_reader.py +++ b/sgkit_bgen/tests/test_bgen_reader.py @@ -1,8 +1,18 @@ +from pathlib import Path +from typing import Any, Tuple + import numpy as np import numpy.testing as npt import pytest +import xarray as xr from sgkit_bgen import read_bgen -from sgkit_bgen.bgen_reader import BgenReader +from sgkit_bgen.bgen_reader import ( + GT_DATA_VARS, + BgenReader, + bgen_to_zarr, + rechunk_bgen, + unpack_variables, +) CHUNKS = [ (100, 200, 3), @@ -34,6 +44,12 @@ [np.nan, 1.018, 0.010, 0.160, 0.991] # Generated using bgen-reader directly ) +EXPECTED_DIMS = dict(variants=199, samples=500, genotypes=3, alleles=2) + + +def _shape(*dims: str) -> Tuple[int, ...]: + return tuple(EXPECTED_DIMS[d] for d in dims) + @pytest.mark.parametrize("chunks", CHUNKS) def test_read_bgen(shared_datadir, chunks): @@ -41,12 +57,14 @@ def test_read_bgen(shared_datadir, chunks): ds = read_bgen(path, chunks=chunks) # check some of the data (in different chunks) - assert ds["call_dosage"].shape == (199, 500) + assert ds["call_dosage"].shape == _shape("variants", "samples") npt.assert_almost_equal(ds["call_dosage"].values[1][0], 1.987, decimal=3) npt.assert_almost_equal(ds["call_dosage"].values[100][0], 0.160, decimal=3) npt.assert_array_equal(ds["call_dosage_mask"].values[0, 0], [True]) npt.assert_array_equal(ds["call_dosage_mask"].values[0, 1], [False]) - assert ds["call_genotype_probability"].shape == (199, 500, 3) + assert ds["call_genotype_probability"].shape == _shape( + "variants", "samples", "genotypes" + ) npt.assert_almost_equal( ds["call_genotype_probability"].values[1][0], [0.005, 0.002, 0.992], decimal=3 ) @@ -61,7 +79,7 @@ def test_read_bgen(shared_datadir, chunks): ) -def test_read_bgen_with_sample_file(shared_datadir): +def test_read_bgen__with_sample_file(shared_datadir): # The example file was generated using # qctool -g sgkit_bgen/tests/data/example.bgen -og sgkit_bgen/tests/data/example-separate-samples.bgen -os sgkit_bgen/tests/data/example-separate-samples.sample -incl-samples sgkit_bgen/tests/data/samples # Then editing example-separate-samples.sample to change the sample IDs @@ -71,7 +89,7 @@ def test_read_bgen_with_sample_file(shared_datadir): assert ds["sample_id"].values.tolist() == ["s1", "s2", "s3", "s4", "s5"] -def test_read_bgen_with_no_samples(shared_datadir): +def test_read_bgen__with_no_samples(shared_datadir): # The example file was generated using # qctool -g sgkit_bgen/tests/data/example.bgen -og sgkit_bgen/tests/data/example-no-samples.bgen -os sgkit_bgen/tests/data/example-no-samples.sample -bgen-omit-sample-identifier-block -incl-samples sgkit_bgen/tests/data/samples # Then deleting example-no-samples.sample @@ -88,7 +106,7 @@ def test_read_bgen_with_no_samples(shared_datadir): @pytest.mark.parametrize("chunks", CHUNKS) -def test_read_bgen_fancy_index(shared_datadir, chunks): +def test_read_bgen__fancy_index(shared_datadir, chunks): path = shared_datadir / "example.bgen" ds = read_bgen(path, chunks=chunks) npt.assert_almost_equal( @@ -98,7 +116,7 @@ def test_read_bgen_fancy_index(shared_datadir, chunks): @pytest.mark.parametrize("chunks", CHUNKS) -def test_read_bgen_scalar_index(shared_datadir, chunks): +def test_read_bgen__scalar_index(shared_datadir, chunks): path = shared_datadir / "example.bgen" ds = read_bgen(path, chunks=chunks) for i, ix in enumerate(INDEXES): @@ -116,7 +134,7 @@ def test_read_bgen_scalar_index(shared_datadir, chunks): ) -def test_read_bgen_raise_on_invalid_indexers(shared_datadir): +def test_read_bgen__raise_on_invalid_indexers(shared_datadir): path = shared_datadir / "example.bgen" reader = BgenReader(path) with pytest.raises(IndexError, match="Indexer must be tuple"): @@ -125,3 +143,99 @@ def test_read_bgen_raise_on_invalid_indexers(shared_datadir): reader[(slice(None),)] with pytest.raises(IndexError, match="Indexer must contain only slices or ints"): reader[([0], [0], [0])] + + +def _rechunk_bgen( + shared_datadir: Path, tmp_path: Path, **kwargs: Any +) -> Tuple[xr.Dataset, xr.Dataset, str]: + path = shared_datadir / "example.bgen" + ds = read_bgen(path, chunks=(10, -1, -1)) + store = tmp_path / "example.zarr" + dsr = rechunk_bgen(ds, store, **kwargs) + return ds, dsr, str(store) + + +def _open_zarr(store: str, **kwargs: Any) -> xr.Dataset: + # Force concat_characters False to avoid to avoid https://github.com/pydata/xarray/issues/4405 + return xr.open_zarr(store, concat_characters=False, **kwargs) # type: ignore[no-any-return,no-untyped-call] + + +@pytest.mark.parametrize("target_chunks", [(10, 10), (50, 50), (100, 50), (50, 100)]) +def test_rechunk_bgen__target_chunks(shared_datadir, tmp_path, target_chunks): + _, dsr, store = _rechunk_bgen( + shared_datadir, + tmp_path, + chunk_length=target_chunks[0], + chunk_width=target_chunks[1], + pack=False, + ) + for v in GT_DATA_VARS: + assert dsr[v].data.chunksize[:2] == target_chunks + + +def test_rechunk_from_zarr__self_consistent(shared_datadir, tmp_path): + # With no probability dtype or packing, rechunk_{to,from}_zarr is a noop + ds, dsr, store = _rechunk_bgen( + shared_datadir, tmp_path, probability_dtype=None, pack=False + ) + xr.testing.assert_allclose(ds.compute(), dsr.compute()) # type: ignore[no-untyped-call] + + +@pytest.mark.parametrize("dtype", ["uint8", "uint16"]) +def test_rechunk_bgen__probability_encoding(shared_datadir, tmp_path, dtype): + ds, _, store = _rechunk_bgen( + shared_datadir, tmp_path, probability_dtype=dtype, pack=False + ) + dsr = _open_zarr(store, mask_and_scale=False) + v = "call_genotype_probability" + assert dsr[v].shape == ds[v].shape + assert dsr[v].dtype == dtype + dsr = _open_zarr(store, mask_and_scale=True) + # There are two missing calls which equates to + # 6 total nan values across 3 possible genotypes + assert np.isnan(dsr[v].values).sum() == 6 + tolerance = 1.0 / (np.iinfo(dtype).max - 1) + np.testing.assert_allclose(ds[v], dsr[v], atol=tolerance) + + +def test_rechunk_bgen__variable_packing(shared_datadir, tmp_path): + ds, dsr, store = _rechunk_bgen( + shared_datadir, tmp_path, probability_dtype=None, pack=True + ) + # A minor tolerance is necessary here when packing is enabled + # because one of the genotype probabilities is constructed from the others + xr.testing.assert_allclose(ds.compute(), dsr.compute(), atol=1e-6) # type: ignore[no-untyped-call] + + +@pytest.mark.parametrize("dtype", ["uint32", "int8", "float32"]) +def test_rechunk_bgen__invalid_probability_type(shared_datadir, tmp_path, dtype): + with pytest.raises(ValueError, match="Probability integer dtype invalid"): + _rechunk_bgen(shared_datadir, tmp_path, probability_dtype=dtype) + + +def test_unpack_variables__invalid_gp_dims(shared_datadir, tmp_path): + # Validate that an error is thrown when variables are + # unpacked without being packed in the first place + _, dsr, store = _rechunk_bgen(shared_datadir, tmp_path, pack=False) + with pytest.raises( + ValueError, + match="Expecting variable 'call_genotype_probability' to have genotypes dimension of size 2", + ): + unpack_variables(dsr) + + +@pytest.mark.parametrize( + "region", [None, dict(variants=slice(0, 100)), dict(samples=slice(0, 50))] +) +def test_bgen_to_zarr(shared_datadir, tmp_path, region): + input = shared_datadir / "example.bgen" + output = tmp_path / "example.zarr" + ds = bgen_to_zarr(input, output, region=region) + expected_dims = { + k: EXPECTED_DIMS[k] + if region is None or k not in region + else region[k].stop - region[k].start + for k in EXPECTED_DIMS + } + actual_dims = {k: v for k, v in ds.dims.items() if k in expected_dims} + assert actual_dims == expected_dims