-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
68 lines (55 loc) · 2.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""ETL script for raw Epi/HAI sequencing report pdf."""
import io
from datetime import datetime
from capepy.aws.glue import EtlJob
from pypdf import PdfReader
from tabula.io import read_pdf
etl_job = EtlJob()
# NOTE: for now we'll take the alert object key and change out the file
# extension for the clean data (leaving all namespacing and such). this
# will probably need to change
clean_obj_key = etl_job.parameters["OBJECT_KEY"].replace(".pdf", ".csv")
# the response should contain a StreamingBody object that needs to be converted
# to a file like object to make the pdf libraries happy
f = io.BytesIO(etl_job.get_raw_file())
try:
# get the report date from the 4th line of the pdf
reader = PdfReader(f)
page = reader.pages[0]
date_reported = page.extract_text().split("\n")[3].strip()
datetime.strptime(date_reported, "%m/%d/%Y")
except ValueError as err:
err_message = (
f"ERROR - Could not properly read sequencing report date. "
f"ETL will continue."
f"{err}"
)
etl_job.logger.error(err_message)
date_reported = ""
try:
# get two tables from the pdf
tables = read_pdf(f, multiple_tables=True, pages=2)
assert isinstance(tables, list)
mlst_st = tables[0]
genes = tables[1]
except (IndexError, KeyError) as err:
err_message = (
f"ERROR - Could not properly read sequencing PDF tables. "
f"ETL Cannot continue."
f"{err}"
)
etl_job.logger.error(err_message)
# NOTE: need to properly handle exception stuff here, and we probably
# want this going somewhere very visible (e.g. SNS topic or a
# perpetual log as someone will need to be made aware)
raise Exception(err_message)
# filter the columns we need and join the tables together
interim = mlst_st[["Accession_ID", "WGS_ID", "MLST_ST"]]
genes_inter = genes.set_index("Unnamed: 0").T
genes_interim = genes_inter.filter(regex="(NDM|KPC|IMP|OXA|VIM|CMY)", axis=1)
interim = interim.join(genes_interim, on="WGS_ID")
interim["Date Reported"] = date_reported
# write out the transformed data
with io.StringIO() as csv_buff:
interim.to_csv(csv_buff, index=False)
etl_job.write_clean_file(csv_buff.getvalue(), clean_obj_key)