Skip to content

Commit

Permalink
Commented the update ack file
Browse files Browse the repository at this point in the history
  • Loading branch information
ASubaran committed Nov 12, 2024
1 parent 0387a70 commit 8639663
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 30 deletions.
36 changes: 18 additions & 18 deletions recordprocessor/src/batch_processing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Functions for processing the file on a row-by-row basis"""

import json
from io import StringIO
# from io import StringIO
import os
import time
import logging
Expand All @@ -12,7 +12,7 @@
from get_operation_permissions import get_operation_permissions
from process_row import process_row
from mappings import Vaccine
from update_ack_file import update_ack_file
# from update_ack_file import update_ack_file
from send_to_kinesis import send_to_kinesis


Expand Down Expand Up @@ -61,8 +61,8 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
make_and_upload_ack_file(
file_id, file_key, True, True, created_at_formatted_string
)
accumulated_ack_file_content = StringIO()
accumulated_ack_file_content.write("|".join(Constants.ack_headers) + "\n")
# accumulated_ack_file_content = StringIO()
# accumulated_ack_file_content.write("|".join(Constants.ack_headers) + "\n")

row_count = 0 # Initialize a counter for rows
for row in csv_reader:
Expand All @@ -81,22 +81,22 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
}

# Send to kinesis. Add diagnostics if send fails.
message_delivered = send_to_kinesis(supplier, outgoing_message_body)
if (
diagnostics := details_from_processing.get("diagnostics")
) is None and message_delivered is False:
diagnostics = "Unsupported file type received as an attachment"
send_to_kinesis(supplier, outgoing_message_body)
# if (
# diagnostics := details_from_processing.get("diagnostics")
# ) is None and message_delivered is False:
# diagnostics = "Unsupported file type received as an attachment"

# Update the ack file
accumulated_ack_file_content = update_ack_file(
file_key,
bucket_name,
accumulated_ack_file_content,
row_id,
message_delivered,
diagnostics,
outgoing_message_body.get("imms_id"),
)
# accumulated_ack_file_content = update_ack_file(
# file_key,
# bucket_name,
# accumulated_ack_file_content,
# row_id,
# message_delivered,
# diagnostics,
# outgoing_message_body.get("imms_id"),
# )

logger.info("Total rows processed: %s", row_count)

Expand Down
8 changes: 4 additions & 4 deletions recordprocessor/tests/test_lambda_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def make_assertions(self, test_cases):
- "{TEST_FILE_ID}#{index+1}|fatal-error" is found in the ack file
"""

ack_file_content = self.get_ack_file_content()
# ack_file_content = self.get_ack_file_content()
kinesis_records = kinesis_client.get_records(ShardIterator=self.get_shard_iterator(), Limit=10)["Records"]
previous_approximate_arrival_time_stamp = yesterday # Initialise with a time prior to the running of the test

Expand Down Expand Up @@ -152,10 +152,10 @@ def make_assertions(self, test_cases):
self.assertIn(key_to_ignore, kinesis_data)
kinesis_data.pop(key_to_ignore)
self.assertEqual(kinesis_data, expected_kinesis_data)
self.assertIn(f"{TEST_FILE_ID}#{index+1}|OK", ack_file_content)
# self.assertIn(f"{TEST_FILE_ID}#{index+1}|OK", ack_file_content)
else:
self.assertEqual(kinesis_data, expected_kinesis_data)
self.assertIn(f"{TEST_FILE_ID}#{index+1}|Fatal", ack_file_content)
# self.assertIn(f"{TEST_FILE_ID}#{index+1}|Fatal", ack_file_content)

