Skip to content

Commit

Permalink
Refactor to clean things up
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiannucci committed Oct 10, 2024
1 parent 1f69a0b commit d556e52
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 65 deletions.
11 changes: 4 additions & 7 deletions kerchunk/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import zarr
import numcodecs

from kerchunk.zarr import dict_to_store

from .codecs import FillStringsCodec
from .utils import _encode_for_JSON, encode_fill_value

Expand Down Expand Up @@ -107,13 +109,8 @@ def __init__(
raise NotImplementedError
self.vlen = vlen_encode
self.store_dict = out or {}
if Version(zarr.__version__) < Version("3.0.0.a0"):
self.store = zarr.storage.KVStore(self.store_dict)
self._zroot = zarr.group(store=self.store, overwrite=True)
else:
self.store = zarr.storage.MemoryStore(mode="a", store_dict=self.store_dict)
self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True)

self.store = dict_to_store(self.store_dict)
self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True)
self._uri = url
self.error = error
lggr.debug(f"HDF5 file URI: {self._uri}")
Expand Down
4 changes: 2 additions & 2 deletions kerchunk/netCDF3.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def translate(self):
fill = float(fill)
if fill is not None and var.data.dtype.kind == "i":
fill = int(fill)
arr = z.create_dataset(
arr = z.create_array(
name=dim,
shape=shape,
dtype=var.data.dtype,
Expand Down Expand Up @@ -252,7 +252,7 @@ def translate(self):
fill = float(fill)
if fill is not None and base.kind == "i":
fill = int(fill)
arr = z.create_dataset(
arr = z.create_array(
name=name,
shape=shape,
dtype=base,
Expand Down
90 changes: 41 additions & 49 deletions kerchunk/tests/test_hdf.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,24 @@
from typing import Any
import fsspec
import json
import os.path as osp

import fsspec.implementations
import fsspec.implementations.reference

import kerchunk.hdf
import numpy as np
import pytest
import xarray as xr
import zarr

from packaging.version import Version

from kerchunk.hdf import SingleHdf5ToZarr, has_visititems_links
from kerchunk.combine import MultiZarrToZarr, drop
from kerchunk.utils import refs_as_fs, refs_as_store
from kerchunk.zarr import fs_as_store

here = osp.dirname(__file__)


async def list_dir(store, path):
[x async for x in store.list_dir(path)]


def create_store(test_dict: dict, remote_options: Any = None):
if Version(zarr.__version__) < Version("3.0.0.a0"):
return fsspec.get_mapper(
"reference://", fo=test_dict, remote_protocol="s3", remote_options=remote_options
)
else:
fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict, remote_options=remote_options)
return zarr.storage.RemoteStore(fs, mode="r")


def test_single():
"""Test creating references for a single HDF file"""
#url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp"
# url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp"
url = "s3://noaa-nos-ofs-pds/ngofs2/netcdf/202410/ngofs2.t03z.20241001.2ds.f020.nc"
so = dict(anon=True, default_fill_cache=False, default_cache_type="none")

Expand All @@ -47,9 +29,11 @@ def test_single():
with open("test_dict.json", "w") as f:
json.dump(test_dict, f)

store = create_store(test_dict)
store = refs_as_store(test_dict)

ds = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False))
ds = xr.open_dataset(
store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)
)

with fsspec.open(url, **so) as f:
expected = xr.open_dataset(f, engine="h5netcdf")
Expand All @@ -66,7 +50,7 @@ def test_single_direct_open():
h5f=url, inline_threshold=300, storage_options=so
).translate()

store = create_store(test_dict)
store = refs_as_store(test_dict)

ds_direct = xr.open_dataset(
store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)
Expand All @@ -76,7 +60,7 @@ def test_single_direct_open():
h5chunks = SingleHdf5ToZarr(f, url, storage_options=so)
test_dict = h5chunks.translate()

store = create_store(test_dict)
store = refs_as_store(test_dict)

ds_from_file_opener = xr.open_dataset(
store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)
Expand All @@ -103,8 +87,10 @@ def test_multizarr(generate_mzz):
mzz = generate_mzz
test_dict = mzz.translate()

store = create_store(test_dict)
ds = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False))
store = refs_as_store(test_dict)
ds = xr.open_dataset(
store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)
)

with fsspec.open_files(urls, **so) as fs:
expts = [xr.open_dataset(f, engine="h5netcdf") for f in fs]
Expand Down Expand Up @@ -178,8 +164,10 @@ def test_times(times_data):
h5chunks = SingleHdf5ToZarr(f, url)
test_dict = h5chunks.translate()

store = create_store(test_dict)
result = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False))
store = refs_as_store(test_dict)
result = xr.open_dataset(
store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)
)
expected = x1.to_dataset()
xr.testing.assert_equal(result, expected)

