diff --git a/infrastructure/terraform/per_workspace/modules/etl/sds/output.tf b/infrastructure/terraform/per_workspace/modules/etl/sds/output.tf index 9b7a4ed79..5a8e10dc6 100644 --- a/infrastructure/terraform/per_workspace/modules/etl/sds/output.tf +++ b/infrastructure/terraform/per_workspace/modules/etl/sds/output.tf @@ -13,3 +13,7 @@ output "changelog_key" { output "bulk_trigger_prefix" { value = local.bulk_trigger_prefix } + +output "notify_lambda_arn" { + value = module.notify.arn +} diff --git a/src/etl/notify/notify.py b/src/etl/notify/notify.py index 1ea02d025..646e2415e 100644 --- a/src/etl/notify/notify.py +++ b/src/etl/notify/notify.py @@ -1,4 +1,4 @@ -def handler(event: list[dict], context): +def handler(event: list[dict], context=None): for item in event: if item.get("error_message") is not None: return "fail" diff --git a/src/etl/notify/tests/test_notify_lambda.py b/src/etl/notify/tests/test_notify_lambda.py new file mode 100644 index 000000000..7ce3ba1b8 --- /dev/null +++ b/src/etl/notify/tests/test_notify_lambda.py @@ -0,0 +1,76 @@ +import json +import boto3 +import pytest +from test_helpers.terraform import read_terraform_output + + +STATE_MACHINE_INPUT_WITHOUT_ERROR_MESSAGE = [ + { + "ETag": '"9b41ef67651c18488a8b08bb67c75699"', + "ServerSideEncryption": "AES256", + "VersionId": "VHBhCwygSeYxgEEecU7N.meZl3uKDoaA", + }, + { + "stage_name": "load", + "processed_records": 0, + "unprocessed_records": 0, + "error_message": None, + }, +] + +STATE_MACHINE_INPUT_WITH_ERROR_MESSAGE = [ + { + "ETag": '"9b41ef67651c18488a8b08bb67c75699"', + "ServerSideEncryption": "AES256", + "VersionId": "VHBhCwygSeYxgEEecU7N.meZl3uKDoaA", + }, + { + "stage_name": "load", + "processed_records": 0, + "unprocessed_records": 0, + "error_message": "oops", + }, +] + + +def test_notify_lambda_with_state_machine_input_without_error_message(): + from etl.notify import notify + + assert notify.handler(event=STATE_MACHINE_INPUT_WITHOUT_ERROR_MESSAGE) == "pass" + + +def test_notify_lambda_with_state_machine_input_with_error_message(): + from etl.notify import notify + + assert notify.handler(event=STATE_MACHINE_INPUT_WITH_ERROR_MESSAGE) == "fail" + + +@pytest.mark.integration +def test_notify_lambda_without_error_message(): + notify_lambda_arn = read_terraform_output("sds_etl.value.notify_lambda_arn") + + lambda_client = boto3.client("lambda") + response = lambda_client.invoke( + FunctionName=notify_lambda_arn, + Payload=json.dumps([{"message": "test"}]).encode(), + ) + assert response["Payload"].read() == "pass" + + +@pytest.mark.integration +def test_notify_lambda_with_error_message(): + notify_lambda_arn = read_terraform_output("sds_etl.value.notify_lambda_arn") + + lambda_client = boto3.client("lambda") + response = lambda_client.invoke( + FunctionName=notify_lambda_arn, + Payload=json.dumps( + [ + { + "message": "test", + "error_message": "this is an error", + } + ] + ).encode(), + ) + assert response["Payload"].read() == "fail" diff --git a/src/layers/etl_utils/trigger/notify.py b/src/layers/etl_utils/trigger/notify.py index ccc4b35ce..66c6065ca 100644 --- a/src/layers/etl_utils/trigger/notify.py +++ b/src/layers/etl_utils/trigger/notify.py @@ -18,10 +18,12 @@ def notify( response = lambda_client.invoke( FunctionName=function_name, Payload=json.dumps( - { - "message": f"{status} '{trigger_type}' trigger of state machine.", - "error_message": error_message, - } + [ + { + "message": f"{status} '{trigger_type}' trigger of state machine.", + "error_message": error_message, + } + ] ).encode(), ) return response["Payload"].read()