Skip to content

Commit

Permalink
Merge pull request #350 from florianziemen/compressors
Browse files Browse the repository at this point in the history
Basic support for HDF5 filters
  • Loading branch information
martindurant authored Aug 31, 2023
2 parents 559870b + 99b0f00 commit 4e42bdb
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 26 deletions.
92 changes: 66 additions & 26 deletions kerchunk/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SingleHdf5ToZarr:
to disable
storage_options: dict
passed to fsspec if h5f is a str
error: "warn" (default) | "pdb" | "ignore"
error: "warn" (default) | "pdb" | "ignore" | "raise"
vlen_encode: ["embed", "null", "leave", "encode"]
What to do with VLEN string variables or columns of tabular variables
leave: pass through the 16byte garbage IDs unaffected, but requires no codec
Expand Down Expand Up @@ -189,40 +189,84 @@ def _transfer_attrs(
f"TypeError transferring attr, skipping:\n {n}@{h5obj.name} = {v} ({type(v)})"
)

def _decode_filters(self, h5obj: Union[h5py.Dataset, h5py.Group]):
if h5obj.scaleoffset:
raise RuntimeError(
f"{h5obj.name} uses HDF5 scaleoffset filter - not supported by kerchunk"
)
if h5obj.compression in ("szip", "lzf"):
raise RuntimeError(
f"{h5obj.name} uses szip or lzf compression - not supported by kerchunk"
)
filters = []
if h5obj.shuffle and h5obj.dtype.kind != "O":
# cannot use shuffle if we materialised objects
filters.append(numcodecs.Shuffle(elementsize=h5obj.dtype.itemsize))
for filter_id, properties in h5obj._filters.items():
if str(filter_id) == "32001":
blosc_compressors = (
"blosclz",
"lz4",
"lz4hc",
"snappy",
"zlib",
"zstd",
)
(
_1,
_2,
bytes_per_num,
total_bytes,
clevel,
shuffle,
compressor,
) = properties
pars = dict(
blocksize=total_bytes,
clevel=clevel,
shuffle=shuffle,
cname=blosc_compressors[compressor],
)
filters.append(numcodecs.Blosc(**pars))
elif str(filter_id) == "32015":
filters.append(numcodecs.Zstd(level=properties[0]))
elif str(filter_id) == "gzip":
filters.append(numcodecs.Zlib(level=properties))
elif str(filter_id) == "32004":
raise RuntimeError(
f"{h5obj.name} uses lz4 compression - not supported by kerchunk"
)
elif str(filter_id) == "32008":
raise RuntimeError(
f"{h5obj.name} uses bitshuffle compression - not supported by kerchunk"
)
else:
breakpoint()
raise RuntimeError(
f"{h5obj.name} uses filter id {filter_id} with properties {properties},"
f" not supported by kerchunk., supported filters are {self.decoders.keys()}"
)
return filters

def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
"""Produce Zarr metadata for all groups and datasets in the HDF5 file."""
try: # method must not raise exception
kwargs = {}
if isinstance(h5obj, h5py.Dataset):
lggr.debug(f"HDF5 dataset: {h5obj.name}")
lggr.debug(f"HDF5 compression: {h5obj.compression}")
if h5obj.id.get_create_plist().get_layout() == h5py.h5d.COMPACT:
# Only do if h5obj.nbytes < self.inline??
kwargs["data"] = h5obj[:]

filters = []
else:
#
# check for unsupported HDF encoding/filters
#
if h5obj.scaleoffset:
raise RuntimeError(
f"{h5obj.name} uses HDF5 scaleoffset filter - not supported by kerchunk"
)
if h5obj.compression in ("szip", "lzf"):
raise RuntimeError(
f"{h5obj.name} uses szip or lzf compression - not supported by kerchunk"
)
if h5obj.compression == "gzip":
compression = numcodecs.Zlib(level=h5obj.compression_opts)
else:
compression = None
filters = []
filters = self._decode_filters(h5obj)
dt = None
# Get storage info of this HDF5 dataset...
cinfo = self._storage_info(h5obj)

if "data" in kwargs:
fill = None
compression = None
else:
# encodings
if h5obj.dtype.kind in "US":
Expand Down Expand Up @@ -380,12 +424,6 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
fill = None
else:
raise NotImplementedError
# Add filter for shuffle
if h5obj.shuffle and h5obj.dtype.kind != "O":
# cannot use shuffle if we materialised objects
filters.append(
numcodecs.Shuffle(elementsize=h5obj.dtype.itemsize)
)

if h5py.h5ds.is_scale(h5obj.id) and not cinfo:
return
Expand All @@ -401,7 +439,7 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
dtype=dt or h5obj.dtype,
chunks=h5obj.chunks or False,
fill_value=fill,
compression=compression,
compression=None,
filters=filters,
overwrite=True,
**kwargs,
Expand Down Expand Up @@ -448,6 +486,8 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]):
import pdb

pdb.post_mortem()
elif self.error == "raise":
raise
else:
# "warn" or anything else, the default
import warnings
Expand Down
17 changes: 17 additions & 0 deletions kerchunk/tests/gen_hdf5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import numpy
import h5py
import hdf5plugin

compressors = dict(
zstd=hdf5plugin.Zstd(),
bitshuffle=hdf5plugin.Bitshuffle(nelems=0, cname="lz4"),
lz4=hdf5plugin.LZ4(nbytes=0),
blosc_lz4_bitshuffle=hdf5plugin.Blosc(
cname="blosclz", clevel=9, shuffle=hdf5plugin.Blosc.BITSHUFFLE
),
)

for c in compressors:
f = h5py.File(f"hdf5_compression_{c}.h5", "w")
f.create_dataset("data", data=numpy.arange(100), **compressors[c])
f.close()
Binary file added kerchunk/tests/hdf5_compression_bitshuffle.h5
Binary file not shown.
Binary file not shown.
Binary file added kerchunk/tests/hdf5_compression_lz4.h5
Binary file not shown.
Binary file added kerchunk/tests/hdf5_compression_zstd.h5
Binary file not shown.
16 changes: 16 additions & 0 deletions kerchunk/tests/test_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,22 @@ def test_compact():
assert np.allclose(g.ancillary_data.atlas_sdp_gps_epoch[:], 1.19880002e09)


def test_compress():
import glob

files = glob.glob(osp.join(here, "hdf5_compression_*.h5"))
for f in files:
h = kerchunk.hdf.SingleHdf5ToZarr(f, error="raise")
if "compression_lz4" in f or "compression_bitshuffle" in f:
with pytest.raises(RuntimeError):
h.translate()
continue
out = h.translate()
m = fsspec.get_mapper("reference://", fo=out)
g = zarr.open(m)
assert np.mean(g.data) == 49.5


def test_embed():
fn = osp.join(here, "NEONDSTowerTemperatureData.hdf5")
h = kerchunk.hdf.SingleHdf5ToZarr(fn, vlen_encode="embed")
Expand Down

0 comments on commit 4e42bdb

Please sign in to comment.