forked from IQuOD/AutoQC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAutoQC.py
88 lines (74 loc) · 2.82 KB
/
AutoQC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from wodpy import wod
import glob, time
import numpy as np
import sys, os, data.ds
import util.main as main
import pandas
import psycopg2
from multiprocessing import Pool
import tempfile
def run(test, profiles, parameters):
'''
run <test> on a list of <profiles>, return an array summarizing when exceptions were raised
'''
qcResults = []
verbose = []
exec('from qctests import ' + test)
for profile in profiles:
exec('result = ' + test + '.test(profile, parameters)')
#demand tests returned bools:
for i in result:
assert isinstance(i, np.bool_), str(i) + ' in test result list is of type ' + str(type(i))
qcResults.append(np.any(result))
verbose.append(result)
return [qcResults, verbose]
########################################
# main
########################################
if len(sys.argv)>2:
# Identify and import tests
testNames = main.importQC('qctests')
testNames.sort()
print('{} quality control checks have been found'.format(len(testNames)))
testNames = main.checkQCTestRequirements(testNames)
print('{} quality control checks are able to be run:'.format(len(testNames)))
for testName in testNames:
print(' {}'.format(testName))
# Parallel processing.
print('\nPlease wait while QC is performed\n')
def process_row(uid):
'''run all tests on the indicated database row'''
# extract profile
profile = main.get_profile_from_db(uid)
# Check that there are temperature data in the profile, otherwise skip.
if profile.var_index() is None:
return
main.catchFlags(profile)
if np.sum(profile.t().mask == False) == 0:
return
# run tests
for itest, test in enumerate(testNames):
result = run(test, [profile], parameterStore)
query = "UPDATE " + sys.argv[1] + " SET " + test.lower() + " = " + str(result[0][0]) + " WHERE uid = " + str(profile.uid()) + ";"
main.dbinteract(query)
# set up global parmaeter store
parameterStore = {}
for test in testNames:
exec('from qctests import ' + test)
try:
exec(test + '.loadParameters(parameterStore)')
except:
print 'No parameters to load for', test
# connect to database & fetch list of all uids
query = 'SELECT uid FROM ' + sys.argv[1] + ' ORDER BY uid OFFSET ' + sys.argv[2] + ' LIMIT ' + str(int(sys.argv[3]) - int(sys.argv[2])) + ';'
uids = main.dbinteract(query)
# launch async processes
pool = Pool(processes=1)
for i in range(len(uids)):
pool.apply_async(process_row, (uids[i][0],))
pool.close()
pool.join()
else:
print 'Please add command line arguments to name your output file and set parallelization:'
print 'python AutoQC <database table> <from> <to>'
print 'will write qc results to <database table> in the database, and run the calculation on database rows starting at <from> and going to but not including <to>.'