Skip to content

Commit

Permalink
Merge pull request #116 from nansencenter/model_argo_comparison
Browse files Browse the repository at this point in the history
Model/Argo comparison
  • Loading branch information
aperrin66 authored Sep 30, 2024
2 parents aa38da1 + 0f825c9 commit c09cfc5
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 4 deletions.
2 changes: 0 additions & 2 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
.*
!.git
**/auxiliary
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
import ConfigParser
import json
import os
import os.path
import pkg_resources
import shutil
import sys
from datetime import datetime,timedelta

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import netCDF4
import numpy
import pyproj
import pyproj.enums

from yapsy.PluginManager import PluginManager


class BadProfile(Exception):
"""Exception raised when a profile does not have the necessary fields"""


def get_profile_reader():
manager = PluginManager()
readers_dir = pkg_resources.resource_filename('syntool_ingestor',
os.path.join('share', 'plugins', 'readers'))
manager.setPluginPlaces([readers_dir])
manager.collectPlugins()
for plugin_wrapper in manager.getAllPlugins():
try:
if plugin_wrapper.plugin_object.can_handle('argo_profile_erddap_json'):
return plugin_wrapper.plugin_object
except NotImplementedError:
pass
return None


def find_closest_point(model_x, model_y, profile_x, profile_y):
closest_coordinates_indices = []
for model_coordinate, profile_coordinate in ((model_x, profile_x), (model_y, profile_y)):
for i, _ in enumerate(model_coordinate):
prev = i-1 if i > 0 else 0
next = i+1 if i < len(model_coordinate) - 1 else -1
if (profile_coordinate >= model_coordinate[i] - ((model_coordinate[i] - model_coordinate[prev]) / 2.0) and
profile_coordinate < model_coordinate[i] + (model_coordinate[next] - model_coordinate[i]) / 2.0):
closest_coordinates_indices.append(i)
break
return closest_coordinates_indices


def plot_profile(to_plot, output_path):
figure, axes = plt.subplots(1, len(to_plot))

if not isinstance(axes, numpy.ndarray):
axes = [axes]

for k, profile_to_plot in enumerate(to_plot):
for field in profile_to_plot['fields']:
if field in ('pres', 'depth'):
y_field = field
break
else:
raise BadProfile("pres or depth field needs to be present")

axes[k].invert_yaxis()
axes[k].set_ylabel("{} ({})".format(
y_field, profile_to_plot['fields'][y_field]['units']))

fields_counter = 0
for field in profile_to_plot['fields']:
if field not in ('pres', 'depth'):
color = profile_to_plot['fields'][field]['color']
if fields_counter == 0:
axes[k].set_xlabel("{} {} ({})".format(
profile_to_plot.get('name', 'TOPAZ 5'),
field,
profile_to_plot['fields'][field]['units']))
axes[k].tick_params(axis='x', colors=color)
axes[k].get_xaxis().label.set_color(color)
axes[k].plot(profile_to_plot['fields'][field]['data'],
profile_to_plot['fields'][y_field]['data'],
color=color)
else:
additional_axes = axes[k].twiny()
additional_axes.axes.set_xlabel("{} {} ({})".format(
profile_to_plot.get('name', 'TOPAZ 5'),
field,
profile_to_plot['fields'][field]['units']))
additional_axes.tick_params(axis='x', colors=color)
additional_axes.get_xaxis().label.set_color(color)
additional_axes.plot(
profile_to_plot['fields'][field]['data'],
profile_to_plot['fields'][y_field]['data'],
color=color)
fields_counter += 1
figure.tight_layout()
figure.savefig(output_path)
plt.close(figure)


def create_profile_ini(features_dir):
config_parser = ConfigParser.ConfigParser()
config_parser.add_section('global')
config_parser.set('global', 'display_type', 'PROFILE')
config_parser.set('global', 'feature_type', 'profile')
config_parser.add_section('metadata')
config_parser.set('metadata', 'profile', 'features/profile.svg')
with open(os.path.join(features_dir, 'profile.ini'), 'w') as profile_file:
config_parser.write(profile_file)


