Skip to content

Commit

Permalink
made some updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Anu-Ra-g committed Jul 1, 2024
1 parent c6b9e1b commit 91cb20f
Showing 1 changed file with 44 additions and 72 deletions.
116 changes: 44 additions & 72 deletions kerchunk/grib2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import io
import logging
from collections import defaultdict
from typing import Iterable, List, Dict, Set, Optional
import pandas as pd
from typing import Iterable, List, Dict, Set, TYPE_CHECKING

import ujson

Expand Down Expand Up @@ -585,91 +584,64 @@ def correct_hrrr_subhf_step(group: Dict) -> Dict:
return group


if TYPE_CHECKING:
import pandas as pd


def parse_grib_idx(
fs: fsspec.AbstractFileSystem,
*,
basename: str,
suffix: str = "idx",
tstamp: Optional[pd.Timestamp] = None,
validate: bool = False,
) -> pd.DataFrame:
) -> "pd.DataFrame":
"""
Standalone method used to extract metadata from a grib2 idx file(text) from NODD
:param fs: the file system to read from
:param basename: the base name is the full path to the grib file
:param suffix: the suffix is the ending for the idx file
:param tstamp: the timestamp to record for this index process
:return: the data frame containing the results
Standalone method used to extract metadata from a grib2 idx file(text) from NODD.
The function takes idx file, extracts the metadata especially the attrs (variables)
from each idx entry and converts it into pandas DataFrame. The dataframe is later
to build the one-to-one mapping to the grib file.
Parameters
----------
fs : fsspec.AbstractFileSystem
The file system to read from.
basename : str
The base name is the full path to the grib file.
suffix : str
The suffix is the ending for the idx file.
validate : bool
The validation if the metadata table has duplicate attrs.
Returns
-------
pandas.DataFrame : The data frame containing the results.
"""
import pandas as pd

fname = f"{basename}.{suffix}"
fs.invalidate_cache(fname)
fs.invalidate_cache(basename)

baseinfo = fs.info(basename)

with fs.open(fname, "r") as f:
splits = []
for line in f.readlines():
try:
idx, offset, date, attrs = line.split(":", maxsplit=3)
splits.append(
[
int(idx),
int(offset),
f"{date[2:6]}-{date[6:8]}-{date[8:10]}-{date[10:]}",
attrs,
]
)
except ValueError:
# Wrap the ValueError in a new one that includes the bad line
# If building the mapping, pick a different forecast run where the idx file is not broken
# If indexing a forecast using the mapping, fall back to reading the grib file
raise ValueError(f"Could not parse line: {line}")

result = pd.DataFrame(
data=splits,
columns=["idx", "offset", "date", "attrs"],
)
result = None

# Subtract the next offset to get the length using the filesize for the last value
result.loc[:, "length"] = (
result.offset.shift(periods=-1, fill_value=baseinfo["size"]) - result.offset
try:
result = pd.read_csv(fname, sep=":", header=None).iloc[:, :4]
result.columns = ["idx", "offset", "date", "attrs"]
result["offset"] = result["offset"].astype(int)
result["idx"] = result["idx"].astype(int)
except Exception as e:
raise ValueError(f"Error while parsing {fname} file") from e

result = result.assign(
length=(
result.offset.shift(periods=-1, fill_value=baseinfo["size"]) - result.offset
),
idx_uri=fname,
grib_uri=basename,
indexed_at=pd.Timestamp.now(),
)

result.loc[:, "idx_uri"] = fname
result.loc[:, "grib_uri"] = basename
if tstamp is None:
tstamp = pd.Timestamp.now()
result.loc[:, "indexed_at"] = tstamp

if fs.protocol[0] == "gs":
result.loc[:, "grib_crc32"] = baseinfo["crc32c"]
result.loc[:, "grib_updated_at"] = pd.to_datetime(
baseinfo["updated"]
).tz_localize(None)

idxinfo = fs.info(fname)
result.loc[:, "idx_crc32"] = idxinfo["crc32c"]
result.loc[:, "idx_updated_at"] = pd.to_datetime(
idxinfo["updated"]
).tz_localize(None)
elif fs.protocol[0] == "s3":
result.loc[:, "grib_Etag"] = baseinfo["ETag"]
result.loc[:, "grib_updated_at"] = pd.to_datetime(
baseinfo["LastModified"]
).tz_localize(None)

idxinfo = fs.info(fname)
result.loc[:, "idx_ETag"] = idxinfo["ETag"]
result.loc[:, "idx_updated_at"] = pd.to_datetime(
idxinfo["LastModified"]
).tz_localize(None)
else:
result.loc[:, "grib_crc32"] = None
result.loc[:, "grib_updated_at"] = None
result.loc[:, "idx_crc32"] = None
result.loc[:, "idx_updated_at"] = None

if validate and not result["attrs"].is_unique:
raise ValueError(f"Attribute mapping for grib file {basename} is not unique)")

Expand Down

0 comments on commit 91cb20f

Please sign in to comment.