Skip to content

Commit

Permalink
Merge pull request #492 from Anu-Ra-g/grib_mapping
Browse files Browse the repository at this point in the history
added the mapping workflow for grib and idx data
  • Loading branch information
martindurant authored Aug 30, 2024
2 parents eaae20f + 7d16c73 commit 3a6ef99
Showing 1 changed file with 170 additions and 1 deletion.
171 changes: 170 additions & 1 deletion kerchunk/grib2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import io
import logging
from collections import defaultdict
from typing import Iterable, List, Dict, Set, TYPE_CHECKING, Optional
import warnings
from typing import Iterable, List, Dict, Set, TYPE_CHECKING, Optional, Callable
import ujson
import itertools

Expand Down Expand Up @@ -807,3 +808,171 @@ def extract_datatree_chunk_index(
)

return pd.DataFrame.from_records(result)


def _map_grib_file_by_group(
fname: str, mapper: Optional[Callable] = None, storage_options=None
) -> "pd.DataFrame":
"""
Helper method used to read the cfgrib metadata associated with each message (group) in the grib file
This method does not add metadata
Parameters
----------
fname : str
the file name to read with scan_grib
mapper : Optional[Callable]
the mapper if any to apply (used for hrrr subhf)
Returns
-------
pandas.Dataframe : The intermediate dataframe constructed from the grib metadata
"""
import pandas as pd

mapper = (lambda x: x) if mapper is None else mapper

with warnings.catch_warnings():
warnings.simplefilter("ignore")
return pd.concat(
# grib idx is fortran indexed (from one not zero)
list(
filter(
lambda item: item is not None,
[
_extract_single_group(mapper(group), i)
for i, group in enumerate(
scan_grib(fname, storage_options=storage_options), start=1
)
],
)
)
).set_index("idx")


def _extract_single_group(grib_group: dict, idx: int):
import datatree

grib_tree_store = grib_tree(
[
grib_group,
]
)

if len(grib_tree_store["refs"]) <= 1:
logger.info("Empty DT: %s", grib_tree_store)
return None

dt = datatree.open_datatree(
fsspec.filesystem("reference", fo=grib_tree_store).get_mapper(""),
engine="zarr",
consolidated=False,
)

k_ind = extract_datatree_chunk_index(dt, grib_tree_store, grib=True)
if k_ind.empty:
logger.warning("Empty Kind: %s", grib_tree_store)
return None

assert (
len(k_ind) == 1
), f"expected a single variable grib group but produced: {k_ind}"
k_ind.loc[:, "idx"] = idx
return k_ind


def build_idx_grib_mapping(
basename: str,
storage_options: Optional[Dict] = None,
suffix: str = "idx",
mapper: Optional[Callable] = None,
validate: bool = True,
) -> "pd.DataFrame":
"""
Mapping method combines the idx and grib metadata to make a mapping from
one to the other for a particular model horizon file. This should be generally
applicable to all forecasts for the given horizon.
Parameters
----------
basename : str
the full path for the grib2 file
storage_options: dict
For accessing the data, passed to filesystem
suffix : str
The suffix is the ending for the idx file.
mapper : Optional[Callable]
the mapper if any to apply (used for hrrr subhf)
validate : bool
to assert the mapping is correct or fail before returning
Returns
-------
pandas.Dataframe : The merged dataframe with the results of the two operations
joined on the grib message (group) number
"""
import pandas as pd

grib_file_index = _map_grib_file_by_group(
fname=basename, mapper=mapper, storage_options=storage_options
)
idx_file_index = parse_grib_idx(
basename=basename, suffix=suffix, storage_options=storage_options
)
result = idx_file_index.merge(
# Left merge because the idx file should be authoritative - one record per grib message
grib_file_index,
on="idx",
how="left",
suffixes=("_idx", "_grib"),
)

if validate:
# If any of these conditions fail - inspect the result manually on colab.
all_match_offset = (
(result.loc[:, "offset_idx"] == result.loc[:, "offset_grib"])
| pd.isna(result.loc[:, "offset_grib"])
| ~pd.isna(result.loc[:, "inline_value"])
)
all_match_length = (
(result.loc[:, "length_idx"] == result.loc[:, "length_grib"])
| pd.isna(result.loc[:, "length_grib"])
| ~pd.isna(result.loc[:, "inline_value"])
)

if not all_match_offset.all():
vcs = all_match_offset.value_counts()
raise ValueError(
f"Failed to match message offset mapping for grib file {basename}: "
f"{vcs[True]} matched, {vcs[False]} didn't"
)

if not all_match_length.all():
vcs = all_match_length.value_counts()
raise ValueError(
f"Failed to match message offset mapping for grib file {basename}: "
f"{vcs[True]} matched, {vcs[False]} didn't"
)

if not result["attrs"].is_unique:
dups = result.loc[result["attrs"].duplicated(keep=False), :]
logger.warning(
"The idx attribute mapping for %s is not unique for %d variables: %s",
basename,
len(dups),
dups.varname.tolist(),
)

r_index = result.set_index(
["varname", "typeOfLevel", "stepType", "level", "valid_time"]
)
if not r_index.index.is_unique:
dups = r_index.loc[r_index.index.duplicated(keep=False), :]
logger.warning(
"The grib hierarchy in %s is not unique for %d variables: %s",
basename,
len(dups),
dups.index.get_level_values("varname").tolist(),
)

return result

0 comments on commit 3a6ef99

Please sign in to comment.