Expand All @@ -191,8 +179,10 @@ def test_times_str(times_data):
h5chunks = SingleHdf5ToZarr(url)
test_dict = h5chunks.translate()

store = create_store(test_dict)
result = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False))
store = refs_as_store(test_dict)
result = xr.open_dataset(
store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)
)
expected = x1.to_dataset()
xr.testing.assert_equal(result, expected)

Expand All @@ -205,9 +195,10 @@ def test_string_embed():
fn = osp.join(here, "vlen.h5")
h = kerchunk.hdf.SingleHdf5ToZarr(fn, fn, vlen_encode="embed")
out = h.translate()
fs = fsspec.filesystem("reference", fo=out)
fs = refs_as_fs(out)
assert txt in fs.references["vlen_str/0"]
z = zarr.open(fs.get_mapper(), zarr_format=2)
store = fs_as_store(fs)
z = zarr.open(store, zarr_format=2)
assert z.vlen_str.dtype == "O"
assert z.vlen_str[0] == txt
assert (z.vlen_str[1:] == "").all()
Expand All @@ -217,8 +208,8 @@ def test_string_null():
fn = osp.join(here, "vlen.h5")
h = kerchunk.hdf.SingleHdf5ToZarr(fn, fn, vlen_encode="null", inline_threshold=0)
out = h.translate()
fs = fsspec.filesystem("reference", fo=out)
z = zarr.open(fs.get_mapper(), zarr_format=2)
store = refs_as_store(out)
z = zarr.open(store, zarr_format=2)
assert z.vlen_str.dtype == "O"
assert (z.vlen_str[:] == None).all()

Expand All @@ -230,8 +221,8 @@ def test_string_leave():
f, fn, vlen_encode="leave", inline_threshold=0
)
out = h.translate()
fs = fsspec.filesystem("reference", fo=out)
z = zarr.open(fs.get_mapper(), zarr_format=2)
store = refs_as_store(out)
z = zarr.open(store, zarr_format=2)
assert z.vlen_str.dtype == "S16"
assert z.vlen_str[0] # some obscured ID
assert (z.vlen_str[1:] == b"").all()
Expand All @@ -244,9 +235,10 @@ def test_string_decode():
f, fn, vlen_encode="encode", inline_threshold=0
)
out = h.translate()
fs = fsspec.filesystem("reference", fo=out)
fs = refs_as_fs(out)
assert txt in fs.cat("vlen_str/.zarray").decode() # stored in filter def
z = zarr.open(fs.get_mapper(), zarr_format=2)
store = fs_as_store(fs)
z = zarr.open(store, zarr_format=2)
assert z.vlen_str[0] == txt
assert (z.vlen_str[1:] == "").all()

Expand All @@ -256,8 +248,8 @@ def test_compound_string_null():
with open(fn, "rb") as f:
h = kerchunk.hdf.SingleHdf5ToZarr(f, fn, vlen_encode="null", inline_threshold=0)
out = h.translate()
fs = fsspec.filesystem("reference", fo=out)
z = zarr.open(fs.get_mapper(), zarr_format=2)
store = refs_as_store(out)
z = zarr.open(store, zarr_format=2)
assert z.vlen_str[0].tolist() == (10, None)
assert (z.vlen_str["ints"][1:] == 0).all()
assert (z.vlen_str["strs"][1:] == None).all()
Expand All @@ -270,8 +262,8 @@ def test_compound_string_leave():
f, fn, vlen_encode="leave", inline_threshold=0
)
out = h.translate()
fs = fsspec.filesystem("reference", fo=out)
z = zarr.open(fs.get_mapper(), zarr_format=2)
store = refs_as_store(out)
z = zarr.open(store, zarr_format=2)
assert z.vlen_str["ints"][0] == 10
assert z.vlen_str["strs"][0] # random ID
assert (z.vlen_str["ints"][1:] == 0).all()
Expand All @@ -285,8 +277,8 @@ def test_compound_string_encode():
f, fn, vlen_encode="encode", inline_threshold=0
)
out = h.translate()
fs = fsspec.filesystem("reference", fo=out)
z = zarr.open(fs.get_mapper(), zarr_format=2)
store = refs_as_store(out)
z = zarr.open(store, zarr_format=2)
assert z.vlen_str["ints"][0] == 10
assert z.vlen_str["strs"][0] == "water"
assert (z.vlen_str["ints"][1:] == 0).all()
Expand Down Expand Up @@ -316,7 +308,7 @@ def test_compress():
h.translate()
continue
out = h.translate()
store = create_store(out)
store = refs_as_store(out)
g = zarr.open(store, zarr_format=2)
assert np.mean(g.data) == 49.5

Expand All @@ -326,7 +318,7 @@ def test_embed():
h = kerchunk.hdf.SingleHdf5ToZarr(fn, vlen_encode="embed")
out = h.translate()

