diff --git a/households.py b/households.py index 7513754..b2bb11e 100644 --- a/households.py +++ b/households.py @@ -8,14 +8,13 @@ import sys from datetime import datetime from pathlib import Path -from random import shuffle from zipfile import ZipFile import pandas as pd from definitions import TIMESTAMP_FMT from derive_subkey import derive_subkey -from households.matching import addr_parse, get_household_matches +from households.matching import get_household_matches HEADERS = ["HOUSEHOLD_POSITION", "PII_POSITIONS"] HOUSEHOLD_PII_HEADERS = [ @@ -76,6 +75,10 @@ def parse_arguments(): " Smaller numbers may result in out of memory errors. Larger numbers" " may increase runtime. Default is 4", ) + parser.add_argument( + "--pairsfile", + help="Location of matching pairs file", + ) parser.add_argument( "--debug", action="store_true", @@ -108,53 +111,29 @@ def parse_source_file(source_file, debug=False): if debug: print(f"[{datetime.now()}] Start loading PII file") - # force all columns to be strings, even if they look numeric - df = pd.read_csv(source_file, dtype=str) - - # break out the address into number, street, suffix, etc, - # so we can prefilter matches based on those - addr_cols = df.apply( - explode_address, - axis="columns", - result_type="expand", + # dtype=str means force all columns to be strings even if they look numeric + # keep_default_na keeps empty cells as empty string, not a NaN + # usecols means only read the given colummn names, + # aka don't read the columns that are never used here: given_name, DOB, sex + df = pd.read_csv( + source_file, + dtype=str, + keep_default_na=False, + usecols=[ + "record_id", + "family_name", + "phone_number", + "household_street_address", + "household_zip", + ], ) - df = pd.concat([df, addr_cols], axis="columns") if debug: - print(f"[{datetime.now()}] Done pre-processing PII file") + print(f"[{datetime.now()}] Done loading PII file") return df -def explode_address(row): - # this addr_parse function is relatively slow so only run it once per row. - # by caching the exploded dict this way we ensure - # that we have it in the right form in all the right places its needed - parsed = addr_parse(row.household_street_address) - parsed["exploded_address"] = parsed.copy() - parsed["exploded_address"][ - "household_street_address" - ] = row.household_street_address - return parsed - - -def write_households_pii(output_rows, household_time): - shuffle(output_rows) - timestamp = household_time.strftime(TIMESTAMP_FMT) - hh_pii_path = Path("temp-data") / f"households_pii-{timestamp}.csv" - with open( - hh_pii_path, - "w", - newline="", - encoding="utf-8", - ) as house_csv: - print(f"Writing households PII to {hh_pii_path}") - writer = csv.writer(house_csv) - writer.writerow(HOUSEHOLD_PII_HEADERS) - for output_row in output_rows: - writer.writerow(output_row) - - # Simple breadth-first-search to turn a graph-like structure of pairs # into a list representing the ids in the household def bfs_traverse_matches(pos_to_pairs, position): @@ -187,58 +166,99 @@ def get_default_pii_csv(dirname="temp-data"): return source_file -def write_mapping_file(pos_pid_rows, hid_pat_id_rows, args): +def write_pii_and_mapping_file(pos_pid_rows, hid_pat_id_rows, household_time, args): if args.sourcefile: source_file = Path(args.sourcefile) else: source_file = get_default_pii_csv() print(f"PII Source: {str(source_file)}") pii_lines = parse_source_file(source_file, args.debug) - output_rows = [] + + # pos_to_pairs is a dict of: + # (patient position) --> [matching pairs that include that patient] + # so it can be traversed sort of like a graph from any given patient + # note the key is patient position within the pii_lines dataframe + pos_to_pairs = get_household_matches( + pii_lines, args.split_factor, args.debug, args.pairsfile + ) + mapping_file = Path(args.mappingfile) + n_households = 0 with open(mapping_file, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.writer(csvfile) - writer.writerow(HEADERS) - already_added = set() - - # pos_to_pairs is a dict of: - # (patient position) --> [matching pairs that include that patient] - # so it can be traversed sort of like a graph from any given patient - # note the key is patient position within the pii_lines dataframe - pos_to_pairs = get_household_matches(pii_lines, args.split_factor, args.debug) + mapping_writer = csv.writer(csvfile) + mapping_writer.writerow(HEADERS) if args.debug: print(f"[{datetime.now()}] Assembling output file") - hclk_position = 0 - # Match households - for position, line in pii_lines.iterrows(): - if position in already_added: - continue - already_added.add(position) - - if position in pos_to_pairs: - pat_clks = bfs_traverse_matches(pos_to_pairs, position) - pat_ids = list(map(lambda p: pii_lines.at[p, "record_id"], pat_clks)) - already_added.update(pat_clks) - else: - pat_clks = [position] - pat_ids = [line[0]] - - string_pat_clks = [str(int) for int in pat_clks] - pat_string = ",".join(string_pat_clks) - writer.writerow([hclk_position, pat_string]) - n_households += 1 - pos_pid_rows.append([hclk_position, line[0]]) - for patid in pat_ids: - hid_pat_id_rows.append([hclk_position, patid]) - # note pat_ids_str will be quoted by the csv writer if needed - pat_ids_str = ",".join(pat_ids) - output_row = [line[2], line[5], line[6], line[7], pat_ids_str] - hclk_position += 1 - output_rows.append(output_row) - return output_rows, n_households + timestamp = household_time.strftime(TIMESTAMP_FMT) + hh_pii_path = Path("temp-data") / f"households_pii-{timestamp}.csv" + with open( + hh_pii_path, + "w", + newline="", + encoding="utf-8", + ) as hh_pii_csv: + print(f"Writing households PII to {hh_pii_path}") + pii_writer = csv.writer(hh_pii_csv) + pii_writer.writerow(HOUSEHOLD_PII_HEADERS) + + pii_lines["written_to_file"] = False + hclk_position = 0 + lines_processed = 0 + five_percent = int(len(pii_lines) / 20) + # Match households + for position, line in pii_lines.sample(frac=1).iterrows(): + # sample(frac=1) shuffles the entire dataframe + # note that "position" is the index and still relative to the original + + lines_processed += 1 + + if args.debug and (lines_processed % five_percent) == 0: + print( + f"[{datetime.now()}] Processing pii lines" + f" - {lines_processed}/{len(pii_lines)}" + ) + + if line["written_to_file"]: + continue + line["written_to_file"] = True + + if position in pos_to_pairs: + pat_positions = bfs_traverse_matches(pos_to_pairs, position) + # map those row numbers to PATIDs + pat_ids = list( + map(lambda p: pii_lines.at[p, "record_id"], pat_positions) + ) + # mark all these rows as written to file + pii_lines.loc[pat_positions, ["written_to_file"]] = True + else: + pat_positions = [position] + pat_ids = [line[0]] + + string_pat_positions = [str(p) for p in pat_positions] + pat_string = ",".join(string_pat_positions) + mapping_writer.writerow([hclk_position, pat_string]) + n_households += 1 + + if args.testrun: + pos_pid_rows.append([hclk_position, line[0]]) + for patid in pat_ids: + hid_pat_id_rows.append([hclk_position, patid]) + + # note pat_ids_str will be quoted by the csv writer if needed + pat_ids_str = ",".join(pat_ids) + output_row = [ + line["family_name"], + line["phone_number"], + line["household_street_address"], + line["household_zip"], + pat_ids_str, + ] + hclk_position += 1 + pii_writer.writerow(output_row) + return n_households def write_scoring_file(hid_pat_id_rows): @@ -297,8 +317,9 @@ def infer_households(args, household_time): hid_pat_id_rows = [] os.makedirs(Path("output") / "households", exist_ok=True) os.makedirs("temp-data", exist_ok=True) - output_rows, n_households = write_mapping_file(pos_pid_rows, hid_pat_id_rows, args) - write_households_pii(output_rows, household_time) + n_households = write_pii_and_mapping_file( + pos_pid_rows, hid_pat_id_rows, household_time, args + ) if args.testrun: write_scoring_file(hid_pat_id_rows) write_hid_hh_pos_map(pos_pid_rows) diff --git a/households/matching.py b/households/matching.py index 4d0d90a..6ba6429 100644 --- a/households/matching.py +++ b/households/matching.py @@ -1,4 +1,7 @@ +import csv +import gc from datetime import datetime +from pathlib import Path import numpy as np import pandas as pd @@ -7,7 +10,9 @@ import usaddress from recordlinkage.base import BaseCompareFeature -MATCH_THRESHOLD = 0.8 +from definitions import TIMESTAMP_FMT + +MATCH_THRESHOLD = 0.85 FN_WEIGHT = 0.2 PHONE_WEIGHT = 0.15 ADDR_WEIGHT = 0.35 @@ -210,10 +215,13 @@ def address_distance(addr1, addr2): # with a 0.6 adjustment is better a1 = addr1["household_street_address"] a2 = addr2["household_street_address"] - score = max( - score, - textdistance.jaro_winkler(a1, a2) * (weight_number + weight_street_name) * 0.6, - ) + (secondary_score * weight_secondary) + if a1 and a2: + score = max( + score, + textdistance.jaro_winkler(a1, a2) + * (weight_number + weight_street_name) + * 0.6, + ) + (secondary_score * weight_secondary) return score @@ -251,7 +259,95 @@ def comp_address_apply(x): return c -def get_household_matches(pii_lines, split_factor=4, debug=False): +def explode_address(row): + # this addr_parse function is relatively slow so only run it once per row. + # by caching the exploded dict this way we ensure + # that we have it in the right form in all the right places its needed + parsed = addr_parse(row.household_street_address) + parsed["exploded_address"] = parsed.copy() + parsed["exploded_address"][ + "household_street_address" + ] = row.household_street_address + return parsed + + +def get_household_matches(pii_lines, split_factor=4, debug=False, pairsfile=None): + if pairsfile: + if debug: + print(f"[{datetime.now()}] Loading matching pairs file") + + matching_pairs = pd.read_csv(pairsfile, index_col=[0, 1], header=None).index + gc.collect() + + if debug: + print(f"[{datetime.now()}] Done loading matching pairs") + + else: + # break out the address into number, street, suffix, etc, + # so we can prefilter matches based on those + addr_cols = pii_lines.apply( + explode_address, + axis="columns", + result_type="expand", + ) + pii_lines_exploded = pd.concat([pii_lines, addr_cols], axis="columns") + + if debug: + print(f"[{datetime.now()}] Done pre-processing PII file") + + candidate_links = get_candidate_links(pii_lines_exploded, split_factor, debug) + gc.collect() + + matching_pairs = get_matching_pairs( + pii_lines_exploded, candidate_links, split_factor, debug + ) + del candidate_links + del pii_lines_exploded + gc.collect() + + if debug: + timestamp = datetime.now().strftime(TIMESTAMP_FMT) + pairs_path = Path("temp-data") / f"households_pairs-{timestamp}.csv" + with open( + pairs_path, + "w", + newline="", + encoding="utf-8", + ) as pairs_csv: + print(f"[{datetime.now()}] Dumping matching pairs to file") + pairs_writer = csv.writer(pairs_csv) + for i in range(len(matching_pairs)): + pairs_writer.writerow(matching_pairs[i]) + print(f"[{datetime.now()}] Wrote matching pairs to {pairs_path}") + + five_percent = int(len(matching_pairs) / 20) + pos_to_pairs = {} + # note: "for pair in matching_pairs:" had unexpectedly poor performance here + for i in range(len(matching_pairs)): + pair = matching_pairs[i] + if debug and (i % five_percent) == 0: + print( + f"[{datetime.now()}] Building dict of matching pairs " + f"- {i}/{len(matching_pairs)}" + ) + + if pair[0] in pos_to_pairs: + pos_to_pairs[pair[0]].append(pair) + else: + pos_to_pairs[pair[0]] = [pair] + + if pair[1] in pos_to_pairs: + pos_to_pairs[pair[1]].append(pair) + else: + pos_to_pairs[pair[1]] = [pair] + + if debug: + print(f"[{datetime.now()}] Done building dict") + + return pos_to_pairs + + +def get_candidate_links(pii_lines, split_factor=4, debug=False): # indexing step defines the pairs of records for comparison # indexer.full() does a full n^2 comparison, but we can do better indexer = recordlinkage.Index() @@ -262,10 +358,11 @@ def get_household_matches(pii_lines, split_factor=4, debug=False): # (zip codes in a geographic area will be too similar) # but if data is dirty then blocks may discard typos - indexer.block(["household_zip", "street"]) + indexer.block(["household_zip", "street", "number"]) indexer.block(["household_zip", "family_name"]) candidate_links = None + # break up the dataframe into subframes, # and iterate over every pair of subframes. # we improve performance somewhat by only comparing looking forward, @@ -292,26 +389,35 @@ def get_household_matches(pii_lines, split_factor=4, debug=False): f"[{subset_B.index.min()}..{subset_B.index.max()}]" ) + # note pairs_subset and candidate_links are MultiIndexes pairs_subset = indexer.index(subset_A, subset_B) + # now we have to remove duplicate and invalid pairs + # e.g. (1, 2) and (2, 1) should not both be in the list + # and (1, 1) should not be in the list + # the simple approach is just take the items where a < b + # unfortunately we have to loop it through a dataframe. + # this is done on the subset rather than the entire list + # to make sure we don't potentially duplicate a massive list + # and crash with OOM + pairs_subset = pairs_subset.to_frame() + pairs_subset = pairs_subset[pairs_subset[0] < pairs_subset[1]] + pairs_subset = pd.MultiIndex.from_frame(pairs_subset) + if candidate_links is None: candidate_links = pairs_subset else: candidate_links = candidate_links.append(pairs_subset) - # now we have to remove duplicate and invalid pairs - # e.g. (1, 2) and (2, 1) should not both be in the list - # and (1, 1) should not be in the list - # the simple approach is just take the items where a < b - - # unfortunately we have to loop it through a dataframe to drop items - clf = candidate_links.to_frame() - clf = clf[clf[0] < clf[1]] - candidate_links = pd.MultiIndex.from_frame(clf) + gc.collect() if debug: print(f"[{datetime.now()}] Found {len(candidate_links)} candidate pairs") + return candidate_links + + +def get_matching_pairs(pii_lines, candidate_links, split_factor, debug): # Comparison step performs the defined comparison algorithms # against the candidate pairs compare_cl = recordlinkage.Compare() @@ -339,7 +445,7 @@ def get_household_matches(pii_lines, split_factor=4, debug=False): if debug: print(f"[{datetime.now()}] Starting detailed comparison of indexed pairs") - matching_pairs = [] + matching_pairs = None # we know that we could support len(subset_A) in memory above, # so use the same amount here len_subset_A = int(len(pii_lines) / split_factor) @@ -372,23 +478,24 @@ def get_household_matches(pii_lines, split_factor=4, debug=False): # filter the matches down based on the cumulative score matches = features[features.sum(axis=1) > MATCH_THRESHOLD] - matching_pairs.extend(list(matches.index)) + if matching_pairs is None: + matching_pairs = matches.index + else: + matching_pairs = matching_pairs.append(matches.index) # matching pairs are bi-directional and not duplicated, # ex if (1,9) is in the list then (9,1) won't be - if debug: - print(f"[{datetime.now()}] Found {len(matching_pairs)} pairs") + if debug: + print(f"[{datetime.now()}] {len(matching_pairs)} matching pairs so far") - pos_to_pairs = {} - for pair in matching_pairs: - if pair[0] in pos_to_pairs: - pos_to_pairs[pair[0]].append(pair) - else: - pos_to_pairs[pair[0]] = [pair] + del features + del matches + gc.collect() - if pair[1] in pos_to_pairs: - pos_to_pairs[pair[1]].append(pair) - else: - pos_to_pairs[pair[1]] = [pair] + # drop exploded address because it's not used past this point + pii_lines.drop(columns=["exploded_address"], inplace=True) - return pos_to_pairs + if debug: + print(f"[{datetime.now()}] Found {len(matching_pairs)} matching pairs") + + return matching_pairs diff --git a/requirements.txt b/requirements.txt index 557063e..e5f93b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ clkhash>=0.16.0 psycopg2>=2.8.3 anonlink-client==0.1.5 ijson>=3.1.2 -textdistance>=4.2.1 +textdistance[extras]>=4.5.0 usaddress>=0.5.10 pylint>=2.4.2 tqdm>=4.36.1