Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PPS product and level1c collector #62

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions bin/pps_l1c_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2019 - 2021 Pytroll

# Author(s):

# Erik Johansson <Firstname.Lastname at smhi.se>
# Adam Dybbroe <Firstname.Lastname at smhi.se>

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Collector to add PPS level1c file to message with PPS files."""

import argparse
import logging

from nwcsafpps_runner.logger import setup_logging
from nwcsafpps_runner.pps_collector_lib import pps_collector_runner


LOOP = True

LOG = logging.getLogger('pps-l1c-collector')


def get_arguments():
"""Get command line arguments."""
parser = argparse.ArgumentParser()

parser.add_argument("-l", "--log-config",
help="Log config file to use instead of the standard logging.")
parser.add_argument('-c', '--config_file',
type=str,
dest='config_file',
default='config.yaml',
help="The file containing " +
"configuration parameters, \n" +
"default = ./config.yaml",
required=True)
parser.add_argument("-v", "--verbose", dest="verbosity", action="count", default=0,
help="Verbosity (between 1 and 2 occurrences with more leading to more "
"verbose logging). WARN=0, INFO=1, "
"DEBUG=2. This is overridden by the log config file if specified.")

args = parser.parse_args()
setup_logging(args)

if 'template' in args.config_file:
raise IOError("Template file given as master config, aborting!")

return args.config_file


if __name__ == '__main__':

config_file = get_arguments()
pps_collector_runner(config_file)
59 changes: 57 additions & 2 deletions nwcsafpps_runner/message_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,65 @@
import os

from posttroll.message import Message
from nwcsafpps_runner.utils import create_pps_file_from_lvl1c


LOG = logging.getLogger(__name__)


def remove_non_pps_products(msg_data):
""" Remove non-PPS files from datasetlis.t"""
msg_data["dataset"] = [item for item in msg_data["dataset"] if "S_NWC" in item["uid"]]


def get_pps_sensor_from_msg(sensor_msg):
""" Get pps sensor from msg sensor."""
sensor = None
if type(sensor_msg) is list and len(sensor_msg) == 1:
sensor = sensor_msg[0]
if sensor is None:
for pps_sensor in ['viirs', 'avhrr', 'modis', 'mersi2', 'metimage', 'slstr']:
if pps_sensor in sensor_msg:
sensor = pps_sensor
if "avhrr/3" in sensor_msg:
sensor = "avhrr"
return sensor


def add_lvl1c_to_msg(msg_data, options):
"""Add PPS lvl1c file to a collection of PPS products."""
level1c_path = os.environ.get('SM_IMAGER_DIR', options.get('pps_lvl1c_dir', './'))
sensor = options.get('sensor', get_pps_sensor_from_msg(msg_data["sensor"]))
num_files = len(msg_data['dataset'])
to_add = {}
for item in msg_data['dataset']:
lvl1c_file = create_pps_file_from_lvl1c(item["uri"], level1c_path,
name_tag=sensor, file_type='.nc')
to_add[lvl1c_file] = {
"uri": lvl1c_file,
"uid": os.path.basename(lvl1c_file)}
msg_data['dataset'].extend(to_add.values())


def flatten_collection(msg_data):
"""Flatten collection msg to dataset msg."""
if "collection" in msg_data:
collection = msg_data.pop("collection")
msg_data["dataset"] = []
for ind in range(0, len(collection)):
for item in collection[ind]["dataset"]:
if type(item) == dict:
msg_data["dataset"].append(item)


def prepare_pps_collector_message(msg, options):
to_send = msg.data.copy()
flatten_collection(to_send)
remove_non_pps_products(to_send)
add_lvl1c_to_msg(to_send, options)
return to_send


def prepare_nwp_message(result_file, publish_topic):
"""Prepare message for NWP files."""
to_send = {}
Expand Down Expand Up @@ -73,10 +128,10 @@ def prepare_l1c_message(result_file, mda, **kwargs):
return to_send


def publish_l1c(publisher, publish_msg, publish_topic):
def publish_l1c(publisher, publish_msg, publish_topic, msg_type="file"):
"""Publish the messages that l1c files are ready."""
LOG.debug('Publish topic = %s', publish_topic)
for topic in publish_topic:
msg = Message(topic, "file", publish_msg).encode()
msg = Message(topic, msg_type, publish_msg).encode()
LOG.debug("sending: %s", str(msg))
publisher.send(msg)
67 changes: 67 additions & 0 deletions nwcsafpps_runner/pps_collector_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2018 - 2021 Pytroll Developers

# Author(s):

# Nina Hakansson <Firstname.Lastname at smhi.se>

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


import signal
import logging

from contextlib import closing, suppress
from posttroll.publisher import create_publisher_from_dict_config
from posttroll.subscriber import Subscribe
from nwcsafpps_runner.config import get_config

from nwcsafpps_runner.message_utils import (publish_l1c,
prepare_pps_collector_message)

LOG = logging.getLogger(__name__)
LOOP = True


def _run_subscribe_publisher(subscriber, publisher, options):
"""The posttroll subscribe/publisher runner."""
def signal_handler(sig, frame):
LOG.warning('You pressed Ctrl+C!')
global LOOP
LOOP = False

signal.signal(signal.SIGINT, signal_handler)

while LOOP:
for msg in subscriber.recv():
LOG.debug(
"Received message data = %s", msg)
pub_msg = prepare_pps_collector_message(msg, options)
publish_l1c(publisher, pub_msg, publish_topic=[options["publish_topic"]], msg_type="dataset")
LOG.info("L1c and PPS products collected.")

def pps_collector_runner(config_file):
"""The live runner for collecting the NWCSAF/PPS l1c and lvl2 products."""
LOG.info("Start the NWCSAF/PPS products and level-1c collector runner")

options = get_config(config_file)
settings = {"name": 'pps-collector-runner',
"nameservers": False,
"port": options.get("publish_port", 3002)}
with Subscribe('', options["subscribe_topics"], True) as sub:
with closing(create_publisher_from_dict_config(settings)) as pub:
pub.start()
_run_subscribe_publisher(sub, pub, options)
14 changes: 11 additions & 3 deletions nwcsafpps_runner/pps_posttroll_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
'ppsCtype': 'CT',
'ppsCpp': 'CPP',
'ppsCmic': 'CMIC',
'ppsHrw': 'HRW',
'ppsPrecip': 'PC',
'ppsPrecipPrepare': 'PC-PRE'}

Expand Down Expand Up @@ -247,6 +248,8 @@ def send(self):
# Error
# pubmsg = self.create_message("FAILED", self.metadata)
LOG.warning("Module %s failed, so no message sent", self.metadata.get('module', 'unknown'))
elif self.metadata["filename"] == "" or self.metadata["filename"] == []:
LOG.info("Module %s did not create any files, so no message sent", self.metadata.get('module', 'unknown'))
else:
# Ok
pubmsg = self.create_message("OK")
Expand Down Expand Up @@ -287,8 +290,10 @@ def create_message(self, status):
self.clean_unused_keys_in_message()

publish_topic = self._create_message_topic()

return {'header': publish_topic, 'type': 'file', 'content': self._to_send}
msg_type = 'file'
if "dataset" in self._to_send:
msg_type = 'dataset'
return {'header': publish_topic, 'type': msg_type, 'content': self._to_send}

def _create_message_topic(self):
"""Create the publish topic from yaml file items and PPS metadata."""
Expand Down Expand Up @@ -349,7 +354,10 @@ def get_message_with_uri_and_uid(self):
LOG.debug("Servername = %s", str(servername))

msg = {}
if isinstance(self.metadata['filename'], list):

if self.metadata['filename'] == '':
return {}
elif isinstance(self.metadata['filename'], list):
dataset = []
for filename in self.metadata['filename']:
uri = os.path.abspath(filename)
Expand Down
30 changes: 28 additions & 2 deletions nwcsafpps_runner/tests/test_pps_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,19 @@ def test_send_method(self, mandatory_param, filename):

mandatory_param.return_value = True
filename.return_value = True
my_metadata = self.metadata.copy()
my_metadata["filename"] = "dummy"

with patch.object(PostTrollMessage, 'publish_message', return_value=None) as mock_method_publish:
with patch.object(PostTrollMessage, 'create_message', return_value=None) as mock_method_create:
posttroll_message = PostTrollMessage(0, self.metadata)
posttroll_message = PostTrollMessage(0, my_metadata)
posttroll_message.send()
self.assertEqual(mock_method_publish.call_count, 1)
self.assertEqual(mock_method_create.call_count, 1)

with patch.object(PostTrollMessage, 'publish_message', return_value=None) as mock_method_publish:
with patch.object(PostTrollMessage, 'create_message', return_value=None) as mock_method_create:
posttroll_message = PostTrollMessage(1, self.metadata)
posttroll_message = PostTrollMessage(1, my_metadata)
posttroll_message.send()
self.assertEqual(mock_method_publish.call_count, 0)
self.assertEqual(mock_method_create.call_count, 0)
Expand Down Expand Up @@ -344,6 +346,30 @@ def test_create_message_with_topic(self, socket_gethostname):
expected_message_header = "/my/pps/publish/topic/UNKNOWN/"
self.assertEqual(expected_message_header, result_message['header'])

@patch('socket.gethostname')
def test_create_message_without_filename(self, socket_gethostname):
"""Test creating a message with header/topic, type and content."""
from nwcsafpps_runner.pps_posttroll_hook import PostTrollMessage

socket_gethostname.return_value = 'TEST_SERVERNAME'

metadata = {'publish_topic': '/my/pps/publish/topic/{pps_product}/',
'output_format': 'CF',
'level': '2',
'variant': 'DR',
'geo_or_polar': 'polar',
'software': 'NWCSAF-PPSv2018',
'start_time': START_TIME1, 'end_time': END_TIME1,
'sensor': 'viirs',
'filename': '',
'platform_name': 'npp'}

posttroll_message = PostTrollMessage(0, metadata)
with patch.object(PostTrollMessage, 'is_segment', return_value=False):
result_message = posttroll_message.create_message('OK')
self.assertFalse("uri" in result_message["content"])


@patch('socket.gethostname')
def test_create_message_with_topic_pattern(self, socket_gethostname):
"""Test creating a message with header/topic that is a pattern, type and content."""
Expand Down
Loading
Loading