def main():
""""""
model_path = sys.argv[1]
argo_paths = sys.argv[2].split(',')
output_dir = sys.argv[3]

with netCDF4.Dataset(model_path, 'r') as model_dataset:
if 'o2' in model_dataset.variables:
product_name = '3413_Comparison_Bio_ARGO_TOPAZ5'
fields = {
'depth': {
'model_field': 'depth',
'reference': 'model',
},
'pres': {
'profile_field': 'pres',
'reference': 'profile',
},
'oxygen': {
'profile_field': 'doxy',
'model_field': 'o2',
'color': 'tab:blue',
},
'chlorophyll': {
'profile_field': 'chla',
'model_field': 'chl',
'color': 'tab:green',
}
}
elif 'so' in model_dataset.variables:
product_name = '3413_Comparison_ARGO_TOPAZ5'
fields = {
'depth': {
'model_field': 'depth',
'reference': 'model',
},
'pres': {
'profile_field': 'pres',
'reference': 'profile',
},
'salinity': {
'profile_field': 'psal',
'model_field': 'so',
'color': 'tab:blue',
},
'temperature': {
'profile_field': 'temp',
'model_field': 'thetao',
'color': 'tab:orange',
}
}

model_x = model_dataset.variables['x'][:] * 100000
model_y = model_dataset.variables['y'][:] * 100000
model_crs = pyproj.CRS.from_proj4(model_dataset.variables['stereographic'].proj4)

proj = pyproj.Transformer.from_crs(model_crs.geodetic_crs, model_crs, always_xy=True)

profile_reader = get_profile_reader()
reader_cfg = {'output_options': {}}

for argo_path in argo_paths:
for meta, profile in profile_reader.extract_from_dataset(argo_path, reader_cfg):
profile_id = "{}_{}".format(profile.platform_number, profile.cycle_number)
profile_x, profile_y = proj.transform(profile.longitude, profile.latitude)
model_point_indices = find_closest_point(model_x, model_y, profile_x, profile_y)

profile_fields_to_plot = {}
model_fields_to_plot = {}

for field, field_properties in fields.items():
# select profile fields to plot
if 'profile_field' in field_properties:
adjusted_field = "{}_adjusted".format(field_properties['profile_field'])
profile_field_name = None
if adjusted_field in profile.list_measurements_fields():
profile_field_name = adjusted_field
elif field in profile.list_measurements_fields():
profile_field_name = field_properties['profile_field']
if profile_field_name:
profile_fields_to_plot[field] = {
'data': profile.get_data_array(profile_field_name),
'units': profile.get_field_units(profile_field_name),
}
if not field_properties.get('reference') == 'profile':
profile_fields_to_plot[field]['color'] = field_properties['color']

# select model fields to plot
if 'model_field' in field_properties:
model_field_name = field_properties['model_field']
if field_properties.get('reference') == 'model':
model_fields_to_plot[field] = {
'data': model_dataset.variables[model_field_name][:],
'units': model_dataset.variables[model_field_name].units
}
else:
model_fields_to_plot[field] = {
'data': model_dataset.variables[model_field_name][0, :, model_point_indices[1], model_point_indices[0]],
'units': model_dataset.variables[model_field_name].units,
'color': field_properties['color'],
}

if not profile_fields_to_plot or profile_fields_to_plot.keys() == ['pres']:
print("No fields to plot for {}".format(profile_id))
continue

to_plot = [
{'name': profile_id, 'fields': profile_fields_to_plot},
{'name': 'TOPAZ 5', 'fields': model_fields_to_plot}
]

profile_name = '{}_{}_{}'.format(os.path.basename(model_path).rstrip('.nc'),
profile_id,
profile.time.strftime('%Y%m%d_%H%M%S'))
relative_ingested_path = os.path.join(product_name, profile_name)
ingested_path = os.path.join(output_dir, relative_ingested_path)
features_path = os.path.join(ingested_path, 'features')
profile_svg_path = os.path.join(features_path, 'profile.svg')
try:
os.makedirs(os.path.dirname(profile_svg_path))
except OSError:
pass

try:
plot_profile(to_plot, profile_svg_path)
except BadProfile as error:
print("Could not plot profile: {}".format(error.args[0]))
shutil.rmtree(ingested_path, ignore_errors=True)
continue
create_profile_ini(features_path)

