Skip to content

Commit

Permalink
Merge pull request #1172 from ASFHyP3/append-processing-results
Browse files Browse the repository at this point in the history
Compute processing time for each Batch job
  • Loading branch information
jtherrmann authored Sep 9, 2022
2 parents ec895ec + 3c077d3 commit b0500d8
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 88 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.21.0]
### Added
- Added `processing_times` field to the Job schema in the API in order to support jobs with multiple processing steps.
### Removed
- Removed `processing_time_in_seconds` field from the Job schema.

## [2.20.0]
### Added
- New `RIVER_WIDTH` job type.
Expand Down
19 changes: 14 additions & 5 deletions apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ components:
expiration_time:
$ref: "#/components/schemas/datetime"
nullable: true
processing_time_in_seconds:
$ref: "#/components/schemas/processing_time_in_seconds"
processing_times:
$ref: "#/components/schemas/processing_times"
priority:
$ref: "#/components/schemas/priority"

Expand Down Expand Up @@ -543,11 +543,20 @@ components:
minimum: 0
maximum: 9999

processing_times:
description: >
A list of run times for the job's processing steps in the order that they were executed. For example,
a job comprised of a single processing step would yield a list containing one processing time, while a job
comprised of three processing steps would yield a list containing three processing times. An empty list
represents a failure to calculate processing times.
type: array
items:
$ref: '#/components/schemas/processing_time_in_seconds'

processing_time_in_seconds:
description: >
Run time in seconds for the final processing attempt (regardless of whether it succeeded). A value of zero
indicates that the job failed before reaching the processing step or that there was an error in calculating
processing time.
Run time in seconds for a particular processing step's final attempt (regardless of whether it succeeded).
A value of zero indicates that there were no attempts.
type: number
minimum: 0

Expand Down
19 changes: 12 additions & 7 deletions apps/check-processing-time/src/check_processing_time.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import json


def get_time_from_attempts(attempts):
def get_time_from_attempts(attempts: list[dict]) -> float:
if len(attempts) == 0:
raise ValueError('no Batch job attempts')
return 0
attempts.sort(key=lambda attempt: attempt['StoppedAt'])
final_attempt = attempts[-1]
return (final_attempt['StoppedAt'] - final_attempt['StartedAt']) / 1000


def lambda_handler(event, context):
results = event['processing_results']
if 'Attempts' in results:
attempts = results['Attempts']
def get_time_from_result(result: dict) -> float:
if 'Attempts' in result:
attempts = result['Attempts']
else:
attempts = json.loads(results['Cause'])['Attempts']
attempts = json.loads(result['Cause'])['Attempts']
return get_time_from_attempts(attempts)


