diff --git a/aapp_runner/helper_functions.py b/aapp_runner/helper_functions.py index 0c41456..3a6e1e5 100644 --- a/aapp_runner/helper_functions.py +++ b/aapp_runner/helper_functions.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (c) 2014 - 2021 Pytroll Community +# Copyright (c) 2014 - 2022 Pytroll Community # # Author(s): # @@ -30,6 +30,63 @@ LOGGER = logging.getLogger(__name__) +def check_if_scene_is_unique(config): + """Check if the Scene is unique. + + The scene is checked against the register (holding already processed + scenes). If it overlaps in time with a previously processed scene and the + area-id's (used to collect the data (granules) are the same, then return + False - the scene is then not unique and should not be processed further. + + """ + LOGGER.debug("config.job_register: %s", str(config.job_register)) + LOGGER.debug("config platform_name: %s", str(config['platform_name'])) + LOGGER.debug("config - collection_area_id: %s", str(config['collection_area_id'])) + + # Use sat id, start and end time and area_id as the unique identifier of the scene! + if (config['platform_name'] in config.job_register and + len(config.job_register[config['platform_name']]) > 0): + + # Go through list of start,end time tuples and see if the current + # scene overlaps with any - only if the area ids are the same + + # Get registered start and end times with area id equal to current area_id + registered_times = [] + for start_t, end_t, area_id in config.job_register[config['platform_name']]: + if area_id == config['collection_area_id']: + registered_times.append((start_t, end_t)) + + # Get overlap status + status = overlapping_timeinterval( + (config['starttime'], config['endtime']), registered_times) + + if status: + info_msg = ("Processing of scene " + config['platform_name'] + + " " + str(status[0]) + " " + str(status[1]) + + " with overlapping time has been" + " launched previously. Skip it!") + LOGGER.info(info_msg) + return False + + LOGGER.debug("No overlap with any recently processed scenes...") + + return True + + +def create_scene_id(config): + """Create a unique scene specific ID to identify the scene process for later. + + The id is created from the platform name and start and end times of the + scene available in the process config dictionary. + + """ + scene_id = (str(config['platform_name']) + '_' + + config['starttime'].strftime('%Y%m%d%H%M%S') + + '_' + config['endtime'].strftime('%Y%m%d%H%M%S')) + LOGGER.debug("scene_id = " + str(scene_id)) + return scene_id + + def overlapping_timeinterval(start_end_times, timelist): """From a list of start and end times check if the current time interval overlaps with one or more""" @@ -39,7 +96,7 @@ def overlapping_timeinterval(start_end_times, timelist): if ((tstart <= starttime and tend > starttime) or (tstart < endtime and tend >= endtime)): return tstart, tend - elif (tstart >= starttime and tend <= endtime): + if (tstart >= starttime and tend <= endtime): return tstart, tend return False @@ -50,7 +107,7 @@ def run_shell_command(command, use_shell=False, use_shlex=True, my_cwd=None, """Run the given command as a shell and get the return code, stdout and stderr Returns True/False and return code. """ - from subprocess import Popen, PIPE + from subprocess import PIPE, Popen if stdin is not None: stdin = stdin.encode('utf-8') diff --git a/aapp_runner/tests/test_helper_functions.py b/aapp_runner/tests/test_helper_functions.py new file mode 100644 index 0000000..7100e80 --- /dev/null +++ b/aapp_runner/tests/test_helper_functions.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2022 Pytroll developers + +# Author(s): + +# Adam Dybbroe + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Unittesting the helper functions for the AAPP-runner. +""" + +import logging +import unittest +from datetime import datetime +from unittest.mock import patch + +from aapp_runner.helper_functions import check_if_scene_is_unique +from aapp_runner.read_aapp_config import AappL1Config, AappRunnerConfig +from aapp_runner.tests.test_config import (TEST_YAML_CONTENT_OK, + create_config_from_yaml) + + +class TestProcessConfigChecking(unittest.TestCase): + """Test various functions checking on the (non-static) config during processing.""" + + def setUp(self): + self.config_complete = create_config_from_yaml(TEST_YAML_CONTENT_OK) + + @patch('aapp_runner.read_aapp_config.load_config_from_file') + def test_check_if_scene_is_unique_return_value(self, config): + """Test checking if the current scene is unique or if it has been processed earlier.""" + config.return_value = self.config_complete + myfilename = "/tmp/mytestfile" + aapp_run_config = AappRunnerConfig(myfilename, 'norrkoping', 'xl-band') + aapp_config = AappL1Config(aapp_run_config.config, 'xl-band') + + aapp_config['platform_name'] = 'metop03' + aapp_config['collection_area_id'] = 'euron1' + aapp_config['starttime'] = datetime(2022, 1, 8, 12, 49, 50) + aapp_config['endtime'] = datetime(2022, 1, 8, 13, 0, 26) + + aapp_config.job_register = {} + + result = check_if_scene_is_unique(aapp_config) + assert result + + aapp_config.job_register = {'metop03': [(datetime(2022, 1, 8, 12, 49, 50), + datetime(2022, 1, 8, 13, 0, 26), 'euron1')]} + # An EARS scene (same platform and overlapping time interval and over + # the same area of interest) arrives shortly after: + aapp_config['platform_name'] = 'metop03' + aapp_config['collection_area_id'] = 'euron1' + aapp_config['starttime'] = datetime(2022, 1, 8, 12, 50) + aapp_config['endtime'] = datetime(2022, 1, 8, 13, 0) + + result = check_if_scene_is_unique(aapp_config) + assert not result + + @patch('aapp_runner.read_aapp_config.load_config_from_file') + def test_check_if_scene_is_unique_logging(self, config): + """Test the logging when checking if the current scene is unique.""" + config.return_value = self.config_complete + myfilename = "/tmp/mytestfile" + aapp_run_config = AappRunnerConfig(myfilename, 'norrkoping', 'xl-band') + aapp_config = AappL1Config(aapp_run_config.config, 'xl-band') + + aapp_config.job_register = {'metop03': [(datetime(2022, 1, 8, 12, 49, 50), + datetime(2022, 1, 8, 13, 0, 26), 'euron1')]} + # An EARS scene (same platform and overlapping time interval and over + # the same area of interest) arrives shortly after: + aapp_config['platform_name'] = 'metop03' + aapp_config['collection_area_id'] = 'euron1' + aapp_config['starttime'] = datetime(2022, 1, 8, 12, 50) + aapp_config['endtime'] = datetime(2022, 1, 8, 13, 0) + + expected_logging = ['INFO:aapp_runner.helper_functions:first message', + 'INFO:aapp_runner.helper_functions:Processing of scene metop03 2022-01-08 12:49:50 2022-01-08 13:00:26 with overlapping time has been launched previously. Skip it!'] + + with self.assertLogs('aapp_runner.helper_functions', level='INFO') as cm: + logging.getLogger('aapp_runner.helper_functions').info('first message') + _ = check_if_scene_is_unique(aapp_config) + + self.assertEqual(cm.output, expected_logging) + + with self.assertLogs('aapp_runner.helper_functions', level='WARNING') as cm: + logging.getLogger('aapp_runner.helper_functions').warning('first message') + _ = check_if_scene_is_unique(aapp_config) + + self.assertEqual(len(cm.output), 1) + + # Scene is different (different satellite) from previous: + aapp_config['platform_name'] = 'metop01' + aapp_config['collection_area_id'] = 'euron1' + aapp_config['starttime'] = datetime(2022, 1, 8, 12, 50) + aapp_config['endtime'] = datetime(2022, 1, 8, 13, 0) + + with self.assertLogs('aapp_runner.helper_functions', level='INFO') as cm: + logging.getLogger('aapp_runner.helper_functions').info('first message') + result = check_if_scene_is_unique(aapp_config) + + assert result + self.assertEqual(len(cm.output), 1) diff --git a/bin/aapp_dr_runner.py b/bin/aapp_dr_runner.py index b8f4ffb..366cabf 100755 --- a/bin/aapp_dr_runner.py +++ b/bin/aapp_dr_runner.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# Copyright (c) 2014 - 2021 Pytroll Community +# Copyright (c) 2014 - 2022 Pytroll Community # Author(s): @@ -51,11 +51,12 @@ from posttroll.publisher import Publish from trollsift.parser import compose +from aapp_runner.aapp_runner_tools import set_collection_area_id from aapp_runner.config_helpers import generate_process_config from aapp_runner.do_commutation import do_decommutation from aapp_runner.exceptions import DecommutationError, SatposError, TleError -from aapp_runner.helper_functions import (overlapping_timeinterval, - run_shell_command) +from aapp_runner.helper_functions import (check_if_scene_is_unique, + create_scene_id, run_shell_command) from aapp_runner.read_aapp_config import AappL1Config, AappRunnerConfig from aapp_runner.tle_satpos_prepare import do_tle_satpos, do_tleing @@ -92,7 +93,6 @@ def reset_job_registry(objdict, key, start_end_times_area): return - def cleanup_aapp_logfiles_archive(config): """Loop over the aapp log files directories and remove expired directories accordingly.""" try: @@ -487,46 +487,6 @@ def check_pass_length(msg, config): return True -def create_and_check_scene_id(msg, config): - """Create a scene specific ID to identify the scene process for later.""" - LOG.debug("config.job_register: %s", str(config.job_register)) - LOG.debug("config platform_name: %s", str(config['platform_name'])) - LOG.debug("config - collection_area_id: %s", str(config['collection_area_id'])) - - # Use sat id, start and end time and area_id as the unique identifier of the scene! - if (config['platform_name'] in config.job_register and - len(config.job_register[config['platform_name']]) > 0): - - # Go through list of start,end time tuples and see if the current - # scene overlaps with any - only if the area ids are the same - - # Get registed start and end times with area id equal to current area_id - registed_times = [] - for start_t, end_t, area_id in config.job_register[config['platform_name']]: - if area_id == config['collection_area_id']: - registed_times.append((start_t, end_t)) - - # Get overlap status - status = overlapping_timeinterval( - (config['starttime'], config['endtime']), registed_times) - - if status: - LOG.warning("Processing of scene " + config['platform_name'] + - " " + str(status[0]) + " " + str(status[1]) + - " with overlapping time has been" - " launched previously") - LOG.info("Skip it...") - return False - else: - LOG.debug("No overlap with any recently processed scenes...") - - scene_id = (str(config['platform_name']) + '_' + - config['starttime'].strftime('%Y%m%d%H%M%S') + - '_' + config['endtime'].strftime('%Y%m%d%H%M%S')) - LOG.debug("scene_id = " + str(scene_id)) - return scene_id - - def which(program): """Check if executable is available in the system environment path.""" # Check if needed executable are available in the @@ -858,10 +818,11 @@ def publish_level1(publisher, config, msg, filelist, station_name, environment): if not generate_process_config(msg, aapp_config): continue - scene_id = create_and_check_scene_id(msg, aapp_config) - if not scene_id: + scene_is_unique = check_if_scene_is_unique(aapp_config) + if not scene_is_unique: continue + scene_id = create_scene_id(aapp_config) try: if not setup_aapp_processing(aapp_config): raise Exception("setup_aapp_processing returned False. See above lines for details.")