profile_time = profile.time
metadata_time_format = '%Y-%m-%d %H:%M:%S'

metadata = {
"product": product_name[5:].replace('_', ' '),
"max_zoom_level": 0,
"min_zoom_level": 0,
"bbox_str": "POLYGON(({} {},{} {},{} {},{} {},{} {}))".format(
profile_x - 50, profile_y - 50,
profile_x - 50, profile_y + 50,
profile_x + 50, profile_y + 50,
profile_x + 50, profile_y - 50,
profile_x - 50, profile_y - 50,
),
"dataset": profile_name,
"shape_str": "POINT ({} {})".format(profile_x, profile_y),
"syntool_id": product_name,
"begin_datetime": profile_time.strftime(metadata_time_format),
"end_datetime": (
profile_time + timedelta(seconds=1)
).strftime(metadata_time_format),
"output_type": "MOORED",
"resolutions": [],
}

with open(os.path.join(ingested_path, 'metadata.json'), 'wb') as metadata_file:
json.dump(metadata, metadata_file)


if __name__ == '__main__':
main()
55 changes: 53 additions & 2 deletions geospaas_processing/tasks/syntool.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
"""Tasks related to Syntool"""
import os
import re
import shutil
import subprocess
import re
import tempfile
from contextlib import ExitStack
from pathlib import Path

import celery

import geospaas_processing.converters.syntool
import geospaas_processing.utils as utils
from geospaas.catalog.models import Dataset
from geospaas_processing.tasks import lock_dataset_files, FaultTolerantTask, WORKING_DIRECTORY
from ..converters.syntool.converter import SyntoolConversionManager
from ..models import ProcessingResult
from . import app
from . import app, DATASET_LOCK_PREFIX


logger = celery.utils.log.get_task_logger(__name__)

Expand Down Expand Up @@ -69,6 +74,52 @@ def convert(self, args, **kwargs): # pylint: disable=unused-argument
save_results(dataset_id, converted_files)
return (dataset_id, converted_files)

@app.task(base=FaultTolerantTask, bind=True, track_started=True)
def compare_profiles(self, args, **kwargs):
"""Generate side-by-side profiles of a model and in-situ data.
"""
model_id, (model_path,) = args[0]
profiles = args[1] # iterable of (id, (path,)) tuples

working_dir = Path(WORKING_DIRECTORY)
output_dir = Path(os.getenv('GEOSPAAS_PROCESSING_SYNTOOL_RESULTS_DIR', WORKING_DIRECTORY))

locks = [utils.redis_lock(f"{DATASET_LOCK_PREFIX}{model_id}", self.request.id)]
for profile_id, _ in profiles:
locks.append(utils.redis_lock(f"{DATASET_LOCK_PREFIX}{profile_id}", self.request.id))

with tempfile.TemporaryDirectory() as tmp_dir, \
ExitStack() as stack:
for lock in locks: # lock all model and profile datasets
stack.enter_context(lock)

command = [
'python2',
str(Path(geospaas_processing.converters.syntool.__file__).parent
/ 'extra_readers'
/ 'compare_model_argo.py'),
str(working_dir / model_path),
','.join(str(working_dir / p[1][0]) for p in profiles),
tmp_dir
]
try:
process = subprocess.run(command, capture_output=True)
except subprocess.CalledProcessError as error:
logger.error("Could not generate comparison profiles for dataset %s\nstdout: %s\nstderr: %s",
model_id,
error.output,
error.stderr)
raise

results = []
if process.returncode == 0:
for product_dir in Path(tmp_dir).iterdir():
shutil.copytree(str(product_dir), str(output_dir / 'ingested' / product_dir.name),
dirs_exist_ok=True)
for granule_dir in product_dir.iterdir():
results.append(str(Path('ingested', product_dir.name, granule_dir.name)))
save_results(model_id, results)
return (model_id, results)

@app.task(base=FaultTolerantTask, bind=True, track_started=True)
@lock_dataset_files
Expand Down
Loading

0 comments on commit c09cfc5

Please sign in to comment.