diff --git a/kerchunk/_grib_idx.py b/kerchunk/_grib_idx.py index 3500eb5..2eab91e 100644 --- a/kerchunk/_grib_idx.py +++ b/kerchunk/_grib_idx.py @@ -1,3 +1,57 @@ +""" +_grib_idx.py +============ + +This module provides utility functions for indexing and then utilizing +the index for the messages in GRIB2 files, accompanied by their idx files. + +The module includes functions for extracting grib metadata, parsing the +attributes in idx files, one-to-one mapping between the grib metadata +and idx data, building the index for the grib files. It has been tested +with GFS, HRRR, GEFS model outputs. + +Attributes +---------- +(to be added) + + +Functions +--------- +parse_grib_idx(basename, suffix, storage_options, validate) + Parses the idx file with corresponding basename(grib file) with + storage options, and returns the idx data as a Dataframe + +extract_dataset_chunk_index(dset, ref_store, grib) + Process and extracts the kerchunk index for an xarray dataset + or datatree node and returns index meta data as a list of dictionary + +extract_datatree_chunk_index(dtree, kerchunk_store, grib) + Recursively iterate through the xarray datatree and with the + help of extract_dataset_chunk_index, the index metadata as a Dataframe + +build_idx_grib_mapping(basename, storage_options, suffix, mapper, validate) + Creates a one-to-one mapping from the idx metadata and gribdata for a single + file, which is needed to build the index. Mapping is returned as a Dataframe + +map_from_index(run_time, mapping, idxdf, raw_merged) + Builds the index for each grib file using the mapping made from + building_idx_grib_mapping function and returns the index as a Dataframe. + +Notes +----- +Indexing process should only happen for files belonging to the same horizon or else +the index won't work. This module is not be used directly but through kerchunk.grib2 +module as the necessary functions for building the index are exposed there. + + +References +---------- +For more information on indexing the process check out this +link: https://github.com/asascience-open/nextgen-dmac/pull/57 + +""" + + import ujson import itertools import logging @@ -18,6 +72,67 @@ class DynamicZarrStoreError(ValueError): logger = logging.getLogger("grib-indexing") +def parse_grib_idx( + basename: str, + suffix: str = "idx", + storage_options: Optional[Dict] = None, + validate: bool = False, +) -> "pd.DataFrame": + """ + Parses per-message metadata from a grib2.idx file (text-type) to a dataframe of attributes + + The function uses the idx file, extracts the metadata known as attrs (variables with + level and forecast time) from each idx entry and converts it into pandas + DataFrame. The dataframe is later to build the one-to-one mapping to the grib file metadata. + + Parameters + ---------- + basename : str + The base name is the full path to the grib file. + suffix : str + The suffix is the ending for the idx file. + storage_options: dict + For accessing the data, passed to filesystem + validate : bool + The validation if the metadata table has duplicate attrs. + + Returns + ------- + pandas.DataFrame : The data frame containing the results. + """ + import pandas as pd + + fs, _ = fsspec.core.url_to_fs(basename, **(storage_options or {})) + + fname = f"{basename}.{suffix}" + + baseinfo = fs.info(basename) + + with fs.open(fname) as f: + result = pd.read_csv(f, header=None, names=["raw_data"]) + result[["idx", "offset", "date", "attrs"]] = result["raw_data"].str.split( + ":", expand=True, n=3 + ) + result["offset"] = result["offset"].astype(int) + result["idx"] = result["idx"].astype(int) + + # dropping the original single "raw_data" column after formatting + result.drop(columns=["raw_data"], inplace=True) + + result = result.assign( + length=( + result.offset.shift(periods=-1, fill_value=baseinfo["size"]) - result.offset + ), + idx_uri=fname, + grib_uri=basename, + ) + + if validate and not result["attrs"].is_unique: + raise ValueError(f"Attribute mapping for grib file {basename} is not unique") + + return result.set_index("idx") + + def build_path(path: Iterable[str | None], suffix: Optional[str] = None) -> str: """ Returns the path to access the values in a zarr store without a leading "/" @@ -42,7 +157,7 @@ def extract_dataset_chunk_index( grib: bool = False, ) -> list[dict]: """ - Process and extract a kerchunk index for an xarray dataset or datatree node. + Process and extracts the kerchunk index for an xarray dataset or datatree node. The data_vars from the dataset will be indexed. The coordinate vars for each dataset will be used for indexing. @@ -248,3 +363,180 @@ def _extract_single_group(grib_group: dict, idx: int, storage_options: Dict): ), f"expected a single variable grib group but produced: {k_ind}" k_ind.loc[:, "idx"] = idx return k_ind + + +def build_idx_grib_mapping( + basename: str, + storage_options: Optional[Dict] = None, + suffix: str = "idx", + mapper: Optional[Callable] = None, + validate: bool = True, +) -> "pd.DataFrame": + """ + Mapping method combines the idx and grib metadata to make a mapping from + one to the other for a particular model horizon file. This should be generally + applicable to all forecasts for the given horizon. + + Parameters + ---------- + basename : str + the full path for the grib2 file + storage_options: dict + For accessing the data, passed to filesystem + suffix : str + The suffix is the ending for the idx file. + mapper : Optional[Callable] + the mapper if any to apply (used for hrrr subhf) + validate : bool + to assert the mapping is correct or fail before returning + + Returns + ------- + pandas.Dataframe : The merged dataframe with the results of the two operations + joined on the grib message (group) number + """ + import pandas as pd + + grib_file_index = _map_grib_file_by_group( + fname=basename, + mapper=mapper, + storage_options=storage_options, + ) + idx_file_index = parse_grib_idx( + basename=basename, suffix=suffix, storage_options=storage_options + ) + result = idx_file_index.merge( + # Left merge because the idx file should be authoritative - one record per grib message + grib_file_index, + on="idx", + how="left", + suffixes=("_idx", "_grib"), + ) + + if validate: + # If any of these conditions fail - inspect the result manually on colab. + all_match_offset = ( + (result.loc[:, "offset_idx"] == result.loc[:, "offset_grib"]) + | pd.isna(result.loc[:, "offset_grib"]) + | ~pd.isna(result.loc[:, "inline_value"]) + ) + all_match_length = ( + (result.loc[:, "length_idx"] == result.loc[:, "length_grib"]) + | pd.isna(result.loc[:, "length_grib"]) + | ~pd.isna(result.loc[:, "inline_value"]) + ) + + if not all_match_offset.all(): + vcs = all_match_offset.value_counts() + raise ValueError( + f"Failed to match message offset mapping for grib file {basename}: " + f"{vcs[True]} matched, {vcs[False]} didn't" + ) + + if not all_match_length.all(): + vcs = all_match_length.value_counts() + raise ValueError( + f"Failed to match message offset mapping for grib file {basename}: " + f"{vcs[True]} matched, {vcs[False]} didn't" + ) + + if not result["attrs"].is_unique: + dups = result.loc[result["attrs"].duplicated(keep=False), :] + logger.warning( + "The idx attribute mapping for %s is not unique for %d variables: %s", + basename, + len(dups), + dups.varname.tolist(), + ) + + r_index = result.set_index( + ["varname", "typeOfLevel", "stepType", "level", "valid_time"] + ) + if not r_index.index.is_unique: + dups = r_index.loc[r_index.index.duplicated(keep=False), :] + logger.warning( + "The grib hierarchy in %s is not unique for %d variables: %s", + basename, + len(dups), + dups.index.get_level_values("varname").tolist(), + ) + + return result + + +def map_from_index( + run_time: "pd.Timestamp", + mapping: "pd.DataFrame", + idxdf: "pd.DataFrame", + raw_merged: bool = False, +) -> "pd.DataFrame": + """ + Main method used for building index dataframes from parsed IDX files + merged with the correct mapping for the horizon + + Parameters + ---------- + + run_time : pd.Timestamp + the run time timestamp of the idx data + mapping : pd.DataFrame + the mapping data derived from comparing the idx attributes to the + CFGrib attributes for a given horizon + idxdf : pd.DataFrame + the dataframe of offsets and lengths for each grib message and its + attributes derived from an idx file + raw_merged : bool + Used for debugging to see all the columns in the merge. By default, + it returns the index columns with the corrected time values plus + the index metadata + + Returns + ------- + + pd.Dataframe : the index dataframe that will be used to read variable data from the grib file + """ + + idxdf = idxdf.reset_index().set_index("attrs") + mapping = mapping.reset_index().set_index("attrs") + mapping.drop(columns="uri", inplace=True) # Drop the URI column from the mapping + + if not idxdf.index.is_unique: + raise ValueError("Parsed idx data must have unique attrs to merge on!") + + if not mapping.index.is_unique: + raise ValueError("Mapping data must have unique attrs to merge on!") + + # Merge the offset and length from the idx file with the varname, step and level from the mapping + + result = idxdf.merge(mapping, on="attrs", how="left", suffixes=("", "_mapping")) + + if raw_merged: + return result + else: + # Get the grib_uri column from the idxdf and ignore the uri column from the mapping + # We want the offset, length and uri of the index file with the varname, step and level of the mapping + selected_results = result.rename(columns=dict(grib_uri="uri"))[ + [ + "varname", + "typeOfLevel", + "stepType", + "name", + "step", + "level", + "time", + "valid_time", + "uri", + "offset", + "length", + "inline_value", + ] + ] + # Drop the inline values from the mapping data + selected_results.loc[:, "inline_value"] = None + selected_results.loc[:, "time"] = run_time + selected_results.loc[:, "valid_time"] = ( + selected_results.time + selected_results.step + ) + logger.info("Dropping %d nan varnames", selected_results.varname.isna().sum()) + selected_results = selected_results.loc[~selected_results.varname.isna(), :] + return selected_results.reset_index(drop=True) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index 51233e3..16647de 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -3,12 +3,9 @@ import io import logging from collections import defaultdict -from typing import Iterable, List, Dict, Set, TYPE_CHECKING, Optional, Callable +from typing import Iterable, List, Dict, Set import ujson -if TYPE_CHECKING: - import pandas as pd - import fsspec import zarr import xarray @@ -17,7 +14,7 @@ from kerchunk.utils import class_factory, _encode_for_JSON from kerchunk.codecs import GRIBCodec from kerchunk.combine import MultiZarrToZarr, drop -from kerchunk._grib_idx import _map_grib_file_by_group +from kerchunk._grib_idx import build_idx_grib_mapping, map_from_index try: @@ -586,163 +583,3 @@ def correct_hrrr_subhf_step(group: Dict) -> Dict: group["refs"]["step/0"] = enocded_val return group - - -def parse_grib_idx( - basename: str, - suffix: str = "idx", - storage_options: Optional[Dict] = None, - validate: bool = False, -) -> "pd.DataFrame": - """ - Parses per-message metadata from a grib2.idx file (text-type) to a dataframe of attributes - - The function uses the idx file, extracts the metadata known as attrs (variables with - level and forecast time) from each idx entry and converts it into pandas - DataFrame. The dataframe is later to build the one-to-one mapping to the grib file metadata. - - Parameters - ---------- - basename : str - The base name is the full path to the grib file. - suffix : str - The suffix is the ending for the idx file. - storage_options: dict - For accessing the data, passed to filesystem - validate : bool - The validation if the metadata table has duplicate attrs. - - Returns - ------- - pandas.DataFrame : The data frame containing the results. - """ - import pandas as pd - - fs, _ = fsspec.core.url_to_fs(basename, **(storage_options or {})) - - fname = f"{basename}.{suffix}" - - baseinfo = fs.info(basename) - - with fs.open(fname) as f: - result = pd.read_csv(f, header=None, names=["raw_data"]) - result[["idx", "offset", "date", "attrs"]] = result["raw_data"].str.split( - ":", expand=True, n=3 - ) - result["offset"] = result["offset"].astype(int) - result["idx"] = result["idx"].astype(int) - - # dropping the original single "raw_data" column after formatting - result.drop(columns=["raw_data"], inplace=True) - - result = result.assign( - length=( - result.offset.shift(periods=-1, fill_value=baseinfo["size"]) - result.offset - ), - idx_uri=fname, - grib_uri=basename, - ) - - if validate and not result["attrs"].is_unique: - raise ValueError(f"Attribute mapping for grib file {basename} is not unique") - - return result.set_index("idx") - - -def build_idx_grib_mapping( - basename: str, - storage_options: Optional[Dict] = None, - suffix: str = "idx", - mapper: Optional[Callable] = None, - validate: bool = True, -) -> "pd.DataFrame": - """ - Mapping method combines the idx and grib metadata to make a mapping from - one to the other for a particular model horizon file. This should be generally - applicable to all forecasts for the given horizon. - - Parameters - ---------- - basename : str - the full path for the grib2 file - storage_options: dict - For accessing the data, passed to filesystem - suffix : str - The suffix is the ending for the idx file. - mapper : Optional[Callable] - the mapper if any to apply (used for hrrr subhf) - validate : bool - to assert the mapping is correct or fail before returning - - Returns - ------- - pandas.Dataframe : The merged dataframe with the results of the two operations - joined on the grib message (group) number - """ - import pandas as pd - - grib_file_index = _map_grib_file_by_group( - fname=basename, - mapper=mapper, - storage_options=storage_options, - ) - idx_file_index = parse_grib_idx( - basename=basename, suffix=suffix, storage_options=storage_options - ) - result = idx_file_index.merge( - # Left merge because the idx file should be authoritative - one record per grib message - grib_file_index, - on="idx", - how="left", - suffixes=("_idx", "_grib"), - ) - - if validate: - # If any of these conditions fail - inspect the result manually on colab. - all_match_offset = ( - (result.loc[:, "offset_idx"] == result.loc[:, "offset_grib"]) - | pd.isna(result.loc[:, "offset_grib"]) - | ~pd.isna(result.loc[:, "inline_value"]) - ) - all_match_length = ( - (result.loc[:, "length_idx"] == result.loc[:, "length_grib"]) - | pd.isna(result.loc[:, "length_grib"]) - | ~pd.isna(result.loc[:, "inline_value"]) - ) - - if not all_match_offset.all(): - vcs = all_match_offset.value_counts() - raise ValueError( - f"Failed to match message offset mapping for grib file {basename}: " - f"{vcs[True]} matched, {vcs[False]} didn't" - ) - - if not all_match_length.all(): - vcs = all_match_length.value_counts() - raise ValueError( - f"Failed to match message offset mapping for grib file {basename}: " - f"{vcs[True]} matched, {vcs[False]} didn't" - ) - - if not result["attrs"].is_unique: - dups = result.loc[result["attrs"].duplicated(keep=False), :] - logger.warning( - "The idx attribute mapping for %s is not unique for %d variables: %s", - basename, - len(dups), - dups.varname.tolist(), - ) - - r_index = result.set_index( - ["varname", "typeOfLevel", "stepType", "level", "valid_time"] - ) - if not r_index.index.is_unique: - dups = r_index.loc[r_index.index.duplicated(keep=False), :] - logger.warning( - "The grib hierarchy in %s is not unique for %d variables: %s", - basename, - len(dups), - dups.index.get_level_values("varname").tolist(), - ) - - return result