def lambda_handler(event, context) -> list[float]:
results_dict = event['processing_results']
results = [results_dict[key] for key in sorted(results_dict.keys())]
return list(map(get_time_from_result, results))
29 changes: 18 additions & 11 deletions apps/step-function.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
"SET_DEFAULT_RESULTS": {
"Type": "Pass",
"Result": {
"processing_time_in_seconds": 0,
"processing_times": [],
"get_files": {
"logs": [],
"expiration_time": null
}
},
"processing_failed": false
},
"ResultPath": "$.results",
"Next": "EXECUTION_STARTED"
Expand Down Expand Up @@ -115,7 +116,7 @@
"Attempts": 3
}
},
"ResultPath": "$.results.processing_results",
"ResultPath": "$.results.processing_results.step_{{ loop.index0 }}",
"Next": "{% if not loop.last %}{{ loop.nextitem['name'] }}{% else %}GET_FILES{% endif %}",
"Retry": [
{
Expand All @@ -137,13 +138,19 @@
"ErrorEquals": [
"States.ALL"
],
"Next": "UPLOAD_LOG",
"ResultPath": "$.results.processing_results"
"Next": "PROCESSING_FAILED",
"ResultPath": "$.results.processing_results.step_{{ loop.index0 }}"
}
]
},
{% endfor %}
{% endfor %}
"PROCESSING_FAILED": {
"Type": "Pass",
"Result": true,
"ResultPath": "$.results.processing_failed",
"Next": "UPLOAD_LOG"
},
"UPLOAD_LOG":{
"Type": "Task",
"Resource": "${UploadLogLambdaArn}",
Expand Down Expand Up @@ -218,17 +225,17 @@
"States.ALL"
],
"Next": "JOB_FAILED",
"ResultPath": "$.results.processing_time_in_seconds_error"
"ResultPath": "$.results.check_processing_time_error"
}
],
"ResultPath": "$.results.processing_time_in_seconds",
"ResultPath": "$.results.processing_times",
"Next": "CHECK_STATUS"
},
"CHECK_STATUS": {
"Type" : "Choice",
"Choices": [{
"Variable": "$.results.processing_results.Error",
"IsPresent": true,
"Variable": "$.results.processing_failed",
"BooleanEquals": true,
"Next": "JOB_FAILED"
}],
"Default": "JOB_SUCCEEDED"
Expand All @@ -244,7 +251,7 @@
"thumbnail_images.$": "$.results.get_files.thumbnail_images",
"logs.$": "$.results.get_files.logs",
"expiration_time.$": "$.results.get_files.expiration_time",
"processing_time_in_seconds.$": "$.results.processing_time_in_seconds"
"processing_times.$": "$.results.processing_times"
},
"Retry": [
{
Expand All @@ -265,7 +272,7 @@
"status_code": "FAILED",
"logs.$": "$.results.get_files.logs",
"expiration_time.$": "$.results.get_files.expiration_time",
"processing_time_in_seconds.$": "$.results.processing_time_in_seconds"
"processing_times.$": "$.results.processing_times"
},
"Retry": [
{
Expand Down
18 changes: 11 additions & 7 deletions apps/upload-log/src/upload_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
S3 = boto3.client('s3')


def get_log_stream(processing_results: dict) -> Optional[str]:
if 'Error' in processing_results:
processing_results = json.loads(processing_results['Cause'])
return processing_results['Container'].get('LogStreamName')
def get_log_stream(result: dict) -> Optional[str]:
if 'Error' in result:
result = json.loads(result['Cause'])
return result['Container'].get('LogStreamName')


def get_log_content(log_group, log_stream):
Expand Down Expand Up @@ -58,9 +58,13 @@ def write_log_to_s3(bucket, prefix, content):


def lambda_handler(event, context):
# TODO handle all results, not just the last one
results_dict = event['processing_results']
result = results_dict[max(results_dict.keys())]

log_content = None

log_stream = get_log_stream(event['processing_results'])
log_stream = get_log_stream(result)
if log_stream is not None:
try:
log_content = get_log_content(event['log_group'], log_stream)
Expand All @@ -69,7 +73,7 @@ def lambda_handler(event, context):
raise

if log_content is None:
assert 'Error' in event['processing_results']
log_content = get_log_content_from_failed_attempts(json.loads(event['processing_results']['Cause']))
assert 'Error' in result
log_content = get_log_content_from_failed_attempts(json.loads(result['Cause']))

write_log_to_s3(environ['BUCKET'], event['prefix'], log_content)
68 changes: 38 additions & 30 deletions tests/test_check_processing_time.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
import pytest

import check_processing_time


def test_single_attempt():
attempts = [{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 2800}]
result = check_processing_time.get_time_from_attempts(attempts)
assert result == 2.3
assert check_processing_time.get_time_from_attempts(attempts) == 2.3


def test_multiple_attempts():
attempts = [
{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000},
{'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700}
]
result = check_processing_time.get_time_from_attempts(attempts)
assert result == 5.7
assert check_processing_time.get_time_from_attempts(attempts) == 5.7


def test_unsorted_attempts():
Expand All @@ -25,8 +21,7 @@ def test_unsorted_attempts():
{'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700},
{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}
]
result = check_processing_time.get_time_from_attempts(attempts)
assert result == 5.7
assert check_processing_time.get_time_from_attempts(attempts) == 5.7


def test_missing_start_time():
Expand All @@ -37,37 +32,50 @@ def test_missing_start_time():
{'Container': {}, 'StatusReason': '', 'StoppedAt': 8700},
{'Container': {}, 'StartedAt': 12000, 'StatusReason': '', 'StoppedAt': 15200}
]
result = check_processing_time.get_time_from_attempts(attempts)
assert result == 3.2
assert check_processing_time.get_time_from_attempts(attempts) == 3.2


def test_no_attempts():
with pytest.raises(ValueError, match='no Batch job attempts'):
check_processing_time.get_time_from_attempts([])
assert check_processing_time.get_time_from_attempts([]) == 0


def test_lambda_handler_with_normal_results():
event = {
'processing_results': {
'Attempts': [
{'Container': {}, 'StartedAt': 1644609403693, 'StatusReason': '', 'StoppedAt': 1644609919331},
{'Container': {}, 'StartedAt': 1644610107570, 'StatusReason': '', 'StoppedAt': 1644611472015}
]
}
def test_get_time_from_result():
result = {
'Attempts': [
{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000},
{'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700}
]
}
assert check_processing_time.get_time_from_result(result) == 5.7


def test_get_time_from_result_failed():
result = {
'Error': 'States.TaskFailed',
'Cause': '{"Attempts": ['
'{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, '
'{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, '
'{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}'
}
response = check_processing_time.lambda_handler(event, None)
assert response == (1644611472015 - 1644610107570) / 1000
assert check_processing_time.get_time_from_result(result) == 6.4


def test_lambda_handler_with_failed_results():
def test_lambda_handler():
event = {
'processing_results': {
'Error': 'States.TaskFailed',
'Cause': '{"Attempts": ['
'{"Container": {}, "StartedAt": 1643834765893, "StatusReason": "", "StoppedAt": 1643834766455}, '
'{"Container": {}, "StartedAt": 1643834888866, "StatusReason": "", "StoppedAt": 1643834889448}, '
'{"Container": {}, "StartedAt": 1643834907858, "StatusReason": "", "StoppedAt": 1643834908466}]}'
'step_0': {
'Attempts': [
{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000},
{'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700}
]
},
'step_1': {
'Error': 'States.TaskFailed',
'Cause': '{"Attempts": ['
'{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, '
'{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, '
'{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}'
},
}
}
response = check_processing_time.lambda_handler(event, None)
assert response == (1643834908466 - 1643834907858) / 1000
assert check_processing_time.lambda_handler(event, None) == [5.7, 6.4]
Loading

0 comments on commit b0500d8

Please sign in to comment.