Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ASubaran committed Nov 13, 2024
1 parent 36bb3ca commit 47d7b91
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 94 deletions.
23 changes: 8 additions & 15 deletions recordforwarder/src/send_request_to_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from utils_for_record_forwarder import invoke_lambda
from constants import Constants
from clients import boto3_client
import json

CREATE_LAMBDA_NAME = os.getenv("CREATE_LAMBDA_NAME")
UPDATE_LAMBDA_NAME = os.getenv("UPDATE_LAMBDA_NAME")
Expand Down Expand Up @@ -68,18 +67,12 @@ def send_request_to_lambda(message_body: dict):
Sends request to the Imms API (unless there was a failure at the recordprocessor level). Returns the imms id.
If message is not successfully received and accepted by the Imms API raises a MessageNotSuccessful Error.
"""
if message_body.get("diagnostics"):
sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(message_body),
MessageGroupId=message_body["file_key"])
supplier = message_body.get("supplier")
fhir_json = message_body.get("fhir_json")
file_key = message_body.get("file_key")
row_id = message_body.get("row_id")
operation_requested = message_body.get("operation_requested")

else:
print("failed")
supplier = message_body.get("supplier")
fhir_json = message_body.get("fhir_json")
file_key = message_body.get("file_key")
row_id = message_body.get("row_id")
operation_requested = message_body.get("operation_requested")

# Send request to Imms FHIR API and return the imms_id
function_map = {"CREATE": send_create_request, "UPDATE": send_update_request, "DELETE": send_delete_request}
function_map[operation_requested](fhir_json=fhir_json, supplier=supplier, file_key=file_key, row_id=row_id)
# Send request to Imms FHIR API and return the imms_id
function_map = {"CREATE": send_create_request, "UPDATE": send_update_request, "DELETE": send_delete_request}
function_map[operation_requested](fhir_json=fhir_json, supplier=supplier, file_key=file_key, row_id=row_id)
16 changes: 13 additions & 3 deletions recordprocessor/src/batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
from mappings import Vaccine
# from update_ack_file import update_ack_file
from send_to_kinesis import send_to_kinesis


from s3_clients import sqs_client
logging.basicConfig(level="INFO")
logger = logging.getLogger()

Expand Down Expand Up @@ -82,7 +81,18 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
}

# Send to kinesis. Add diagnostics if send fails.
send_to_kinesis(supplier, outgoing_message_body)
if "diagnostics" in outgoing_message_body:
env = get_environment()
if env in ["ref", "prod", "int"]:
return env
else:
env = "internal-dev"
account_id = os.getenv("LOCAL_ACCOUNT_ID")
queue_url = f"https://sqs.eu-west-2.amazonaws.com/{account_id}/imms-{env}-ack-metadata-queue.fifo"
sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(outgoing_message_body),
MessageGroupId=outgoing_message_body["file_key"])
else:
send_to_kinesis(supplier, outgoing_message_body)
# if (
# diagnostics := details_from_processing.get("diagnostics")
# ) is None and message_delivered is False:
Expand Down
1 change: 1 addition & 0 deletions recordprocessor/src/s3_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@

s3_client = boto3_client("s3", region_name=REGION_NAME)
kinesis_client = boto3_client("kinesis", region_name=REGION_NAME)
sqs_client = boto3_client("sqs", region_name="eu-west-2")
127 changes: 68 additions & 59 deletions recordprocessor/tests/test_lambda_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from unittest.mock import patch
from datetime import datetime, timedelta, timezone
from copy import deepcopy
from moto import mock_s3, mock_kinesis
from moto import mock_s3, mock_kinesis, mock_sqs
from boto3 import client as boto3_client
import os
import sys
Expand Down Expand Up @@ -42,6 +42,10 @@

s3_client = boto3_client("s3", region_name=AWS_REGION)
kinesis_client = boto3_client("kinesis", region_name=AWS_REGION)
sqs_client = boto3_client("sqs", region_name="eu-west-2")
queue_name = "imms-internal-dev-ack-metadata-queue.fifo"
attributes = {"FIFOQueue": "true", "ContentBasedDeduplication": "true"}


yesterday = datetime.now(timezone.utc) - timedelta(days=1)

Expand All @@ -61,18 +65,18 @@ def setUp(self) -> None:

kinesis_client.create_stream(StreamName=STREAM_NAME, ShardCount=1)

def tearDown(self) -> None:
# Delete all of the buckets (the contents of each bucket must be deleted first)
for bucket_name in [SOURCE_BUCKET_NAME, DESTINATION_BUCKET_NAME]:
for obj in s3_client.list_objects_v2(Bucket=bucket_name).get("Contents", []):
s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
s3_client.delete_bucket(Bucket=bucket_name)
# def tearDown(self) -> None:
# # Delete all of the buckets (the contents of each bucket must be deleted first)
# for bucket_name in [SOURCE_BUCKET_NAME, DESTINATION_BUCKET_NAME]:
# for obj in s3_client.list_objects_v2(Bucket=bucket_name).get("Contents", []):
# s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
# s3_client.delete_bucket(Bucket=bucket_name)

# Delete the kinesis stream
try:
kinesis_client.delete_stream(StreamName=STREAM_NAME, EnforceConsumerDeletion=True)
except kinesis_client.exceptions.ResourceNotFoundException:
pass
# # Delete the kinesis stream
# try:
# kinesis_client.delete_stream(StreamName=STREAM_NAME, EnforceConsumerDeletion=True)
# except kinesis_client.exceptions.ResourceNotFoundException:
# pass

@staticmethod
def upload_files(sourc_file_content, mock_permissions=MOCK_PERMISSIONS): # pylint: disable=dangerous-default-value
Expand Down Expand Up @@ -121,40 +125,42 @@ def make_assertions(self, test_cases):
- Kinesis Data is equal to the expected_kinesis_data
- "{TEST_FILE_ID}#{index+1}|fatal-error" is found in the ack file
"""

