diff --git a/.dockerignore b/.dockerignore index 221bc6c..5ba8687 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1 @@ -.* -!.git **/auxiliary \ No newline at end of file diff --git a/geospaas_processing/converters/syntool/extra_readers/compare_model_argo.py b/geospaas_processing/converters/syntool/extra_readers/compare_model_argo.py new file mode 100644 index 0000000..3bcfcf3 --- /dev/null +++ b/geospaas_processing/converters/syntool/extra_readers/compare_model_argo.py @@ -0,0 +1,275 @@ +import ConfigParser +import json +import os +import os.path +import pkg_resources +import shutil +import sys +from datetime import datetime,timedelta + +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +import netCDF4 +import numpy +import pyproj +import pyproj.enums + +from yapsy.PluginManager import PluginManager + + +class BadProfile(Exception): + """Exception raised when a profile does not have the necessary fields""" + + +def get_profile_reader(): + manager = PluginManager() + readers_dir = pkg_resources.resource_filename('syntool_ingestor', + os.path.join('share', 'plugins', 'readers')) + manager.setPluginPlaces([readers_dir]) + manager.collectPlugins() + for plugin_wrapper in manager.getAllPlugins(): + try: + if plugin_wrapper.plugin_object.can_handle('argo_profile_erddap_json'): + return plugin_wrapper.plugin_object + except NotImplementedError: + pass + return None + + +def find_closest_point(model_x, model_y, profile_x, profile_y): + closest_coordinates_indices = [] + for model_coordinate, profile_coordinate in ((model_x, profile_x), (model_y, profile_y)): + for i, _ in enumerate(model_coordinate): + prev = i-1 if i > 0 else 0 + next = i+1 if i < len(model_coordinate) - 1 else -1 + if (profile_coordinate >= model_coordinate[i] - ((model_coordinate[i] - model_coordinate[prev]) / 2.0) and + profile_coordinate < model_coordinate[i] + (model_coordinate[next] - model_coordinate[i]) / 2.0): + closest_coordinates_indices.append(i) + break + return closest_coordinates_indices + + +def plot_profile(to_plot, output_path): + figure, axes = plt.subplots(1, len(to_plot)) + + if not isinstance(axes, numpy.ndarray): + axes = [axes] + + for k, profile_to_plot in enumerate(to_plot): + for field in profile_to_plot['fields']: + if field in ('pres', 'depth'): + y_field = field + break + else: + raise BadProfile("pres or depth field needs to be present") + + axes[k].invert_yaxis() + axes[k].set_ylabel("{} ({})".format( + y_field, profile_to_plot['fields'][y_field]['units'])) + + fields_counter = 0 + for field in profile_to_plot['fields']: + if field not in ('pres', 'depth'): + color = profile_to_plot['fields'][field]['color'] + if fields_counter == 0: + axes[k].set_xlabel("{} {} ({})".format( + profile_to_plot.get('name', 'TOPAZ 5'), + field, + profile_to_plot['fields'][field]['units'])) + axes[k].tick_params(axis='x', colors=color) + axes[k].get_xaxis().label.set_color(color) + axes[k].plot(profile_to_plot['fields'][field]['data'], + profile_to_plot['fields'][y_field]['data'], + color=color) + else: + additional_axes = axes[k].twiny() + additional_axes.axes.set_xlabel("{} {} ({})".format( + profile_to_plot.get('name', 'TOPAZ 5'), + field, + profile_to_plot['fields'][field]['units'])) + additional_axes.tick_params(axis='x', colors=color) + additional_axes.get_xaxis().label.set_color(color) + additional_axes.plot( + profile_to_plot['fields'][field]['data'], + profile_to_plot['fields'][y_field]['data'], + color=color) + fields_counter += 1 + figure.tight_layout() + figure.savefig(output_path) + plt.close(figure) + + +def create_profile_ini(features_dir): + config_parser = ConfigParser.ConfigParser() + config_parser.add_section('global') + config_parser.set('global', 'display_type', 'PROFILE') + config_parser.set('global', 'feature_type', 'profile') + config_parser.add_section('metadata') + config_parser.set('metadata', 'profile', 'features/profile.svg') + with open(os.path.join(features_dir, 'profile.ini'), 'w') as profile_file: + config_parser.write(profile_file) + + +def main(): + """""" + model_path = sys.argv[1] + argo_paths = sys.argv[2].split(',') + output_dir = sys.argv[3] + + with netCDF4.Dataset(model_path, 'r') as model_dataset: + if 'o2' in model_dataset.variables: + product_name = '3413_Comparison_Bio_ARGO_TOPAZ5' + fields = { + 'depth': { + 'model_field': 'depth', + 'reference': 'model', + }, + 'pres': { + 'profile_field': 'pres', + 'reference': 'profile', + }, + 'oxygen': { + 'profile_field': 'doxy', + 'model_field': 'o2', + 'color': 'tab:blue', + }, + 'chlorophyll': { + 'profile_field': 'chla', + 'model_field': 'chl', + 'color': 'tab:green', + } + } + elif 'so' in model_dataset.variables: + product_name = '3413_Comparison_ARGO_TOPAZ5' + fields = { + 'depth': { + 'model_field': 'depth', + 'reference': 'model', + }, + 'pres': { + 'profile_field': 'pres', + 'reference': 'profile', + }, + 'salinity': { + 'profile_field': 'psal', + 'model_field': 'so', + 'color': 'tab:blue', + }, + 'temperature': { + 'profile_field': 'temp', + 'model_field': 'thetao', + 'color': 'tab:orange', + } + } + + model_x = model_dataset.variables['x'][:] * 100000 + model_y = model_dataset.variables['y'][:] * 100000 + model_crs = pyproj.CRS.from_proj4(model_dataset.variables['stereographic'].proj4) + + proj = pyproj.Transformer.from_crs(model_crs.geodetic_crs, model_crs, always_xy=True) + + profile_reader = get_profile_reader() + reader_cfg = {'output_options': {}} + + for argo_path in argo_paths: + for meta, profile in profile_reader.extract_from_dataset(argo_path, reader_cfg): + profile_id = "{}_{}".format(profile.platform_number, profile.cycle_number) + profile_x, profile_y = proj.transform(profile.longitude, profile.latitude) + model_point_indices = find_closest_point(model_x, model_y, profile_x, profile_y) + + profile_fields_to_plot = {} + model_fields_to_plot = {} + + for field, field_properties in fields.items(): + # select profile fields to plot + if 'profile_field' in field_properties: + adjusted_field = "{}_adjusted".format(field_properties['profile_field']) + profile_field_name = None + if adjusted_field in profile.list_measurements_fields(): + profile_field_name = adjusted_field + elif field in profile.list_measurements_fields(): + profile_field_name = field_properties['profile_field'] + if profile_field_name: + profile_fields_to_plot[field] = { + 'data': profile.get_data_array(profile_field_name), + 'units': profile.get_field_units(profile_field_name), + } + if not field_properties.get('reference') == 'profile': + profile_fields_to_plot[field]['color'] = field_properties['color'] + + # select model fields to plot + if 'model_field' in field_properties: + model_field_name = field_properties['model_field'] + if field_properties.get('reference') == 'model': + model_fields_to_plot[field] = { + 'data': model_dataset.variables[model_field_name][:], + 'units': model_dataset.variables[model_field_name].units + } + else: + model_fields_to_plot[field] = { + 'data': model_dataset.variables[model_field_name][0, :, model_point_indices[1], model_point_indices[0]], + 'units': model_dataset.variables[model_field_name].units, + 'color': field_properties['color'], + } + + if not profile_fields_to_plot or profile_fields_to_plot.keys() == ['pres']: + print("No fields to plot for {}".format(profile_id)) + continue + + to_plot = [ + {'name': profile_id, 'fields': profile_fields_to_plot}, + {'name': 'TOPAZ 5', 'fields': model_fields_to_plot} + ] + + profile_name = '{}_{}_{}'.format(os.path.basename(model_path).rstrip('.nc'), + profile_id, + profile.time.strftime('%Y%m%d_%H%M%S')) + relative_ingested_path = os.path.join(product_name, profile_name) + ingested_path = os.path.join(output_dir, relative_ingested_path) + features_path = os.path.join(ingested_path, 'features') + profile_svg_path = os.path.join(features_path, 'profile.svg') + try: + os.makedirs(os.path.dirname(profile_svg_path)) + except OSError: + pass + + try: + plot_profile(to_plot, profile_svg_path) + except BadProfile as error: + print("Could not plot profile: {}".format(error.args[0])) + shutil.rmtree(ingested_path, ignore_errors=True) + continue + create_profile_ini(features_path) + + profile_time = profile.time + metadata_time_format = '%Y-%m-%d %H:%M:%S' + + metadata = { + "product": product_name[5:].replace('_', ' '), + "max_zoom_level": 0, + "min_zoom_level": 0, + "bbox_str": "POLYGON(({} {},{} {},{} {},{} {},{} {}))".format( + profile_x - 50, profile_y - 50, + profile_x - 50, profile_y + 50, + profile_x + 50, profile_y + 50, + profile_x + 50, profile_y - 50, + profile_x - 50, profile_y - 50, + ), + "dataset": profile_name, + "shape_str": "POINT ({} {})".format(profile_x, profile_y), + "syntool_id": product_name, + "begin_datetime": profile_time.strftime(metadata_time_format), + "end_datetime": ( + profile_time + timedelta(seconds=1) + ).strftime(metadata_time_format), + "output_type": "MOORED", + "resolutions": [], + } + + with open(os.path.join(ingested_path, 'metadata.json'), 'wb') as metadata_file: + json.dump(metadata, metadata_file) + + +if __name__ == '__main__': + main() diff --git a/geospaas_processing/tasks/syntool.py b/geospaas_processing/tasks/syntool.py index 380d6d8..1b49596 100644 --- a/geospaas_processing/tasks/syntool.py +++ b/geospaas_processing/tasks/syntool.py @@ -1,17 +1,22 @@ """Tasks related to Syntool""" import os +import re import shutil import subprocess -import re +import tempfile +from contextlib import ExitStack from pathlib import Path import celery +import geospaas_processing.converters.syntool +import geospaas_processing.utils as utils from geospaas.catalog.models import Dataset from geospaas_processing.tasks import lock_dataset_files, FaultTolerantTask, WORKING_DIRECTORY from ..converters.syntool.converter import SyntoolConversionManager from ..models import ProcessingResult -from . import app +from . import app, DATASET_LOCK_PREFIX + logger = celery.utils.log.get_task_logger(__name__) @@ -69,6 +74,52 @@ def convert(self, args, **kwargs): # pylint: disable=unused-argument save_results(dataset_id, converted_files) return (dataset_id, converted_files) +@app.task(base=FaultTolerantTask, bind=True, track_started=True) +def compare_profiles(self, args, **kwargs): + """Generate side-by-side profiles of a model and in-situ data. + """ + model_id, (model_path,) = args[0] + profiles = args[1] # iterable of (id, (path,)) tuples + + working_dir = Path(WORKING_DIRECTORY) + output_dir = Path(os.getenv('GEOSPAAS_PROCESSING_SYNTOOL_RESULTS_DIR', WORKING_DIRECTORY)) + + locks = [utils.redis_lock(f"{DATASET_LOCK_PREFIX}{model_id}", self.request.id)] + for profile_id, _ in profiles: + locks.append(utils.redis_lock(f"{DATASET_LOCK_PREFIX}{profile_id}", self.request.id)) + + with tempfile.TemporaryDirectory() as tmp_dir, \ + ExitStack() as stack: + for lock in locks: # lock all model and profile datasets + stack.enter_context(lock) + + command = [ + 'python2', + str(Path(geospaas_processing.converters.syntool.__file__).parent + / 'extra_readers' + / 'compare_model_argo.py'), + str(working_dir / model_path), + ','.join(str(working_dir / p[1][0]) for p in profiles), + tmp_dir + ] + try: + process = subprocess.run(command, capture_output=True) + except subprocess.CalledProcessError as error: + logger.error("Could not generate comparison profiles for dataset %s\nstdout: %s\nstderr: %s", + model_id, + error.output, + error.stderr) + raise + + results = [] + if process.returncode == 0: + for product_dir in Path(tmp_dir).iterdir(): + shutil.copytree(str(product_dir), str(output_dir / 'ingested' / product_dir.name), + dirs_exist_ok=True) + for granule_dir in product_dir.iterdir(): + results.append(str(Path('ingested', product_dir.name, granule_dir.name))) + save_results(model_id, results) + return (model_id, results) @app.task(base=FaultTolerantTask, bind=True, track_started=True) @lock_dataset_files diff --git a/tests/tasks/test_syntool_tasks.py b/tests/tasks/test_syntool_tasks.py index 13e64a9..2e615af 100644 --- a/tests/tasks/test_syntool_tasks.py +++ b/tests/tasks/test_syntool_tasks.py @@ -8,6 +8,7 @@ import django.test import celery.exceptions +import geospaas_processing.converters.syntool import geospaas_processing.tasks import geospaas_processing.tasks.syntool as tasks_syntool from geospaas_processing.models import ProcessingResult @@ -82,6 +83,66 @@ def test_convert(self): mock_save_results.assert_called_once_with(1, ('bar', 'baz')) self.assertTupleEqual(result, (1, ('bar', 'baz'))) + def test_compare_profiles(self): + """Test that the right calls are made in compare_profiles""" + with mock.patch('celery.Task.request') as mock_task_request, \ + mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \ + mock.patch('subprocess.run') as mock_run, \ + mock.patch('pathlib.Path.iterdir') as mock_iterdir, \ + mock.patch('shutil.copytree')as mock_copytree, \ + mock.patch('geospaas_processing.tasks.syntool.save_results') as mock_save_results, \ + mock.patch('geospaas_processing.utils.redis_lock') as mock_lock: + + with self.subTest('compare_profiles should succeed'): + mock_task_request.id = 'task_id' + mock_tmpdir.return_value.__enter__.return_value = '/tmp' + mock_run.return_value.returncode = 0 + mock_iterdir.side_effect = ( + (Path('3413_product_1'), Path('3413_product_2')), + (Path('product_1_granule_1'), Path('product_1_granule_2')), + (Path('product_2_granule_1'),), + ) + + results = tasks_syntool.compare_profiles(((1, ('/foo',)), + ((2, ('/bar',)), (3, ('/baz',))))) + self.assertTupleEqual( + results, + (1, ['ingested/3413_product_1/product_1_granule_1', + 'ingested/3413_product_1/product_1_granule_2', + 'ingested/3413_product_2/product_2_granule_1']) + ) + mock_lock.assert_has_calls(( + mock.call('lock-1', 'task_id'), + mock.call('lock-2', 'task_id'), + mock.call('lock-3', 'task_id') + )) + mock_run.assert_called_once_with( + [ + 'python2', + str(Path(geospaas_processing.converters.syntool.__file__).parent / + Path('extra_readers', 'compare_model_argo.py')), + '/foo', '/bar,/baz', '/tmp' + ], + capture_output=True) + mock_copytree.assert_has_calls(( + mock.call('3413_product_1', '/tmp/test_data/ingested/3413_product_1', + dirs_exist_ok=True), + mock.call('3413_product_2', '/tmp/test_data/ingested/3413_product_2', + dirs_exist_ok=True) + )) + mock_save_results.assert_called_once_with( + 1, + ['ingested/3413_product_1/product_1_granule_1', + 'ingested/3413_product_1/product_1_granule_2', + 'ingested/3413_product_2/product_2_granule_1'] + ) + with self.subTest('compare_profiles should fail'): + mock_run.side_effect = subprocess.CalledProcessError(returncode=1, cmd='foo') + with self.assertRaises(subprocess.CalledProcessError), \ + self.assertLogs(tasks_syntool.logger, logging.ERROR): + tasks_syntool.compare_profiles(((1, ('/foo',)), + ((2, ('/bar',)), (3, ('/baz',))))) + class DBInsertTestCase(unittest.TestCase): """Tests for the db_insert() task"""