Skip to content

Commit

Permalink
Merge pull request #54 from mitre/last_minute_fixes
Browse files Browse the repository at this point in the history
Additional fixes based on partner testing
  • Loading branch information
dehall authored Mar 1, 2023
2 parents 26bd083 + 3313560 commit a4935df
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 59 deletions.
27 changes: 2 additions & 25 deletions data_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import re
import time
from datetime import date, datetime
from datetime import datetime

import pandas as pd

Expand Down Expand Up @@ -56,16 +56,8 @@ def analyze(data, source):
"missing": int(dob_col.isna().sum()),
}

expected_min_dob = "1997-01-01"
# roughly, 19 years old at the start of CODI observation period (2016)
# expected_max_dob is trickier because we don't know when the data was populated
# TODO: add another command line arg?

if source == "csv":
if "-" in notnull_dobs[0]:
out_of_range = notnull_dobs[notnull_dobs < expected_min_dob]
stats["dob"]["count_earlier_dob_than_expected"] = len(out_of_range)
else:
if "-" not in notnull_dobs.iloc[0]:
# the date format will either be YYYY-MM-DD or YYMMDD
# we'll assume it's consistent across a single file

Expand All @@ -81,21 +73,6 @@ def analyze(data, source):
) # str-ify the Timestamps again
stats["dob"]["max_parsed"] = str(parsed_dobs.max())

expected_min_dob = pd.to_datetime(expected_min_dob, format="%Y-%m-%d")

out_of_range = parsed_dobs[parsed_dobs < expected_min_dob]
stats["dob"]["count_earlier_dob_than_expected"] = len(out_of_range)

else:
# different DBs may return different data types for the DOB column
if type(notnull_dobs[0]) is date:
expected_min_dob = date.fromisoformat(expected_min_dob)
# else
# assume it's a string in ISO format, no change should be needed

out_of_range = notnull_dobs[notnull_dobs < expected_min_dob]
stats["dob"]["count_earlier_dob_than_expected"] = len(out_of_range)

sex_col = case_insensitive_lookup(data, "sex", source)
stats["sex"] = top_N(sex_col)

Expand Down
12 changes: 10 additions & 2 deletions extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ def clean_zip(household_zip):


def clean_dob_fromstr(dob_str, date_format):
if not dob_str:
return ""
return strftime("%Y-%m-%d", strptime(dob_str, date_format))


Expand Down Expand Up @@ -230,11 +232,17 @@ def handle_row(row, report, version):
output_row.append(clean_string(family_name))

dob = case_insensitive_lookup(row, "DOB", version)
output_row.append(dob.strftime("%Y-%m-%d"))
if dob:
output_row.append(dob.strftime("%Y-%m-%d"))
else:
output_row.append("")

sex = case_insensitive_lookup(row, "sex", version)
validate(report, "sex", sex)
output_row.append(sex.strip())
if sex:
output_row.append(sex.strip())
else:
output_row.append("")

phone_number = case_insensitive_lookup(row, "phone", version)
validate(report, "phone_number", phone_number)
Expand Down
124 changes: 92 additions & 32 deletions utils/data_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def add_parser_db_args(parser):
default=V2,
choices=[V1, V2],
help="Version of the CODI Data Model schema to use. "
f'Valid options are "{V1}" or "{V2}"',
f'Valid options are "{V1}" or "{V2}". Default is "{V2}"',
)

parser.add_argument(
Expand All @@ -79,8 +79,29 @@ def add_parser_db_args(parser):
"--schema_name",
dest="v2_schema",
default="cdm",
help="Name of the database schema containing the CODI DEMOGRAPHIC"
" and PRIVATE_DEMOGRAPHIC tables",
help="Name of the database schema containing the PRIVATE_DEMOGRAPHIC"
" and PRIVATE_ADDRESS_HISTORY tables in a v2 database. "
"Default is 'cdm'",
)
parser.add_argument(
"--address_selection",
dest="v2_address_selection",
choices=["full", "preferred", "single"],
default="full",
help="Determines the approach for selecting a single address per PATID"
' from PRIVATE_ADDRESS_HISTORY. Options: Use "single" if '
"the data is already guaranteed to only contain one address per PATID."
' Use "preferred" if the database is guaranteed to only contain one '
"address with address_preferred='Y' per PATID. "
'Use "full" if the database may contain multiple preferred addresses'
" for different dates/types/use. This option will select "
"the most recent preferred address by start date."
"Default if not specified is 'full'",
)
parser.add_argument(
"--debug_query",
action="store_true",
help="Aids in debugging by printing out the actual DB query being run",
)


