diff --git a/ci/environment.yml b/ci/environment.yml index a114b659..581e275f 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -14,7 +14,7 @@ dependencies: - packaging - universal_pathlib - hdf5plugin - - numcodecs + - numcodecs>=0.15.1 - icechunk>=0.1.1 # Testing - codecov[toml] @@ -36,5 +36,6 @@ dependencies: # for opening FITS files - astropy - pip + - zarr>=3.0.2 - pip: - - imagecodecs-numcodecs==2024.6.1 + - imagecodecs-numcodecs==2024.6.1 diff --git a/ci/min-deps.yml b/ci/min-deps.yml index a0a0cf20..02921b1e 100644 --- a/ci/min-deps.yml +++ b/ci/min-deps.yml @@ -6,9 +6,9 @@ dependencies: - h5py - hdf5 - netcdf4 - - xarray>=2024.10.0 + - xarray>=2025.1.1 - numpy>=2.0.0 - - numcodecs + - numcodecs>=0.15.1 - packaging - ujson - universal_pathlib @@ -23,3 +23,4 @@ dependencies: - pytest - pooch - fsspec + - zarr>=3.0.2 diff --git a/ci/upstream.yml b/ci/upstream.yml index f312de2c..c6c13d30 100644 --- a/ci/upstream.yml +++ b/ci/upstream.yml @@ -13,7 +13,7 @@ dependencies: - ujson - universal_pathlib - hdf5plugin - - numcodecs + - numcodecs>=0.15.1 - imagecodecs>=2024.6.1 # Testing - codecov[toml] @@ -28,6 +28,7 @@ dependencies: - pooch - fsspec - pip + - zarr>=3.0.2 - pip: - git+https://github.com/earth-mover/icechunk.git@main#subdirectory=icechunk-python # Installs zarr-python v3.0.0 as dependency - git+https://github.com/fsspec/kerchunk.git@main diff --git a/conftest.py b/conftest.py index 0781c37e..a06d8cf1 100644 --- a/conftest.py +++ b/conftest.py @@ -1,13 +1,25 @@ +"""Pytest configuration and fixtures for virtualizarr tests.""" + +# Standard library imports +import itertools from pathlib import Path from typing import Any, Callable, Mapping, Optional -import h5py +# Third-party imports +import h5py # type: ignore[import] import numpy as np import pytest import xarray as xr from xarray.core.variable import Variable +# Local imports +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.manifests.manifest import join +from virtualizarr.manifests.utils import create_v3_array_metadata +from virtualizarr.utils import ceildiv + +# Pytest configuration def pytest_addoption(parser): """Add command-line flags for pytest.""" parser.addoption( @@ -18,65 +30,152 @@ def pytest_addoption(parser): def pytest_runtest_setup(item): - # based on https://stackoverflow.com/questions/47559524 + """Skip network tests unless explicitly enabled.""" if "network" in item.keywords and not item.config.getoption("--run-network-tests"): pytest.skip( "set --run-network-tests to run tests requiring an internet connection" ) +# Common codec configurations +DELTA_CODEC = {"name": "numcodecs.delta", "configuration": {"dtype": " dict[str, dict[str, Any]]: + """ + Generate chunk entries for a manifest based on shape and chunks. + + Parameters + ---------- + shape : tuple of int + The shape of the array + chunks : tuple of int + The chunk size for each dimension + entry_generator : callable + Function that takes chunk indices and returns an entry dict + + Returns + ------- + dict + Mapping of chunk keys to entry dictionaries + """ + chunk_grid_shape = tuple( + ceildiv(axis_length, chunk_length) + for axis_length, chunk_length in zip(shape, chunks) + ) + + if chunk_grid_shape == (): + return {"0": entry_generator((0,))} + + all_possible_combos = itertools.product( + *[range(length) for length in chunk_grid_shape] + ) + return {join(ind): entry_generator(ind) for ind in all_possible_combos} + + +def _offset_from_chunk_key(ind: tuple[int, ...]) -> int: + """Generate an offset value from chunk indices.""" + return sum(ind) * 10 + + +def _length_from_chunk_key(ind: tuple[int, ...]) -> int: + """Generate a length value from chunk indices.""" + return sum(ind) + 5 + + +def _entry_from_chunk_key(ind: tuple[int, ...]) -> dict[str, str | int]: + """Generate a (somewhat) unique manifest entry from a given chunk key.""" + entry = { + "path": f"/foo.{str(join(ind))}.nc", + "offset": _offset_from_chunk_key(ind), + "length": _length_from_chunk_key(ind), + } + return entry # type: ignore[return-value] + + +def _generate_chunk_manifest( + netcdf4_file: str, + shape: tuple[int, ...], + chunks: tuple[int, ...], + offset: int = 6144, + length: int = 48, +) -> ChunkManifest: + """Generate a chunk manifest with sequential offsets for each chunk.""" + current_offset = [offset] # Use list to allow mutation in closure + + def sequential_entry_generator(ind: tuple[int, ...]) -> dict[str, Any]: + entry = { + "path": netcdf4_file, + "offset": current_offset[0], + "length": length, + } + current_offset[0] += length + return entry + + entries = _generate_chunk_entries(shape, chunks, sequential_entry_generator) + return ChunkManifest(entries) + + +# NetCDF file fixtures @pytest.fixture def empty_netcdf4_file(tmp_path: Path) -> str: + """Create an empty NetCDF4 file.""" filepath = tmp_path / "empty.nc" - - # Set up example xarray dataset - with xr.Dataset() as ds: # Save it to disk as netCDF (in temporary directory) + with xr.Dataset() as ds: ds.to_netcdf(filepath, format="NETCDF4") - return str(filepath) @pytest.fixture def netcdf4_file(tmp_path: Path) -> str: + """Create a NetCDF4 file with air temperature data.""" filepath = tmp_path / "air.nc" - - # Set up example xarray dataset with xr.tutorial.open_dataset("air_temperature") as ds: - # Save it to disk as netCDF (in temporary directory) ds.to_netcdf(filepath, format="NETCDF4") - return str(filepath) @pytest.fixture def netcdf4_file_with_data_in_multiple_groups(tmp_path: Path) -> str: + """Create a NetCDF4 file with data in multiple groups.""" filepath = tmp_path / "test.nc" - ds1 = xr.DataArray([1, 2, 3], name="foo").to_dataset() ds1.to_netcdf(filepath) ds2 = xr.DataArray([4, 5], name="bar").to_dataset() ds2.to_netcdf(filepath, group="subgroup", mode="a") - return str(filepath) @pytest.fixture def netcdf4_files_factory(tmp_path: Path) -> Callable: + """Factory fixture to create multiple NetCDF4 files.""" + def create_netcdf4_files( encoding: Optional[Mapping[str, Mapping[str, Any]]] = None, ) -> tuple[str, str]: filepath1 = tmp_path / "air1.nc" filepath2 = tmp_path / "air2.nc" - with xr.tutorial.open_dataset("air_temperature") as ds: - # Split dataset into two parts ds1 = ds.isel(time=slice(None, 1460)) ds2 = ds.isel(time=slice(1460, None)) - - # Save datasets to disk as NetCDF in the temporary directory with the provided encoding ds1.to_netcdf(filepath1, encoding=encoding) ds2.to_netcdf(filepath2, encoding=encoding) - return str(filepath1), str(filepath2) return create_netcdf4_files @@ -84,16 +183,16 @@ def create_netcdf4_files( @pytest.fixture def netcdf4_file_with_2d_coords(tmp_path: Path) -> str: + """Create a NetCDF4 file with 2D coordinates.""" filepath = tmp_path / "ROMS_example.nc" - with xr.tutorial.open_dataset("ROMS_example") as ds: ds.to_netcdf(filepath, format="NETCDF4") - return str(filepath) @pytest.fixture def netcdf4_virtual_dataset(netcdf4_file): + """Create a virtual dataset from a NetCDF4 file.""" from virtualizarr import open_virtual_dataset return open_virtual_dataset(netcdf4_file, indexes={}) @@ -101,52 +200,176 @@ def netcdf4_virtual_dataset(netcdf4_file): @pytest.fixture def netcdf4_inlined_ref(netcdf4_file): + """Create an inlined reference from a NetCDF4 file.""" from kerchunk.hdf import SingleHdf5ToZarr return SingleHdf5ToZarr(netcdf4_file, inline_threshold=1000).translate() +# HDF5 file fixtures @pytest.fixture def hdf5_groups_file(tmp_path: Path) -> str: + """Create an HDF5 file with groups.""" filepath = tmp_path / "air.nc" - - # Set up example xarray dataset with xr.tutorial.open_dataset("air_temperature") as ds: - # Save it to disk as netCDF (in temporary directory) ds.to_netcdf(filepath, format="NETCDF4", group="test/group") - return str(filepath) @pytest.fixture def hdf5_empty(tmp_path: Path) -> str: + """Create an empty HDF5 file.""" filepath = tmp_path / "empty.nc" - with h5py.File(filepath, "w") as f: dataset = f.create_dataset("empty", shape=(), dtype="float32") dataset.attrs["empty"] = "true" - return str(filepath) @pytest.fixture def hdf5_scalar(tmp_path: Path) -> str: + """Create an HDF5 file with a scalar dataset.""" filepath = tmp_path / "scalar.nc" - with h5py.File(filepath, "w") as f: dataset = f.create_dataset("scalar", data=0.1, dtype="float32") dataset.attrs["scalar"] = "true" - return str(filepath) @pytest.fixture def simple_netcdf4(tmp_path: Path) -> str: + """Create a simple NetCDF4 file with a single variable.""" filepath = tmp_path / "simple.nc" - arr = np.arange(12, dtype=np.dtype("int32")).reshape(3, 4) var = Variable(data=arr, dims=["x", "y"]) ds = xr.Dataset({"foo": var}) ds.to_netcdf(filepath) - return str(filepath) + + +# Zarr ArrayV3Metadata, ManifestArray, virtual xr.Variable and virtual xr.Dataset fixtures +@pytest.fixture +def array_v3_metadata(): + """Create V3 array metadata with sensible defaults.""" + + def _create_metadata( + shape: tuple = (5, 5), + chunks: tuple = (5, 5), + data_type: np.dtype = np.dtype("int32"), + codecs: list[dict] | None = None, + fill_value: int | None = None, + ): + codecs = codecs or [{"configuration": {"endian": "little"}, "name": "bytes"}] + return create_v3_array_metadata( + shape=shape, + chunk_shape=chunks, + data_type=data_type, + codecs=codecs, + fill_value=fill_value or 0, + ) + + return _create_metadata + + +@pytest.fixture +def manifest_array(array_v3_metadata): + """ + Create an example ManifestArray with sensible defaults. + + The manifest is populated with a (somewhat) unique path, offset, and length for each key. + """ + + def _manifest_array( + shape: tuple = (5, 5), + chunks: tuple = (5, 5), + codecs: list[dict] | None = [ARRAYBYTES_CODEC, ZLIB_CODEC], + ): + metadata = array_v3_metadata(shape=shape, chunks=chunks, codecs=codecs) + entries = _generate_chunk_entries(shape, chunks, _entry_from_chunk_key) + chunkmanifest = ChunkManifest(entries=entries) + return ManifestArray(chunkmanifest=chunkmanifest, metadata=metadata) + + return _manifest_array + + +@pytest.fixture +def virtual_variable(array_v3_metadata: Callable) -> Callable: + """Generate a virtual variable with configurable parameters.""" + + def _virtual_variable( + file_uri: str, + shape: tuple[int, ...] = (3, 4), + chunk_shape: tuple[int, ...] = (3, 4), + dtype: np.dtype = np.dtype("int32"), + codecs: Optional[list[dict[Any, Any]]] = None, + fill_value: Optional[str] = None, + encoding: Optional[dict] = None, + offset: int = 6144, + length: int = 48, + dims: list[str] = [], + attrs: dict[str, Any] = {}, + ) -> xr.Variable: + manifest = _generate_chunk_manifest( + file_uri, + shape=shape, + chunks=chunk_shape, + offset=offset, + length=length, + ) + metadata = array_v3_metadata( + shape=shape, + chunks=chunk_shape, + codecs=codecs, + data_type=dtype, + fill_value=fill_value, + ) + ma = ManifestArray(chunkmanifest=manifest, metadata=metadata) + return xr.Variable( + data=ma, + dims=dims, + encoding=encoding, + attrs=attrs, + ) + + return _virtual_variable + + +@pytest.fixture +def virtual_dataset(virtual_variable: Callable) -> Callable: + """Generate a virtual dataset with configurable parameters.""" + + def _virtual_dataset( + file_uri: str, + shape: tuple[int, ...] = (3, 4), + chunk_shape: tuple[int, ...] = (3, 4), + dtype: np.dtype = np.dtype("int32"), + codecs: Optional[list[dict[Any, Any]]] = None, + fill_value: Optional[str] = None, + encoding: Optional[dict] = None, + variable_name: str = "foo", + offset: int = 6144, + length: int = 48, + dims: Optional[list[str]] = None, + coords: Optional[xr.Coordinates] = None, + ) -> xr.Dataset: + with xr.open_dataset(file_uri) as ds: + var = virtual_variable( + file_uri=file_uri, + shape=shape, + chunk_shape=chunk_shape, + dtype=dtype, + codecs=codecs, + fill_value=fill_value, + encoding=encoding, + offset=offset, + length=length, + dims=dims or [str(name) for name in ds.dims], + attrs=ds[variable_name].attrs, + ) + return xr.Dataset( + {variable_name: var}, + coords=coords, + attrs=ds.attrs, + ) + + return _virtual_dataset diff --git a/docs/usage.md b/docs/usage.md index c783da9c..b610beb3 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -1,4 +1,5 @@ (usage)= + # Usage This page explains how to use VirtualiZarr today, by introducing the key concepts one-by-one. @@ -27,7 +28,6 @@ vds = open_virtual_dataset('air.nc') (Notice we did not have to explicitly indicate the file format, as {py:func}`open_virtual_dataset ` will attempt to automatically infer it.) - ```{note} In future we would like for it to be possible to just use `xr.open_dataset`, e.g. @@ -43,6 +43,7 @@ Printing this "virtual dataset" shows that although it is an instance of `xarray ```python vds ``` + ``` Size: 8MB Dimensions: (time: 2920, lat: 25, lon: 53) @@ -69,12 +70,15 @@ As the manifest contains only addresses at which to find large binary chunks, th ```python ds.nbytes ``` + ``` 30975672 ``` + ```python vds.virtualize.nbytes ``` + ``` 128 ``` @@ -119,6 +123,7 @@ marr = vds['air'].data manifest = marr.manifest manifest.dict() ``` + ```python {'0.0.0': {'path': 'file:///work/data/air.nc', 'offset': 15419, 'length': 7738000}} ``` @@ -134,20 +139,49 @@ A Zarr array is defined not just by the location of its constituent chunk data, ```python marr ``` + ``` ManifestArray ``` + ```python marr.manifest ``` + ``` ChunkManifest ``` + ```python -marr.zarray +marr.metadata ``` + ``` -ZArray(shape=(2920, 25, 53), chunks=(2920, 25, 53), dtype=int16, compressor=None, filters=None, fill_value=None) +ArrayV3Metadata(shape=(2920, 25, 53), + data_type=, + chunk_grid=RegularChunkGrid(chunk_shape=(2920, 25, 53)), + chunk_key_encoding=DefaultChunkKeyEncoding(name='default', + separator='/'), + fill_value=np.float64(-327.67), + codecs=(FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0, 'dtype': ')), + attributes={'GRIB_id': 11, + 'GRIB_name': 'TMP', + 'actual_range': [185.16000366210938, + 322.1000061035156], + 'dataset': 'NMC Reanalysis', + 'level_desc': 'Surface', + 'long_name': '4xDaily Air temperature at sigma ' + 'level 995', + 'parent_stat': 'Other', + 'precision': 2, + 'statistic': 'Individual Obs', + 'units': 'degK', + 'var_desc': 'Air temperature'}, + dimension_names=None, + zarr_format=3, + node_type='array', + storage_transformers=()) ``` A `ManifestArray` can therefore be thought of as a virtualized representation of a single Zarr array. @@ -160,12 +194,15 @@ import numpy as np concatenated = np.concatenate([marr, marr], axis=0) concatenated ``` + ``` ManifestArray ``` + ```python concatenated.manifest.dict() ``` + ``` {'0.0.0': {'path': 'file:///work/data/air.nc', 'offset': 15419, 'length': 7738000}, '1.0.0': {'path': 'file:///work/data/air.nc', 'offset': 15419, 'length': 7738000}} @@ -182,6 +219,7 @@ Remember that you cannot load values from a `ManifestArray` directly. ```python vds['air'].values ``` + ```python NotImplementedError: ManifestArrays can't be converted into numpy arrays or pandas Index objects ``` @@ -207,6 +245,7 @@ Whilst the values of virtual variables (i.e. those backed by `ManifestArray` obj ```python vds = open_virtual_dataset('air.nc', loadable_variables=['air', 'time']) ``` + ```python Size: 31MB Dimensions: (time: 2920, lat: 25, lon: 53) @@ -223,11 +262,13 @@ Attributes: references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly... title: 4x daily NMC reanalysis (1948) ``` + You can see that the dataset contains a mixture of virtual variables backed by `ManifestArray` objects (`lat` and `lon`), and loadable variables backed by (lazy) numpy arrays (`air` and `time`). Loading variables can be useful in a few scenarios: + 1. You need to look at the actual values of a multi-dimensional variable in order to decide what to do next, -2. You want in-memory indexes to use with ``xr.combine_by_coords``, +2. You want in-memory indexes to use with `xr.combine_by_coords`, 3. Storing a variable on-disk as a set of references would be inefficient, e.g. because it's a very small array (saving the values like this is similar to kerchunk's concept of "inlining" data), 4. The variable has encoding, and the simplest way to decode it correctly is to let xarray's standard decoding machinery load it into memory and apply the decoding, 5. Some of your variables have inconsistent-length chunks, and you want to be able to concatenate them together. For example you might have multiple virtual datasets with coordinates of inconsistent length (e.g., leap years within multi-year daily data). @@ -250,6 +291,7 @@ vds = open_virtual_dataset( decode_times=True, ) ``` + ```python Size: 31MB Dimensions: (time: 2920, lat: 25, lon: 53) @@ -300,6 +342,7 @@ As we know the correct order a priori, we can just combine along one dimension u combined_vds = xr.concat([vds1, vds2], dim='time', coords='minimal', compat='override') combined_vds ``` + ``` Size: 8MB Dimensions: (time: 2920, lat: 25, lon: 53) @@ -322,6 +365,7 @@ We can see that the resulting combined manifest has two chunks, as expected. ```python combined_vds['air'].data.manifest.dict() ``` + ``` {'0.0.0': {'path': 'file:///work/data/air1.nc', 'offset': 15419, 'length': 3869000}, '1.0.0': {'path': 'file:///work/data/air2.nc', 'offset': 15419, 'length': 3869000}} @@ -370,11 +414,12 @@ vds2 = open_virtual_dataset('air2.nc', loadable_variables=['time', 'lat', 'lon'] combined_vds = xr.combine_by_coords([vds2, vds1], coords='minimal', compat='override') ``` -Notice we don't have to specify the concatenation dimension explicitly - xarray works out the correct ordering for us. Even though we actually passed in the virtual datasets in the wrong order just now, the manifest still has the chunks listed in the correct order such that the 1-dimensional ``time`` coordinate has ascending values: +Notice we don't have to specify the concatenation dimension explicitly - xarray works out the correct ordering for us. Even though we actually passed in the virtual datasets in the wrong order just now, the manifest still has the chunks listed in the correct order such that the 1-dimensional `time` coordinate has ascending values: ```python combined_vds['air'].data.manifest.dict() ``` + ``` {'0.0.0': {'path': 'file:///work/data/air1.nc', 'offset': 15419, 'length': 3869000}, '1.0.0': {'path': 'file:///work/data/air2.nc', 'offset': 15419, 'length': 3869000}} @@ -382,7 +427,7 @@ combined_vds['air'].data.manifest.dict() ### Ordering using metadata -TODO: Use preprocess to create a new index from the metadata. Requires ``open_virtual_mfdataset`` to be implemented in [PR #349](https://github.com/zarr-developers/VirtualiZarr/pull/349). +TODO: Use preprocess to create a new index from the metadata. Requires `open_virtual_mfdataset` to be implemented in [PR #349](https://github.com/zarr-developers/VirtualiZarr/pull/349). ## Writing virtual stores to disk @@ -456,8 +501,6 @@ session.commit("Appended second dataset") See the [Icechunk documentation](https://icechunk.io/icechunk-python/virtual/#creating-a-virtual-dataset-with-virtualizarr) for more details. - - ## Opening Kerchunk references as virtual datasets You can open existing Kerchunk `json` or `parquet` references as Virtualizarr virtual datasets. This may be useful for converting existing Kerchunk formatted references to storage formats like [Icechunk](https://icechunk.io/). @@ -468,9 +511,9 @@ vds = open_virtual_dataset('combined.json', filetype='kerchunk') vds = open_virtual_dataset('combined.parquet', filetype='kerchunk') ``` -One difference between the kerchunk references format and virtualizarr's internal manifest representation (as well as icechunk's format) is that paths in kerchunk references can be relative paths. Opening kerchunk references that contain relative local filepaths therefore requires supplying another piece of information: the directory of the ``fsspec`` filesystem which the filepath was defined relative to. +One difference between the kerchunk references format and virtualizarr's internal manifest representation (as well as icechunk's format) is that paths in kerchunk references can be relative paths. Opening kerchunk references that contain relative local filepaths therefore requires supplying another piece of information: the directory of the `fsspec` filesystem which the filepath was defined relative to. -You can dis-ambuiguate kerchunk references containing relative paths by passing the ``fs_root`` kwarg to ``virtual_backend_kwargs``. +You can dis-ambuiguate kerchunk references containing relative paths by passing the `fs_root` kwarg to `virtual_backend_kwargs`. ```python # file `relative_refs.json` contains a path like './file.nc' @@ -484,7 +527,7 @@ vds = open_virtual_dataset( # the path in the virtual dataset will now be 'file:///some_directory/file.nc' ``` -Note that as the virtualizarr {py:meth}`vds.virtualize.to_kerchunk ` method only writes absolute paths, the only scenario in which you might come across references containing relative paths is if you are opening references that were previously created using the ``kerchunk`` library alone. +Note that as the virtualizarr {py:meth}`vds.virtualize.to_kerchunk ` method only writes absolute paths, the only scenario in which you might come across references containing relative paths is if you are opening references that were previously created using the `kerchunk` library alone. ## Rewriting existing manifests @@ -505,10 +548,12 @@ def local_to_s3_url(old_local_path: str) -> str: filename = Path(old_local_path).name return str(new_s3_bucket_url / filename) ``` + ```python renamed_vds = vds.virtualize.rename_paths(local_to_s3_url) renamed_vds['air'].data.manifest.dict() ``` + ``` {'0.0.0': {'path': 'http://s3.amazonaws.com/my_bucket/air.nc', 'offset': 15419, 'length': 7738000}} ``` diff --git a/pyproject.toml b/pyproject.toml index e6256aff..b68c919b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,9 +23,10 @@ dependencies = [ "xarray>=2025.1.1", "numpy>=2.0.0", "universal-pathlib", - "numcodecs", + "numcodecs>=0.15.1", "ujson", "packaging", + "zarr>=3.0.2", ] [project.optional-dependencies] diff --git a/virtualizarr/codecs.py b/virtualizarr/codecs.py index b340a239..5c9ebde0 100644 --- a/virtualizarr/codecs.py +++ b/virtualizarr/codecs.py @@ -1,109 +1,136 @@ -from typing import TYPE_CHECKING, Union +from typing import TYPE_CHECKING, Any, Tuple, Union -from virtualizarr.zarr import Codec +import numpy as np +import zarr +from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec +from zarr.abc.codec import Codec as ZarrCodec +from zarr.core.codec_pipeline import BatchedCodecPipeline +from zarr.core.metadata.v3 import ArrayV3Metadata if TYPE_CHECKING: - from zarr import Array # type: ignore - from zarr.core.abc.codec import ( # type: ignore - Codec as ZarrCodec, - ) - from .manifests.array import ManifestArray +CodecPipeline = Tuple[ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec, ...] -def get_codecs( - array: Union["ManifestArray", "Array"], - normalize_to_zarr_v3: bool = False, -) -> Union[Codec, tuple["ZarrCodec", ...]]: - """ - Get the codecs for either a ManifestArray or a Zarr Array. +DeconstructedCodecPipeline = tuple[ + tuple[ArrayArrayCodec, ...], # Array-to-array transformations + ArrayBytesCodec | None, # Array-to-bytes conversion + tuple[BytesBytesCodec, ...], # Bytes-to-bytes transformations +] - Parameters: - array (Union[ManifestArray, ZarrArray]): The input array, either ManifestArray or Zarr Array. - Returns: - List[Optional[Codec]]: A list of codecs or an empty list if no codecs are found. - - Raises: - ImportError: If `zarr` is required but not installed. - ValueError: If the array type is unsupported. +def numcodec_config_to_configurable(num_codec: dict) -> dict: """ - if _is_manifest_array(array): - return _get_manifestarray_codecs(array, normalize_to_zarr_v3) # type: ignore[arg-type] - - if _is_zarr_array(array): - return _get_zarr_array_codecs(array, normalize_to_zarr_v3) # type: ignore[arg-type] + Convert a numcodecs codec into a zarr v3 configurable. + """ + if num_codec["id"].startswith("numcodecs."): + return num_codec + + num_codec_copy = num_codec.copy() + name = "numcodecs." + num_codec_copy.pop("id") + return {"name": name, "configuration": num_codec_copy} + + +def extract_codecs( + codecs: CodecPipeline, +) -> DeconstructedCodecPipeline: + """Extracts various codec types.""" + arrayarray_codecs: tuple[ArrayArrayCodec, ...] = () + arraybytes_codec: ArrayBytesCodec | None = None + bytesbytes_codecs: tuple[BytesBytesCodec, ...] = () + for codec in codecs: + if isinstance(codec, ArrayArrayCodec): + arrayarray_codecs += (codec,) + if isinstance(codec, ArrayBytesCodec): + arraybytes_codec = codec + if isinstance(codec, BytesBytesCodec): + bytesbytes_codecs += (codec,) + return (arrayarray_codecs, arraybytes_codec, bytesbytes_codecs) + + +def convert_to_codec_pipeline( + dtype: np.dtype, + codecs: list[dict] | None = [], +) -> BatchedCodecPipeline: + """ + Convert list of codecs to valid BatchedCodecPipeline. - raise ValueError("Unsupported array type or zarr is not installed.") + Parameters + ---------- + dtype : np.dtype + codecs: list[dict] | None + Returns + ------- + BatchedCodecPipeline + """ + from zarr.core.array import _get_default_chunk_encoding_v3 + from zarr.registry import get_codec_class -def _is_manifest_array(array: object) -> bool: - """Check if the array is an instance of ManifestArray.""" - try: - from .manifests.array import ManifestArray + zarr_codecs: tuple[ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec, ...] = () + if codecs and len(codecs) > 0: + zarr_codecs = tuple( + get_codec_class(codec["name"]).from_dict(codec) for codec in codecs + ) - return isinstance(array, ManifestArray) - except ImportError: - return False + # It would be nice to use zarr.core.codec_pipeline.codecs_from_list here but that function requires + # array array codecs and array bytes codecs to already be present in the list and in the correct order. + arrayarray_codecs, arraybytes_codec, bytesbytes_codecs = extract_codecs(zarr_codecs) + if arraybytes_codec is None: + arraybytes_codec = _get_default_chunk_encoding_v3(dtype)[1] -def _get_manifestarray_codecs( - array: "ManifestArray", - normalize_to_zarr_v3: bool = False, -) -> Union[Codec, tuple["ZarrCodec", ...]]: - """Get codecs for a ManifestArray based on its zarr_format.""" - if normalize_to_zarr_v3 or array.zarray.zarr_format == 3: - return array.zarray._v3_codecs().into_v3_codecs(array.zarray.dtype) - elif array.zarray.zarr_format == 2: - return array.zarray.codec - else: - raise ValueError("Unsupported zarr_format for ManifestArray.") - + codec_pipeline = BatchedCodecPipeline( + array_array_codecs=arrayarray_codecs, + array_bytes_codec=arraybytes_codec, + bytes_bytes_codecs=bytesbytes_codecs, + batch_size=1, + ) -def _is_zarr_array(array: object) -> bool: - """Check if the array is an instance of Zarr Array.""" - try: - from zarr import Array + return codec_pipeline - return isinstance(array, Array) - except ImportError: - return False +def get_codec_config(codec: ZarrCodec) -> dict[str, Any]: + """ + Extract configuration from a codec, handling both zarr-python and numcodecs codecs. + """ + if hasattr(codec, "codec_config"): + return codec.codec_config + elif hasattr(codec, "get_config"): + return codec.get_config() + elif hasattr(codec, "codec_name"): + # If we can't get config, try to get the name and configuration directly + # This assumes the codec follows the v3 spec format + return { + "id": codec.codec_name.replace("numcodecs.", ""), + **getattr(codec, "configuration", {}), + } + else: + raise ValueError(f"Unable to parse codec configuration: {codec}") -def _get_zarr_array_codecs( - array: "Array", - normalize_to_zarr_v3: bool = False, -) -> Union[Codec, tuple["ZarrCodec", ...]]: - """Get codecs for a Zarr Array based on its format.""" - import zarr - from packaging import version - # Check that zarr-python v3 is installed - required_version = "3.0.0b" - installed_version = zarr.__version__ - if version.parse(installed_version) < version.parse(required_version): - raise NotImplementedError( - f"zarr-python v3 or higher is required, but version {installed_version} is installed." +def get_codecs(array: Union["ManifestArray", "zarr.Array"]) -> CodecPipeline: + """ + Get the zarr v3 codec pipeline for either a ManifestArray or a Zarr Array. + + Parameters + ---------- + array : Union[ManifestArray, Array] + The input array, either ManifestArray or Zarr Array. + + Returns + ------- + CodecPipeline + A tuple of zarr v3 codecs representing the codec pipeline. + + Raises + ------ + ValueError + If the array type is unsupported or the array's metadata is not in zarr v3 format. + """ + if not isinstance(array.metadata, ArrayV3Metadata): + raise ValueError( + "Only zarr v3 format arrays are supported. Please convert your array to v3 format." ) - from zarr.core.metadata import ( # type: ignore[import-untyped] - ArrayV2Metadata, - ArrayV3Metadata, - ) - # For zarr format v3 - if isinstance(array.metadata, ArrayV3Metadata): - return tuple(array.metadata.codecs) - # For zarr format v2 - elif isinstance(array.metadata, ArrayV2Metadata): - if normalize_to_zarr_v3: - # we could potentially normalize to v3 using ZArray._v3_codecs, but we don't have a use case for that. - raise NotImplementedError( - "Normalization to zarr v3 is not supported for zarr v2 array." - ) - else: - return Codec( - compressor=array.metadata.compressor, - filters=list(array.metadata.filters or ()), - ) - else: - raise ValueError("Unsupported zarr_format for Zarr Array.") + return array.metadata.codecs diff --git a/virtualizarr/manifests/array.py b/virtualizarr/manifests/array.py index 89f648aa..f1c3c36d 100644 --- a/virtualizarr/manifests/array.py +++ b/virtualizarr/manifests/array.py @@ -2,13 +2,13 @@ from typing import Any, Callable, Union import numpy as np +from zarr.core.metadata.v3 import ArrayV3Metadata, RegularChunkGrid from virtualizarr.manifests.array_api import ( MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS, _isnan, ) from virtualizarr.manifests.manifest import ChunkManifest -from virtualizarr.zarr import ZArray class ManifestArray: @@ -24,11 +24,11 @@ class ManifestArray: """ _manifest: ChunkManifest - _zarray: ZArray + _metadata: ArrayV3Metadata def __init__( self, - zarray: ZArray | dict, + metadata: ArrayV3Metadata | dict, chunkmanifest: dict | ChunkManifest, ) -> None: """ @@ -36,15 +36,20 @@ def __init__( Parameters ---------- - zarray : dict or ZArray + metadata : dict or ArrayV3Metadata chunkmanifest : dict or ChunkManifest """ - if isinstance(zarray, ZArray): - _zarray = zarray + if isinstance(metadata, ArrayV3Metadata): + _metadata = metadata else: # try unpacking the dict - _zarray = ZArray(**zarray) + _metadata = ArrayV3Metadata(**metadata) + + if not isinstance(_metadata.chunk_grid, RegularChunkGrid): + raise NotImplementedError( + f"Only RegularChunkGrid is currently supported for chunk size, but got type {type(_metadata.chunk_grid)}" + ) if isinstance(chunkmanifest, ChunkManifest): _chunkmanifest = chunkmanifest @@ -55,10 +60,10 @@ def __init__( f"chunkmanifest arg must be of type ChunkManifest or dict, but got type {type(chunkmanifest)}" ) - # TODO check that the zarray shape and chunkmanifest shape are consistent with one another + # TODO check that the metadata shape and chunkmanifest shape are consistent with one another # TODO also cover the special case of scalar arrays - self._zarray = _zarray + self._metadata = _metadata self._manifest = _chunkmanifest @property @@ -66,21 +71,27 @@ def manifest(self) -> ChunkManifest: return self._manifest @property - def zarray(self) -> ZArray: - return self._zarray + def metadata(self) -> ArrayV3Metadata: + return self._metadata @property def chunks(self) -> tuple[int, ...]: - return tuple(self.zarray.chunks) + """ + Individual chunk size by number of elements. + """ + return self._metadata.chunks @property def dtype(self) -> np.dtype: - dtype_str = self.zarray.dtype - return np.dtype(dtype_str) + dtype_str = self.metadata.data_type + return dtype_str.to_numpy() @property def shape(self) -> tuple[int, ...]: - return tuple(int(length) for length in list(self.zarray.shape)) + """ + Array shape by number of elements along each dimension. + """ + return self.metadata.shape @property def ndim(self) -> int: @@ -155,7 +166,7 @@ def __eq__( # type: ignore[override] if self.shape != other.shape: raise NotImplementedError("Unsure how to handle broadcasting like this") - if self.zarray != other.zarray: + if self.metadata != other.metadata: return np.full(shape=self.shape, fill_value=False, dtype=np.dtype(bool)) else: if self.manifest == other.manifest: @@ -263,7 +274,7 @@ def rename_paths( ChunkManifest.rename_paths """ renamed_manifest = self.manifest.rename_paths(new) - return ManifestArray(zarray=self.zarray, chunkmanifest=renamed_manifest) + return ManifestArray(metadata=self.metadata, chunkmanifest=renamed_manifest) def _possibly_expand_trailing_ellipsis(key, ndim: int): diff --git a/virtualizarr/manifests/array_api.py b/virtualizarr/manifests/array_api.py index 4950b48c..ffc0e95f 100644 --- a/virtualizarr/manifests/array_api.py +++ b/virtualizarr/manifests/array_api.py @@ -2,7 +2,7 @@ import numpy as np -from virtualizarr.zarr import determine_chunk_grid_shape +from virtualizarr.utils import determine_chunk_grid_shape from .manifest import ChunkManifest from .utils import ( @@ -10,6 +10,7 @@ check_same_ndims, check_same_shapes, check_same_shapes_except_on_concat_axis, + copy_and_replace_metadata, ) if TYPE_CHECKING: @@ -53,6 +54,7 @@ def concatenate( The signature of this function is array API compliant, so that it can be called by `xarray.concat`. """ + from .array import ManifestArray if axis is None: @@ -100,12 +102,11 @@ def concatenate( lengths=concatenated_lengths, ) - # chunk shape has not changed, there are just now more chunks along the concatenation axis - new_zarray = first_arr.zarray.replace( - shape=tuple(new_shape), + new_metadata = copy_and_replace_metadata( + old_metadata=first_arr.metadata, new_shape=new_shape ) - return ManifestArray(chunkmanifest=concatenated_manifest, zarray=new_zarray) + return ManifestArray(chunkmanifest=concatenated_manifest, metadata=new_metadata) @implements(np.stack) @@ -120,6 +121,7 @@ def stack( The signature of this function is array API compliant, so that it can be called by `xarray.stack`. """ + from .array import ManifestArray if not isinstance(axis, int): @@ -170,12 +172,11 @@ def stack( new_chunks = list(old_chunks) new_chunks.insert(axis, 1) - new_zarray = first_arr.zarray.replace( - chunks=tuple(new_chunks), - shape=tuple(new_shape), + new_metadata = copy_and_replace_metadata( + old_metadata=first_arr.metadata, new_shape=new_shape, new_chunks=new_chunks ) - return ManifestArray(chunkmanifest=stacked_manifest, zarray=new_zarray) + return ManifestArray(chunkmanifest=stacked_manifest, metadata=new_metadata) @implements(np.expand_dims) @@ -236,12 +237,13 @@ def broadcast_to(x: "ManifestArray", /, shape: tuple[int, ...]) -> "ManifestArra lengths=broadcasted_lengths, ) - new_zarray = x.zarray.replace( - chunks=new_chunk_shape, - shape=new_shape, + new_metadata = copy_and_replace_metadata( + old_metadata=x.metadata, + new_shape=list(new_shape), + new_chunks=list(new_chunk_shape), ) - return ManifestArray(chunkmanifest=broadcasted_manifest, zarray=new_zarray) + return ManifestArray(chunkmanifest=broadcasted_manifest, metadata=new_metadata) def _prepend_singleton_dimensions(shape: tuple[int, ...], ndim: int) -> tuple[int, ...]: diff --git a/virtualizarr/manifests/utils.py b/virtualizarr/manifests/utils.py index 07cf2baf..b4dbfb17 100644 --- a/virtualizarr/manifests/utils.py +++ b/virtualizarr/manifests/utils.py @@ -1,15 +1,66 @@ -from typing import TYPE_CHECKING, Any, Iterable, Union +from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union import numpy as np +from zarr import Array +from zarr.core.metadata.v3 import ArrayV3Metadata -from virtualizarr.codecs import get_codecs +from virtualizarr.codecs import convert_to_codec_pipeline, get_codecs if TYPE_CHECKING: - from zarr import Array # type: ignore - from .array import ManifestArray +def create_v3_array_metadata( + shape: tuple[int, ...], + data_type: np.dtype, + chunk_shape: tuple[int, ...], + fill_value: Any = None, + codecs: Optional[list[Dict[str, Any]]] = None, + attributes: Optional[Dict[str, Any]] = None, +) -> ArrayV3Metadata: + """ + Create an ArrayV3Metadata instance with standard configuration. + This function encapsulates common patterns used across different readers. + + Parameters + ---------- + shape : tuple[int, ...] + The shape of the array + data_type : np.dtype + The numpy dtype of the array + chunk_shape : tuple[int, ...] + The shape of each chunk + fill_value : Any, optional + The fill value for the array + codecs : list[Dict[str, Any]], optional + List of codec configurations + attributes : Dict[str, Any], optional + Additional attributes for the array + + Returns + ------- + ArrayV3Metadata + A configured ArrayV3Metadata instance with standard defaults + """ + return ArrayV3Metadata( + shape=shape, + data_type=data_type, + chunk_grid={ + "name": "regular", + "configuration": {"chunk_shape": chunk_shape}, + }, + chunk_key_encoding={"name": "default"}, + fill_value=fill_value, + codecs=convert_to_codec_pipeline( + codecs=codecs or [], + dtype=data_type, + ), + attributes=attributes or {}, + dimension_names=None, + storage_transformers=None, + ) + + def check_same_dtypes(dtypes: list[np.dtype]) -> None: """Check all the dtypes are the same""" @@ -116,3 +167,23 @@ def check_compatible_arrays( check_same_ndims([ma.ndim, existing_array.ndim]) arr_shapes = [ma.shape, existing_array.shape] check_same_shapes_except_on_concat_axis(arr_shapes, append_axis) + + +def copy_and_replace_metadata( + old_metadata: ArrayV3Metadata, + new_shape: list[int] | None = None, + new_chunks: list[int] | None = None, +) -> ArrayV3Metadata: + """ + Update metadata to reflect a new shape and/or chunk shape. + """ + metadata_copy = old_metadata.to_dict().copy() + metadata_copy["shape"] = new_shape # type: ignore[assignment] + if new_chunks is not None: + metadata_copy["chunk_grid"] = { + "name": "regular", + "configuration": {"chunk_shape": tuple(new_chunks)}, + } + # ArrayV3Metadata.from_dict removes extra keys zarr_format and node_type + new_metadata = ArrayV3Metadata.from_dict(metadata_copy) + return new_metadata diff --git a/virtualizarr/readers/dmrpp.py b/virtualizarr/readers/dmrpp.py index 74e097ad..b5d2b020 100644 --- a/virtualizarr/readers/dmrpp.py +++ b/virtualizarr/readers/dmrpp.py @@ -9,10 +9,10 @@ from virtualizarr.manifests import ChunkManifest, ManifestArray from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri +from virtualizarr.manifests.utils import create_v3_array_metadata from virtualizarr.readers.common import VirtualBackend from virtualizarr.types import ChunkKey from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions -from virtualizarr.zarr import ZArray class DMRPPVirtualBackend(VirtualBackend): @@ -378,6 +378,7 @@ def _parse_variable(self, var_tag: ET.Element) -> Variable: ------- xr.Variable """ + # Dimension info dims: dict[str, int] = {} dimension_tags = self._find_dimension_tags(var_tag) @@ -390,7 +391,6 @@ def _parse_variable(self, var_tag: ET.Element) -> Variable: self._DAP_NP_DTYPE[var_tag.tag.removeprefix("{" + self._NS["dap"] + "}")] ) # Chunks and Filters - filters = None shape: tuple[int, ...] = tuple(dims.values()) chunks_shape = shape chunks_tag = var_tag.find("dmrpp:chunks", self._NS) @@ -406,7 +406,7 @@ def _parse_variable(self, var_tag: ET.Element) -> Variable: chunks_shape = shape chunkmanifest = self._parse_chunks(chunks_tag, chunks_shape) # Filters - filters = self._parse_filters(chunks_tag, dtype) + codecs = self._parse_filters(chunks_tag, dtype) # Attributes attrs: dict[str, Any] = {} for attr_tag in var_tag.iterfind("dap:Attribute", self._NS): @@ -414,16 +414,15 @@ def _parse_variable(self, var_tag: ET.Element) -> Variable: # Fill value is placed in zarr array's fill_value and variable encoding and removed from attributes encoding = {k: attrs.get(k) for k in self._ENCODING_KEYS if k in attrs} fill_value = attrs.pop("_FillValue", None) - # create ManifestArray and ZArray - zarray = ZArray( - chunks=chunks_shape, - dtype=dtype, - fill_value=fill_value, - filters=filters, - order="C", + # create ManifestArray + metadata = create_v3_array_metadata( shape=shape, + data_type=dtype, + chunk_shape=chunks_shape, + fill_value=fill_value, + codecs=codecs, ) - marr = ManifestArray(zarray=zarray, chunkmanifest=chunkmanifest) + marr = ManifestArray(metadata=metadata, chunkmanifest=chunkmanifest) return Variable(dims=dims.keys(), data=marr, attrs=attrs, encoding=encoding) def _parse_attribute(self, attr_tag: ET.Element) -> dict[str, Any]: @@ -490,16 +489,23 @@ def _parse_filters( compression_types = chunks_tag.attrib["compressionType"].split(" ") for c in compression_types: if c == "shuffle": - filters.append({"id": "shuffle", "elementsize": dtype.itemsize}) + filters.append( + { + "name": "numcodecs.shuffle", + "configuration": {"elementsize": dtype.itemsize}, + } + ) elif c == "deflate": filters.append( { - "id": "zlib", - "level": int( - chunks_tag.attrib.get( - "deflateLevel", self._DEFAULT_ZLIB_VALUE - ) - ), + "name": "numcodecs.zlib", + "configuration": { + "level": int( + chunks_tag.attrib.get( + "deflateLevel", self._DEFAULT_ZLIB_VALUE + ) + ), + }, } ) return filters diff --git a/virtualizarr/readers/hdf/hdf.py b/virtualizarr/readers/hdf/hdf.py index 2b45cd9a..9fc30c0a 100644 --- a/virtualizarr/readers/hdf/hdf.py +++ b/virtualizarr/readers/hdf/hdf.py @@ -14,12 +14,14 @@ import numpy as np import xarray as xr +from virtualizarr.codecs import numcodec_config_to_configurable from virtualizarr.manifests import ( ChunkEntry, ChunkManifest, ManifestArray, ) from virtualizarr.manifests.manifest import validate_and_normalize_path_to_uri +from virtualizarr.manifests.utils import create_v3_array_metadata from virtualizarr.readers.common import ( VirtualBackend, construct_virtual_dataset, @@ -28,7 +30,6 @@ from virtualizarr.readers.hdf.filters import cfcodec_from_dataset, codecs_from_dataset from virtualizarr.types import ChunkKey from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions, soft_import -from virtualizarr.zarr import ZArray h5py = soft_import("h5py", "For reading hdf files", strict=False) @@ -283,9 +284,6 @@ def _dataset_to_variable( list: xarray.Variable A list of xarray variables. """ - # This chunk determination logic mirrors zarr-python's create - # https://github.com/zarr-developers/zarr-python/blob/main/zarr/creation.py#L62-L66 - chunks = dataset.chunks if dataset.chunks else dataset.shape codecs = codecs_from_dataset(dataset) cfcodec = cfcodec_from_dataset(dataset) @@ -305,21 +303,20 @@ def _dataset_to_variable( fill_value = float("nan") if isinstance(fill_value, np.generic): fill_value = fill_value.item() - filters = [codec.get_config() for codec in codecs] - zarray = ZArray( - chunks=chunks, # type: ignore - compressor=None, - dtype=dtype, - fill_value=fill_value, - filters=filters, - order="C", + codec_configs = [ + numcodec_config_to_configurable(codec.get_config()) for codec in codecs + ] + metadata = create_v3_array_metadata( shape=dataset.shape, - zarr_format=2, + data_type=dtype, + chunk_shape=chunks, + fill_value=fill_value, + codecs=codec_configs, ) dims = HDFVirtualBackend._dataset_dims(dataset, group=group) manifest = HDFVirtualBackend._dataset_chunk_manifest(path, dataset) if manifest: - marray = ManifestArray(zarray=zarray, chunkmanifest=manifest) + marray = ManifestArray(metadata=metadata, chunkmanifest=manifest) variable = xr.Variable(data=marray, dims=dims, attrs=attrs) else: variable = xr.Variable(data=np.empty(dataset.shape), dims=dims, attrs=attrs) diff --git a/virtualizarr/tests/__init__.py b/virtualizarr/tests/__init__.py index 258a9112..09d36d3d 100644 --- a/virtualizarr/tests/__init__.py +++ b/virtualizarr/tests/__init__.py @@ -1,15 +1,10 @@ import importlib -import itertools -import numpy as np import pytest from packaging.version import Version -from virtualizarr.manifests import ChunkManifest, ManifestArray -from virtualizarr.manifests.manifest import join from virtualizarr.readers import HDF5VirtualBackend from virtualizarr.readers.hdf import HDFVirtualBackend -from virtualizarr.zarr import ZArray, ceildiv requires_network = pytest.mark.network @@ -44,66 +39,8 @@ def _importorskip( has_imagecodecs, requires_imagecodecs = _importorskip("imagecodecs") has_hdf5plugin, requires_hdf5plugin = _importorskip("hdf5plugin") has_zarr_python, requires_zarr_python = _importorskip("zarr") -has_zarr_python_v3, requires_zarr_python_v3 = _importorskip("zarr", "3.0.0b") parametrize_over_hdf_backends = pytest.mark.parametrize( "hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend] if has_kerchunk else [HDFVirtualBackend], ) - - -def create_manifestarray( - shape: tuple[int, ...], chunks: tuple[int, ...] -) -> ManifestArray: - """ - Create an example ManifestArray with sensible defaults. - - The manifest is populated with a (somewhat) unique path, offset, and length for each key. - """ - - zarray = ZArray( - chunks=chunks, - compressor={"id": "blosc", "clevel": 5, "cname": "lz4", "shuffle": 1}, - dtype=np.dtype("float32"), - fill_value=0.0, - filters=None, - order="C", - shape=shape, - zarr_format=2, - ) - - chunk_grid_shape = tuple( - ceildiv(axis_length, chunk_length) - for axis_length, chunk_length in zip(shape, chunks) - ) - - if chunk_grid_shape == (): - d = {"0": entry_from_chunk_key((0,))} - else: - # create every possible combination of keys - all_possible_combos = itertools.product( - *[range(length) for length in chunk_grid_shape] - ) - d = {join(ind): entry_from_chunk_key(ind) for ind in all_possible_combos} - - chunkmanifest = ChunkManifest(entries=d) - - return ManifestArray(chunkmanifest=chunkmanifest, zarray=zarray) - - -def entry_from_chunk_key(ind: tuple[int, ...]) -> dict[str, str | int]: - """Generate a (somewhat) unique manifest entry from a given chunk key""" - entry = { - "path": f"/foo.{str(join(ind))}.nc", - "offset": offset_from_chunk_key(ind), - "length": length_from_chunk_key(ind), - } - return entry # type: ignore[return-value] - - -def offset_from_chunk_key(ind: tuple[int, ...]) -> int: - return sum(ind) * 10 - - -def length_from_chunk_key(ind: tuple[int, ...]) -> int: - return sum(ind) + 5 diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index 1b27b0f2..7308caea 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -230,9 +230,12 @@ class TestReadFromURL: "grib", "https://github.com/pydata/xarray-data/raw/master/era5-2mt-2019-03-uk.grib", ), - ( + pytest.param( "netcdf3", "https://github.com/pydata/xarray-data/raw/master/air_temperature.nc", + marks=pytest.mark.xfail( + reason="Big endian not yet supported by zarr-python 3.0" + ), # https://github.com/zarr-developers/zarr-python/issues/2324 ), ( "netcdf4", @@ -258,9 +261,14 @@ class TestReadFromURL: pytest.param( "fits", "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits", - marks=pytest.mark.skipif( - not has_astropy, reason="package astropy is not available" - ), + marks=[ + pytest.mark.skipif( + not has_astropy, reason="package astropy is not available" + ), + pytest.mark.xfail( + reason="Big endian not yet supported by zarr-python 3.0" + ), # https://github.com/zarr-developers/zarr-python/issues/2324 + ], ), ( "jpg", diff --git a/virtualizarr/tests/test_codecs.py b/virtualizarr/tests/test_codecs.py index 72f2a856..d05ea863 100644 --- a/virtualizarr/tests/test_codecs.py +++ b/virtualizarr/tests/test_codecs.py @@ -1,161 +1,131 @@ -from unittest.mock import patch - import numpy as np import pytest -from numcodecs import Blosc, Delta - -from virtualizarr import ChunkManifest, ManifestArray -from virtualizarr.codecs import get_codecs -from virtualizarr.tests import ( - requires_zarr_python, - requires_zarr_python_v3, +from zarr.codecs import BytesCodec +from zarr.core.codec_pipeline import BatchedCodecPipeline +from zarr.registry import get_codec_class + +from conftest import ( + ARRAYBYTES_CODEC, + BLOSC_CODEC, + DELTA_CODEC, + ZLIB_CODEC, ) -from virtualizarr.zarr import Codec +from virtualizarr.codecs import convert_to_codec_pipeline, get_codecs class TestCodecs: - def create_manifest_array(self, compressor=None, filters=None, zarr_format=2): - return ManifestArray( - chunkmanifest=ChunkManifest( - entries={"0.0": dict(path="/test.nc", offset=6144, length=48)} - ), - zarray=dict( - shape=(2, 3), - dtype=np.dtype("i4"), shape=(3,), chunks=(3,)) - expected_ma = ManifestArray(chunkmanifest=expected_manifest, zarray=expected_zarray) + metadata = array_v3_metadata(shape=(3,), chunks=(3,)) + expected_ma = ManifestArray(chunkmanifest=expected_manifest, metadata=metadata) expected_vds = xr.Dataset({"foo": xr.Variable(data=expected_ma, dims=["x"])}) xrt.assert_identical(vds, expected_vds) diff --git a/virtualizarr/tests/test_writers/conftest.py b/virtualizarr/tests/test_writers/conftest.py index 28c5b3db..1a8ffdbd 100644 --- a/virtualizarr/tests/test_writers/conftest.py +++ b/virtualizarr/tests/test_writers/conftest.py @@ -3,24 +3,22 @@ from xarray import Dataset from xarray.core.variable import Variable +from conftest import ARRAYBYTES_CODEC, ZLIB_CODEC from virtualizarr.manifests import ChunkManifest, ManifestArray @pytest.fixture -def vds_with_manifest_arrays() -> Dataset: +def vds_with_manifest_arrays(array_v3_metadata) -> Dataset: arr = ManifestArray( chunkmanifest=ChunkManifest( entries={"0.0": dict(path="/test.nc", offset=6144, length=48)} ), - zarray=dict( + metadata=array_v3_metadata( shape=(2, 3), - dtype=np.dtype(" ChunkManifest: - chunk_dict = {} - num_chunks = [shape[i] // chunks[i] for i in range(len(shape))] - offset = offset - - # Generate all possible chunk indices using Cartesian product - for chunk_indices in product(*[range(n) for n in num_chunks]): - chunk_index = ".".join(map(str, chunk_indices)) - chunk_dict[chunk_index] = { - "path": netcdf4_file, - "offset": offset, - "length": length, - } - offset += length # Increase offset for each chunk - - return ChunkManifest(chunk_dict) - - -def gen_virtual_variable( - file_uri: str, - shape: tuple[int, ...] = (3, 4), - chunk_shape: tuple[int, ...] = (3, 4), - dtype: np.dtype = np.dtype("int32"), - compressor: Optional[dict] = None, - filters: Optional[list[dict[Any, Any]]] = None, - fill_value: Optional[str] = None, - encoding: Optional[dict] = None, - offset: int = 6144, - length: int = 48, - dims: list[str] = [], - zarr_format: Literal[2, 3] = 2, - attrs: dict[str, Any] = {}, -) -> xr.Variable: - manifest = generate_chunk_manifest( - file_uri, - shape=shape, - chunks=chunk_shape, - offset=offset, - length=length, - ) - zarray = ZArray( - shape=shape, - chunks=chunk_shape, - dtype=dtype, - compressor=compressor, - filters=filters, - fill_value=fill_value, - zarr_format=zarr_format, - ) - ma = ManifestArray(chunkmanifest=manifest, zarray=zarray) - return xr.Variable( - data=ma, - dims=dims, - encoding=encoding, - attrs=attrs, - ) - - -def gen_virtual_dataset( - file_uri: str, - shape: tuple[int, ...] = (3, 4), - chunk_shape: tuple[int, ...] = (3, 4), - dtype: np.dtype = np.dtype("int32"), - compressor: Optional[dict] = None, - filters: Optional[list[dict[Any, Any]]] = None, - fill_value: Optional[str] = None, - encoding: Optional[dict] = None, - variable_name: str = "foo", - offset: int = 6144, - length: int = 48, - dims: Optional[list[str]] = None, - zarr_format: Literal[2, 3] = 2, - coords: Optional[xr.Coordinates] = None, -) -> xr.Dataset: - with xr.open_dataset(file_uri) as ds: - var = gen_virtual_variable( - file_uri, - shape=shape, - chunk_shape=chunk_shape, - dtype=dtype, - compressor=compressor, - filters=filters, - fill_value=fill_value, - encoding=encoding, - offset=offset, - length=length, - dims=dims or [str(name) for name in ds.dims], - zarr_format=zarr_format, - attrs=ds[variable_name].attrs, - ) - - return xr.Dataset( - {variable_name: var}, - coords=coords, - attrs=ds.attrs, - ) - - class TestAppend: """ Tests for appending to existing icechunk store. @@ -546,12 +430,15 @@ class TestAppend: # Success cases ## When appending to a single virtual ref without encoding, it succeeds def test_append_virtual_ref_without_encoding( - self, icechunk_repo: "Repository", simple_netcdf4: str + self, + icechunk_repo: "Repository", + simple_netcdf4: str, + virtual_dataset: Callable, ): import xarray.testing as xrt # generate virtual dataset - vds = gen_virtual_dataset(file_uri=simple_netcdf4) + vds = virtual_dataset(file_uri=simple_netcdf4) # Commit the first virtual dataset writable_session = icechunk_repo.writable_session("main") vds.virtualize.to_icechunk(writable_session.store) @@ -579,16 +466,18 @@ def test_append_virtual_ref_without_encoding( xrt.assert_identical(array, expected_array) def test_append_virtual_ref_with_encoding( - self, icechunk_repo: "Repository", netcdf4_files_factory: Callable + self, + icechunk_repo: "Repository", + netcdf4_files_factory: Callable, + virtual_dataset: Callable, ): import xarray.testing as xrt scale_factor = 0.01 encoding = {"air": {"scale_factor": scale_factor}} filepath1, filepath2 = netcdf4_files_factory(encoding=encoding) - vds1, vds2 = ( - gen_virtual_dataset( + virtual_dataset( file_uri=filepath1, shape=(1460, 25, 53), chunk_shape=(1460, 25, 53), @@ -599,7 +488,7 @@ def test_append_virtual_ref_with_encoding( offset=15419, length=15476000, ), - gen_virtual_dataset( + virtual_dataset( file_uri=filepath2, shape=(1460, 25, 53), chunk_shape=(1460, 25, 53), @@ -639,7 +528,11 @@ def test_append_virtual_ref_with_encoding( ## When appending to a virtual ref with encoding, it succeeds @pytest.mark.asyncio async def test_append_with_multiple_root_arrays( - self, icechunk_repo: "Repository", netcdf4_files_factory: Callable + self, + icechunk_repo: "Repository", + netcdf4_files_factory: Callable, + virtual_variable: Callable, + virtual_dataset: Callable, ): import xarray.testing as xrt from zarr.core.buffer import default_buffer_prototype @@ -648,7 +541,7 @@ async def test_append_with_multiple_root_arrays( encoding={"air": {"dtype": "float64", "chunksizes": (1460, 25, 53)}} ) - lon_manifest = gen_virtual_variable( + lon_manifest = virtual_variable( filepath1, shape=(53,), chunk_shape=(53,), @@ -657,7 +550,7 @@ async def test_append_with_multiple_root_arrays( length=212, dims=["lon"], ) - lat_manifest = gen_virtual_variable( + lat_manifest = virtual_variable( filepath1, shape=(25,), chunk_shape=(25,), @@ -673,7 +566,7 @@ async def test_append_with_multiple_root_arrays( "calendar": "standard", } time_manifest1, time_manifest2 = [ - gen_virtual_variable( + virtual_variable( filepath, shape=(1460,), chunk_shape=(1460,), @@ -694,7 +587,7 @@ async def test_append_with_multiple_root_arrays( for time_manifest in [time_manifest1, time_manifest2] ] vds1, vds2 = ( - gen_virtual_dataset( + virtual_dataset( file_uri=filepath1, shape=(1460, 25, 53), chunk_shape=(1460, 25, 53), @@ -705,7 +598,7 @@ async def test_append_with_multiple_root_arrays( length=15476000, coords=coords1, ), - gen_virtual_dataset( + virtual_dataset( file_uri=filepath2, shape=(1460, 25, 53), chunk_shape=(1460, 25, 53), @@ -748,12 +641,11 @@ async def test_append_with_multiple_root_arrays( xrt.assert_equal(ds, expected_ds) # When appending to a virtual ref with compression, it succeeds - @pytest.mark.parametrize("zarr_format", [2, 3]) def test_append_with_compression_succeeds( self, icechunk_repo: "Repository", netcdf4_files_factory: Callable, - zarr_format: Literal[2, 3], + virtual_dataset: Callable, ): import xarray.testing as xrt @@ -768,29 +660,33 @@ def test_append_with_compression_succeeds( file1, file2 = netcdf4_files_factory(encoding=encoding) # Generate compressed dataset vds1, vds2 = ( - gen_virtual_dataset( + virtual_dataset( file_uri=file1, shape=(1460, 25, 53), chunk_shape=(1460, 25, 53), - compressor={"id": "zlib", "level": 4}, + codecs=[ + {"name": "bytes", "configuration": {"endian": "little"}}, + {"name": "numcodecs.zlib", "configuration": {"level": 4}}, + ], dims=["time", "lat", "lon"], dtype=np.dtype("float64"), variable_name="air", offset=18043, length=3936114, - zarr_format=zarr_format, ), - gen_virtual_dataset( + virtual_dataset( file_uri=file2, shape=(1460, 25, 53), chunk_shape=(1460, 25, 53), - compressor={"id": "zlib", "level": 4}, + codecs=[ + {"name": "bytes", "configuration": {"endian": "little"}}, + {"name": "numcodecs.zlib", "configuration": {"level": 4}}, + ], dims=["time", "lat", "lon"], dtype=np.dtype("float64"), variable_name="air", offset=18043, length=3938672, - zarr_format=zarr_format, ), ) @@ -816,10 +712,13 @@ def test_append_with_compression_succeeds( ## When chunk shapes are different it fails def test_append_with_different_chunking_fails( - self, icechunk_repo: "Repository", simple_netcdf4: str + self, + icechunk_repo: "Repository", + simple_netcdf4: str, + virtual_dataset: Callable, ): # Generate a virtual dataset with specific chunking - vds = gen_virtual_dataset(file_uri=simple_netcdf4, chunk_shape=(3, 4)) + vds = virtual_dataset(file_uri=simple_netcdf4, chunk_shape=(3, 4)) # Commit the dataset icechunk_filestore = icechunk_repo.writable_session("main") @@ -827,7 +726,7 @@ def test_append_with_different_chunking_fails( icechunk_filestore.commit("test commit") # Try to append dataset with different chunking, expect failure - vds_different_chunking = gen_virtual_dataset( + vds_different_chunking = virtual_dataset( file_uri=simple_netcdf4, chunk_shape=(1, 1) ) icechunk_filestore_append = icechunk_repo.writable_session("main") @@ -840,15 +739,14 @@ def test_append_with_different_chunking_fails( ## When encoding is different it fails def test_append_with_different_encoding_fails( - self, icechunk_repo: "Repository", simple_netcdf4: str + self, + icechunk_repo: "Repository", + simple_netcdf4: str, + virtual_dataset: Callable, ): # Generate datasets with different encoding - vds1 = gen_virtual_dataset( - file_uri=simple_netcdf4, encoding={"scale_factor": 0.1} - ) - vds2 = gen_virtual_dataset( - file_uri=simple_netcdf4, encoding={"scale_factor": 0.01} - ) + vds1 = virtual_dataset(file_uri=simple_netcdf4, encoding={"scale_factor": 0.1}) + vds2 = virtual_dataset(file_uri=simple_netcdf4, encoding={"scale_factor": 0.01}) # Commit the first dataset icechunk_filestore = icechunk_repo.writable_session("main") @@ -864,15 +762,18 @@ def test_append_with_different_encoding_fails( vds2.virtualize.to_icechunk(icechunk_filestore_append.store, append_dim="x") def test_dimensions_do_not_align( - self, icechunk_repo: "Repository", simple_netcdf4: str + self, + icechunk_repo: "Repository", + simple_netcdf4: str, + virtual_dataset: Callable, ): # Generate datasets with different lengths on the non-append dimension (x) - vds1 = gen_virtual_dataset( + vds1 = virtual_dataset( # {'x': 5, 'y': 4} file_uri=simple_netcdf4, shape=(5, 4), ) - vds2 = gen_virtual_dataset( + vds2 = virtual_dataset( # {'x': 6, 'y': 4} file_uri=simple_netcdf4, shape=(6, 4), @@ -889,12 +790,15 @@ def test_dimensions_do_not_align( vds2.virtualize.to_icechunk(icechunk_filestore_append.store, append_dim="y") def test_append_dim_not_in_dims_raises_error( - self, icechunk_repo: "Repository", simple_netcdf4: str + self, + icechunk_repo: "Repository", + simple_netcdf4: str, + virtual_dataset: Callable, ): """ Test that attempting to append with an append_dim not present in dims raises a ValueError. """ - vds = gen_virtual_dataset( + vds = virtual_dataset( file_uri=simple_netcdf4, shape=(5, 4), chunk_shape=(5, 4), dims=["x", "y"] ) diff --git a/virtualizarr/tests/test_writers/test_kerchunk.py b/virtualizarr/tests/test_writers/test_kerchunk.py index 1e9b240c..20620a5d 100644 --- a/virtualizarr/tests/test_writers/test_kerchunk.py +++ b/virtualizarr/tests/test_writers/test_kerchunk.py @@ -1,27 +1,27 @@ import numpy as np import pandas as pd from xarray import Dataset +from zarr.core.metadata.v2 import ArrayV2Metadata from virtualizarr.manifests import ChunkManifest, ManifestArray from virtualizarr.tests import requires_fastparquet, requires_kerchunk +from virtualizarr.writers.kerchunk import convert_v3_to_v2_metadata @requires_kerchunk class TestAccessor: - def test_accessor_to_kerchunk_dict(self): + def test_accessor_to_kerchunk_dict(self, array_v3_metadata): manifest = ChunkManifest( entries={"0.0": dict(path="file:///test.nc", offset=6144, length=48)} ) arr = ManifestArray( chunkmanifest=manifest, - zarray=dict( + metadata=array_v3_metadata( shape=(2, 3), - dtype=np.dtype(" str: + """Convert V2 metadata to kerchunk JSON format.""" + import json + + from virtualizarr.writers.kerchunk import NumpyEncoder + + zarray_dict: dict[str, JSON] = v2_metadata.to_dict() + if v2_metadata.filters: + zarray_dict["filters"] = [ + # we could also cast to json, but get_config is intended for serialization + codec.get_config() + for codec in v2_metadata.filters + if codec is not None + ] # type: ignore[assignment] + if v2_metadata.compressor: + zarray_dict["compressor"] = v2_metadata.compressor.get_config() # type: ignore[assignment] + + return json.dumps(zarray_dict, separators=(",", ":"), cls=NumpyEncoder) + + +def from_kerchunk_refs(decoded_arr_refs_zarray) -> "ArrayV3Metadata": + """ + Convert a decoded zarr array (.zarray) reference to an ArrayV3Metadata object. + This function processes the given decoded Zarr array reference dictionary, + to construct and return an ArrayV3Metadata object based on the provided information. + + Parameters: + ---------- + decoded_arr_refs_zarray : dict + A dictionary containing the decoded Zarr array reference information. + Expected keys include "dtype", "fill_value", "zarr_format", "filters", + "compressor", "chunks", and "shape". + Returns: + ------- + ArrayV3Metadata + + Raises: + ------ + ValueError + If the Zarr format specified in the input dictionary is not 2 or 3. + """ + # coerce type of fill_value as kerchunk can be inconsistent with this + dtype = np.dtype(decoded_arr_refs_zarray["dtype"]) + fill_value = decoded_arr_refs_zarray["fill_value"] + if np.issubdtype(dtype, np.floating) and ( + fill_value is None or fill_value == "NaN" or fill_value == "nan" + ): + fill_value = np.nan + + zarr_format = int(decoded_arr_refs_zarray["zarr_format"]) + if zarr_format not in (2, 3): + raise ValueError(f"Zarr format must be 2 or 3, but got {zarr_format}") + filters = ( + decoded_arr_refs_zarray.get("filters", []) or [] + ) # Ensure filters is a list + compressor = decoded_arr_refs_zarray.get("compressor") # Might be None + + # Ensure compressor is a list before unpacking + codec_configs = [*filters, *(compressor if compressor is not None else [])] + numcodec_configs = [ + numcodec_config_to_configurable(config) for config in codec_configs + ] + return create_v3_array_metadata( + chunk_shape=tuple(decoded_arr_refs_zarray["chunks"]), + data_type=dtype, + codecs=numcodec_configs, + fill_value=fill_value, + shape=tuple(decoded_arr_refs_zarray["shape"]), + ) def virtual_vars_and_metadata_from_kerchunk_refs( @@ -162,23 +241,27 @@ def variable_from_kerchunk_refs( """Create a single xarray Variable by reading specific keys of a kerchunk references dict.""" arr_refs = extract_array_refs(refs, var_name) - chunk_dict, zarray, zattrs = parse_array_refs(arr_refs) + chunk_dict, metadata, zattrs = parse_array_refs(arr_refs) # we want to remove the _ARRAY_DIMENSIONS from the final variables' .attrs dims = zattrs.pop("_ARRAY_DIMENSIONS") if chunk_dict: manifest = manifest_from_kerchunk_chunk_dict(chunk_dict, fs_root=fs_root) - varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) - elif len(zarray.shape) != 0: + varr = virtual_array_class(metadata=metadata, chunkmanifest=manifest) + elif len(metadata.shape) != 0: # empty variables don't have physical chunks, but zarray shows that the variable # is at least 1D - shape = determine_chunk_grid_shape(zarray.shape, zarray.chunks) + + shape = determine_chunk_grid_shape( + metadata.shape, + metadata.chunks, + ) manifest = ChunkManifest(entries={}, shape=shape) - varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest) + varr = virtual_array_class(metadata=metadata, chunkmanifest=manifest) else: # This means we encountered a scalar variable of dimension 0, # very likely that it actually has no numeric value and its only purpose # is to communicate dataset attributes. - varr = zarray.fill_value + varr = metadata.fill_value return Variable(data=varr, dims=dims, attrs=zattrs) @@ -263,12 +346,12 @@ def extract_array_refs( def parse_array_refs( arr_refs: KerchunkArrRefs, -) -> tuple[dict, ZArray, ZAttrs]: - zarray = ZArray.from_kerchunk_refs(arr_refs.pop(".zarray")) +) -> tuple[dict, ArrayV3Metadata, dict[str, JSON]]: + metadata = from_kerchunk_refs(arr_refs.pop(".zarray")) zattrs = arr_refs.pop(".zattrs", {}) chunk_dict = arr_refs - return chunk_dict, zarray, zattrs + return chunk_dict, metadata, zattrs def fully_decode_arr_refs(d: dict) -> KerchunkArrRefs: diff --git a/virtualizarr/utils.py b/virtualizarr/utils.py index b5ae3447..4f6c4773 100644 --- a/virtualizarr/utils.py +++ b/virtualizarr/utils.py @@ -100,3 +100,19 @@ def soft_import(name: str, reason: str, strict: Optional[bool] = True): ) else: return None + + +def ceildiv(a: int, b: int) -> int: + """ + Ceiling division operator for integers. + + See https://stackoverflow.com/questions/14822184/is-there-a-ceiling-equivalent-of-operator-in-python + """ + return -(a // -b) + + +def determine_chunk_grid_shape( + shape: tuple[int, ...], chunks: tuple[int, ...] +) -> tuple[int, ...]: + """Calculate the shape of the chunk grid based on array shape and chunk size.""" + return tuple(ceildiv(length, chunksize) for length, chunksize in zip(shape, chunks)) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index b5ec09a0..1c044465 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -16,7 +16,6 @@ check_same_ndims, check_same_shapes_except_on_concat_axis, ) -from virtualizarr.zarr import encode_dtype if TYPE_CHECKING: from icechunk import IcechunkStore # type: ignore[import-not-found] @@ -194,7 +193,7 @@ def check_compatible_arrays( ): arrays: List[Union[ManifestArray, Array]] = [ma, existing_array] check_same_dtypes([arr.dtype for arr in arrays]) - check_same_codecs([get_codecs(arr, normalize_to_zarr_v3=True) for arr in arrays]) + check_same_codecs([get_codecs(arr) for arr in arrays]) check_same_chunk_shapes([arr.chunks for arr in arrays]) check_same_ndims([ma.ndim, existing_array.ndim]) arr_shapes = [ma.shape, existing_array.shape] @@ -212,8 +211,10 @@ def write_virtual_variable_to_icechunk( """Write a single virtual variable into an icechunk store""" from zarr import Array + from virtualizarr.codecs import extract_codecs + ma = cast(ManifestArray, var.data) - zarray = ma.zarray + metadata = ma.metadata dims: list[str] = cast(list[str], list(var.dims)) existing_num_chunks = 0 @@ -242,21 +243,17 @@ def write_virtual_variable_to_icechunk( ) else: append_axis = None - - # Get the codecs and convert them to zarr v3 format - codecs = zarray._v3_codecs() - - # create array if it doesn't already exist + # TODO: Should codecs be an argument to zarr's AsyncrGroup.create_array? + filters, _, compressors = extract_codecs(metadata.codecs) arr = group.require_array( name=name, - shape=zarray.shape, - chunks=zarray.chunks, - dtype=encode_dtype(zarray.dtype), - compressors=codecs.compressors, - serializer=codecs.serializer, - filters=codecs.filters, + shape=metadata.shape, + chunks=metadata.chunks, + dtype=metadata.data_type.to_numpy(), + filters=filters, + compressors=compressors, dimension_names=var.dims, - fill_value=zarray.fill_value, + fill_value=metadata.fill_value, ) arr.update_attributes( diff --git a/virtualizarr/writers/kerchunk.py b/virtualizarr/writers/kerchunk.py index b6d302b2..520c7ccb 100644 --- a/virtualizarr/writers/kerchunk.py +++ b/virtualizarr/writers/kerchunk.py @@ -1,19 +1,31 @@ import base64 import json -from typing import cast +from typing import Any, cast import numpy as np from xarray import Dataset from xarray.coding.times import CFDatetimeCoder from xarray.core.variable import Variable +from zarr.abc.codec import ArrayArrayCodec, BytesBytesCodec +from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata +from virtualizarr.codecs import extract_codecs, get_codec_config from virtualizarr.manifests.manifest import join from virtualizarr.types.kerchunk import KerchunkArrRefs, KerchunkStoreRefs -from virtualizarr.zarr import ZArray class NumpyEncoder(json.JSONEncoder): - # TODO I don't understand how kerchunk gets around this problem of encoding numpy types (in the zattrs) whilst only using ujson + """JSON encoder that handles common scientific Python types found in attributes. + + This encoder converts various Python types to JSON-serializable formats: + - NumPy arrays and scalars to Python lists and native types + - NumPy dtypes to strings + - Sets to lists + - Other objects that implement __array__ to lists + - Objects with to_dict method (like pandas objects) + - Objects with __str__ method as fallback + """ + def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() # Convert NumPy array to Python list @@ -21,7 +33,19 @@ def default(self, obj): return obj.item() # Convert NumPy scalar to Python scalar elif isinstance(obj, np.dtype): return str(obj) - return json.JSONEncoder.default(self, obj) + elif isinstance(obj, set): + return list(obj) # Convert sets to lists + elif hasattr(obj, "__array__"): + return np.asarray(obj).tolist() # Handle array-like objects + elif hasattr(obj, "to_dict"): + return obj.to_dict() # Handle objects with to_dict method + + try: + return json.JSONEncoder.default(self, obj) + except TypeError: + if hasattr(obj, "__str__"): + return str(obj) + raise def dataset_to_kerchunk_refs(ds: Dataset) -> KerchunkStoreRefs: @@ -38,7 +62,6 @@ def dataset_to_kerchunk_refs(ds: Dataset) -> KerchunkStoreRefs: prepended_with_var_name = { f"{var_name}/{key}": val for key, val in arr_refs.items() } - all_arr_refs.update(prepended_with_var_name) zattrs = ds.attrs @@ -67,6 +90,57 @@ def remove_file_uri_prefix(path: str): return path +def convert_v3_to_v2_metadata( + v3_metadata: ArrayV3Metadata, fill_value: Any = None +) -> ArrayV2Metadata: + """ + Convert ArrayV3Metadata to ArrayV2Metadata. + + Parameters + ---------- + v3_metadata : ArrayV3Metadata + The metadata object in v3 format. + fill_value : Any, optional + Override the fill value from v3 metadata. + + Returns + ------- + ArrayV2Metadata + The metadata object in v2 format. + """ + import warnings + + array_filters: tuple[ArrayArrayCodec, ...] + bytes_compressors: tuple[BytesBytesCodec, ...] + array_filters, _, bytes_compressors = extract_codecs(v3_metadata.codecs) + + # Handle compressor configuration + compressor_config: dict[str, Any] | None = None + if bytes_compressors: + if len(bytes_compressors) > 1: + warnings.warn( + "Multiple compressors found in v3 metadata. Using the first compressor, " + "others will be ignored. This may affect data compatibility.", + UserWarning, + ) + compressor_config = get_codec_config(bytes_compressors[0]) + + # Handle filter configurations + filter_configs = [get_codec_config(filter_) for filter_ in array_filters] + v2_metadata = ArrayV2Metadata( + shape=v3_metadata.shape, + dtype=v3_metadata.data_type.to_numpy(), + chunks=v3_metadata.chunks, + fill_value=fill_value or v3_metadata.fill_value, + compressor=compressor_config, + filters=filter_configs, + order="C", + attributes=v3_metadata.attributes, + dimension_separator=".", # Assuming '.' as default dimension separator + ) + return v2_metadata + + def variable_to_kerchunk_arr_refs(var: Variable, var_name: str) -> KerchunkArrRefs: """ Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps either a ManifestArray or a numpy array). @@ -74,6 +148,7 @@ def variable_to_kerchunk_arr_refs(var: Variable, var_name: str) -> KerchunkArrRe Partially encodes the inner dicts to json to match kerchunk behaviour (see https://github.com/fsspec/kerchunk/issues/415). """ from virtualizarr.manifests import ManifestArray + from virtualizarr.translators.kerchunk import to_kerchunk_json if isinstance(var.data, ManifestArray): marr = var.data @@ -86,9 +161,7 @@ def variable_to_kerchunk_arr_refs(var: Variable, var_name: str) -> KerchunkArrRe ] for chunk_key, entry in marr.manifest.dict().items() } - - zarray = marr.zarray.replace(zarr_format=2) - + array_v2_metadata = convert_v3_to_v2_metadata(marr.metadata) else: try: np_arr = var.to_numpy() @@ -118,15 +191,17 @@ def variable_to_kerchunk_arr_refs(var: Variable, var_name: str) -> KerchunkArrRe # TODO will this fail for a scalar? arr_refs = {join(0 for _ in np_arr.shape): inlined_data} - zarray = ZArray( + from zarr.core.metadata.v2 import ArrayV2Metadata + + array_v2_metadata = ArrayV2Metadata( chunks=np_arr.shape, shape=np_arr.shape, dtype=np_arr.dtype, order="C", - fill_value=None, + fill_value=var.encoding.get("fill_value", None), ) - zarray_dict = zarray.to_kerchunk_json() + zarray_dict = to_kerchunk_json(array_v2_metadata) arr_refs[".zarray"] = zarray_dict zattrs = {**var.attrs, **var.encoding} diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py deleted file mode 100644 index c2244001..00000000 --- a/virtualizarr/zarr.py +++ /dev/null @@ -1,303 +0,0 @@ -import dataclasses -from typing import TYPE_CHECKING, Any, Literal, NewType, cast - -import numpy as np - -if TYPE_CHECKING: - try: - from zarr.abc.codec import Codec as ZarrCodec - from zarr.core.array import CompressorLike, FiltersLike, SerializerLike - except ImportError: - pass - -# TODO replace these with classes imported directly from Zarr? (i.e. Zarr Object Models) -ZAttrs = NewType( - "ZAttrs", dict[str, Any] -) # just the .zattrs (for one array or for the whole store/group) -FillValueT = bool | str | float | int | list | None -ZARR_FORMAT = Literal[2, 3] - -ZARR_DEFAULT_FILL_VALUE: dict[str, FillValueT] = { - # numpy dtypes's hierarchy lets us avoid checking for all the widths - # https://numpy.org/doc/stable/reference/arrays.scalars.html - np.dtype("bool").kind: False, - np.dtype("int").kind: 0, - np.dtype("float").kind: 0.0, - np.dtype("complex").kind: [0.0, 0.0], - np.dtype("datetime64").kind: 0, -} -""" -The value and format of the fill_value depend on the `data_type` of the array. -See here for spec: -https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#fill-value -""" - - -@dataclasses.dataclass -class Codec: - compressor: dict | None = None - filters: list[dict] | None = None - - -@dataclasses.dataclass -class ZarrV3Codecs: - filters: "FiltersLike" - compressors: "CompressorLike" - serializer: "SerializerLike" - - def into_v3_codecs(self, dtype: np.dtype) -> tuple["ZarrCodec", ...]: - try: - from zarr.core.array import _parse_chunk_encoding_v3 - except ImportError: - raise ImportError("zarr v3 is required to generate v3 codecs") - - codecs = _parse_chunk_encoding_v3( - serializer=self.serializer, - compressors=self.compressors, - filters=self.filters, - dtype=dtype, - ) - - return cast(tuple["ZarrCodec", ...], (*codecs[0], codecs[1], *codecs[2])) - - -@dataclasses.dataclass -class ZArray: - """Just the .zarray information""" - - # TODO will this work for V3? - - shape: tuple[int, ...] - chunks: tuple[int, ...] - dtype: np.dtype - fill_value: FillValueT = dataclasses.field(default=None) - order: Literal["C", "F"] = "C" - compressor: dict | None = None - filters: list[dict] | None = None - zarr_format: Literal[2, 3] = 2 - - def __post_init__(self) -> None: - if len(self.shape) != len(self.chunks): - raise ValueError( - "Dimension mismatch between array shape and chunk shape. " - f"Array shape {self.shape} has ndim={self.shape} but chunk shape {self.chunks} has ndim={len(self.chunks)}" - ) - - if isinstance(self.dtype, str): - # Convert dtype string to numpy.dtype - self.dtype = np.dtype(self.dtype) - - if self.fill_value is None: - self.fill_value = ZARR_DEFAULT_FILL_VALUE.get(self.dtype.kind, 0.0) - - @property - def codec(self) -> Codec: - """For comparison against other arrays.""" - return Codec(compressor=self.compressor, filters=self.filters) - - @classmethod - def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": - # coerce type of fill_value as kerchunk can be inconsistent with this - dtype = np.dtype(decoded_arr_refs_zarray["dtype"]) - fill_value = decoded_arr_refs_zarray["fill_value"] - if np.issubdtype(dtype, np.floating) and ( - fill_value is None or fill_value == "NaN" or fill_value == "nan" - ): - fill_value = np.nan - - compressor = decoded_arr_refs_zarray["compressor"] - zarr_format = int(decoded_arr_refs_zarray["zarr_format"]) - if zarr_format not in (2, 3): - raise ValueError(f"Zarr format must be 2 or 3, but got {zarr_format}") - - return ZArray( - chunks=tuple(decoded_arr_refs_zarray["chunks"]), - compressor=compressor, - dtype=dtype, - fill_value=fill_value, - filters=decoded_arr_refs_zarray["filters"], - order=decoded_arr_refs_zarray["order"], - shape=tuple(decoded_arr_refs_zarray["shape"]), - zarr_format=cast(ZARR_FORMAT, zarr_format), - ) - - def dict(self) -> dict[str, Any]: - zarray_dict = dataclasses.asdict(self) - zarray_dict["dtype"] = encode_dtype(zarray_dict["dtype"]) - return zarray_dict - - def to_kerchunk_json(self) -> str: - import ujson - - zarray_dict = self.dict() - if zarray_dict["fill_value"] is np.nan: - zarray_dict["fill_value"] = None - return ujson.dumps(zarray_dict) - - # ZArray.dict seems to shadow "dict", so we need the type ignore in - # the signature below. - def replace( - self, - shape: tuple[int, ...] | None = None, - chunks: tuple[int, ...] | None = None, - dtype: np.dtype | str | None = None, - fill_value: FillValueT = None, - order: Literal["C", "F"] | None = None, - compressor: "dict | None" = None, # type: ignore[valid-type] - filters: list[dict] | None = None, # type: ignore[valid-type] - zarr_format: Literal[2, 3] | None = None, - ) -> "ZArray": - """ - Convenience method to create a new ZArray from an existing one by altering only certain attributes. - """ - replacements: dict[str, Any] = {} - if shape is not None: - replacements["shape"] = shape - if chunks is not None: - replacements["chunks"] = chunks - if dtype is not None: - replacements["dtype"] = dtype - if fill_value is not None: - replacements["fill_value"] = fill_value - if order is not None: - replacements["order"] = order - if compressor is not None: - replacements["compressor"] = compressor - if filters is not None: - replacements["filters"] = filters - if zarr_format is not None: - replacements["zarr_format"] = zarr_format - return dataclasses.replace(self, **replacements) - - def _v3_codecs(self) -> ZarrV3Codecs: - """ - VirtualiZarr internally uses the `filters`, `compressor`, and `order` attributes - from zarr v2, but to create conformant zarr v3 metadata those 3 must be turned into `codecs` objects. - Not all codecs are created equal though: https://github.com/zarr-developers/zarr-python/issues/1943 - An array _must_ declare a single ArrayBytes codec, and 0 or more ArrayArray, BytesBytes codecs. - Roughly, this is the mapping: - ``` - filters: Iterable[ArrayArrayCodec] #optional - compressor: ArrayBytesCodec #mandatory - post_compressor: Iterable[BytesBytesCodec] #optional - ``` - """ - try: - from zarr.core.metadata.v3 import ( # type: ignore[import-untyped] - parse_codecs, - ) - except ImportError: - raise ImportError("zarr v3 is required to generate v3 codecs") - - codec_configs = [] - - # https://zarr-specs.readthedocs.io/en/latest/v3/codecs/transpose/v1.0.html#transpose-codec-v1 - # Either "C" or "F", defining the layout of bytes within each chunk of the array. - # "C" means row-major order, i.e., the last dimension varies fastest; - # "F" means column-major order, i.e., the first dimension varies fastest. - # For now, we only need transpose if the order is not "C" - if self.order == "F": - order = tuple(reversed(range(len(self.shape)))) - transpose = dict(name="transpose", configuration=dict(order=order)) - codec_configs.append(transpose) - - # Noting here that zarr v3 has very few codecs specificed in the official spec, - # and that there are far more codecs in `numcodecs`. We take a gamble and assume - # that the codec names and configuration are simply mapped into zarrv3 "configurables". - if self.filters: - codec_configs.extend( - [_num_codec_config_to_configurable(filter) for filter in self.filters] - ) - - if self.compressor: - codec_configs.append(_num_codec_config_to_configurable(self.compressor)) - - # convert the pipeline repr into actual v3 codec objects - codecs = parse_codecs(codec_configs) - filters = v3_codecs_to_filters(codecs) - compressors = v3_codecs_to_compressors(codecs) - serializer = v3_codecs_to_serializer(codecs) - return ZarrV3Codecs( - filters=filters, compressors=compressors, serializer=serializer - ) - - -def encode_dtype(dtype: np.dtype) -> str: - # TODO not sure if there is a better way to get the ' int: - """ - Ceiling division operator for integers. - - See https://stackoverflow.com/questions/14822184/is-there-a-ceiling-equivalent-of-operator-in-python - """ - return -(a // -b) - - -def determine_chunk_grid_shape( - shape: tuple[int, ...], chunks: tuple[int, ...] -) -> tuple[int, ...]: - return tuple(ceildiv(length, chunksize) for length, chunksize in zip(shape, chunks)) - - -def _num_codec_config_to_configurable(num_codec: dict) -> dict: - """ - Convert a numcodecs codec into a zarr v3 configurable. - """ - if num_codec["id"].startswith("numcodecs."): - return num_codec - - num_codec_copy = num_codec.copy() - name = "numcodecs." + num_codec_copy.pop("id") - return {"name": name, "configuration": num_codec_copy} - - -def v3_codecs_to_compressors( - codecs: tuple["ZarrCodec", ...], -) -> "CompressorLike": - """ - Parse out the codecs and return the compressors. These are only the BytesBytes codecs. - """ - try: - from zarr.abc.codec import BytesBytesCodec - except ImportError: - raise ImportError("zarr v3 is required to generate v3 codec pipelines") - compressors = [codec for codec in codecs if isinstance(codec, BytesBytesCodec)] - return compressors - - -def v3_codecs_to_filters( - codecs: tuple["ZarrCodec", ...], -) -> "FiltersLike": - """ - Parse out the codecs and return the filters. These are only the ArrayArray codecs. - """ - try: - from zarr.abc.codec import ArrayArrayCodec - except ImportError: - raise ImportError("zarr v3 is required to generate v3 codec pipelines") - - filters = [codec for codec in codecs if isinstance(codec, ArrayArrayCodec)] - return filters - - -def v3_codecs_to_serializer( - codecs: tuple["ZarrCodec", ...], -) -> "SerializerLike": - """ - Parse out the codecs and return the serializer. This is only the Bytes codec. - """ - try: - from zarr.abc.codec import ArrayBytesCodec - except ImportError: - raise ImportError("zarr v3 is required to generate v3 codec pipelines") - - serializer = [codec for codec in codecs if isinstance(codec, ArrayBytesCodec)] - if len(serializer) == 0: - # We can just use auto because the default zarr 3 serializer is the Bytes codec - return "auto" - - # There can only be one serializer, so we return the first one - return serializer[0]