From e848d0a508d1f9597a5d2c340d7254506948e388 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Fri, 2 Sep 2022 15:36:53 -0800 Subject: [PATCH 01/22] Compute processing time from multiple Batch jobs --- .../src/check_processing_time.py | 16 +++-- tests/test_check_processing_time.py | 67 +++++++++++-------- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/apps/check-processing-time/src/check_processing_time.py b/apps/check-processing-time/src/check_processing_time.py index b5e653f13..128d7d9b1 100644 --- a/apps/check-processing-time/src/check_processing_time.py +++ b/apps/check-processing-time/src/check_processing_time.py @@ -1,7 +1,7 @@ import json -def get_time_from_attempts(attempts): +def get_time_from_attempts(attempts: list[dict]) -> float: if len(attempts) == 0: raise ValueError('no Batch job attempts') attempts.sort(key=lambda attempt: attempt['StoppedAt']) @@ -9,10 +9,14 @@ def get_time_from_attempts(attempts): return (final_attempt['StoppedAt'] - final_attempt['StartedAt']) / 1000 -def lambda_handler(event, context): - results = event['processing_results'] - if 'Attempts' in results: - attempts = results['Attempts'] +def get_time_from_result(result: dict) -> float: + if 'Attempts' in result: + attempts = result['Attempts'] else: - attempts = json.loads(results['Cause'])['Attempts'] + attempts = json.loads(result['Cause'])['Attempts'] return get_time_from_attempts(attempts) + + +def lambda_handler(event, context): + results = event['processing_results'] + return sum(map(get_time_from_result, results)) diff --git a/tests/test_check_processing_time.py b/tests/test_check_processing_time.py index ae202a67e..bdeaa4902 100644 --- a/tests/test_check_processing_time.py +++ b/tests/test_check_processing_time.py @@ -5,8 +5,7 @@ def test_single_attempt(): attempts = [{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 2800}] - result = check_processing_time.get_time_from_attempts(attempts) - assert result == 2.3 + assert check_processing_time.get_time_from_attempts(attempts) == 2.3 def test_multiple_attempts(): @@ -14,8 +13,7 @@ def test_multiple_attempts(): {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} ] - result = check_processing_time.get_time_from_attempts(attempts) - assert result == 5.7 + assert check_processing_time.get_time_from_attempts(attempts) == 5.7 def test_unsorted_attempts(): @@ -25,8 +23,7 @@ def test_unsorted_attempts(): {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700}, {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000} ] - result = check_processing_time.get_time_from_attempts(attempts) - assert result == 5.7 + assert check_processing_time.get_time_from_attempts(attempts) == 5.7 def test_missing_start_time(): @@ -37,8 +34,7 @@ def test_missing_start_time(): {'Container': {}, 'StatusReason': '', 'StoppedAt': 8700}, {'Container': {}, 'StartedAt': 12000, 'StatusReason': '', 'StoppedAt': 15200} ] - result = check_processing_time.get_time_from_attempts(attempts) - assert result == 3.2 + assert check_processing_time.get_time_from_attempts(attempts) == 3.2 def test_no_attempts(): @@ -46,28 +42,43 @@ def test_no_attempts(): check_processing_time.get_time_from_attempts([]) -def test_lambda_handler_with_normal_results(): - event = { - 'processing_results': { - 'Attempts': [ - {'Container': {}, 'StartedAt': 1644609403693, 'StatusReason': '', 'StoppedAt': 1644609919331}, - {'Container': {}, 'StartedAt': 1644610107570, 'StatusReason': '', 'StoppedAt': 1644611472015} - ] - } +def test_get_time_from_result(): + result = { + 'Attempts': [ + {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, + {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} + ] + } + assert check_processing_time.get_time_from_result(result) == 5.7 + + +def test_get_time_from_result_failed(): + result = { + 'Error': 'States.TaskFailed', + 'Cause': '{"Attempts": [' + '{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, ' + '{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, ' + '{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}' } - response = check_processing_time.lambda_handler(event, None) - assert response == (1644611472015 - 1644610107570) / 1000 + assert check_processing_time.get_time_from_result(result) == 6.4 -def test_lambda_handler_with_failed_results(): +def test_lambda_handler(): event = { - 'processing_results': { - 'Error': 'States.TaskFailed', - 'Cause': '{"Attempts": [' - '{"Container": {}, "StartedAt": 1643834765893, "StatusReason": "", "StoppedAt": 1643834766455}, ' - '{"Container": {}, "StartedAt": 1643834888866, "StatusReason": "", "StoppedAt": 1643834889448}, ' - '{"Container": {}, "StartedAt": 1643834907858, "StatusReason": "", "StoppedAt": 1643834908466}]}' - } + 'processing_results': [ + { + 'Attempts': [ + {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, + {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} + ] + }, + { + 'Error': 'States.TaskFailed', + 'Cause': '{"Attempts": [' + '{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, ' + '{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, ' + '{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}' + }, + ] } - response = check_processing_time.lambda_handler(event, None) - assert response == (1643834908466 - 1643834907858) / 1000 + assert check_processing_time.lambda_handler(event, None) == 5.7 + 6.4 From de5722fe534be41794548d1aefbebe1279f1be81 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Tue, 6 Sep 2022 09:21:10 -0800 Subject: [PATCH 02/22] Store result for each Batch job --- .../src/check_processing_time.py | 4 +- apps/set-status/set-status-cf.yml.j2 | 80 +++++++++++++++++++ apps/set-status/src/set_status.py | 2 + apps/step-function.json.j2 | 36 +++++++-- apps/upload-log/src/upload_log.py | 18 +++-- 5 files changed, 127 insertions(+), 13 deletions(-) create mode 100644 apps/set-status/set-status-cf.yml.j2 create mode 100644 apps/set-status/src/set_status.py diff --git a/apps/check-processing-time/src/check_processing_time.py b/apps/check-processing-time/src/check_processing_time.py index 128d7d9b1..e8e19f054 100644 --- a/apps/check-processing-time/src/check_processing_time.py +++ b/apps/check-processing-time/src/check_processing_time.py @@ -18,5 +18,7 @@ def get_time_from_result(result: dict) -> float: def lambda_handler(event, context): - results = event['processing_results'] + results_dict = event['processing_results'] + results = [results_dict[i] for i in sorted(results_dict.keys())] + # TODO return list return sum(map(get_time_from_result, results)) diff --git a/apps/set-status/set-status-cf.yml.j2 b/apps/set-status/set-status-cf.yml.j2 new file mode 100644 index 000000000..3a87583b5 --- /dev/null +++ b/apps/set-status/set-status-cf.yml.j2 @@ -0,0 +1,80 @@ +AWSTemplateFormatVersion: 2010-09-09 + +{% if security_environment == 'EDC' %} +Parameters: + + PermissionsBoundaryPolicyArn: + Type: String + + SecurityGroupId: + Type: String + + SubnetIds: + Type: CommaDelimitedList +{% endif %} + +Outputs: + + LambdaArn: + Value: !GetAtt Lambda.Arn + +Resources: + + LogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${Lambda}" + RetentionInDays: 90 + + Role: + Type: {{ 'Custom::JplRole' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::Role' }} + Properties: + {% if security_environment in ('JPL', 'JPL-public') %} + ServiceToken: !ImportValue Custom::JplRole::ServiceToken + Path: /account-managed/hyp3/ + {% endif %} + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + Action: sts:AssumeRole + Principal: + Service: lambda.amazonaws.com + Effect: Allow + ManagedPolicyArns: + - !Ref Policy + {% if security_environment == 'EDC' %} + - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole + PermissionsBoundary: !Ref PermissionsBoundaryPolicyArn + {% endif %} + + Policy: + Type: {{ 'Custom::JplPolicy' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::ManagedPolicy' }} + Properties: + {% if security_environment in ('JPL', 'JPL-public') %} + ServiceToken: !ImportValue Custom::JplPolicy::ServiceToken + Path: /account-managed/hyp3/ + {% endif %} + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - logs:CreateLogStream + - logs:PutLogEvents + Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*" + + Lambda: + Type: AWS::Lambda::Function + Properties: + Code: src/ + Handler: set_status.lambda_handler + MemorySize: 128 + Role: !GetAtt Role.Arn + Runtime: python3.9 + Timeout: 30 + {% if security_environment == 'EDC' %} + VpcConfig: + SecurityGroupIds: + - !Ref SecurityGroupId + SubnetIds: !Ref SubnetIds + {% endif %} diff --git a/apps/set-status/src/set_status.py b/apps/set-status/src/set_status.py new file mode 100644 index 000000000..d1ccded0a --- /dev/null +++ b/apps/set-status/src/set_status.py @@ -0,0 +1,2 @@ +def lambda_handler(event, context) -> bool: + return any('Error' in result for result in event['processing_results'].values()) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index 8d29bf4e1..c1fd11892 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -115,7 +115,7 @@ "Attempts": 3 } }, - "ResultPath": "$.results.processing_results", + "ResultPath": "$.results.processing_results.{{ loop.index0 }}", "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", "Retry": [ { @@ -138,7 +138,7 @@ "States.ALL" ], "Next": "UPLOAD_LOG", - "ResultPath": "$.results.processing_results" + "ResultPath": "$.results.processing_results.{{ loop.index0 }}" } ] }, @@ -222,13 +222,39 @@ } ], "ResultPath": "$.results.processing_time_in_seconds", - "Next": "CHECK_STATUS" + "Next": "SET_STATUS" }, + "SET_STATUS": { + "Type": "Task", + "Resource": "${SetStatusLambdaArn}", + "Parameters": { + "processing_results.$": "$.results.processing_results" + }, + "Retry": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "MaxAttempts": 2 + } + ], + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "JOB_FAILED", + "ResultPath": "$.results.set_status_error" + } + ], + "ResultPath": "$.results.status_is_failed", + "Next": "CHECK_STATUS" + } "CHECK_STATUS": { "Type" : "Choice", "Choices": [{ - "Variable": "$.results.processing_results.Error", - "IsPresent": true, + "Variable": "$.results.status_is_failed", + "BooleanEquals": true, "Next": "JOB_FAILED" }], "Default": "JOB_SUCCEEDED" diff --git a/apps/upload-log/src/upload_log.py b/apps/upload-log/src/upload_log.py index 541bcdcec..16309cad9 100644 --- a/apps/upload-log/src/upload_log.py +++ b/apps/upload-log/src/upload_log.py @@ -10,10 +10,10 @@ S3 = boto3.client('s3') -def get_log_stream(processing_results: dict) -> Optional[str]: - if 'Error' in processing_results: - processing_results = json.loads(processing_results['Cause']) - return processing_results['Container'].get('LogStreamName') +def get_log_stream(result: dict) -> Optional[str]: + if 'Error' in result: + result = json.loads(result['Cause']) + return result['Container'].get('LogStreamName') def get_log_content(log_group, log_stream): @@ -58,9 +58,13 @@ def write_log_to_s3(bucket, prefix, content): def lambda_handler(event, context): + # TODO handle all results, not just the last one + results_dict = event['processing_results'] + result = results_dict[len(results_dict) - 1] + log_content = None - log_stream = get_log_stream(event['processing_results']) + log_stream = get_log_stream(result) if log_stream is not None: try: log_content = get_log_content(event['log_group'], log_stream) @@ -69,7 +73,7 @@ def lambda_handler(event, context): raise if log_content is None: - assert 'Error' in event['processing_results'] - log_content = get_log_content_from_failed_attempts(json.loads(event['processing_results']['Cause'])) + assert 'Error' in result + log_content = get_log_content_from_failed_attempts(json.loads(result['Cause'])) write_log_to_s3(environ['BUCKET'], event['prefix'], log_content) From 72b1131349e8b4ba4974b556ef1245eeedf0b12c Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Tue, 6 Sep 2022 09:24:35 -0800 Subject: [PATCH 03/22] Store processing time for each Batch job --- .../check-processing-time/src/check_processing_time.py | 3 +-- tests/test_check_processing_time.py | 10 +++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/apps/check-processing-time/src/check_processing_time.py b/apps/check-processing-time/src/check_processing_time.py index e8e19f054..a5d41b953 100644 --- a/apps/check-processing-time/src/check_processing_time.py +++ b/apps/check-processing-time/src/check_processing_time.py @@ -20,5 +20,4 @@ def get_time_from_result(result: dict) -> float: def lambda_handler(event, context): results_dict = event['processing_results'] results = [results_dict[i] for i in sorted(results_dict.keys())] - # TODO return list - return sum(map(get_time_from_result, results)) + return list(map(get_time_from_result, results)) diff --git a/tests/test_check_processing_time.py b/tests/test_check_processing_time.py index bdeaa4902..29b39c9ad 100644 --- a/tests/test_check_processing_time.py +++ b/tests/test_check_processing_time.py @@ -65,20 +65,20 @@ def test_get_time_from_result_failed(): def test_lambda_handler(): event = { - 'processing_results': [ - { + 'processing_results': { + '0': { 'Attempts': [ {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} ] }, - { + '1': { 'Error': 'States.TaskFailed', 'Cause': '{"Attempts": [' '{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, ' '{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, ' '{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}' }, - ] + } } - assert check_processing_time.lambda_handler(event, None) == 5.7 + 6.4 + assert check_processing_time.lambda_handler(event, None) == [5.7, 6.4] From 55b7d591d50514a813c8264372892970579c9e6a Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Tue, 6 Sep 2022 11:30:51 -0800 Subject: [PATCH 04/22] Fix upload_log handling of processing results --- apps/upload-log/src/upload_log.py | 2 +- tests/test_upload_log.py | 63 +++++++++++++++++-------------- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/apps/upload-log/src/upload_log.py b/apps/upload-log/src/upload_log.py index 16309cad9..058c5935a 100644 --- a/apps/upload-log/src/upload_log.py +++ b/apps/upload-log/src/upload_log.py @@ -60,7 +60,7 @@ def write_log_to_s3(bucket, prefix, content): def lambda_handler(event, context): # TODO handle all results, not just the last one results_dict = event['processing_results'] - result = results_dict[len(results_dict) - 1] + result = results_dict[str(len(results_dict) - 1)] log_content = None diff --git a/tests/test_upload_log.py b/tests/test_upload_log.py index aa59226a2..b6c3204b4 100644 --- a/tests/test_upload_log.py +++ b/tests/test_upload_log.py @@ -22,24 +22,24 @@ def s3_stubber(): def test_get_log_stream(): - processing_results = { + result = { 'Container': { 'LogStreamName': 'mySucceededLogStream', }, } - assert upload_log.get_log_stream(processing_results) == 'mySucceededLogStream' + assert upload_log.get_log_stream(result) == 'mySucceededLogStream' - processing_results = { + result = { 'Error': 'States.TaskFailed', 'Cause': '{"Container": {"LogStreamName": "myFailedLogStream"}}', } - assert upload_log.get_log_stream(processing_results) == 'myFailedLogStream' + assert upload_log.get_log_stream(result) == 'myFailedLogStream' - processing_results = { + result = { 'Error': 'States.TaskFailed', 'Cause': '{"Container": {}}', } - assert upload_log.get_log_stream(processing_results) is None + assert upload_log.get_log_stream(result) is None def test_get_log_content(cloudwatch_stubber): @@ -117,7 +117,7 @@ def test_lambda_handler(mock_get_log_content: MagicMock, mock_write_log_to_s3: M event = { 'prefix': 'test-prefix', 'log_group': 'test-log-group', - 'processing_results': {'Container': {'LogStreamName': 'test-log-stream'}} + 'processing_results': {'0': {'Container': {'LogStreamName': 'test-log-stream'}}} } upload_log.lambda_handler(event, None) @@ -133,11 +133,14 @@ def test_lambda_handler_no_log_stream(mock_write_log_to_s3: MagicMock): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - 'Error': '', - 'Cause': '{"Container": {},' - '"Status": "FAILED",' - '"StatusReason": "foo reason",' - '"Attempts": []}' + '0': + { + 'Error': '', + 'Cause': '{"Container": {},' + '"Status": "FAILED",' + '"StatusReason": "foo reason",' + '"Attempts": []}' + } } } @@ -159,14 +162,16 @@ def mock_get_log_events(**kwargs): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - 'Error': '', - 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' - '"Status": "FAILED",' - '"StatusReason": "Task failed to start",' - '"Attempts": [' - '{"Container": {"Reason": "error message 1"}},' - '{"Container": {"Reason": "error message 2"}},' - '{"Container": {"Reason": "error message 3"}}]}' + '0': { + 'Error': '', + 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' + '"Status": "FAILED",' + '"StatusReason": "Task failed to start",' + '"Attempts": [' + '{"Container": {"Reason": "error message 1"}},' + '{"Container": {"Reason": "error message 2"}},' + '{"Container": {"Reason": "error message 3"}}]}' + } } } @@ -194,14 +199,16 @@ def mock_get_log_events(**kwargs): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - 'Error': '', - 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' - '"Status": "FAILED",' - '"StatusReason": "Task failed to start",' - '"Attempts": [' - '{"Container": {"Reason": "error message 1"}},' - '{"Container": {"Reason": "error message 2"}},' - '{"Container": {"Reason": "error message 3"}}]}' + '0': { + 'Error': '', + 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' + '"Status": "FAILED",' + '"StatusReason": "Task failed to start",' + '"Attempts": [' + '{"Container": {"Reason": "error message 1"}},' + '{"Container": {"Reason": "error message 2"}},' + '{"Container": {"Reason": "error message 3"}}]}' + } } } From fb5e100e47728f39d57f1bbad52ce9befce02893 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Tue, 6 Sep 2022 11:36:28 -0800 Subject: [PATCH 05/22] Rename lambda --- .../check-for-processing-errors.yml.j2} | 2 +- .../src/check_for_processing_errors.py} | 0 apps/step-function.json.j2 | 12 ++++++------ 3 files changed, 7 insertions(+), 7 deletions(-) rename apps/{set-status/set-status-cf.yml.j2 => check-for-processing-errors/check-for-processing-errors.yml.j2} (97%) rename apps/{set-status/src/set_status.py => check-for-processing-errors/src/check_for_processing_errors.py} (100%) diff --git a/apps/set-status/set-status-cf.yml.j2 b/apps/check-for-processing-errors/check-for-processing-errors.yml.j2 similarity index 97% rename from apps/set-status/set-status-cf.yml.j2 rename to apps/check-for-processing-errors/check-for-processing-errors.yml.j2 index 3a87583b5..671df68e8 100644 --- a/apps/set-status/set-status-cf.yml.j2 +++ b/apps/check-for-processing-errors/check-for-processing-errors.yml.j2 @@ -67,7 +67,7 @@ Resources: Type: AWS::Lambda::Function Properties: Code: src/ - Handler: set_status.lambda_handler + Handler: check_for_processing_errors.lambda_handler MemorySize: 128 Role: !GetAtt Role.Arn Runtime: python3.9 diff --git a/apps/set-status/src/set_status.py b/apps/check-for-processing-errors/src/check_for_processing_errors.py similarity index 100% rename from apps/set-status/src/set_status.py rename to apps/check-for-processing-errors/src/check_for_processing_errors.py diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index c1fd11892..e6bc35c01 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -222,11 +222,11 @@ } ], "ResultPath": "$.results.processing_time_in_seconds", - "Next": "SET_STATUS" + "Next": "CHECK_FOR_PROCESSING_ERRORS" }, - "SET_STATUS": { + "CHECK_FOR_PROCESSING_ERRORS": { "Type": "Task", - "Resource": "${SetStatusLambdaArn}", + "Resource": "${CheckForProcessingErrorsLambdaArn}", "Parameters": { "processing_results.$": "$.results.processing_results" }, @@ -244,16 +244,16 @@ "States.ALL" ], "Next": "JOB_FAILED", - "ResultPath": "$.results.set_status_error" + "ResultPath": "$.results.check_for_processing_errors_error" } ], - "ResultPath": "$.results.status_is_failed", + "ResultPath": "$.results.processing_has_errors", "Next": "CHECK_STATUS" } "CHECK_STATUS": { "Type" : "Choice", "Choices": [{ - "Variable": "$.results.status_is_failed", + "Variable": "$.results.processing_has_errors", "BooleanEquals": true, "Next": "JOB_FAILED" }], From 383d272c5087a5a1a24117f124cee1b30b86c293 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Tue, 6 Sep 2022 11:43:05 -0800 Subject: [PATCH 06/22] Add check_processing_time return type --- apps/check-processing-time/src/check_processing_time.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/check-processing-time/src/check_processing_time.py b/apps/check-processing-time/src/check_processing_time.py index a5d41b953..c35c17d4c 100644 --- a/apps/check-processing-time/src/check_processing_time.py +++ b/apps/check-processing-time/src/check_processing_time.py @@ -17,7 +17,7 @@ def get_time_from_result(result: dict) -> float: return get_time_from_attempts(attempts) -def lambda_handler(event, context): +def lambda_handler(event, context) -> list[float]: results_dict = event['processing_results'] results = [results_dict[i] for i in sorted(results_dict.keys())] return list(map(get_time_from_result, results)) From b9c4e2eb0b6e3d43758232535134ee724703a609 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 16:01:34 -0800 Subject: [PATCH 07/22] Add PROCESSING_FAILED step --- .../check-for-processing-errors.yml.j2 | 80 ------------------- .../src/check_for_processing_errors.py | 2 - apps/step-function.json.j2 | 41 +++------- 3 files changed, 11 insertions(+), 112 deletions(-) delete mode 100644 apps/check-for-processing-errors/check-for-processing-errors.yml.j2 delete mode 100644 apps/check-for-processing-errors/src/check_for_processing_errors.py diff --git a/apps/check-for-processing-errors/check-for-processing-errors.yml.j2 b/apps/check-for-processing-errors/check-for-processing-errors.yml.j2 deleted file mode 100644 index 671df68e8..000000000 --- a/apps/check-for-processing-errors/check-for-processing-errors.yml.j2 +++ /dev/null @@ -1,80 +0,0 @@ -AWSTemplateFormatVersion: 2010-09-09 - -{% if security_environment == 'EDC' %} -Parameters: - - PermissionsBoundaryPolicyArn: - Type: String - - SecurityGroupId: - Type: String - - SubnetIds: - Type: CommaDelimitedList -{% endif %} - -Outputs: - - LambdaArn: - Value: !GetAtt Lambda.Arn - -Resources: - - LogGroup: - Type: AWS::Logs::LogGroup - Properties: - LogGroupName: !Sub "/aws/lambda/${Lambda}" - RetentionInDays: 90 - - Role: - Type: {{ 'Custom::JplRole' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::Role' }} - Properties: - {% if security_environment in ('JPL', 'JPL-public') %} - ServiceToken: !ImportValue Custom::JplRole::ServiceToken - Path: /account-managed/hyp3/ - {% endif %} - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - Action: sts:AssumeRole - Principal: - Service: lambda.amazonaws.com - Effect: Allow - ManagedPolicyArns: - - !Ref Policy - {% if security_environment == 'EDC' %} - - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole - PermissionsBoundary: !Ref PermissionsBoundaryPolicyArn - {% endif %} - - Policy: - Type: {{ 'Custom::JplPolicy' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::ManagedPolicy' }} - Properties: - {% if security_environment in ('JPL', 'JPL-public') %} - ServiceToken: !ImportValue Custom::JplPolicy::ServiceToken - Path: /account-managed/hyp3/ - {% endif %} - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - logs:CreateLogStream - - logs:PutLogEvents - Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*" - - Lambda: - Type: AWS::Lambda::Function - Properties: - Code: src/ - Handler: check_for_processing_errors.lambda_handler - MemorySize: 128 - Role: !GetAtt Role.Arn - Runtime: python3.9 - Timeout: 30 - {% if security_environment == 'EDC' %} - VpcConfig: - SecurityGroupIds: - - !Ref SecurityGroupId - SubnetIds: !Ref SubnetIds - {% endif %} diff --git a/apps/check-for-processing-errors/src/check_for_processing_errors.py b/apps/check-for-processing-errors/src/check_for_processing_errors.py deleted file mode 100644 index d1ccded0a..000000000 --- a/apps/check-for-processing-errors/src/check_for_processing_errors.py +++ /dev/null @@ -1,2 +0,0 @@ -def lambda_handler(event, context) -> bool: - return any('Error' in result for result in event['processing_results'].values()) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index e6bc35c01..fa73ac44c 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -8,7 +8,8 @@ "get_files": { "logs": [], "expiration_time": null - } + }, + "processing_failed": false }, "ResultPath": "$.results", "Next": "EXECUTION_STARTED" @@ -137,13 +138,19 @@ "ErrorEquals": [ "States.ALL" ], - "Next": "UPLOAD_LOG", + "Next": "PROCESSING_FAILED", "ResultPath": "$.results.processing_results.{{ loop.index0 }}" } ] }, {% endfor %} {% endfor %} + "PROCESSING_FAILED": { + "Type": "Pass", + "Result": true, + "ResultPath": "$.results.processing_failed", + "Next": "UPLOAD_LOG" + } "UPLOAD_LOG":{ "Type": "Task", "Resource": "${UploadLogLambdaArn}", @@ -222,38 +229,12 @@ } ], "ResultPath": "$.results.processing_time_in_seconds", - "Next": "CHECK_FOR_PROCESSING_ERRORS" - }, - "CHECK_FOR_PROCESSING_ERRORS": { - "Type": "Task", - "Resource": "${CheckForProcessingErrorsLambdaArn}", - "Parameters": { - "processing_results.$": "$.results.processing_results" - }, - "Retry": [ - { - "ErrorEquals": [ - "States.ALL" - ], - "MaxAttempts": 2 - } - ], - "Catch": [ - { - "ErrorEquals": [ - "States.ALL" - ], - "Next": "JOB_FAILED", - "ResultPath": "$.results.check_for_processing_errors_error" - } - ], - "ResultPath": "$.results.processing_has_errors", "Next": "CHECK_STATUS" - } + }, "CHECK_STATUS": { "Type" : "Choice", "Choices": [{ - "Variable": "$.results.processing_has_errors", + "Variable": "$.results.processing_failed", "BooleanEquals": true, "Next": "JOB_FAILED" }], From d1284108846112cf66afc2ab0bcd08d958b91303 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 16:34:34 -0800 Subject: [PATCH 08/22] Replace processing_time_in_seconds with processing_times --- .../src/hyp3_api/api-spec/openapi-spec.yml.j2 | 19 ++++++++++++++----- .../src/check_processing_time.py | 2 +- apps/step-function.json.j2 | 10 +++++----- tests/test_check_processing_time.py | 3 +-- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 b/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 index ded2e606b..10faaf9a3 100644 --- a/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 +++ b/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 @@ -434,8 +434,8 @@ components: expiration_time: $ref: "#/components/schemas/datetime" nullable: true - processing_time_in_seconds: - $ref: "#/components/schemas/processing_time_in_seconds" + processing_times: + $ref: "#/components/schemas/processing_times" priority: $ref: "#/components/schemas/priority" @@ -543,11 +543,20 @@ components: minimum: 0 maximum: 9999 + processing_times: + description: > + A list of run times for the job's processing steps in the order that they were executed. For example, + a job comprised of a single processing step would yield a list containing one processing time, while a job + comprised on three processing steps would yield a list containing three processing times. An empty list + represents a failure to calculate processing times. + type: array + items: + $ref: '#/components/schemas/processing_time_in_seconds' + processing_time_in_seconds: description: > - Run time in seconds for the final processing attempt (regardless of whether it succeeded). A value of zero - indicates that the job failed before reaching the processing step or that there was an error in calculating - processing time. + Run time in seconds for a particular processing step's final attempt (regardless of whether it succeeded). + A value of zero indicates that there were no attempts. type: number minimum: 0 diff --git a/apps/check-processing-time/src/check_processing_time.py b/apps/check-processing-time/src/check_processing_time.py index c35c17d4c..5d1d16b92 100644 --- a/apps/check-processing-time/src/check_processing_time.py +++ b/apps/check-processing-time/src/check_processing_time.py @@ -3,7 +3,7 @@ def get_time_from_attempts(attempts: list[dict]) -> float: if len(attempts) == 0: - raise ValueError('no Batch job attempts') + return 0 attempts.sort(key=lambda attempt: attempt['StoppedAt']) final_attempt = attempts[-1] return (final_attempt['StoppedAt'] - final_attempt['StartedAt']) / 1000 diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index fa73ac44c..d21eea7f3 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -4,7 +4,7 @@ "SET_DEFAULT_RESULTS": { "Type": "Pass", "Result": { - "processing_time_in_seconds": 0, + "processing_times": [], "get_files": { "logs": [], "expiration_time": null @@ -225,10 +225,10 @@ "States.ALL" ], "Next": "JOB_FAILED", - "ResultPath": "$.results.processing_time_in_seconds_error" + "ResultPath": "$.results.check_processing_time_error" } ], - "ResultPath": "$.results.processing_time_in_seconds", + "ResultPath": "$.results.processing_times", "Next": "CHECK_STATUS" }, "CHECK_STATUS": { @@ -251,7 +251,7 @@ "thumbnail_images.$": "$.results.get_files.thumbnail_images", "logs.$": "$.results.get_files.logs", "expiration_time.$": "$.results.get_files.expiration_time", - "processing_time_in_seconds.$": "$.results.processing_time_in_seconds" + "processing_times.$": "$.results.processing_times" }, "Retry": [ { @@ -272,7 +272,7 @@ "status_code": "FAILED", "logs.$": "$.results.get_files.logs", "expiration_time.$": "$.results.get_files.expiration_time", - "processing_time_in_seconds.$": "$.results.processing_time_in_seconds" + "processing_times.$": "$.results.processing_times" }, "Retry": [ { diff --git a/tests/test_check_processing_time.py b/tests/test_check_processing_time.py index 29b39c9ad..c12d72ff7 100644 --- a/tests/test_check_processing_time.py +++ b/tests/test_check_processing_time.py @@ -38,8 +38,7 @@ def test_missing_start_time(): def test_no_attempts(): - with pytest.raises(ValueError, match='no Batch job attempts'): - check_processing_time.get_time_from_attempts([]) + assert check_processing_time.get_time_from_attempts([]) == 0 def test_get_time_from_result(): From 71ec404455d6566f0ad60e430fa7a6f4976ac13b Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 16:35:23 -0800 Subject: [PATCH 09/22] Fix typo --- apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 b/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 index 10faaf9a3..64e2e413a 100644 --- a/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 +++ b/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 @@ -547,7 +547,7 @@ components: description: > A list of run times for the job's processing steps in the order that they were executed. For example, a job comprised of a single processing step would yield a list containing one processing time, while a job - comprised on three processing steps would yield a list containing three processing times. An empty list + comprised of three processing steps would yield a list containing three processing times. An empty list represents a failure to calculate processing times. type: array items: From 363cddb8482fb2241be30d2c723a1fef27a6012a Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 16:43:37 -0800 Subject: [PATCH 10/22] Update Changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf79dac9b..7c3e2a609 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.21.0] +### Added +- Added `processing_times` field to the Job schema in the API in order to support jobs with multiple processing steps. +### Removed +- Removed `processing_time_in_seconds` field from the Job schema. + ## [2.20.0] ### Added - New `RIVER_WIDTH` job type. From 7bd7e43e37884e205394f5691cfb4aeae295ec7e Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 16:46:44 -0800 Subject: [PATCH 11/22] Remove unused import --- tests/test_check_processing_time.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_check_processing_time.py b/tests/test_check_processing_time.py index c12d72ff7..d2d75efa8 100644 --- a/tests/test_check_processing_time.py +++ b/tests/test_check_processing_time.py @@ -1,5 +1,3 @@ -import pytest - import check_processing_time From e2a0cc5e6f867f2d9481608a2d312b4b1fa137fd Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 16:53:05 -0800 Subject: [PATCH 12/22] Add missing comma --- apps/step-function.json.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index d21eea7f3..c50744138 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -150,7 +150,7 @@ "Result": true, "ResultPath": "$.results.processing_failed", "Next": "UPLOAD_LOG" - } + }, "UPLOAD_LOG":{ "Type": "Task", "Resource": "${UploadLogLambdaArn}", From 052c324e29f4c8539c68de607e1b3feb41046a85 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 16:59:06 -0800 Subject: [PATCH 13/22] Fix processing_results path --- apps/step-function.json.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index c50744138..03f964af5 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -116,7 +116,7 @@ "Attempts": 3 } }, - "ResultPath": "$.results.processing_results.{{ loop.index0 }}", + "ResultPath": "$.results.processing_results['{{ loop.index0 }}']", "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", "Retry": [ { @@ -139,7 +139,7 @@ "States.ALL" ], "Next": "PROCESSING_FAILED", - "ResultPath": "$.results.processing_results.{{ loop.index0 }}" + "ResultPath": "$.results.processing_results['{{ loop.index0 }}']", } ] }, From 33f4615f4a0b6cafb3fe255485f167460ffabd31 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 17:01:12 -0800 Subject: [PATCH 14/22] Remove trailing comma --- apps/step-function.json.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index 03f964af5..42e645976 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -139,7 +139,7 @@ "States.ALL" ], "Next": "PROCESSING_FAILED", - "ResultPath": "$.results.processing_results['{{ loop.index0 }}']", + "ResultPath": "$.results.processing_results['{{ loop.index0 }}']" } ] }, From 3c4d68f76cac2e2a5a115153d087453b2c65b424 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 17:06:06 -0800 Subject: [PATCH 15/22] Fix processing_results path again --- apps/step-function.json.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index 42e645976..c427f3573 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -116,7 +116,7 @@ "Attempts": 3 } }, - "ResultPath": "$.results.processing_results['{{ loop.index0 }}']", + "ResultPath": "$['results']['processing_results']['{{ loop.index0 }}']", "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", "Retry": [ { @@ -139,7 +139,7 @@ "States.ALL" ], "Next": "PROCESSING_FAILED", - "ResultPath": "$.results.processing_results['{{ loop.index0 }}']" + "ResultPath": "$['results']['processing_results']['{{ loop.index0 }}']" } ] }, From d62dc56207841c7667144eee1893040a3291a83a Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 17:11:44 -0800 Subject: [PATCH 16/22] Try to get statelint to accept the processing_results path! --- apps/step-function.json.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index c427f3573..e7fc83c55 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -116,7 +116,7 @@ "Attempts": 3 } }, - "ResultPath": "$['results']['processing_results']['{{ loop.index0 }}']", + "ResultPath": "$['results']['processing_results']['s{{ loop.index0 }}']", "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", "Retry": [ { @@ -139,7 +139,7 @@ "States.ALL" ], "Next": "PROCESSING_FAILED", - "ResultPath": "$['results']['processing_results']['{{ loop.index0 }}']" + "ResultPath": "$['results']['processing_results']['s{{ loop.index0 }}']" } ] }, From 73926932ae43d8247e5cfb6d65cdd52a4818d26f Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 17:20:28 -0800 Subject: [PATCH 17/22] Rename processing_results keys to satisfy statelint --- apps/check-processing-time/src/check_processing_time.py | 2 +- apps/step-function.json.j2 | 4 ++-- apps/upload-log/src/upload_log.py | 2 +- tests/test_check_processing_time.py | 4 ++-- tests/test_upload_log.py | 8 ++++---- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/check-processing-time/src/check_processing_time.py b/apps/check-processing-time/src/check_processing_time.py index 5d1d16b92..7cc979b7a 100644 --- a/apps/check-processing-time/src/check_processing_time.py +++ b/apps/check-processing-time/src/check_processing_time.py @@ -19,5 +19,5 @@ def get_time_from_result(result: dict) -> float: def lambda_handler(event, context) -> list[float]: results_dict = event['processing_results'] - results = [results_dict[i] for i in sorted(results_dict.keys())] + results = [results_dict[key] for key in sorted(results_dict.keys())] return list(map(get_time_from_result, results)) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index e7fc83c55..88f8bbf97 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -116,7 +116,7 @@ "Attempts": 3 } }, - "ResultPath": "$['results']['processing_results']['s{{ loop.index0 }}']", + "ResultPath": "$.results.processing_results.step-{{ loop.index0 }}", "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", "Retry": [ { @@ -139,7 +139,7 @@ "States.ALL" ], "Next": "PROCESSING_FAILED", - "ResultPath": "$['results']['processing_results']['s{{ loop.index0 }}']" + "ResultPath": "$.results.processing_results.step-{{ loop.index0 }}" } ] }, diff --git a/apps/upload-log/src/upload_log.py b/apps/upload-log/src/upload_log.py index 058c5935a..a994208f7 100644 --- a/apps/upload-log/src/upload_log.py +++ b/apps/upload-log/src/upload_log.py @@ -60,7 +60,7 @@ def write_log_to_s3(bucket, prefix, content): def lambda_handler(event, context): # TODO handle all results, not just the last one results_dict = event['processing_results'] - result = results_dict[str(len(results_dict) - 1)] + result = results_dict[f'step-{len(results_dict) - 1}'] log_content = None diff --git a/tests/test_check_processing_time.py b/tests/test_check_processing_time.py index d2d75efa8..0f5aa5f84 100644 --- a/tests/test_check_processing_time.py +++ b/tests/test_check_processing_time.py @@ -63,13 +63,13 @@ def test_get_time_from_result_failed(): def test_lambda_handler(): event = { 'processing_results': { - '0': { + 'step-0': { 'Attempts': [ {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} ] }, - '1': { + 'step-1': { 'Error': 'States.TaskFailed', 'Cause': '{"Attempts": [' '{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, ' diff --git a/tests/test_upload_log.py b/tests/test_upload_log.py index b6c3204b4..8c007d370 100644 --- a/tests/test_upload_log.py +++ b/tests/test_upload_log.py @@ -117,7 +117,7 @@ def test_lambda_handler(mock_get_log_content: MagicMock, mock_write_log_to_s3: M event = { 'prefix': 'test-prefix', 'log_group': 'test-log-group', - 'processing_results': {'0': {'Container': {'LogStreamName': 'test-log-stream'}}} + 'processing_results': {'step-0': {'Container': {'LogStreamName': 'test-log-stream'}}} } upload_log.lambda_handler(event, None) @@ -133,7 +133,7 @@ def test_lambda_handler_no_log_stream(mock_write_log_to_s3: MagicMock): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - '0': + 'step-0': { 'Error': '', 'Cause': '{"Container": {},' @@ -162,7 +162,7 @@ def mock_get_log_events(**kwargs): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - '0': { + 'step-0': { 'Error': '', 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' '"Status": "FAILED",' @@ -199,7 +199,7 @@ def mock_get_log_events(**kwargs): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - '0': { + 'step-0': { 'Error': '', 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' '"Status": "FAILED",' From 34cc370bf61cc61c0bb34f04d661979c5a996166 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 17:24:57 -0800 Subject: [PATCH 18/22] Please just make statelint shut up!! --- apps/step-function.json.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index 88f8bbf97..22658d096 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -116,7 +116,7 @@ "Attempts": 3 } }, - "ResultPath": "$.results.processing_results.step-{{ loop.index0 }}", + "ResultPath": "$['results']['processing_results']['step-{{ loop.index0 }}']", "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", "Retry": [ { @@ -139,7 +139,7 @@ "States.ALL" ], "Next": "PROCESSING_FAILED", - "ResultPath": "$.results.processing_results.step-{{ loop.index0 }}" + "ResultPath": "$['results']['processing_results']['step-{{ loop.index0 }}']" } ] }, From 44b71406e47bf8ef786f5d41818a573c8c5f793e Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 17:28:36 -0800 Subject: [PATCH 19/22] please --- apps/step-function.json.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index 22658d096..35110a691 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -116,7 +116,7 @@ "Attempts": 3 } }, - "ResultPath": "$['results']['processing_results']['step-{{ loop.index0 }}']", + "ResultPath": "$['results']['processing_results']['step_{{ loop.index0 }}']", "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", "Retry": [ { @@ -139,7 +139,7 @@ "States.ALL" ], "Next": "PROCESSING_FAILED", - "ResultPath": "$['results']['processing_results']['step-{{ loop.index0 }}']" + "ResultPath": "$['results']['processing_results']['step_{{ loop.index0 }}']" } ] }, From a41c05cdcc5e59b4908aa06659a357c891f22f96 Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 17:32:07 -0800 Subject: [PATCH 20/22] hopefully this is also acceptable --- apps/step-function.json.j2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index 35110a691..54831ba8c 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -116,7 +116,7 @@ "Attempts": 3 } }, - "ResultPath": "$['results']['processing_results']['step_{{ loop.index0 }}']", + "ResultPath": "$.results.processing_results.step_{{ loop.index0 }}", "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", "Retry": [ { @@ -139,7 +139,7 @@ "States.ALL" ], "Next": "PROCESSING_FAILED", - "ResultPath": "$['results']['processing_results']['step_{{ loop.index0 }}']" + "ResultPath": "$.results.processing_results.step_{{ loop.index0 }}" } ] }, From ab94885a3f6acea1b408fb84f163b82f3c5db40c Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Thu, 8 Sep 2022 17:34:38 -0800 Subject: [PATCH 21/22] Use the updated processing_results key naming --- apps/upload-log/src/upload_log.py | 2 +- tests/test_check_processing_time.py | 4 ++-- tests/test_upload_log.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/upload-log/src/upload_log.py b/apps/upload-log/src/upload_log.py index a994208f7..aaf0b3e22 100644 --- a/apps/upload-log/src/upload_log.py +++ b/apps/upload-log/src/upload_log.py @@ -60,7 +60,7 @@ def write_log_to_s3(bucket, prefix, content): def lambda_handler(event, context): # TODO handle all results, not just the last one results_dict = event['processing_results'] - result = results_dict[f'step-{len(results_dict) - 1}'] + result = results_dict[f'step_{len(results_dict) - 1}'] log_content = None diff --git a/tests/test_check_processing_time.py b/tests/test_check_processing_time.py index 0f5aa5f84..3c2f7b662 100644 --- a/tests/test_check_processing_time.py +++ b/tests/test_check_processing_time.py @@ -63,13 +63,13 @@ def test_get_time_from_result_failed(): def test_lambda_handler(): event = { 'processing_results': { - 'step-0': { + 'step_0': { 'Attempts': [ {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} ] }, - 'step-1': { + 'step_1': { 'Error': 'States.TaskFailed', 'Cause': '{"Attempts": [' '{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, ' diff --git a/tests/test_upload_log.py b/tests/test_upload_log.py index 8c007d370..33eb41165 100644 --- a/tests/test_upload_log.py +++ b/tests/test_upload_log.py @@ -117,7 +117,7 @@ def test_lambda_handler(mock_get_log_content: MagicMock, mock_write_log_to_s3: M event = { 'prefix': 'test-prefix', 'log_group': 'test-log-group', - 'processing_results': {'step-0': {'Container': {'LogStreamName': 'test-log-stream'}}} + 'processing_results': {'step_0': {'Container': {'LogStreamName': 'test-log-stream'}}} } upload_log.lambda_handler(event, None) @@ -133,7 +133,7 @@ def test_lambda_handler_no_log_stream(mock_write_log_to_s3: MagicMock): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - 'step-0': + 'step_0': { 'Error': '', 'Cause': '{"Container": {},' @@ -162,7 +162,7 @@ def mock_get_log_events(**kwargs): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - 'step-0': { + 'step_0': { 'Error': '', 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' '"Status": "FAILED",' @@ -199,7 +199,7 @@ def mock_get_log_events(**kwargs): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - 'step-0': { + 'step_0': { 'Error': '', 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' '"Status": "FAILED",' From 3c077d39dd0625b44d762b67a5acdedf54914c1e Mon Sep 17 00:00:00 2001 From: Jake Herrmann Date: Fri, 9 Sep 2022 14:33:54 -0800 Subject: [PATCH 22/22] Improve readability --- apps/upload-log/src/upload_log.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/upload-log/src/upload_log.py b/apps/upload-log/src/upload_log.py index aaf0b3e22..284641093 100644 --- a/apps/upload-log/src/upload_log.py +++ b/apps/upload-log/src/upload_log.py @@ -60,7 +60,7 @@ def write_log_to_s3(bucket, prefix, content): def lambda_handler(event, context): # TODO handle all results, not just the last one results_dict = event['processing_results'] - result = results_dict[f'step_{len(results_dict) - 1}'] + result = results_dict[max(results_dict.keys())] log_content = None