Skip to content

Commit

Permalink
feat: Daac sender + receiver logic (#402)
Browse files Browse the repository at this point in the history
* chore: method to subscribe to sns from sqs

* feat: adding infrastructure for archive daac

* feat: adding percolator

* feat: add method to migrate data

* feat: add migration logic to the real code

* chore: move granules index to correct location

* feat: (in progress) adding daac config crud ops

* feat: finished adding CRUDS for daac config

* fix: need to authorizer if user is authorized for current collection + set tenant & venue for DB

* fix: updating errors based on testcase

* fix: adding log statement

* fix: mistaken perc alias v. normal alias

* fix: saved search are not in correct place in mapping

* chore: adding log statement to see the problem

* fix: add it at the correct place

* fix: add test case + update errors based on those

* feat: adding mock daac lambda logic + terraform

* fix: adding iam creations

* fix: add vpc related iam permissions

* fix: add test case and check some bugs + get terraform working

* fix: disable s3 logic for now

* fix: update terraform to get things running

* feat: add logic to send message to daac sns

* fix: update granule index update logic

* fix: some bugs from eye test + add response logic

* feat: add lambda entry points + update terraform

* feat: auto request part

* fix: update erros from renaming

* fix: updating many errors

* fix: update mock daac sns + store error details on cnm_s_failure

* fix: archiving_types needs to be a dict

* fix: piggyback on granlue-to-es to trigger daac archive

* fix: update logic to get cnm response

* fix: updating daac CRUD for demo

* fix: still updating daac crud

* fix: still updating daac crud

* fix: still updating daac crud

* fix: still updating daac crud

* fix: allowing everyone to overwrit one another config

* fix: ignore subject in sns msg

* fix: daac round trip errors

* feat: manual archive + refactor

* feat: add test case

* feat: compare s3 size in mock daac

* fix: accept either s3 or https uri

* fix: need version in crud ops of archive config

* chore: update test

* fix: iam issues for cross s3 accounts

* fix: bug in mock daac

* fix: when updating "success" should it delete prior failed status detailed messages

* fix: make granules stac compliant by moving archiving keys to properties
  • Loading branch information
wphyojpl authored Aug 12, 2024
1 parent cbaa753 commit 8ccc5a7
Show file tree
Hide file tree
Showing 22 changed files with 779 additions and 103 deletions.
151 changes: 151 additions & 0 deletions cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import json
import os
from time import sleep

import requests
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3

from cumulus_lambda_functions.lib.aws.aws_message_transformers import AwsMessageTransformers
from cumulus_lambda_functions.lib.json_validator import JsonValidator

from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.lib.aws.aws_sns import AwsSns
from cumulus_lambda_functions.lib.time_utils import TimeUtils
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections
from cumulus_lambda_functions.lib.uds_db.archive_index import UdsArchiveConfigIndex

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())


class DaacArchiverLogic:
def __init__(self):
self.__es_url, self.__es_port = os.getenv('ES_URL'), int(os.getenv('ES_PORT', '443'))
self.__archive_index_logic = UdsArchiveConfigIndex(self.__es_url, self.__es_port)
self.__granules_index = GranulesDbIndex()
self.__sns = AwsSns()
self.__s3 = AwsS3()

def get_cnm_response_json_file(self, potential_file, granule_id):
if 'href' not in potential_file:
raise ValueError(f'missing href in potential_file: {potential_file}')
self.__s3.set_s3_url(potential_file['href'])
LOGGER.debug(f'attempting to retrieve cnm response from : {granule_id} & {potential_file}')
cnm_response_keys = [k for k, _ in self.__s3.get_child_s3_files(self.__s3.target_bucket, os.path.dirname(self.__s3.target_key)) if k.lower().endswith('.cnm.json')]
if len(cnm_response_keys) < 1:
LOGGER.debug(f'missing cnm response file: {os.path.dirname(self.__s3.target_key)}.. trying again in 30 second.')
sleep(30) # waiting 30 second. should be enough.
cnm_response_keys = [k for k, _ in self.__s3.get_child_s3_files(self.__s3.target_bucket, os.path.dirname(self.__s3.target_key)) if k.lower().endswith('.cnm.json')]
if len(cnm_response_keys) < 1:
LOGGER.debug(f'missing cnm response file after 2nd try: {os.path.dirname(self.__s3.target_key)}.. quitting.')
return None
if len(cnm_response_keys) > 1:
LOGGER.warning(f'more than 1 cnm response file: {cnm_response_keys}')
cnm_response_keys = cnm_response_keys[0]
LOGGER.debug(f'cnm_response_keys: {cnm_response_keys}')
local_file = self.__s3.set_s3_url(f's3://{self.__s3.target_bucket}/{cnm_response_keys}').download('/tmp')
cnm_response_json = FileUtils.read_json(local_file)
FileUtils.remove_if_exists(local_file)
return cnm_response_json

