Skip to content

Commit

Permalink
Issues/3: Prep for production readiness review (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankinspace authored Aug 30, 2024
1 parent 8965ac5 commit 8d3685a
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- [issues/25](https://github.com/podaac/net2cog/issues/25): Converted harmony adapter to operate on STAC catalog
- [issues/3](https://github.com/podaac/net2cog/issues/3): Improved error handling and updated test cases to use new-style harmony execution

## [0.3.0]
### Changed
Expand Down
2 changes: 1 addition & 1 deletion cmr/ops_associations.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
C1234410736-POCLOUD
C1940468263-POCLOUD
3 changes: 2 additions & 1 deletion cmr/uat_associations.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
C1247485690-LARC_CLOUD
C1247485690-LARC_CLOUD
C1234410736-POCLOUD
8 changes: 5 additions & 3 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@ FROM python:3.10-slim
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get upgrade -y \
gcc \
g++ \
libnetcdf-dev \
libhdf5-dev \
hdf5-helpers \
libgdal-dev \
&& pip3 install --upgrade pip \
&& pip3 install cython \
&& apt-get clean

# Create a new user
RUN adduser --quiet --disabled-password --shell /bin/sh --home /home/dockeruser --gecos "" --uid 1000 dockeruser
USER dockeruser
ENV HOME /home/dockeruser
ENV PYTHONPATH "${PYTHONPATH}:/home/dockeruser/.local/bin"
ENV HOME=/home/dockeruser
ENV PYTHONPATH="${PYTHONPATH}:/home/dockeruser/.local/bin"
ENV PATH="/home/dockeruser/.local/bin:${PATH}"

# The 'SOURCE' argument is what will be used in 'pip install'.
Expand All @@ -43,7 +45,7 @@ WORKDIR /worker

COPY --chown=dockeruser $DIST_PATH $DIST_PATH
USER dockeruser
RUN pip3 install --no-cache-dir --force --user --index-url https://pypi.org/simple/ --extra-index-url https://test.pypi.org/simple/ $SOURCE \
RUN pip install --no-cache-dir --force --user --index-url https://pypi.org/simple/ --extra-index-url https://test.pypi.org/simple/ $SOURCE \
&& rm -rf $DIST_PATH

COPY --chown=dockeruser ./docker/docker-entrypoint.sh docker-entrypoint.sh
Expand Down
27 changes: 18 additions & 9 deletions net2cog/netcdf_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,36 @@
Functions related to converting a NetCDF file to other formats.
"""

import logging
import os
import pathlib
from os.path import join as pjoin, basename, dirname, exists, splitext
import subprocess
from subprocess import check_call

import logging
import tempfile
from os.path import join as pjoin, basename, dirname, exists, splitext
from subprocess import check_call
from typing import List

import xarray as xr
import rasterio
import rioxarray # noqa
import xarray as xr
from rasterio import CRS

from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles

import rioxarray # noqa
from rioxarray.exceptions import DimensionError

LOGGER = logging.getLogger(__name__)
EXCLUDE_VARS = ['lon', 'lat', 'longitude', 'latitude', 'time']


class Net2CogError(Exception):
"""
Exception raised when an error occurs while converting a NetCDF file to COG
"""

def __init__(self, msg):
super().__init__(msg)


def run_command(command, work_dir):
"""
A simple utility to execute a subprocess command.
Expand Down Expand Up @@ -188,7 +194,10 @@ def netcdf_converter(input_nc_file: pathlib.Path, output_cog_pathname: pathlib.P
# xds_reversed = xds.reindex(lat=xds.lat[::-1])
LOGGER.info("Writing COG to %s", basename(gtiff_fname))
if var_list:
xds = xds[var_list]
try:
xds = xds[var_list]
except KeyError as error:
raise Net2CogError(f"Variable {error} not found in dataset") from error
return _write_cogtiff(gtiff_fname, xds)
LOGGER.error("%s: NetCDF file does not contain spatial dimensions such as lat / lon "
"or x / y", netcdf_file)
Expand Down
33 changes: 19 additions & 14 deletions net2cog/netcdf_convert_harmony.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pystac import Asset

from net2cog import netcdf_convert
from net2cog.netcdf_convert import Net2CogError

DATA_DIRECTORY_ENV = "DATA_DIRECTORY"

Expand All @@ -31,8 +32,8 @@ class NetcdfConverterService(harmony.BaseHarmonyAdapter):
for documentation and examples.
"""

def __init__(self, message):
super().__init__(message)
def __init__(self, message, catalog=None, config=None):
super().__init__(message, catalog, config)

self.data_dir = os.getenv(DATA_DIRECTORY_ENV, '/home/dockeruser/data')
pathlib.Path(self.data_dir).mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -60,9 +61,9 @@ def process_item(self, item: pystac.Item, source: harmony.message.Source) -> pys
result = item.clone()
result.assets = {}
output_dir = self.job_data_dir

self.logger.info('Input item %s', json.dumps(item.to_dict()))
try:
self.logger.info('Input item: %s', json.dumps(item.to_dict()))
self.logger.info('Input source: %s', source)
# Get the data file
asset = next(v for k, v in item.assets.items() if 'data' in (v.roles or []))
self.logger.info('Downloading %s to %s', asset.href, output_dir)
Expand All @@ -74,26 +75,34 @@ def process_item(self, item: pystac.Item, source: harmony.message.Source) -> pys

# Generate output filename
output_filename, output_file_ext = os.path.splitext(
harmony.adapter.util.generate_output_filename(input_filename, ext='tif'))
harmony.adapter.util.generate_output_filename(asset.href, ext='tif'))
output_filename = f'{output_filename}_converted{output_file_ext}'

# Determine variables that need processing
self.logger.info('Generating COG(s) for %s output will be saved to %s', input_filename, output_filename)
var_list = source.process('variables')
if not isinstance(var_list, list):
var_list = [var_list]
if len(var_list) > 1:
if len(var_list) != 1:
raise HarmonyException(
'net2cog harmony adapter currently only supports processing one variable at a time. '
'Please specify a single variable in your Harmony request.')
var_list = list(map(lambda var: var.name, var_list))
self.logger.info('Processing variables %s', var_list)

# Run the netcdf converter for the complete netcdf granule
cog_generated = next(iter(netcdf_convert.netcdf_converter(pathlib.Path(input_filename),
pathlib.Path(output_dir).joinpath(
output_filename),
var_list=var_list)), [])
try:
cog_generated = next(iter(netcdf_convert.netcdf_converter(pathlib.Path(input_filename),
pathlib.Path(output_dir).joinpath(
output_filename),
var_list=var_list)), [])
except Net2CogError as error:
raise HarmonyException(
f'net2cog failed to convert {asset.title}: {error}') from error
except Exception as uncaught_exception:
raise HarmonyException(str(f'Uncaught error in net2cog. '
f'Notify net2cog service provider. '
f'Message: {uncaught_exception}')) from uncaught_exception

# Stage the output file with a conventional filename
self.logger.info('Generated COG %s', cog_generated)
Expand All @@ -113,10 +122,6 @@ def process_item(self, item: pystac.Item, source: harmony.message.Source) -> pys
# Return the STAC record
self.logger.info('Processed item %s', json.dumps(result.to_dict()))
return result
except Exception as uncaught_exception:
raise HarmonyException(str(f'Uncaught error in net2cog. '
f'Notify net2cog service provider. '
f'Message: {uncaught_exception}')) from uncaught_exception
finally:
# Clean up any intermediate resources
shutil.rmtree(self.job_data_dir)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"type": "Feature",
"stac_version": "1.0.0-beta.2",
"id": "RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0",
"properties": {
"start_datetime": "2001-01-01T01:01:01Z",
"end_datetime": "2002-02-02T02:02:02Z",
"datetime": null
},
"geometry": null,
"links": [
{
"rel": "root",
"href": "../catalog.json",
"type": "application/json"
},
{
"rel": "parent",
"href": "../catalog.json",
"type": "application/json"
}
],
"assets": {
"data": {
"href": "!!REPLACED IN TEST CASE!!",
"title": "Example2",
"roles": [
"data"
]
}
},
"bbox": [
-1,
-2,
3,
4
]
}
17 changes: 17 additions & 0 deletions tests/data/SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4/catalog.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"id": "test",
"stac_version": "1.0.0-beta.2",
"description": "test",
"links": [
{
"rel": "root",
"href": "./catalog.json",
"type": "application/json"
},
{
"rel": "item",
"href": "./RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0/RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.json",
"type": "application/json"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"requestId": "0001111-2222-3333-4444-5555666677777",
"sources": [
{
"collection": "C0001-EXAMPLE",
"collection": "C1940468263-POCLOUD",
"variables": [
{
"id": "V0001-EXAMPLE",
"id": "V2093907958-POCLOUD",
"name": "sss_smap",
"fullPath": "sss_smap"
}
Expand All @@ -21,7 +21,7 @@
{
"id": "RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0",
"name": "Example2",
"url": "file:///home/tests/data/RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc",
"url": "!!REPLACED IN TEST CASE!!",
"temporal": {
"start": "2001-01-01T01:01:01Z",
"end": "2002-02-02T02:02:02Z"
Expand Down
74 changes: 41 additions & 33 deletions tests/test_netcdf_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest

from net2cog import netcdf_convert
from net2cog.netcdf_convert import Net2CogError


@pytest.fixture(scope='session')
Expand All @@ -29,63 +30,70 @@ def output_basedir(tmp_path):


@pytest.mark.parametrize('data_file', [
'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc'
'SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4/RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc'
])
def test_cog_generation(data_file, data_dir, output_basedir):
"""
Test that the conversion works and the output is a valid cloud optimized geotiff
"""
test_file = pathlib.Path(data_dir, data_file)
output_dir = pathlib.Path(output_basedir, pathlib.Path(data_file).stem)

netcdf_convert.netcdf_converter(test_file, pathlib.Path(output_basedir, data_file), [])
results = netcdf_convert.netcdf_converter(test_file, pathlib.Path(output_basedir, data_file), [])

assert os.path.isdir(output_dir)
output_files = os.listdir(output_dir)
assert len(output_files) > 0
assert len(results) > 0

with os.scandir(output_dir) as outdir:
for entry in outdir:
if entry.is_file():
for entry in results:
if pathlib.Path(entry).is_file():
cogtif_val = [
'rio',
'cogeo',
'validate',
entry
]

cogtif_val = [
'rio',
'cogeo',
'validate',
entry.path
]
process = subprocess.run(cogtif_val, check=True, stdout=subprocess.PIPE, universal_newlines=True)
cog_test = process.stdout
cog_test = cog_test.replace("\n", "")

process = subprocess.run(cogtif_val, check=True, stdout=subprocess.PIPE, universal_newlines=True)
cog_test = process.stdout
cog_test = cog_test.replace("\n", "")
valid_cog = entry + " is a valid cloud optimized GeoTIFF"
assert cog_test == valid_cog

valid_cog = entry.path + " is a valid cloud optimized GeoTIFF"
assert cog_test == valid_cog


def test_band_selection(data_dir, output_basedir):
@pytest.mark.parametrize(['data_file', 'in_bands'], [
['SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4/RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc', ['gland', 'fland', 'sss_smap']]
])
def test_band_selection(data_file, in_bands, data_dir, output_basedir):
"""
Verify the correct bands asked for by the user are being converted
"""

in_bands = sorted(['gland', 'fland', 'sss_smap'])
data_file = 'RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc'
in_bands = sorted(in_bands)
test_file = pathlib.Path(data_dir, data_file)
output_dir = pathlib.Path(output_basedir, pathlib.Path(data_file).stem)

results = netcdf_convert.netcdf_converter(test_file, pathlib.Path(output_basedir, data_file), in_bands)

assert os.path.isdir(output_dir)
output_files = os.listdir(output_dir)
assert len(output_files) == 3
assert len(results) == 3

out_bands = []
with os.scandir(output_dir) as outdir:
for entry in outdir:
if entry.is_file():
band_completed = entry.name.split("4.0_")[-1].replace(".tif", "")
out_bands.append(band_completed)
for entry in results:
if pathlib.Path(entry).is_file():
band_completed = entry.split("4.0_")[-1].replace(".tif", "")
out_bands.append(band_completed)

out_bands.sort()
assert in_bands == out_bands


@pytest.mark.parametrize(['data_file', 'in_bands'], [
['SMAP_RSS_L3_SSS_SMI_8DAY-RUNNINGMEAN_V4/RSS_smap_SSS_L3_8day_running_2020_005_FNL_v04.0.nc', ['waldo']]
])
def test_unknown_band_selection(data_file, in_bands, data_dir, output_basedir):
"""
Verify an incorrect band asked for by the user raises an exception
"""

in_bands = sorted(in_bands)
test_file = pathlib.Path(data_dir, data_file)

with pytest.raises(Net2CogError):
netcdf_convert.netcdf_converter(test_file, pathlib.Path(output_basedir, data_file), in_bands)
Loading

0 comments on commit 8d3685a

Please sign in to comment.