Skip to content

Commit

Permalink
Corrupt grib file check for regrid pipeline (#448)
Browse files Browse the repository at this point in the history
* Added grib file validity check.

* Used wgrib2 utility to check for corrupt file.

* Github CI actions: Installed wgrib2 utility.

* Added sudo permissions

* Added sudo privileges for make command

* Installing wgrib2 with a different method

* Added Dockerfile's wgrib2 installation steps

* Installed webp package

* Cloned wgrib2 build from a github repo

* Reverted wgrib2 changes. Used grib_ls to check file validity.

* Added test cases for corrupt grib files check.

* Version bump
  • Loading branch information
j9sh264 authored Mar 11, 2024
1 parent 5aed3a6 commit 7b579d3
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,4 @@ jobs:
run: |
conda run -n weather-tools pip install -e .[test] --use-deprecated=legacy-resolver
- name: Run type checker
run: conda run -n weather-tools pytype
run: conda run -n weather-tools pytype
18 changes: 17 additions & 1 deletion weather_mv/loader_pipeline/regrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import os.path
import shutil
import subprocess
import tempfile
import typing as t
import warnings
Expand Down Expand Up @@ -233,7 +234,16 @@ def target_from(self, uri: str) -> str:
no_ext, _ = os.path.splitext(in_dest)
return f'{no_ext}.nc'

def apply(self, uri: str):
def is_grib_file_corrupt(self, local_grib: str) -> bool:
try:
# Run grib_ls command to check the file
subprocess.check_output(['grib_ls', local_grib])
return False
except subprocess.CalledProcessError as e:
logger.info(f"Encountered error while reading GRIB: {e}.")
return True

def apply(self, uri: str) -> None:
logger.info(f'Regridding from {uri!r} to {self.target_from(uri)!r}.')

if self.dry_run:
Expand All @@ -247,6 +257,12 @@ def apply(self, uri: str):
logger.info(f'Copying grib from {uri!r} to local disk.')

with open_local(uri) as local_grib:
logger.info(f"Checking for {uri}'s validity...")
if self.is_grib_file_corrupt(local_grib):
logger.info(f"Corrupt GRIB file found: {uri}.")
return
logger.info(f"No issues found with {uri}.")

logger.info(f'Regridding {uri!r}.')
fs = mv.bindings.Fieldset(path=local_grib)
fieldset = mv.regrid(data=fs, **self.regrid_kwargs)
Expand Down
7 changes: 7 additions & 0 deletions weather_mv/loader_pipeline/regrid_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ def test_zarr__coarsen(self):
except: # noqa
self.fail('Cannot open Zarr with Xarray.')

def test_corrupt_grib_file(self):
correct_file_path = os.path.join(self.test_data_folder, 'test_data_grib_single_timestep')
corrupt_file_path = os.path.join(self.test_data_folder, 'test_data_corrupt_grib')

self.assertFalse(self.Op.is_grib_file_corrupt(correct_file_path))
self.assertTrue(self.Op.is_grib_file_corrupt(corrupt_file_path))


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion weather_mv/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
packages=find_packages(),
author='Anthromets',
author_email='[email protected]',
version='0.2.20',
version='0.2.21',
url='https://weather-tools.readthedocs.io/en/latest/weather_mv/',
description='A tool to load weather data into BigQuery.',
install_requires=beam_gcp_requirements + base_requirements,
Expand Down
Binary file added weather_mv/test_data/test_data_corrupt_grib
Binary file not shown.

0 comments on commit 7b579d3

Please sign in to comment.