Skip to content

Commit

Permalink
change datasets config to YAML, add filename regex tester
Browse files Browse the repository at this point in the history
  • Loading branch information
tomkralidis committed Jan 29, 2024
1 parent 861e737 commit d39d073
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 5 deletions.
2 changes: 1 addition & 1 deletion msc-wis2node.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ export MSC_WIS2NODE_BROKER_PORT=8883
export MSC_WIS2NODE_BROKER_USERNAME=username
export MSC_WIS2NODE_BROKER_PASSWORD=password
export MSC_WIS2NODE_MSC_DATAMART_AMQP=amqps://dd.weather.gc.ca
export MSC_WIS2NODE_DATASET_CONFIG=/opt/msc-wis2node/conf/datasets.csv
export MSC_WIS2NODE_DATASET_CONFIG=/opt/msc-wis2node/conf/datasets.yaml
export MSC_WIS2NODE_DISCOVERY_METADATA_ZIP_URL=https://example.org/discovery-metadata.zip
export MSC_WIS2NODE_TOPIC_PREFIX=origin/a/wis2/ca-eccc-msc
7 changes: 4 additions & 3 deletions msc_wis2node/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
###############################################################################

from io import BytesIO
import json
import logging
from pathlib import Path
import tempfile
Expand Down Expand Up @@ -93,8 +92,10 @@ def create_datasets_conf(metadata_zipfile: Union[Path, None]) -> None:
except AttributeError as err:
LOGGER.warning(f'Missing distribution: {err}')

with Path(DATASET_CONFIG).open('w') as fh:
json.dump(datasets_conf, fh, indent=4)
LOGGER.debug('Dumping YAML document')
with Path(DATASET_CONFIG).open('wb') as fh:
yaml.dump(datasets_conf, fh, sort_keys=False, encoding='utf8',
indent=4, default_flow_style=False)


@click.group()
Expand Down
43 changes: 42 additions & 1 deletion msc_wis2node/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#
###############################################################################

from datetime import date, datetime, timezone
import json
import logging
import random
Expand All @@ -29,6 +30,7 @@
from paho.mqtt import publish
from pywis_pubsub.publish import create_message
from sarracenia.flowcb import FlowCB
import yaml

from msc_wis2node.env import (BROKER_HOSTNAME, BROKER_PORT, BROKER_USERNAME,
BROKER_PASSWORD, DATASET_CONFIG, TOPIC_PREFIX)
Expand Down Expand Up @@ -85,7 +87,7 @@ def __init__(self):
}

with open(DATASET_CONFIG) as fh:
self.datasets = json.load(fh)['datasets']
self.datasets = yaml.load(fh, Loader=yaml.SafeLoader)['datasets']

def publish(self, base_url: str, relative_path: str) -> bool:
"""
Expand Down Expand Up @@ -122,6 +124,7 @@ def identify(self, path: str) -> Union[dict, None]:
"""

for dataset in self.datasets:
LOGGER.debug(f"DATASETS: {self.datasets}")
match = False
subtopic_dirpath = self._subtopic2dirpath(dataset['subtopic'])

Expand Down Expand Up @@ -160,9 +163,13 @@ def publish_to_wis2(self, dataset: str, url: str) -> None:
topic = f"{TOPIC_PREFIX}/{dataset['wis2-topic']}"
LOGGER.info(f'TOPIC: {topic}')

datetime_ = self._topic_regex2datetime(
topic, dataset.get('msc-filename-datetime'))

message = create_message(
identifier=str(uuid.uuid4()),
# metadata_id=dataset['metadata-id'],
datetime_=datetime_,
topic=topic,
content_type=dataset['media-type'],
url=url
Expand Down Expand Up @@ -207,5 +214,39 @@ def _subtopic2dirpath(self, subtopic: str) -> str:

return dirpath

def _topic_regex2datetime(self, topic: str,
pattern: Union[str, None]) -> Union[str, None]:
"""
Generate RFC3339 string
:param topic: topic
:param pattern: regular expression of date pattern
:returns: `str` of resulting RFC3339 datetime, or `None` if not found
"""

if pattern is None:
return None

match = re.search(topic, pattern)

if match is None:
LOGGER.debug(f'No match ({pattern} not in {topic})')
return None

groups = [int(m) for m in match.groups()]
LOGGER.debug(f'datetime regex groups found: {groups}')

if len(groups) < 3:
LOGGER.debug('Casting date')
obj = date(*groups)
value = obj.isoformat()
else:
LOGGER.debug('Casting datetime')
dt = datetime(*groups, tzinfo=timezone.utc)
value = f'{dt.isoformat()}Z'

return value

def __repr__(self):
return '<WIS2Publisher>'

0 comments on commit d39d073

Please sign in to comment.