diff --git a/CHANGELOG.md b/CHANGELOG.md index cf79dac9b..7c3e2a609 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.21.0] +### Added +- Added `processing_times` field to the Job schema in the API in order to support jobs with multiple processing steps. +### Removed +- Removed `processing_time_in_seconds` field from the Job schema. + ## [2.20.0] ### Added - New `RIVER_WIDTH` job type. diff --git a/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 b/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 index ded2e606b..64e2e413a 100644 --- a/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 +++ b/apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2 @@ -434,8 +434,8 @@ components: expiration_time: $ref: "#/components/schemas/datetime" nullable: true - processing_time_in_seconds: - $ref: "#/components/schemas/processing_time_in_seconds" + processing_times: + $ref: "#/components/schemas/processing_times" priority: $ref: "#/components/schemas/priority" @@ -543,11 +543,20 @@ components: minimum: 0 maximum: 9999 + processing_times: + description: > + A list of run times for the job's processing steps in the order that they were executed. For example, + a job comprised of a single processing step would yield a list containing one processing time, while a job + comprised of three processing steps would yield a list containing three processing times. An empty list + represents a failure to calculate processing times. + type: array + items: + $ref: '#/components/schemas/processing_time_in_seconds' + processing_time_in_seconds: description: > - Run time in seconds for the final processing attempt (regardless of whether it succeeded). A value of zero - indicates that the job failed before reaching the processing step or that there was an error in calculating - processing time. + Run time in seconds for a particular processing step's final attempt (regardless of whether it succeeded). + A value of zero indicates that there were no attempts. type: number minimum: 0 diff --git a/apps/check-processing-time/src/check_processing_time.py b/apps/check-processing-time/src/check_processing_time.py index b5e653f13..7cc979b7a 100644 --- a/apps/check-processing-time/src/check_processing_time.py +++ b/apps/check-processing-time/src/check_processing_time.py @@ -1,18 +1,23 @@ import json -def get_time_from_attempts(attempts): +def get_time_from_attempts(attempts: list[dict]) -> float: if len(attempts) == 0: - raise ValueError('no Batch job attempts') + return 0 attempts.sort(key=lambda attempt: attempt['StoppedAt']) final_attempt = attempts[-1] return (final_attempt['StoppedAt'] - final_attempt['StartedAt']) / 1000 -def lambda_handler(event, context): - results = event['processing_results'] - if 'Attempts' in results: - attempts = results['Attempts'] +def get_time_from_result(result: dict) -> float: + if 'Attempts' in result: + attempts = result['Attempts'] else: - attempts = json.loads(results['Cause'])['Attempts'] + attempts = json.loads(result['Cause'])['Attempts'] return get_time_from_attempts(attempts) + + +def lambda_handler(event, context) -> list[float]: + results_dict = event['processing_results'] + results = [results_dict[key] for key in sorted(results_dict.keys())] + return list(map(get_time_from_result, results)) diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index 8d29bf4e1..54831ba8c 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -4,11 +4,12 @@ "SET_DEFAULT_RESULTS": { "Type": "Pass", "Result": { - "processing_time_in_seconds": 0, + "processing_times": [], "get_files": { "logs": [], "expiration_time": null - } + }, + "processing_failed": false }, "ResultPath": "$.results", "Next": "EXECUTION_STARTED" @@ -115,7 +116,7 @@ "Attempts": 3 } }, - "ResultPath": "$.results.processing_results", + "ResultPath": "$.results.processing_results.step_{{ loop.index0 }}", "Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}", "Retry": [ { @@ -137,13 +138,19 @@ "ErrorEquals": [ "States.ALL" ], - "Next": "UPLOAD_LOG", - "ResultPath": "$.results.processing_results" + "Next": "PROCESSING_FAILED", + "ResultPath": "$.results.processing_results.step_{{ loop.index0 }}" } ] }, {% endfor %} {% endfor %} + "PROCESSING_FAILED": { + "Type": "Pass", + "Result": true, + "ResultPath": "$.results.processing_failed", + "Next": "UPLOAD_LOG" + }, "UPLOAD_LOG":{ "Type": "Task", "Resource": "${UploadLogLambdaArn}", @@ -218,17 +225,17 @@ "States.ALL" ], "Next": "JOB_FAILED", - "ResultPath": "$.results.processing_time_in_seconds_error" + "ResultPath": "$.results.check_processing_time_error" } ], - "ResultPath": "$.results.processing_time_in_seconds", + "ResultPath": "$.results.processing_times", "Next": "CHECK_STATUS" }, "CHECK_STATUS": { "Type" : "Choice", "Choices": [{ - "Variable": "$.results.processing_results.Error", - "IsPresent": true, + "Variable": "$.results.processing_failed", + "BooleanEquals": true, "Next": "JOB_FAILED" }], "Default": "JOB_SUCCEEDED" @@ -244,7 +251,7 @@ "thumbnail_images.$": "$.results.get_files.thumbnail_images", "logs.$": "$.results.get_files.logs", "expiration_time.$": "$.results.get_files.expiration_time", - "processing_time_in_seconds.$": "$.results.processing_time_in_seconds" + "processing_times.$": "$.results.processing_times" }, "Retry": [ { @@ -265,7 +272,7 @@ "status_code": "FAILED", "logs.$": "$.results.get_files.logs", "expiration_time.$": "$.results.get_files.expiration_time", - "processing_time_in_seconds.$": "$.results.processing_time_in_seconds" + "processing_times.$": "$.results.processing_times" }, "Retry": [ { diff --git a/apps/upload-log/src/upload_log.py b/apps/upload-log/src/upload_log.py index 541bcdcec..284641093 100644 --- a/apps/upload-log/src/upload_log.py +++ b/apps/upload-log/src/upload_log.py @@ -10,10 +10,10 @@ S3 = boto3.client('s3') -def get_log_stream(processing_results: dict) -> Optional[str]: - if 'Error' in processing_results: - processing_results = json.loads(processing_results['Cause']) - return processing_results['Container'].get('LogStreamName') +def get_log_stream(result: dict) -> Optional[str]: + if 'Error' in result: + result = json.loads(result['Cause']) + return result['Container'].get('LogStreamName') def get_log_content(log_group, log_stream): @@ -58,9 +58,13 @@ def write_log_to_s3(bucket, prefix, content): def lambda_handler(event, context): + # TODO handle all results, not just the last one + results_dict = event['processing_results'] + result = results_dict[max(results_dict.keys())] + log_content = None - log_stream = get_log_stream(event['processing_results']) + log_stream = get_log_stream(result) if log_stream is not None: try: log_content = get_log_content(event['log_group'], log_stream) @@ -69,7 +73,7 @@ def lambda_handler(event, context): raise if log_content is None: - assert 'Error' in event['processing_results'] - log_content = get_log_content_from_failed_attempts(json.loads(event['processing_results']['Cause'])) + assert 'Error' in result + log_content = get_log_content_from_failed_attempts(json.loads(result['Cause'])) write_log_to_s3(environ['BUCKET'], event['prefix'], log_content) diff --git a/tests/test_check_processing_time.py b/tests/test_check_processing_time.py index ae202a67e..3c2f7b662 100644 --- a/tests/test_check_processing_time.py +++ b/tests/test_check_processing_time.py @@ -1,12 +1,9 @@ -import pytest - import check_processing_time def test_single_attempt(): attempts = [{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 2800}] - result = check_processing_time.get_time_from_attempts(attempts) - assert result == 2.3 + assert check_processing_time.get_time_from_attempts(attempts) == 2.3 def test_multiple_attempts(): @@ -14,8 +11,7 @@ def test_multiple_attempts(): {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} ] - result = check_processing_time.get_time_from_attempts(attempts) - assert result == 5.7 + assert check_processing_time.get_time_from_attempts(attempts) == 5.7 def test_unsorted_attempts(): @@ -25,8 +21,7 @@ def test_unsorted_attempts(): {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700}, {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000} ] - result = check_processing_time.get_time_from_attempts(attempts) - assert result == 5.7 + assert check_processing_time.get_time_from_attempts(attempts) == 5.7 def test_missing_start_time(): @@ -37,37 +32,50 @@ def test_missing_start_time(): {'Container': {}, 'StatusReason': '', 'StoppedAt': 8700}, {'Container': {}, 'StartedAt': 12000, 'StatusReason': '', 'StoppedAt': 15200} ] - result = check_processing_time.get_time_from_attempts(attempts) - assert result == 3.2 + assert check_processing_time.get_time_from_attempts(attempts) == 3.2 def test_no_attempts(): - with pytest.raises(ValueError, match='no Batch job attempts'): - check_processing_time.get_time_from_attempts([]) + assert check_processing_time.get_time_from_attempts([]) == 0 -def test_lambda_handler_with_normal_results(): - event = { - 'processing_results': { - 'Attempts': [ - {'Container': {}, 'StartedAt': 1644609403693, 'StatusReason': '', 'StoppedAt': 1644609919331}, - {'Container': {}, 'StartedAt': 1644610107570, 'StatusReason': '', 'StoppedAt': 1644611472015} - ] - } +def test_get_time_from_result(): + result = { + 'Attempts': [ + {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, + {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} + ] + } + assert check_processing_time.get_time_from_result(result) == 5.7 + + +def test_get_time_from_result_failed(): + result = { + 'Error': 'States.TaskFailed', + 'Cause': '{"Attempts": [' + '{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, ' + '{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, ' + '{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}' } - response = check_processing_time.lambda_handler(event, None) - assert response == (1644611472015 - 1644610107570) / 1000 + assert check_processing_time.get_time_from_result(result) == 6.4 -def test_lambda_handler_with_failed_results(): +def test_lambda_handler(): event = { 'processing_results': { - 'Error': 'States.TaskFailed', - 'Cause': '{"Attempts": [' - '{"Container": {}, "StartedAt": 1643834765893, "StatusReason": "", "StoppedAt": 1643834766455}, ' - '{"Container": {}, "StartedAt": 1643834888866, "StatusReason": "", "StoppedAt": 1643834889448}, ' - '{"Container": {}, "StartedAt": 1643834907858, "StatusReason": "", "StoppedAt": 1643834908466}]}' + 'step_0': { + 'Attempts': [ + {'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}, + {'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700} + ] + }, + 'step_1': { + 'Error': 'States.TaskFailed', + 'Cause': '{"Attempts": [' + '{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, ' + '{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, ' + '{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}' + }, } } - response = check_processing_time.lambda_handler(event, None) - assert response == (1643834908466 - 1643834907858) / 1000 + assert check_processing_time.lambda_handler(event, None) == [5.7, 6.4] diff --git a/tests/test_upload_log.py b/tests/test_upload_log.py index aa59226a2..33eb41165 100644 --- a/tests/test_upload_log.py +++ b/tests/test_upload_log.py @@ -22,24 +22,24 @@ def s3_stubber(): def test_get_log_stream(): - processing_results = { + result = { 'Container': { 'LogStreamName': 'mySucceededLogStream', }, } - assert upload_log.get_log_stream(processing_results) == 'mySucceededLogStream' + assert upload_log.get_log_stream(result) == 'mySucceededLogStream' - processing_results = { + result = { 'Error': 'States.TaskFailed', 'Cause': '{"Container": {"LogStreamName": "myFailedLogStream"}}', } - assert upload_log.get_log_stream(processing_results) == 'myFailedLogStream' + assert upload_log.get_log_stream(result) == 'myFailedLogStream' - processing_results = { + result = { 'Error': 'States.TaskFailed', 'Cause': '{"Container": {}}', } - assert upload_log.get_log_stream(processing_results) is None + assert upload_log.get_log_stream(result) is None def test_get_log_content(cloudwatch_stubber): @@ -117,7 +117,7 @@ def test_lambda_handler(mock_get_log_content: MagicMock, mock_write_log_to_s3: M event = { 'prefix': 'test-prefix', 'log_group': 'test-log-group', - 'processing_results': {'Container': {'LogStreamName': 'test-log-stream'}} + 'processing_results': {'step_0': {'Container': {'LogStreamName': 'test-log-stream'}}} } upload_log.lambda_handler(event, None) @@ -133,11 +133,14 @@ def test_lambda_handler_no_log_stream(mock_write_log_to_s3: MagicMock): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - 'Error': '', - 'Cause': '{"Container": {},' - '"Status": "FAILED",' - '"StatusReason": "foo reason",' - '"Attempts": []}' + 'step_0': + { + 'Error': '', + 'Cause': '{"Container": {},' + '"Status": "FAILED",' + '"StatusReason": "foo reason",' + '"Attempts": []}' + } } } @@ -159,14 +162,16 @@ def mock_get_log_events(**kwargs): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - 'Error': '', - 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' - '"Status": "FAILED",' - '"StatusReason": "Task failed to start",' - '"Attempts": [' - '{"Container": {"Reason": "error message 1"}},' - '{"Container": {"Reason": "error message 2"}},' - '{"Container": {"Reason": "error message 3"}}]}' + 'step_0': { + 'Error': '', + 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' + '"Status": "FAILED",' + '"StatusReason": "Task failed to start",' + '"Attempts": [' + '{"Container": {"Reason": "error message 1"}},' + '{"Container": {"Reason": "error message 2"}},' + '{"Container": {"Reason": "error message 3"}}]}' + } } } @@ -194,14 +199,16 @@ def mock_get_log_events(**kwargs): 'prefix': 'test-prefix', 'log_group': 'test-log-group', 'processing_results': { - 'Error': '', - 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' - '"Status": "FAILED",' - '"StatusReason": "Task failed to start",' - '"Attempts": [' - '{"Container": {"Reason": "error message 1"}},' - '{"Container": {"Reason": "error message 2"}},' - '{"Container": {"Reason": "error message 3"}}]}' + 'step_0': { + 'Error': '', + 'Cause': '{"Container": {"LogStreamName": "test-log-stream"},' + '"Status": "FAILED",' + '"StatusReason": "Task failed to start",' + '"Attempts": [' + '{"Container": {"Reason": "error message 1"}},' + '{"Container": {"Reason": "error message 2"}},' + '{"Container": {"Reason": "error message 3"}}]}' + } } }