def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
granule_files = uds_cnm_json['product']['files']
if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1:
return granule_files # TODO remove missing md5?
archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']}
result_files = []
for each_file in granule_files:
"""
{
"type": "data",
"name": "abcd.1234.efgh.test_file05.nc",
"uri": "https://uds-distribution-placeholder/uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05/abcd.1234.efgh.test_file05.nc",
"checksumType": "md5",
"checksum": "unknown",
"size": -1
}
"""
a = each_file['type']
if each_file['type'] not in archiving_types:
continue
file_extensions = archiving_types[each_file['type']]
if len(file_extensions) < 1:
result_files.append(each_file) # TODO remove missing md5?
temp_filename = each_file['name'].upper().strip()
if any([temp_filename.endswith(k.upper()) for k in file_extensions]):
result_files.append(each_file) # TODO remove missing md5?
return result_files

def send_to_daac_internal(self, uds_cnm_json: dict):
granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue)
daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier'])
if daac_config is None:
LOGGER.debug(f'uds_cnm_json is not configured for archival. uds_cnm_json: {uds_cnm_json}')
return
daac_config = daac_config[0] # TODO This is currently not supporting more than 1 daac.
try:
self.__sns.set_topic_arn(daac_config['daac_sns_topic_arn'])
daac_cnm_message = {
"collection": daac_config['daac_collection_name'],
"identifier": uds_cnm_json['identifier'],
"submissionTime": f'{TimeUtils.get_current_time()}Z',
"provider": granule_identifier.tenant,
"version": "1.6.0", # TODO this is hardcoded?
"product": {
"name": granule_identifier.id,
"dataVersion": daac_config['daac_data_version'],
'files': self.__extract_files(uds_cnm_json, daac_config),
}
}
self.__sns.publish_message(json.dumps(daac_cnm_message))
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
'archive_status': 'cnm_s_success',
'archive_error_message': '',
'archive_error_code': '',
}, uds_cnm_json['identifier'])
except Exception as e:
LOGGER.exception(f'failed during archival process')
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
'archive_status': 'cnm_s_failed',
'archive_error_message': str(e),
}, uds_cnm_json['identifier'])
return

def send_to_daac(self, event: dict):
LOGGER.debug(f'send_to_daac#event: {event}')
uds_cnm_json = AwsMessageTransformers().sqs_sns(event)
LOGGER.debug(f'sns_msg: {uds_cnm_json}')
self.send_to_daac_internal(uds_cnm_json)
return

def receive_from_daac(self, event: dict):
LOGGER.debug(f'receive_from_daac#event: {event}')
sns_msg = AwsMessageTransformers().sqs_sns(event)
LOGGER.debug(f'sns_msg: {sns_msg}')
cnm_notification_msg = sns_msg

cnm_msg_schema = requests.get('https://raw.githubusercontent.com/podaac/cloud-notification-message-schema/v1.6.1/cumulus_sns_schema.json')
cnm_msg_schema.raise_for_status()
cnm_msg_schema = json.loads(cnm_msg_schema.text)
result = JsonValidator(cnm_msg_schema).validate(cnm_notification_msg)
if result is not None:
raise ValueError(f'input cnm event has cnm_msg_schema validation errors: {result}')
if 'response' not in cnm_notification_msg:
raise ValueError(f'missing response in {cnm_notification_msg}')
granule_identifier = UdsCollections.decode_identifier(cnm_notification_msg['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
if cnm_notification_msg['response']['status'] == 'SUCCESS':
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
'archive_status': 'cnm_r_success',
'archive_error_message': '',
'archive_error_code': '',
}, cnm_notification_msg['identifier'])
return
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
'archive_status': 'cnm_r_failed',
'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in cnm_notification_msg['response'] else 'unknown',
'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg['response'] else 'unknown',
}, cnm_notification_msg['identifier'])
return
17 changes: 15 additions & 2 deletions cumulus_lambda_functions/daac_archiver/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
from cumulus_lambda_functions.daac_archiver.daac_archiver_logic import DaacArchiverLogic
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