store = create_store(out)
store = refs_as_store(out)
z = zarr.open(store, zarr_format=2)
data = z["Domain_10"]["STER"]["min_1"]["boom_1"]["temperature"][:]
assert data[0].tolist() == [
Expand Down Expand Up @@ -361,8 +353,8 @@ def test_translate_links():
out = kerchunk.hdf.SingleHdf5ToZarr(fn, inline_threshold=50).translate(
preserve_linked_dsets=True
)
fs = fsspec.filesystem("reference", fo=out)
z = zarr.open(fs.get_mapper(), zarr_format=2)
store = refs_as_store(out)
z = zarr.open(store, zarr_format=2)

# 1. Test the hard linked datasets were translated correctly
# 2. Test the soft linked datasets were translated correctly
Expand Down
37 changes: 30 additions & 7 deletions kerchunk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,28 @@
import numpy as np
import zarr

from kerchunk.zarr import fs_as_store


def refs_as_fs(refs, remote_protocol=None, remote_options=None, **kwargs):
"""Convert a reference set to an fsspec filesystem"""
fs = fsspec.filesystem(
"reference",
fo=refs,
remote_protocol=remote_protocol,
remote_options=remote_options,
**kwargs,
)
return fs


def refs_as_store(refs, remote_protocol=None, remote_options=None):
"""Convert a reference set to a zarr store"""
fs = refs_as_fs(
refs, remote_protocol=remote_protocol, remote_options=remote_options
)
return fs_as_store(fs)


def class_factory(func):
"""Experimental uniform API across function-based file scanners"""
Expand Down Expand Up @@ -74,7 +96,7 @@ def rename_target(refs, renames):
-------
dict: the altered reference set, which can be saved
"""
fs = fsspec.filesystem("reference", fo=refs) # to produce normalised refs
fs = refs_as_fs(refs) # to produce normalised refs
refs = fs.references
out = {}
for k, v in refs.items():
Expand Down Expand Up @@ -136,7 +158,6 @@ def _encode_for_JSON(store):
return store



def encode_fill_value(v: Any, dtype: np.dtype, object_codec: Any = None) -> Any:
# early out
if v is None:
Expand Down Expand Up @@ -190,6 +211,9 @@ def do_inline(store, threshold, remote_options=None, remote_protocol=None):
remote_options=remote_options,
remote_protocol=remote_protocol,
)
fs = refs_as_fs(
store, remote_protocol=remote_protocol, remote_options=remote_options
)
out = fs.references.copy()

# Inlining is done when one of two conditions are satisfied:
Expand Down Expand Up @@ -267,10 +291,9 @@ def inline_array(store, threshold=1000, names=None, remote_options=None):
-------
amended references set (simple style)
"""
fs = fsspec.filesystem(
"reference", fo=store, **(remote_options or {}), skip_instance_cache=True
)
g = zarr.open_group(fs.get_mapper(), mode="r+", zarr_format=2)
fs = refs_as_fs(store, remote_options=remote_options or {})
zarr_store = fs_as_store(store, mode="r+", remote_options=remote_options or {})
g = zarr.open_group(zarr_store, mode="r+", zarr_format=2)
_inline_array(g, threshold, names=names or [])
return fs.references

Expand All @@ -293,7 +316,7 @@ def subchunk(store, variable, factor):
-------
modified store
"""
fs = fsspec.filesystem("reference", fo=store)
fs = refs_as_fs(store)
store = fs.references
meta_file = f"{variable}/.zarray"
meta = ujson.loads(fs.cat(meta_file))
Expand Down
35 changes: 35 additions & 0 deletions kerchunk/zarr.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
from packaging.version import Version

import fsspec
from fsspec.implementations.reference import LazyReferenceMapper
import zarr

import kerchunk.utils


def is_zarr3():
"""Check if the installed zarr version is version 3"""
return Version(zarr.__version__) >= Version("3.0.0.a0")


def dict_to_store(store_dict: dict):
"""Create an in memory zarr store backed by the given dictionary"""
if is_zarr3():
return zarr.storage.MemoryStore(mode="a", store_dict=store_dict)
else:
return zarr.storage.KVStore(store_dict)


def fs_as_store(fs, mode='r', remote_protocol=None, remote_options=None):
"""Open the refs as a zarr store
Parameters
----------
refs: dict-like
the references to open
mode: str
Returns
-------
zarr.storage.Store or zarr.storage.Mapper, fsspec.AbstractFileSystem
"""
if is_zarr3():
return zarr.storage.RemoteStore(fs, mode=mode)
else:
return fs.get_mapper()


def single_zarr(
uri_or_store,
storage_options=None,
Expand Down

0 comments on commit d556e52

Please sign in to comment.