Expand Down Expand Up @@ -171,6 +192,10 @@ def get_query(engine, version, args):
)

query = select([identifier])

if args.debug_query:
print(query)

return query
else:
# note there is also the `demographic` table, but
Expand All @@ -193,36 +218,71 @@ def get_query(engine, version, args):
schema=args.v2_schema,
)

addr_period_order = prv_address.columns.address_period_start.desc()

# Different SQL engines have different semantics for sorting DESC:
# Postgres and Oracle put nulls first, so we want NULLS LAST
# MSSQL puts nulls last, but doesn't support NULLS LAST
# so we use this hack to get NULLS LAST for all main dialects.
# For safety, in case other engines also don't support NULLS LAST,
# only apply it to the ones that we know it works on
# (vs not applying it to the ones we know it doesn't)

# TODO: test on MySQL - deferring since none of our partners use it now

# known engine dialect names are "mssql", "postgresql", and "oracle"
if engine.dialect.name in ["postgresql", "oracle"]:
addr_period_order = addr_period_order.nulls_last()

subquery = (
select(prv_address.columns.addressid)
.filter(prv_address.columns.patid == prv_demo.columns.patid)
.order_by(prv_address.columns.address_preferred.desc())
.order_by(addr_period_order)
.limit(1)
.correlate(prv_demo)
.scalar_subquery()
)
if args.v2_address_selection == "single":
# The user said their data is guaranteed to only have a single
# address per PATID. This simplifies the query to just
# join the tables together with no additional filters
query = select([prv_demo, prv_address]).filter(
prv_demo.columns.patid == prv_address.columns.patid
)
elif args.v2_address_selection == "preferred":
# The user said their data may have multiple addresses,
# but is guaranteed that only one per PATID will be preferred.
# This simplifies the query to just select ADDRESS_PREFERRED=Y
query = select([prv_demo, prv_address]).filter(
prv_demo.columns.patid == prv_address.columns.patid,
prv_address.columns.address_preferred == "Y",
)
else:
# The user indicated the data may have multiple preferreds,
# (or at least did not select one of the above options)
# so we select the most recent by date.
# The PCOR schema includes "type" (physical/postal/both/unknown)
# and "use" (home/word/temp/old/unknown) fields, and the hierarchy
# of the possible combination of those options is not well-defined.
# (eg, should we pick physical/work over a both-type/unknown-use?)
# For simplicity and performance we will just pick
# the first preferred address we find, sorting by date.
# Going forward, a better solution is likely to include all of
# an individuals' addresses in PPRL, rather than more complex ways
# of picking a single one.

addr_period_order = prv_address.columns.address_period_start.desc()

# Different SQL engines have different semantics for sorting DESC:
# Postgres and Oracle put nulls first, so we want NULLS LAST
# MSSQL puts nulls last, but doesn't support NULLS LAST
# so we use this hack to get NULLS LAST for all main dialects.
# For safety, in case other engines also don't support NULLS LAST,
# only apply it to the ones that we know it works on
# (vs not applying it to the ones we know it doesn't)

# TODO: test on MySQL - deferring since none of our partners use it now

# known engine dialect names are "mssql", "postgresql", and "oracle"
if engine.dialect.name in ["postgresql", "oracle"]:
addr_period_order = addr_period_order.nulls_last()

subquery = (
select(prv_address.columns.addressid)
.filter(
prv_address.columns.patid == prv_demo.columns.patid,
prv_address.columns.address_preferred == "Y",
)
.order_by(prv_address.columns.address_preferred.desc())
.order_by(addr_period_order)
.limit(1)
.correlate(prv_demo)
.scalar_subquery()
)

query = select([prv_demo, prv_address]).filter(
prv_demo.columns.patid == prv_address.columns.patid,
prv_address.columns.addressid == subquery,
)
query = select([prv_demo, prv_address]).filter(
prv_demo.columns.patid == prv_address.columns.patid,
prv_address.columns.addressid == subquery,
)

if args.debug_query:
print(query)

return query

Expand Down

0 comments on commit a4935df

Please sign in to comment.