def lambda_handler(event, context):
def lambda_handler_request(event, context):
"""
:param event:
:param context:
:return:
{'Records': [{'messageId': '6ff7c6fd-4053-4ab4-bc12-c042be1ed275', 'receiptHandle': 'AQEBYASiFPjQT5JBI2KKCTF/uQhHfJt/tHhgucslQQdvkNVxcXCNi2E5Ux4U9N0eu7RfvlnvtycjUh0gdL7jIeoyH+VRKSF61uAJuT4p31BsNe0GYu49N9A6+kxjP/RrykR7ZofmQRdHToX1ugRc76SMRic4H/ZZ89YAHA2QeomJFMrYywIxlk8OAzYaBf2dQI7WexjY5u1CW00XNMbTGyTo4foVPxcSn6bdFpfgxW/L7yJMX/0YQvrA9ruiuQ+lrui+6fWYh5zEk3f5v1bYtUQ6DtyyfbtMHZQJTJpUlWAFRzzN+3melilH7FySyOGDXhPb0BOSzmdKq9wBbfLW/YPb7l99ejq4GfRfj8LyI4EtB96vTeUw4LCgUqbZcBrxbGBLUXMacweh+gCjHav9ylqr2SeOiqG3vWPq9pwFYQIDqNE=', 'body': '{\n "Type" : "Notification",\n "MessageId" : "33e1075a-435c-5217-a33d-59fae85e19b2",\n "TopicArn" : "arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester",\n "Subject" : "Amazon S3 Notification",\n "Message" : "{\\"Service\\":\\"Amazon S3\\",\\"Event\\":\\"s3:TestEvent\\",\\"Time\\":\\"2024-04-22T18:13:22.416Z\\",\\"Bucket\\":\\"uds-sbx-cumulus-staging\\",\\"RequestId\\":\\"DQ4T0GRVFPSX45C9\\",\\"HostId\\":\\"gHBFnYNmfnGDZBmqoQwA3RScjtjBk5lr426moGxu8IDpe5UhWAqNTxHqilWBoPN1njzIrzNrf8c=\\"}",\n "Timestamp" : "2024-04-22T18:13:22.434Z",\n "SignatureVersion" : "1",\n "Signature" : "RvSxqpU7J7CCJXbin9cXqTxzjMjgAUFtk/n454mTMcOe5x3Ay1w4AHfzyeYQCFBdLHNBa8n3OdMDoDlJqyVQMb8k+nERaiZWN2oqFVDRqT9pqSr89b+4FwlhPv6TYy2pBa/bgjZ4cOSYsey1uSQ3hjl0idfssvuV5cCRxQScbA+yu8Gcv9K7Oqgy01mC0sDHiuPIifhFXxupG5ygbjqoHIB+1gdMEbBwyixoY5GOpHM/O2uHNF+dJDjax1WMxQ2FzVjiFeCa+tNcjovF059+tx2v1YmDq/kEAFrN6DAtP6R4zKag62P9jkvjU/wHYJ2jjXmZAqoG+nuzAo24HiZPSw==",\n "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem",\n "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester:76cbefa1-addf-45c2-97e1-ae16986b195b"\n}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1713809602474', 'SenderId': 'AIDAIYLAVTDLUXBIEIX46', 'ApproximateFirstReceiveTimestamp': '1713809602483'}, 'messageAttributes': {}, 'md5OfBody': 'c6d06d1b742ad5bd2cfe5f542640aad2', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester', 'awsRegion': 'us-west-2'}]}
"""
LambdaLoggerGenerator.remove_default_handlers()
print('To be implemented later')
DaacArchiverLogic().send_to_daac(event)
return


