From 7ab3a84e230bce79100fefee3b7df95b74b32da1 Mon Sep 17 00:00:00 2001 From: Anu-Ra-g Date: Tue, 13 Aug 2024 17:27:02 +0530 Subject: [PATCH 1/2] added the mapping workflow for grib and idx data --- kerchunk/grib2.py | 168 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 167 insertions(+), 1 deletion(-) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index a5b1f56d..6a454dc4 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -3,7 +3,8 @@ import io import logging from collections import defaultdict -from typing import Iterable, List, Dict, Set, TYPE_CHECKING, Optional +import warnings +from typing import Iterable, List, Dict, Set, TYPE_CHECKING, Optional, Callable import ujson import itertools @@ -806,3 +807,168 @@ def extract_datatree_chunk_index( ) return pd.DataFrame.from_records(result) + + +def _map_grib_file_by_group( + fname: str, + mapper: Optional[Callable] = None, +) -> "pd.DataFrame": + """ + Helper method used to read the cfgrib metadata associated with each message (group) in the grib file + This method does not add metadata + + Parameters + ---------- + fname : str + the file name to read with scan_grib + mapper : Optional[Callable] + the mapper if any to apply (used for hrrr subhf) + + Returns + ------- + pandas.Dataframe : The intermediate dataframe constructed from the grib metadata + """ + import pandas as pd + + mapper = (lambda x: x) if mapper is None else mapper + + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + return pd.concat( + # grib idx is fortran indexed (from one not zero) + list( + filter( + lambda item: item is not None, + [ + _extract_single_group(mapper(group), i) + for i, group in enumerate(scan_grib(fname), start=1) + ], + ) + ) + ).set_index("idx") + + +def _extract_single_group(grib_group: dict, idx: int): + import datatree + + grib_tree_store = grib_tree( + [ + grib_group, + ] + ) + + if len(grib_tree_store["refs"]) <= 1: + logger.info("Empty DT: %s", grib_tree_store) + return None + + dt = datatree.open_datatree( + fsspec.filesystem("reference", fo=grib_tree_store).get_mapper(""), + engine="zarr", + consolidated=False, + ) + + k_ind = extract_datatree_chunk_index(dt, grib_tree_store, grib=True) + if k_ind.empty: + logger.warning("Empty Kind: %s", grib_tree_store) + return None + + assert ( + len(k_ind) == 1 + ), f"expected a single variable grib group but produced: {k_ind}" + k_ind.loc[:, "idx"] = idx + return k_ind + + +def build_idx_grib_mapping( + basename: str, + storage_options: Optional[Dict] = None, + suffix: str = "idx", + mapper: Optional[Callable] = None, + validate: bool = True, +) -> "pd.DataFrame": + """ + Mapping method combines the idx and grib metadata to make a mapping from + one to the other for a particular model horizon file. This should be generally + applicable to all forecasts for the given horizon. + + Parameters + ---------- + basename : str + the full path for the grib2 file + storage_options: dict + For accessing the data, passed to filesystem + suffix : str + The suffix is the ending for the idx file. + mapper : Optional[Callable] + the mapper if any to apply (used for hrrr subhf) + validate : bool + to assert the mapping is correct or fail before returning + + Returns + ------- + pandas.Dataframe : The merged dataframe with the results of the two operations + joined on the grib message (group) number + """ + import pandas as pd + + grib_file_index = _map_grib_file_by_group(fname=basename, mapper=mapper) + idx_file_index = parse_grib_idx( + basename=basename, suffix=suffix, storage_options=storage_options + ) + result = idx_file_index.merge( + # Left merge because the idx file should be authoritative - one record per grib message + grib_file_index, + on="idx", + how="left", + suffixes=("_idx", "_grib"), + ) + + if validate: + # If any of these conditions fail - inspect the result manually on colab. + all_match_offset = ( + (result.loc[:, "offset_idx"] == result.loc[:, "offset_grib"]) + | pd.isna(result.loc[:, "offset_grib"]) + | ~pd.isna(result.loc[:, "inline_value"]) + ) + all_match_length = ( + (result.loc[:, "length_idx"] == result.loc[:, "length_grib"]) + | pd.isna(result.loc[:, "length_grib"]) + | ~pd.isna(result.loc[:, "inline_value"]) + ) + + if not all_match_offset.all(): + vcs = all_match_offset.value_counts() + raise ValueError( + f"Failed to match message offset mapping for grib file {basename}: " + f"{vcs[True]} matched, {vcs[False]} didn't" + ) + + if not all_match_length.all(): + vcs = all_match_length.value_counts() + raise ValueError( + f"Failed to match message offset mapping for grib file {basename}: " + f"{vcs[True]} matched, {vcs[False]} didn't" + ) + + if not result["attrs"].is_unique: + dups = result.loc[result["attrs"].duplicated(keep=False), :] + logger.warning( + "The idx attribute mapping for %s is not unique for %d variables: %s", + basename, + len(dups), + dups.varname.tolist(), + ) + + r_index = result.set_index( + ["varname", "typeOfLevel", "stepType", "level", "valid_time"] + ) + if not r_index.index.is_unique: + dups = r_index.loc[r_index.index.duplicated(keep=False), :] + logger.warning( + "The grib hierarchy in %s is not unique for %d variables: %s", + basename, + len(dups), + dups.index.get_level_values("varname").tolist(), + ) + + return result From 7d16c73a402688db8e5ec398ec245aa2ad0cee87 Mon Sep 17 00:00:00 2001 From: Anu-Ra-g Date: Fri, 16 Aug 2024 18:20:22 +0530 Subject: [PATCH 2/2] added storage options argument to mapping --- kerchunk/grib2.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index 6a454dc4..2b84ceb7 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -810,8 +810,7 @@ def extract_datatree_chunk_index( def _map_grib_file_by_group( - fname: str, - mapper: Optional[Callable] = None, + fname: str, mapper: Optional[Callable] = None, storage_options=None ) -> "pd.DataFrame": """ Helper method used to read the cfgrib metadata associated with each message (group) in the grib file @@ -841,7 +840,9 @@ def _map_grib_file_by_group( lambda item: item is not None, [ _extract_single_group(mapper(group), i) - for i, group in enumerate(scan_grib(fname), start=1) + for i, group in enumerate( + scan_grib(fname, storage_options=storage_options), start=1 + ) ], ) ) @@ -911,7 +912,9 @@ def build_idx_grib_mapping( """ import pandas as pd - grib_file_index = _map_grib_file_by_group(fname=basename, mapper=mapper) + grib_file_index = _map_grib_file_by_group( + fname=basename, mapper=mapper, storage_options=storage_options + ) idx_file_index = parse_grib_idx( basename=basename, suffix=suffix, storage_options=storage_options )