# ack_file_content = self.get_ack_file_content()
kinesis_records = kinesis_client.get_records(ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]
previous_approximate_arrival_time_stamp = yesterday # Initialise with a time prior to the running of the test

#messages = sqs_client.receive_message(QueueUrl='https://sqs.eu-west-2.amazonaws.com/123456789012/imms-batch-internal-dev-metadata-queue.fifo', WaitTimeSeconds=1, MaxNumberOfMessages=10)
for test_name, index, expected_kinesis_data, expect_success in test_cases:
with self.subTest(test_name):

kinesis_record = kinesis_records[index]
self.assertEqual(kinesis_record["PartitionKey"], TEST_SUPPLIER)
self.assertEqual(kinesis_record["SequenceNumber"], f"{index+1}")

# Ensure that arrival times are sequential
approximate_arrival_timestamp = kinesis_record["ApproximateArrivalTimestamp"]
self.assertGreater(approximate_arrival_timestamp, previous_approximate_arrival_time_stamp)
previous_approximate_arrival_time_stamp = approximate_arrival_timestamp

kinesis_data = json.loads(kinesis_record["Data"].decode("utf-8"), parse_float=Decimal)
expected_kinesis_data = {
"row_id": f"{TEST_FILE_ID}#{index+1}",
"file_key": TEST_FILE_KEY,
"supplier": TEST_SUPPLIER,
**expected_kinesis_data,
}
if expect_success:
# Some tests ignore the fhir_json value, so we only need to check that the key is present.
if "fhir_json" not in expected_kinesis_data:
key_to_ignore = "fhir_json"
self.assertIn(key_to_ignore, kinesis_data)
kinesis_data.pop(key_to_ignore)
self.assertEqual(kinesis_data, expected_kinesis_data)
# self.assertIn(f"{TEST_FILE_ID}#{index+1}|OK", ack_file_content)
else:
self.assertEqual(kinesis_data, expected_kinesis_data)

if "diagnostics" not in expected_kinesis_data:
kinesis_record = kinesis_records[index]
self.assertEqual(kinesis_record["PartitionKey"], TEST_SUPPLIER)
self.assertEqual(kinesis_record["SequenceNumber"], f"{index+1}")

# Ensure that arrival times are sequential
approximate_arrival_timestamp = kinesis_record["ApproximateArrivalTimestamp"]
self.assertGreater(approximate_arrival_timestamp, previous_approximate_arrival_time_stamp)
previous_approximate_arrival_time_stamp = approximate_arrival_timestamp