def test_e2e_success(self):
"""
Expand Down Expand Up @@ -304,7 +304,7 @@ def test_e2e_kinesis_failed(self):

main(TEST_EVENT_DUMPED)

self.assertIn("Fatal", self.get_ack_file_content())
# self.assertIn("Fatal", self.get_ack_file_content())


if __name__ == "__main__":
Expand Down
16 changes: 8 additions & 8 deletions recordprocessor/tests/test_processing_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def test_process_csv_to_fhir(self, mock_send_to_kinesis):
with patch("batch_processing.get_operation_permissions", return_value={"CREATE", "UPDATE", "DELETE"}):
process_csv_to_fhir(TEST_EVENT)

self.assert_value_in_ack_file("Success")
# self.assert_value_in_ack_file("Success")
mock_send_to_kinesis.assert_called()

@patch("batch_processing.send_to_kinesis")
Expand All @@ -196,7 +196,7 @@ def test_process_csv_to_fhir_(self, mock_send_to_kinesis):

process_csv_to_fhir(TEST_EVENT_PERMISSION)

self.assert_value_in_ack_file("Success")
# self.assert_value_in_ack_file("Success")
mock_send_to_kinesis.assert_called()

@patch("batch_processing.send_to_kinesis")
Expand All @@ -206,7 +206,7 @@ def test_process_csv_to_fhir_positive_string_provided(self, mock_send_to_kinesis
with patch("batch_processing.get_operation_permissions", return_value={"CREATE", "UPDATE", "DELETE"}):
process_csv_to_fhir(TEST_EVENT)

self.assert_value_in_ack_file("Success")
# self.assert_value_in_ack_file("Success")
mock_send_to_kinesis.assert_called()

@patch("batch_processing.send_to_kinesis")
Expand All @@ -216,7 +216,7 @@ def test_process_csv_to_fhir_only_mandatory(self, mock_send_to_kinesis):
with patch("batch_processing.get_operation_permissions", return_value={"CREATE", "UPDATE", "DELETE"}):
process_csv_to_fhir(TEST_EVENT)

self.assert_value_in_ack_file("Success")
# self.assert_value_in_ack_file("Success")
mock_send_to_kinesis.assert_called()

@patch("batch_processing.send_to_kinesis")
Expand All @@ -226,7 +226,7 @@ def test_process_csv_to_fhir_positive_string_not_provided(self, mock_send_to_kin
with patch("batch_processing.get_operation_permissions", return_value={"CREATE", "UPDATE", "DELETE"}):
process_csv_to_fhir(TEST_EVENT)

self.assert_value_in_ack_file("Success")
# self.assert_value_in_ack_file("Success")
mock_send_to_kinesis.assert_called()

@patch("batch_processing.send_to_kinesis")
Expand All @@ -239,7 +239,7 @@ def test_process_csv_to_fhir_paramter_missing(self, mock_send_to_kinesis):
):
process_csv_to_fhir(TEST_EVENT)

self.assert_value_in_ack_file("Fatal")
# self.assert_value_in_ack_file("Fatal")
mock_send_to_kinesis.assert_called()

@patch("batch_processing.send_to_kinesis")
Expand Down Expand Up @@ -340,7 +340,7 @@ def test_process_csv_to_fhir_successful(self, mock_send_to_kinesis):
mock_csv_reader_instance.__iter__.return_value = iter(TestValues.mock_update_request)
process_csv_to_fhir(TEST_EVENT)

self.assert_value_in_ack_file("Success")
# self.assert_value_in_ack_file("Success")
mock_send_to_kinesis.assert_called()

@patch("batch_processing.send_to_kinesis")
Expand All @@ -352,7 +352,7 @@ def test_process_csv_to_fhir_incorrect_permissions(self, mock_send_to_kinesis):
mock_csv_reader_instance.__iter__.return_value = iter(TestValues.mock_update_request)
process_csv_to_fhir(TEST_EVENT)

self.assert_value_in_ack_file("No permissions for requested operation")
# self.assert_value_in_ack_file("No permissions for requested operation")
mock_send_to_kinesis.assert_called()

def test_get_environment(self):
Expand Down

0 comments on commit 8639663

Please sign in to comment.