def lambda_handler_response(event, context):
"""
:param event:
:param context:
:return:
{'Records': [{'messageId': '6ff7c6fd-4053-4ab4-bc12-c042be1ed275', 'receiptHandle': 'AQEBYASiFPjQT5JBI2KKCTF/uQhHfJt/tHhgucslQQdvkNVxcXCNi2E5Ux4U9N0eu7RfvlnvtycjUh0gdL7jIeoyH+VRKSF61uAJuT4p31BsNe0GYu49N9A6+kxjP/RrykR7ZofmQRdHToX1ugRc76SMRic4H/ZZ89YAHA2QeomJFMrYywIxlk8OAzYaBf2dQI7WexjY5u1CW00XNMbTGyTo4foVPxcSn6bdFpfgxW/L7yJMX/0YQvrA9ruiuQ+lrui+6fWYh5zEk3f5v1bYtUQ6DtyyfbtMHZQJTJpUlWAFRzzN+3melilH7FySyOGDXhPb0BOSzmdKq9wBbfLW/YPb7l99ejq4GfRfj8LyI4EtB96vTeUw4LCgUqbZcBrxbGBLUXMacweh+gCjHav9ylqr2SeOiqG3vWPq9pwFYQIDqNE=', 'body': '{\n "Type" : "Notification",\n "MessageId" : "33e1075a-435c-5217-a33d-59fae85e19b2",\n "TopicArn" : "arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester",\n "Subject" : "Amazon S3 Notification",\n "Message" : "{\\"Service\\":\\"Amazon S3\\",\\"Event\\":\\"s3:TestEvent\\",\\"Time\\":\\"2024-04-22T18:13:22.416Z\\",\\"Bucket\\":\\"uds-sbx-cumulus-staging\\",\\"RequestId\\":\\"DQ4T0GRVFPSX45C9\\",\\"HostId\\":\\"gHBFnYNmfnGDZBmqoQwA3RScjtjBk5lr426moGxu8IDpe5UhWAqNTxHqilWBoPN1njzIrzNrf8c=\\"}",\n "Timestamp" : "2024-04-22T18:13:22.434Z",\n "SignatureVersion" : "1",\n "Signature" : "RvSxqpU7J7CCJXbin9cXqTxzjMjgAUFtk/n454mTMcOe5x3Ay1w4AHfzyeYQCFBdLHNBa8n3OdMDoDlJqyVQMb8k+nERaiZWN2oqFVDRqT9pqSr89b+4FwlhPv6TYy2pBa/bgjZ4cOSYsey1uSQ3hjl0idfssvuV5cCRxQScbA+yu8Gcv9K7Oqgy01mC0sDHiuPIifhFXxupG5ygbjqoHIB+1gdMEbBwyixoY5GOpHM/O2uHNF+dJDjax1WMxQ2FzVjiFeCa+tNcjovF059+tx2v1YmDq/kEAFrN6DAtP6R4zKag62P9jkvjU/wHYJ2jjXmZAqoG+nuzAo24HiZPSw==",\n "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem",\n "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester:76cbefa1-addf-45c2-97e1-ae16986b195b"\n}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1713809602474', 'SenderId': 'AIDAIYLAVTDLUXBIEIX46', 'ApproximateFirstReceiveTimestamp': '1713809602483'}, 'messageAttributes': {}, 'md5OfBody': 'c6d06d1b742ad5bd2cfe5f542640aad2', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester', 'awsRegion': 'us-west-2'}]}
"""
LambdaLoggerGenerator.remove_default_handlers()
DaacArchiverLogic().receive_from_daac(event)
return
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
class GranulesIndexMapping:
archiving_keys = [
'archive_status', 'archive_error_message', 'archive_error_code'
]
percolator_mappings = {
"daac_collection_name": {
"type": "keyword"
Expand All @@ -24,6 +27,10 @@ class GranulesIndexMapping:
},
}
stac_mappings = {
"archive_status": {"type": "keyword"},
"archive_error_message": {"type": "text"},
"archive_error_code": {"type": "keyword"},

"event_time": {"type": "long"},
"type": {"type": "keyword"},
"stac_version": {"type": "keyword"},
Expand Down
10 changes: 10 additions & 0 deletions cumulus_lambda_functions/granules_to_es/granules_indexer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import json
import os
from time import sleep

from cumulus_lambda_functions.daac_archiver.daac_archiver_logic import DaacArchiverLogic
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

from cumulus_lambda_functions.cumulus_stac.item_transformer import ItemTransformer

Expand Down Expand Up @@ -99,4 +103,10 @@ def start(self):
self.__cumulus_record['granuleId']
)
LOGGER.debug(f'added to GranulesDbIndex')
daac_archiver = DaacArchiverLogic()
cnm_response = daac_archiver.get_cnm_response_json_file(list(stac_item['assets'].values())[0], stac_item['id'])
if cnm_response is None:
LOGGER.error(f'no CNM Response file. Not continuing to DAAC Archiving')
return self
daac_archiver.send_to_daac_internal(cnm_response)
return self
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class AwsMessageTransformers:
"Type": {"type": "string"},
"MessageId": {"type": "string"},
"TopicArn": {"type": "string"},
"Subject": {"type": "string"},
# "Subject": {"type": "string"},
"Timestamp": {"type": "string"},
"SignatureVersion": {"type": "string"},
"Signature": {"type": "string"},
Expand Down
Loading

0 comments on commit 8ccc5a7

Please sign in to comment.