kinesis_data = json.loads(kinesis_record["Data"].decode("utf-8"), parse_float=Decimal)
expected_kinesis_data = {
"row_id": f"{TEST_FILE_ID}#{index+1}",
"file_key": TEST_FILE_KEY,
"supplier": TEST_SUPPLIER,
"created_at_formatted_string": "2020-01-01",
**expected_kinesis_data,
}
if expect_success:
# Some tests ignore the fhir_json value, so we only need to check that the key is present.
if "fhir_json" not in expected_kinesis_data:
key_to_ignore = "fhir_json"
self.assertIn(key_to_ignore, kinesis_data)
kinesis_data.pop(key_to_ignore)
self.assertEqual(kinesis_data, expected_kinesis_data)
# self.assertIn(f"{TEST_FILE_ID}#{index+1}|OK", ack_file_content)
else:
self.assertEqual(kinesis_data, expected_kinesis_data)
# self.assertIn(f"{TEST_FILE_ID}#{index+1}|Fatal", ack_file_content)

def test_e2e_success(self):
Expand All @@ -174,37 +180,40 @@ def test_e2e_success(self):
]
self.make_assertions(test_cases)

def test_e2e_no_permissions(self):
@mock_sqs
@patch("batch_processing.sqs_client.send_message")
def test_e2e_no_permissions(self, mock_send_message):
"""
Tests that file containing CREATE, UPDATE and DELETE is successfully processed when the supplier does not have
any permissions.
"""
self.upload_files(VALID_FILE_CONTENT_WITH_NEW_AND_UPDATE_AND_DELETE)
# Create the queue in the mocked SQS environment
sqs_client = boto3_client("sqs", region_name="eu-west-2")
queue_name = "imms-internal-dev-ack-metadata-queue.fifo"
attributes = {"FIFOQueue": "true", "ContentBasedDeduplication": "true"}
queue_url = sqs_client.create_queue(QueueName=queue_name, Attributes=attributes)["QueueUrl"]

# Upload test files and prepare event
self.upload_files(VALID_FILE_CONTENT_WITH_NEW_AND_UPDATE)
event = deepcopy(TEST_EVENT_DUMPED)
test_event = json.loads(event)
test_event["permission"] = ["RSV_CREATE"]
test_event = json.dumps(test_event)

# Call the main processing function with the event
main(test_event)
# expected_kinesis_data = {"diagnostics": Diagnostics.NO_PERMISSIONS}

# Test case tuples are stuctured as (test_name, index, expected_kinesis_data_ignoring_fhir_json,expect_success)
# Receive message from the queue to validate
sqs_client.receive_message(
QueueUrl=queue_url,
WaitTimeSeconds=1,
MaxNumberOfMessages=10
)
# Define test cases for assertions
test_cases = [
("CREATE success", 0, {"operation_requested": "CREATE"}, True),
(
"UPDATE no permissions",
1,
{"diagnostics": Diagnostics.NO_PERMISSIONS, "operation_requested": "UPDATE"},
False,
),
(
"DELETE no permissions",
2,
{"diagnostics": Diagnostics.NO_PERMISSIONS, "operation_requested": "DELETE"},
False,
),
("UPDATE no permissions", 1, {"diagnostics": Diagnostics.NO_PERMISSIONS, "operation_requested": "UPDATE"}, False),
("DELETE no permissions", 2, {"diagnostics": Diagnostics.NO_PERMISSIONS, "operation_requested": "DELETE"}, False),
]

self.make_assertions(test_cases)

