Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly calculate dtype element size in netCDF3 records #466

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion kerchunk/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,6 @@ def store_coords(self):
elif k in z:
# Fall back to existing fill value
kw["fill_value"] = z[k].fill_value

arr = group.create_dataset(
name=k,
data=data,
Expand Down
32 changes: 14 additions & 18 deletions kerchunk/netCDF3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import base64
from functools import reduce
from operator import mul

import numpy as np
from fsspec.implementations.reference import LazyReferenceMapper
import fsspec

from kerchunk.utils import _encode_for_JSON
from kerchunk.utils import _encode_for_JSON, inline_array

try:
from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable
Expand Down Expand Up @@ -202,21 +201,11 @@ def translate(self):
)
part = ".".join(["0"] * len(shape)) or "0"
k = f"{dim}/{part}"
if self.threshold and int(self.chunks[dim][1]) < self.threshold:
self.fp.seek(int(self.chunks[dim][0]))
data = self.fp.read(int(self.chunks[dim][1]))
try:
# easiest way to test if data is ascii
data.decode("ascii")
except UnicodeDecodeError:
data = b"base64:" + base64.b64encode(data)
out[k] = data
else:
out[k] = [
self.filename,
int(self.chunks[dim][0]),
int(self.chunks[dim][1]),
]
out[k] = [
self.filename,
int(self.chunks[dim][0]),
int(self.chunks[dim][1]),
]
arr.attrs.update(
{
k: v.decode() if isinstance(v, bytes) else str(v)
Expand All @@ -232,7 +221,8 @@ def translate(self):
# native chunks version (no codec, no options)
start, size, dt = self.chunks["record_array"][0]
dt = np.dtype(dt)
outer_shape = size // dt.itemsize
itemsize = sum(dt[_].itemsize for _ in dt.names)
outer_shape = size // itemsize
offset = start
for name in dt.names:
dtype = dt[name]
Expand Down Expand Up @@ -294,6 +284,12 @@ def translate(self):
if k != "filename" # special "attribute"
}
)
if self.threshold:
out = inline_array(
out,
self.threshold,
remote_options=dict(remote_options=self.storage_options),
)

if isinstance(out, LazyReferenceMapper):
out.flush()
Expand Down
2 changes: 1 addition & 1 deletion kerchunk/tests/test_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def test_grib_tree():
"atmosphere latitude longitude step time valid_time".split()
)
# Assert that the fill value is set correctly
assert zg.refc.instant.atmosphere.step.fill_value is np.NaN
assert zg.refc.instant.atmosphere.step.fill_value is np.nan


# The following two tests use json fixture data generated from calling scan grib
Expand Down
9 changes: 5 additions & 4 deletions kerchunk/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io

import fsspec
import json
import kerchunk.utils
import kerchunk.zarr
import numpy as np
Expand Down Expand Up @@ -72,16 +73,16 @@ def test_inline_array():
"data/.zattrs": '{"foo": "bar"}',
}
fs = fsspec.filesystem("reference", fo=refs)
out1 = kerchunk.utils.inline_array(refs, threshold=1000) # does nothing
out1 = kerchunk.utils.inline_array(refs, threshold=1) # does nothing
assert out1 == refs
out2 = kerchunk.utils.inline_array(refs, threshold=1000, names=["data"]) # explicit
out2 = kerchunk.utils.inline_array(refs, threshold=1, names=["data"]) # explicit
assert "data/1" not in out2
assert out2["data/.zattrs"] == refs["data/.zattrs"]
assert json.loads(out2["data/.zattrs"]) == json.loads(refs["data/.zattrs"])
fs = fsspec.filesystem("reference", fo=out2)
g = zarr.open(fs.get_mapper())
assert g.data[:].tolist() == [1, 2]

out3 = kerchunk.utils.inline_array(refs, threshold=1) # inlines because of size
out3 = kerchunk.utils.inline_array(refs, threshold=1000) # inlines because of size
assert "data/1" not in out3
fs = fsspec.filesystem("reference", fo=out3)
g = zarr.open(fs.get_mapper())
Expand Down
44 changes: 36 additions & 8 deletions kerchunk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def _inline_array(group, threshold, names, prefix=""):
if isinstance(thing, zarr.Group):
_inline_array(thing, threshold=threshold, prefix=prefix1, names=names)
else:
cond1 = threshold and thing.nbytes < threshold and thing.nchunks > 1
cond1 = threshold and thing.nbytes < threshold
cond2 = prefix1 in names
if cond1 or cond2:
original_attrs = dict(thing.attrs)
Expand All @@ -194,6 +194,7 @@ def _inline_array(group, threshold, names, prefix=""):
chunks=thing.shape,
compression=None,
overwrite=True,
fill_value=thing.fill_value,
)
arr.attrs.update(original_attrs)

Expand Down Expand Up @@ -249,35 +250,62 @@ def subchunk(store, variable, factor):
modified store
"""
fs = fsspec.filesystem("reference", fo=store)
store = copy.deepcopy(store)
store = fs.references
meta_file = f"{variable}/.zarray"
meta = ujson.loads(fs.cat(meta_file))
if meta["compressor"] is not None:
raise ValueError("Can only subchunk an uncompressed array")
chunks_orig = meta["chunks"]
if chunks_orig[0] % factor == 0:
chunk_new = [chunks_orig[0] // factor] + chunks_orig[1:]
else:
raise ValueError("Must subchunk by exact integer factor")
chunk_new = []
# plan
multi = None
for ind, this_chunk in enumerate(chunks_orig):
if this_chunk == 1:
chunk_new.append(1)
continue
elif this_chunk % factor == 0:
chunk_new.extend([this_chunk // factor] + chunks_orig[ind + 1 :])
break
elif factor % this_chunk == 0:
# if factor // chunks_orig[0] > 1:
chunk_new.append(1)
if multi is None:
multi = this_chunk
factor //= this_chunk
else:
raise ValueError("Must subchunk by exact integer factor")

if multi:
# TODO: this reloads the referenceFS; *maybe* reuses it
return subchunk(store, variable, multi)

# execute
meta["chunks"] = chunk_new
store = copy.deepcopy(store)
store[meta_file] = ujson.dumps(meta)

for k, v in store.copy().items():
if k.startswith(f"{variable}/"):
kpart = k[len(variable) + 1 :]
if kpart.startswith(".z"):
continue
sep = "." if "." in k else "/"
sep = "." if "." in kpart else "/"
chunk_index = [int(_) for _ in kpart.split(sep)]
if isinstance(v, (str, bytes)):
# TODO: check this early, as some refs might have been edited already
raise ValueError("Refusing to sub-chunk inlined data")
if len(v) > 1:
url, offset, size = v
else:
(url,) = v
offset = 0
size = fs.size(k)
for subpart in range(factor):
new_index = [chunk_index[0] * factor + subpart] + chunk_index[1:]
new_index = (
chunk_index[:ind]
+ [chunk_index[ind] * factor + subpart]
+ chunk_index[ind + 1 :]
)
newpart = sep.join(str(_) for _ in new_index)
newv = [url, offset + subpart * size // factor, size // factor]
store[f"{variable}/{newpart}"] = newv
Expand Down
Loading