Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
jaklinger committed Apr 11, 2024
1 parent 1950e38 commit 5e685c3
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ output "changelog_key" {
output "bulk_trigger_prefix" {
value = local.bulk_trigger_prefix
}

output "notify_lambda_arn" {
value = module.notify.arn
}
2 changes: 1 addition & 1 deletion src/etl/notify/notify.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def handler(event: list[dict], context):
def handler(event: list[dict], context=None):
for item in event:
if item.get("error_message") is not None:
return "fail"
Expand Down
76 changes: 76 additions & 0 deletions src/etl/notify/tests/test_notify_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import json
import boto3
import pytest
from test_helpers.terraform import read_terraform_output


STATE_MACHINE_INPUT_WITHOUT_ERROR_MESSAGE = [
{
"ETag": '"9b41ef67651c18488a8b08bb67c75699"',
"ServerSideEncryption": "AES256",
"VersionId": "VHBhCwygSeYxgEEecU7N.meZl3uKDoaA",
},
{
"stage_name": "load",
"processed_records": 0,
"unprocessed_records": 0,
"error_message": None,
},
]

STATE_MACHINE_INPUT_WITH_ERROR_MESSAGE = [
{
"ETag": '"9b41ef67651c18488a8b08bb67c75699"',
"ServerSideEncryption": "AES256",
"VersionId": "VHBhCwygSeYxgEEecU7N.meZl3uKDoaA",
},
{
"stage_name": "load",
"processed_records": 0,
"unprocessed_records": 0,
"error_message": "oops",
},
]


def test_notify_lambda_with_state_machine_input_without_error_message():
from etl.notify import notify

assert notify.handler(event=STATE_MACHINE_INPUT_WITHOUT_ERROR_MESSAGE) == "pass"


def test_notify_lambda_with_state_machine_input_with_error_message():
from etl.notify import notify

assert notify.handler(event=STATE_MACHINE_INPUT_WITH_ERROR_MESSAGE) == "fail"


@pytest.mark.integration
def test_notify_lambda_without_error_message():
notify_lambda_arn = read_terraform_output("sds_etl.value.notify_lambda_arn")

lambda_client = boto3.client("lambda")
response = lambda_client.invoke(
FunctionName=notify_lambda_arn,
Payload=json.dumps([{"message": "test"}]).encode(),
)
assert response["Payload"].read() == "pass"


@pytest.mark.integration
def test_notify_lambda_with_error_message():
notify_lambda_arn = read_terraform_output("sds_etl.value.notify_lambda_arn")

lambda_client = boto3.client("lambda")
response = lambda_client.invoke(
FunctionName=notify_lambda_arn,
Payload=json.dumps(
[
{
"message": "test",
"error_message": "this is an error",
}
]
).encode(),
)
assert response["Payload"].read() == "fail"
10 changes: 6 additions & 4 deletions src/layers/etl_utils/trigger/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ def notify(
response = lambda_client.invoke(
FunctionName=function_name,
Payload=json.dumps(
{
"message": f"{status} '{trigger_type}' trigger of state machine.",
"error_message": error_message,
}
[
{
"message": f"{status} '{trigger_type}' trigger of state machine.",
"error_message": error_message,
}
]
).encode(),
)
return response["Payload"].read()

0 comments on commit 5e685c3

Please sign in to comment.