def test_e2e_partial_permissions(self):
Expand Down
12 changes: 6 additions & 6 deletions recordprocessor/tests/test_processing_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ def test_process_csv_to_fhir_positive_string_not_provided(self, mock_send_to_kin
# self.assert_value_in_ack_file("Success")
mock_send_to_kinesis.assert_called()

@patch("batch_processing.send_to_kinesis")
def test_process_csv_to_fhir_paramter_missing(self, mock_send_to_kinesis):
@patch("batch_processing.sqs_client.send_message")
def test_process_csv_to_fhir_paramter_missing(self, mock_sqs):
s3_client.put_object(Bucket=SOURCE_BUCKET_NAME, Key=TEST_FILE_KEY,
Body=VALID_FILE_CONTENT_WITH_NEW_AND_UPDATE.replace("new", ""))

Expand All @@ -240,7 +240,7 @@ def test_process_csv_to_fhir_paramter_missing(self, mock_send_to_kinesis):
process_csv_to_fhir(TEST_EVENT)

# self.assert_value_in_ack_file("Fatal")
mock_send_to_kinesis.assert_called()
mock_sqs.assert_called()

@patch("batch_processing.send_to_kinesis")
def test_process_csv_to_fhir_invalid_headers(self, mock_send_to_kinesis):
Expand Down Expand Up @@ -343,8 +343,8 @@ def test_process_csv_to_fhir_successful(self, mock_send_to_kinesis):
# self.assert_value_in_ack_file("Success")
mock_send_to_kinesis.assert_called()

@patch("batch_processing.send_to_kinesis")
def test_process_csv_to_fhir_incorrect_permissions(self, mock_send_to_kinesis):
@patch("batch_processing.sqs_client.send_message")
def test_process_csv_to_fhir_incorrect_permissions(self, mock_send_to_sqs):
s3_client.put_object(Bucket=SOURCE_BUCKET_NAME, Key=TEST_FILE_KEY, Body=VALID_FILE_CONTENT_WITH_UPDATE)

with patch("batch_processing.get_operation_permissions", return_value={"DELETE"}):
Expand All @@ -353,7 +353,7 @@ def test_process_csv_to_fhir_incorrect_permissions(self, mock_send_to_kinesis):
process_csv_to_fhir(TEST_EVENT)

# self.assert_value_in_ack_file("No permissions for requested operation")
mock_send_to_kinesis.assert_called()
mock_send_to_sqs.assert_called()

def test_get_environment(self):
with patch("batch_processing.os.getenv", return_value="internal-dev"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@

MOCK_ENVIRONMENT_DICT = {
"ENVIRONMENT": "internal-dev",
"LOCAL_ACCOUNT_ID": "123456",
"LOCAL_ACCOUNT_ID": "123456789012",
"ACK_BUCKET_NAME": DESTINATION_BUCKET_NAME,
"SHORT_QUEUE_PREFIX": "imms-batch-internal-dev",
"KINESIS_STREAM_ARN": f"arn:aws:kinesis:{AWS_REGION}:123456789012:stream/{STREAM_NAME}",
Expand Down
13 changes: 13 additions & 0 deletions terraform/ecs_batch_processor_config.tf
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ resource "aws_iam_policy" "ecs_task_exec_policy" {
Effect = "Allow"
Action = "lambda:InvokeFunction"
Resource = data.aws_lambda_function.existing_search_lambda.arn
},
{
Effect= "Allow",
Action= [
"sqs:SendMessage",
"sqs:ReceiveMessage",
"kms:Decrypt"
],
Resource= ["arn:aws:sqs:${var.aws_region}:${local.local_account_id}:imms-${local.api_env}-ack-metadata-queue.fifo"]
}
]
})
Expand Down Expand Up @@ -209,6 +218,10 @@ resource "aws_ecs_task_definition" "ecs_task" {
name = "SEARCH_IMMS_LAMBDA"
value = data.aws_lambda_function.existing_search_lambda.function_name
},
{
name="SQS_QUEUE_URL"
value= "https://sqs.eu-west-2.amazonaws.com/${local.local_account_id}/imms-${local.api_env}-ack-metadata-queue.fifo"
}

]
logConfiguration = {
Expand Down
10 changes: 0 additions & 10 deletions terraform/forwarder_lambda.tf
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,6 @@ resource "aws_iam_policy" "forwarding_lambda_exec_policy" {
"firehose:PutRecordBatch"
],
"Resource": data.aws_kinesis_firehose_delivery_stream.splunk_stream.arn
},
{
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:ReceiveMessage",
"kms:Decrypt"
],
"Resource": ["arn:aws:sqs:${var.aws_region}:${local.local_account_id}:imms-${local.api_env}-ack-metadata-queue.fifo"]
}
]
})
Expand Down Expand Up @@ -215,7 +206,6 @@ resource "aws_lambda_function" "forwarding_lambda" {
UPDATE_LAMBDA_NAME = data.aws_lambda_function.existing_update_lambda.function_name
DELETE_LAMBDA_NAME = data.aws_lambda_function.existing_delete_lambda.function_name
SEARCH_LAMBDA_NAME = data.aws_lambda_function.existing_search_lambda.function_name
SQS_QUEUE_URL = "https://sqs.eu-west-2.amazonaws.com/${local.local_account_id}/imms-${local.api_env}-ack-metadata-queue.fifo"
}
}
kms_key_arn = data.aws_kms_key.existing_lambda_encryption_key.arn
Expand Down

0 comments on commit 47d7b91

Please sign in to comment.