From 47d7b9143f74b29485eaf99a934f51e4a38fb17f Mon Sep 17 00:00:00 2001 From: ASubaran Date: Wed, 13 Nov 2024 15:28:43 +0100 Subject: [PATCH] Changes --- recordforwarder/src/send_request_to_lambda.py | 23 ++-- recordprocessor/src/batch_processing.py | 16 ++- recordprocessor/src/s3_clients.py | 1 + recordprocessor/tests/test_lambda_e2e.py | 127 ++++++++++-------- .../tests/test_processing_lambda.py | 12 +- .../values_for_recordprocessor_tests.py | 2 +- terraform/ecs_batch_processor_config.tf | 13 ++ terraform/forwarder_lambda.tf | 10 -- 8 files changed, 110 insertions(+), 94 deletions(-) diff --git a/recordforwarder/src/send_request_to_lambda.py b/recordforwarder/src/send_request_to_lambda.py index b95f8f9c..5b5ae5a9 100644 --- a/recordforwarder/src/send_request_to_lambda.py +++ b/recordforwarder/src/send_request_to_lambda.py @@ -7,7 +7,6 @@ from utils_for_record_forwarder import invoke_lambda from constants import Constants from clients import boto3_client -import json CREATE_LAMBDA_NAME = os.getenv("CREATE_LAMBDA_NAME") UPDATE_LAMBDA_NAME = os.getenv("UPDATE_LAMBDA_NAME") @@ -68,18 +67,12 @@ def send_request_to_lambda(message_body: dict): Sends request to the Imms API (unless there was a failure at the recordprocessor level). Returns the imms id. If message is not successfully received and accepted by the Imms API raises a MessageNotSuccessful Error. """ - if message_body.get("diagnostics"): - sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(message_body), - MessageGroupId=message_body["file_key"]) + supplier = message_body.get("supplier") + fhir_json = message_body.get("fhir_json") + file_key = message_body.get("file_key") + row_id = message_body.get("row_id") + operation_requested = message_body.get("operation_requested") - else: - print("failed") - supplier = message_body.get("supplier") - fhir_json = message_body.get("fhir_json") - file_key = message_body.get("file_key") - row_id = message_body.get("row_id") - operation_requested = message_body.get("operation_requested") - - # Send request to Imms FHIR API and return the imms_id - function_map = {"CREATE": send_create_request, "UPDATE": send_update_request, "DELETE": send_delete_request} - function_map[operation_requested](fhir_json=fhir_json, supplier=supplier, file_key=file_key, row_id=row_id) + # Send request to Imms FHIR API and return the imms_id + function_map = {"CREATE": send_create_request, "UPDATE": send_update_request, "DELETE": send_delete_request} + function_map[operation_requested](fhir_json=fhir_json, supplier=supplier, file_key=file_key, row_id=row_id) diff --git a/recordprocessor/src/batch_processing.py b/recordprocessor/src/batch_processing.py index d4a8c90c..33f18832 100644 --- a/recordprocessor/src/batch_processing.py +++ b/recordprocessor/src/batch_processing.py @@ -14,8 +14,7 @@ from mappings import Vaccine # from update_ack_file import update_ack_file from send_to_kinesis import send_to_kinesis - - +from s3_clients import sqs_client logging.basicConfig(level="INFO") logger = logging.getLogger() @@ -82,7 +81,18 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None: } # Send to kinesis. Add diagnostics if send fails. - send_to_kinesis(supplier, outgoing_message_body) + if "diagnostics" in outgoing_message_body: + env = get_environment() + if env in ["ref", "prod", "int"]: + return env + else: + env = "internal-dev" + account_id = os.getenv("LOCAL_ACCOUNT_ID") + queue_url = f"https://sqs.eu-west-2.amazonaws.com/{account_id}/imms-{env}-ack-metadata-queue.fifo" + sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(outgoing_message_body), + MessageGroupId=outgoing_message_body["file_key"]) + else: + send_to_kinesis(supplier, outgoing_message_body) # if ( # diagnostics := details_from_processing.get("diagnostics") # ) is None and message_delivered is False: diff --git a/recordprocessor/src/s3_clients.py b/recordprocessor/src/s3_clients.py index db7ccded..1f74287f 100644 --- a/recordprocessor/src/s3_clients.py +++ b/recordprocessor/src/s3_clients.py @@ -6,3 +6,4 @@ s3_client = boto3_client("s3", region_name=REGION_NAME) kinesis_client = boto3_client("kinesis", region_name=REGION_NAME) +sqs_client = boto3_client("sqs", region_name="eu-west-2") diff --git a/recordprocessor/tests/test_lambda_e2e.py b/recordprocessor/tests/test_lambda_e2e.py index 8e61b963..5275e345 100644 --- a/recordprocessor/tests/test_lambda_e2e.py +++ b/recordprocessor/tests/test_lambda_e2e.py @@ -6,7 +6,7 @@ from unittest.mock import patch from datetime import datetime, timedelta, timezone from copy import deepcopy -from moto import mock_s3, mock_kinesis +from moto import mock_s3, mock_kinesis, mock_sqs from boto3 import client as boto3_client import os import sys @@ -42,6 +42,10 @@ s3_client = boto3_client("s3", region_name=AWS_REGION) kinesis_client = boto3_client("kinesis", region_name=AWS_REGION) +sqs_client = boto3_client("sqs", region_name="eu-west-2") +queue_name = "imms-internal-dev-ack-metadata-queue.fifo" +attributes = {"FIFOQueue": "true", "ContentBasedDeduplication": "true"} + yesterday = datetime.now(timezone.utc) - timedelta(days=1) @@ -61,18 +65,18 @@ def setUp(self) -> None: kinesis_client.create_stream(StreamName=STREAM_NAME, ShardCount=1) - def tearDown(self) -> None: - # Delete all of the buckets (the contents of each bucket must be deleted first) - for bucket_name in [SOURCE_BUCKET_NAME, DESTINATION_BUCKET_NAME]: - for obj in s3_client.list_objects_v2(Bucket=bucket_name).get("Contents", []): - s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) - s3_client.delete_bucket(Bucket=bucket_name) + # def tearDown(self) -> None: + # # Delete all of the buckets (the contents of each bucket must be deleted first) + # for bucket_name in [SOURCE_BUCKET_NAME, DESTINATION_BUCKET_NAME]: + # for obj in s3_client.list_objects_v2(Bucket=bucket_name).get("Contents", []): + # s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) + # s3_client.delete_bucket(Bucket=bucket_name) - # Delete the kinesis stream - try: - kinesis_client.delete_stream(StreamName=STREAM_NAME, EnforceConsumerDeletion=True) - except kinesis_client.exceptions.ResourceNotFoundException: - pass + # # Delete the kinesis stream + # try: + # kinesis_client.delete_stream(StreamName=STREAM_NAME, EnforceConsumerDeletion=True) + # except kinesis_client.exceptions.ResourceNotFoundException: + # pass @staticmethod def upload_files(sourc_file_content, mock_permissions=MOCK_PERMISSIONS): # pylint: disable=dangerous-default-value @@ -121,40 +125,42 @@ def make_assertions(self, test_cases): - Kinesis Data is equal to the expected_kinesis_data - "{TEST_FILE_ID}#{index+1}|fatal-error" is found in the ack file """ - + # ack_file_content = self.get_ack_file_content() kinesis_records = kinesis_client.get_records(ShardIterator=self.get_shard_iterator(), Limit=10)["Records"] previous_approximate_arrival_time_stamp = yesterday # Initialise with a time prior to the running of the test - + #messages = sqs_client.receive_message(QueueUrl='https://sqs.eu-west-2.amazonaws.com/123456789012/imms-batch-internal-dev-metadata-queue.fifo', WaitTimeSeconds=1, MaxNumberOfMessages=10) for test_name, index, expected_kinesis_data, expect_success in test_cases: with self.subTest(test_name): - - kinesis_record = kinesis_records[index] - self.assertEqual(kinesis_record["PartitionKey"], TEST_SUPPLIER) - self.assertEqual(kinesis_record["SequenceNumber"], f"{index+1}") - - # Ensure that arrival times are sequential - approximate_arrival_timestamp = kinesis_record["ApproximateArrivalTimestamp"] - self.assertGreater(approximate_arrival_timestamp, previous_approximate_arrival_time_stamp) - previous_approximate_arrival_time_stamp = approximate_arrival_timestamp - - kinesis_data = json.loads(kinesis_record["Data"].decode("utf-8"), parse_float=Decimal) - expected_kinesis_data = { - "row_id": f"{TEST_FILE_ID}#{index+1}", - "file_key": TEST_FILE_KEY, - "supplier": TEST_SUPPLIER, - **expected_kinesis_data, - } - if expect_success: - # Some tests ignore the fhir_json value, so we only need to check that the key is present. - if "fhir_json" not in expected_kinesis_data: - key_to_ignore = "fhir_json" - self.assertIn(key_to_ignore, kinesis_data) - kinesis_data.pop(key_to_ignore) - self.assertEqual(kinesis_data, expected_kinesis_data) - # self.assertIn(f"{TEST_FILE_ID}#{index+1}|OK", ack_file_content) - else: - self.assertEqual(kinesis_data, expected_kinesis_data) + + if "diagnostics" not in expected_kinesis_data: + kinesis_record = kinesis_records[index] + self.assertEqual(kinesis_record["PartitionKey"], TEST_SUPPLIER) + self.assertEqual(kinesis_record["SequenceNumber"], f"{index+1}") + + # Ensure that arrival times are sequential + approximate_arrival_timestamp = kinesis_record["ApproximateArrivalTimestamp"] + self.assertGreater(approximate_arrival_timestamp, previous_approximate_arrival_time_stamp) + previous_approximate_arrival_time_stamp = approximate_arrival_timestamp + + kinesis_data = json.loads(kinesis_record["Data"].decode("utf-8"), parse_float=Decimal) + expected_kinesis_data = { + "row_id": f"{TEST_FILE_ID}#{index+1}", + "file_key": TEST_FILE_KEY, + "supplier": TEST_SUPPLIER, + "created_at_formatted_string": "2020-01-01", + **expected_kinesis_data, + } + if expect_success: + # Some tests ignore the fhir_json value, so we only need to check that the key is present. + if "fhir_json" not in expected_kinesis_data: + key_to_ignore = "fhir_json" + self.assertIn(key_to_ignore, kinesis_data) + kinesis_data.pop(key_to_ignore) + self.assertEqual(kinesis_data, expected_kinesis_data) + # self.assertIn(f"{TEST_FILE_ID}#{index+1}|OK", ack_file_content) + else: + self.assertEqual(kinesis_data, expected_kinesis_data) # self.assertIn(f"{TEST_FILE_ID}#{index+1}|Fatal", ack_file_content) def test_e2e_success(self): @@ -174,37 +180,40 @@ def test_e2e_success(self): ] self.make_assertions(test_cases) - def test_e2e_no_permissions(self): + @mock_sqs + @patch("batch_processing.sqs_client.send_message") + def test_e2e_no_permissions(self, mock_send_message): """ Tests that file containing CREATE, UPDATE and DELETE is successfully processed when the supplier does not have any permissions. """ - self.upload_files(VALID_FILE_CONTENT_WITH_NEW_AND_UPDATE_AND_DELETE) + # Create the queue in the mocked SQS environment + sqs_client = boto3_client("sqs", region_name="eu-west-2") + queue_name = "imms-internal-dev-ack-metadata-queue.fifo" + attributes = {"FIFOQueue": "true", "ContentBasedDeduplication": "true"} + queue_url = sqs_client.create_queue(QueueName=queue_name, Attributes=attributes)["QueueUrl"] + + # Upload test files and prepare event + self.upload_files(VALID_FILE_CONTENT_WITH_NEW_AND_UPDATE) event = deepcopy(TEST_EVENT_DUMPED) test_event = json.loads(event) test_event["permission"] = ["RSV_CREATE"] test_event = json.dumps(test_event) - + # Call the main processing function with the event main(test_event) - # expected_kinesis_data = {"diagnostics": Diagnostics.NO_PERMISSIONS} - # Test case tuples are stuctured as (test_name, index, expected_kinesis_data_ignoring_fhir_json,expect_success) + # Receive message from the queue to validate + sqs_client.receive_message( + QueueUrl=queue_url, + WaitTimeSeconds=1, + MaxNumberOfMessages=10 + ) + # Define test cases for assertions test_cases = [ ("CREATE success", 0, {"operation_requested": "CREATE"}, True), - ( - "UPDATE no permissions", - 1, - {"diagnostics": Diagnostics.NO_PERMISSIONS, "operation_requested": "UPDATE"}, - False, - ), - ( - "DELETE no permissions", - 2, - {"diagnostics": Diagnostics.NO_PERMISSIONS, "operation_requested": "DELETE"}, - False, - ), + ("UPDATE no permissions", 1, {"diagnostics": Diagnostics.NO_PERMISSIONS, "operation_requested": "UPDATE"}, False), + ("DELETE no permissions", 2, {"diagnostics": Diagnostics.NO_PERMISSIONS, "operation_requested": "DELETE"}, False), ] - self.make_assertions(test_cases) def test_e2e_partial_permissions(self): diff --git a/recordprocessor/tests/test_processing_lambda.py b/recordprocessor/tests/test_processing_lambda.py index fcc37143..6c3c3d4b 100644 --- a/recordprocessor/tests/test_processing_lambda.py +++ b/recordprocessor/tests/test_processing_lambda.py @@ -229,8 +229,8 @@ def test_process_csv_to_fhir_positive_string_not_provided(self, mock_send_to_kin # self.assert_value_in_ack_file("Success") mock_send_to_kinesis.assert_called() - @patch("batch_processing.send_to_kinesis") - def test_process_csv_to_fhir_paramter_missing(self, mock_send_to_kinesis): + @patch("batch_processing.sqs_client.send_message") + def test_process_csv_to_fhir_paramter_missing(self, mock_sqs): s3_client.put_object(Bucket=SOURCE_BUCKET_NAME, Key=TEST_FILE_KEY, Body=VALID_FILE_CONTENT_WITH_NEW_AND_UPDATE.replace("new", "")) @@ -240,7 +240,7 @@ def test_process_csv_to_fhir_paramter_missing(self, mock_send_to_kinesis): process_csv_to_fhir(TEST_EVENT) # self.assert_value_in_ack_file("Fatal") - mock_send_to_kinesis.assert_called() + mock_sqs.assert_called() @patch("batch_processing.send_to_kinesis") def test_process_csv_to_fhir_invalid_headers(self, mock_send_to_kinesis): @@ -343,8 +343,8 @@ def test_process_csv_to_fhir_successful(self, mock_send_to_kinesis): # self.assert_value_in_ack_file("Success") mock_send_to_kinesis.assert_called() - @patch("batch_processing.send_to_kinesis") - def test_process_csv_to_fhir_incorrect_permissions(self, mock_send_to_kinesis): + @patch("batch_processing.sqs_client.send_message") + def test_process_csv_to_fhir_incorrect_permissions(self, mock_send_to_sqs): s3_client.put_object(Bucket=SOURCE_BUCKET_NAME, Key=TEST_FILE_KEY, Body=VALID_FILE_CONTENT_WITH_UPDATE) with patch("batch_processing.get_operation_permissions", return_value={"DELETE"}): @@ -353,7 +353,7 @@ def test_process_csv_to_fhir_incorrect_permissions(self, mock_send_to_kinesis): process_csv_to_fhir(TEST_EVENT) # self.assert_value_in_ack_file("No permissions for requested operation") - mock_send_to_kinesis.assert_called() + mock_send_to_sqs.assert_called() def test_get_environment(self): with patch("batch_processing.os.getenv", return_value="internal-dev"): diff --git a/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py b/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py index 9e646dc6..7961d787 100644 --- a/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py +++ b/recordprocessor/tests/utils_for_recordprocessor_tests/values_for_recordprocessor_tests.py @@ -253,7 +253,7 @@ MOCK_ENVIRONMENT_DICT = { "ENVIRONMENT": "internal-dev", - "LOCAL_ACCOUNT_ID": "123456", + "LOCAL_ACCOUNT_ID": "123456789012", "ACK_BUCKET_NAME": DESTINATION_BUCKET_NAME, "SHORT_QUEUE_PREFIX": "imms-batch-internal-dev", "KINESIS_STREAM_ARN": f"arn:aws:kinesis:{AWS_REGION}:123456789012:stream/{STREAM_NAME}", diff --git a/terraform/ecs_batch_processor_config.tf b/terraform/ecs_batch_processor_config.tf index 85c1f91e..8acdb6f5 100644 --- a/terraform/ecs_batch_processor_config.tf +++ b/terraform/ecs_batch_processor_config.tf @@ -146,6 +146,15 @@ resource "aws_iam_policy" "ecs_task_exec_policy" { Effect = "Allow" Action = "lambda:InvokeFunction" Resource = data.aws_lambda_function.existing_search_lambda.arn + }, + { + Effect= "Allow", + Action= [ + "sqs:SendMessage", + "sqs:ReceiveMessage", + "kms:Decrypt" + ], + Resource= ["arn:aws:sqs:${var.aws_region}:${local.local_account_id}:imms-${local.api_env}-ack-metadata-queue.fifo"] } ] }) @@ -209,6 +218,10 @@ resource "aws_ecs_task_definition" "ecs_task" { name = "SEARCH_IMMS_LAMBDA" value = data.aws_lambda_function.existing_search_lambda.function_name }, + { + name="SQS_QUEUE_URL" + value= "https://sqs.eu-west-2.amazonaws.com/${local.local_account_id}/imms-${local.api_env}-ack-metadata-queue.fifo" + } ] logConfiguration = { diff --git a/terraform/forwarder_lambda.tf b/terraform/forwarder_lambda.tf index 76a9428b..3db40d69 100644 --- a/terraform/forwarder_lambda.tf +++ b/terraform/forwarder_lambda.tf @@ -170,15 +170,6 @@ resource "aws_iam_policy" "forwarding_lambda_exec_policy" { "firehose:PutRecordBatch" ], "Resource": data.aws_kinesis_firehose_delivery_stream.splunk_stream.arn - }, - { - "Effect": "Allow", - "Action": [ - "sqs:SendMessage", - "sqs:ReceiveMessage", - "kms:Decrypt" - ], - "Resource": ["arn:aws:sqs:${var.aws_region}:${local.local_account_id}:imms-${local.api_env}-ack-metadata-queue.fifo"] } ] }) @@ -215,7 +206,6 @@ resource "aws_lambda_function" "forwarding_lambda" { UPDATE_LAMBDA_NAME = data.aws_lambda_function.existing_update_lambda.function_name DELETE_LAMBDA_NAME = data.aws_lambda_function.existing_delete_lambda.function_name SEARCH_LAMBDA_NAME = data.aws_lambda_function.existing_search_lambda.function_name - SQS_QUEUE_URL = "https://sqs.eu-west-2.amazonaws.com/${local.local_account_id}/imms-${local.api_env}-ack-metadata-queue.fifo" } } kms_key_arn = data.aws_kms_key.existing_lambda_encryption_key.arn