From 0769fce732cb1bb8444ff552f65417dc7a7a93bb Mon Sep 17 00:00:00 2001 From: Dave Peck Date: Sun, 26 Nov 2023 20:35:00 -0500 Subject: [PATCH] Tooling to work with raw FEC filings, slim down large datasets, and identify likely identical voters on record (#1) * fix zipcode stuff * Clean up zip data. * Support nicknames for voters * Load and index raw FEC committee data. * Testing the ETL pipeline in detail. Committed to a schema. * Rename individual -> contributions * Working on contribution tests * Test contribution summaries * Test contributions, mostly complete. * FEC command line tool. * Fix contributions, maybe * Committee lookup * Fix nicknames a bit * Process apple contact book backups. --- fec.py | 167 ++++++++ munge.py | 293 ------------- nicknames.py | 75 ---- scripts/test.sh | 2 +- server/data/__init__.py | 11 + server/data/contacts/__init__.py | 39 ++ server/data/contacts/abbu.py | 83 ++++ server/{ => data}/fec/__init__.py | 0 server/data/fec/committees.py | 274 ++++++++++++ server/data/fec/contributions.py | 565 +++++++++++++++++++++++++ server/data/fec/test_committees.py | 168 ++++++++ server/data/fec/test_contributions.py | 529 +++++++++++++++++++++++ server/{data.py => data/manager.py} | 6 +- server/{ => data}/names/__init__.py | 0 server/data/names/nicknames.py | 259 ++++++++++++ server/data/names/test_nicknames.py | 199 +++++++++ server/{ => data}/usps/__init__.py | 0 server/data/usps/city_state.py | 7 + server/data/usps/metros.py | 143 +++++++ server/data/usps/test_metros.py | 17 + server/{ => data}/usps/test_zipcode.py | 16 +- server/{ => data}/usps/zipcode.py | 58 ++- server/utils/test_validations.py | 91 ++++ server/utils/validations.py | 97 ++++- 24 files changed, 2700 insertions(+), 399 deletions(-) create mode 100755 fec.py delete mode 100755 munge.py delete mode 100755 nicknames.py create mode 100644 server/data/__init__.py create mode 100644 server/data/contacts/__init__.py create mode 100644 server/data/contacts/abbu.py rename server/{ => data}/fec/__init__.py (100%) create mode 100644 server/data/fec/committees.py create mode 100644 server/data/fec/contributions.py create mode 100644 server/data/fec/test_committees.py create mode 100644 server/data/fec/test_contributions.py rename server/{data.py => data/manager.py} (70%) rename server/{ => data}/names/__init__.py (100%) create mode 100644 server/data/names/nicknames.py create mode 100644 server/data/names/test_nicknames.py rename server/{ => data}/usps/__init__.py (100%) create mode 100644 server/data/usps/city_state.py create mode 100644 server/data/usps/metros.py create mode 100644 server/data/usps/test_metros.py rename server/{ => data}/usps/test_zipcode.py (57%) rename server/{ => data}/usps/zipcode.py (67%) diff --git a/fec.py b/fec.py new file mode 100755 index 0000000..5b14a37 --- /dev/null +++ b/fec.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +# ruff: noqa: E501 + +import json + +import click + +from server.data.contacts import Contact, IContactProvider, SimpleContactProvider +from server.data.contacts.abbu import DirectoryABBUManager, ZipABBUManager +from server.data.fec.committees import CommitteeManager +from server.data.fec.contributions import ( + ContributionsManager, + ContributionSummariesManager, + FuzzyIdentifier, +) +from server.data.manager import DataManager +from server.data.names.nicknames import MessyNicknamesManager, NicknamesManager + + +@click.group() +def fec(): + """Work with FEC data.""" + pass + + +@fec.group() +def names(): + """Work with names data.""" + pass + + +@names.command() +@click.option( + "--data", + type=click.Path(exists=True), + help="Path to data dir.", + required=False, + default=None, +) +def clean(data: str | None = None): + """Clean raw names data.""" + data_manager = DataManager(data) if data is not None else DataManager.default() + messy_names_manager = MessyNicknamesManager.from_data_manager(data_manager) + nicknames_manager = messy_names_manager.nicknames_manager + nicknames_manager.to_jsonl_data_manager(data_manager) + + +@fec.group() +def committees(): + """Work with FEC committees data.""" + pass + + +@committees.command(name="lookup") +@click.argument("committee_id") +@click.option( + "--data", + type=click.Path(exists=True), + help="Path to data dir.", + required=False, + default=None, +) +def committee_lookup(committee_id: str, data: str | None = None): + """Search FEC committees data.""" + data_manager = DataManager(data) if data is not None else DataManager.default() + committees_manager = CommitteeManager.from_csv_data_manager(data_manager) + committee = committees_manager.get_committee(committee_id) + if committee is None: + print("No matching committee.") + else: + print(json.dumps(committee.to_data(), indent=2)) + + +@fec.group() +def contributions(): + """Work with FEC contributions data.""" + pass + + +@contributions.command() +@click.option( + "--data", + type=click.Path(exists=True), + help="Path to data dir.", + required=False, + default=None, +) +def summarize(data: str | None = None): + """Summarize raw FEC individual contribution data.""" + data_manager = DataManager(data) if data is not None else DataManager.default() + contributions_manager = ContributionsManager.from_data_manager(data_manager) + summaries_manager = contributions_manager.contribution_summaries_manager + summaries_manager.to_jsonl_data_manager(data_manager) + + +@contributions.command() +@click.argument("first_name", required=False, default=None) +@click.argument("last_name", required=False, default=None) +@click.argument("zip_code", required=False, default=None) +@click.option( + "-c", + "--contact-dir", + type=click.Path(exists=True, dir_okay=True, file_okay=False), + help="Path to a `.abbu` contacts dir.", + required=False, + default=None, +) +@click.option( + "-z", + "--contact-zip", + type=click.Path(exists=True, dir_okay=False, file_okay=True), + help="Path to a `.abbu` contacts zip file.", + required=False, + default=None, +) +@click.option( + "--data", + type=click.Path(exists=True), + help="Path to data dir.", + required=False, + default=None, +) +def search( + first_name: str | None = None, + last_name: str | None = None, + zip_code: str | None = None, + data: str | None = None, + contact_dir: str | None = None, + contact_zip: str | None = None, +): + """Search summarized FEC contributions data.""" + data_manager = DataManager(data) if data is not None else DataManager.default() + nicknames_manager = NicknamesManager.from_data_manager(data_manager) + summaries_manager = ContributionSummariesManager.from_data_manager(data_manager) + + contact_provider: IContactProvider | None = None + + if contact_dir is not None: + contact_provider = DirectoryABBUManager(contact_dir) + elif contact_zip is not None: + contact_provider = ZipABBUManager(contact_zip) + elif first_name and last_name and zip_code: + singleton = Contact(first_name, last_name, zip_code) + contact_provider = SimpleContactProvider([singleton]) + + if contact_provider is None: + raise click.UsageError( + "You must provide a contact dir, zip file, or explicit name & zip." + ) + + for contact in contact_provider.get_contacts(): + fuzzy_id = FuzzyIdentifier( + contact.last, + contact.first, + contact.zip_code, + get_nickname_index=nicknames_manager, + ).fuzzy_id + summary = summaries_manager.get_summary(fuzzy_id) + print(f"--> {contact.first} {contact.last} {contact.zip_code}") + if summary is None: + print("{}") + else: + print(json.dumps(summary.to_data(), indent=2)) + + +if __name__ == "__main__": + fec() diff --git a/munge.py b/munge.py deleted file mode 100755 index 1bb062b..0000000 --- a/munge.py +++ /dev/null @@ -1,293 +0,0 @@ -#!/usr/bin/env python3 -# ruff: noqa: E501 - -import datetime -import json -import typing as t -from dataclasses import dataclass -from decimal import Decimal - -import click -from tqdm import tqdm - -# See https://www.fec.gov/campaign-finance-data/contributions-individuals-file-description/ -type TransactionPGICode = t.Literal[ - "P", # Primary - "G", # General - "O", # Other - "C", # Convention - "R", # Runoff - "S", # Special - "E", # Recount -] - - -# See https://www.fec.gov/campaign-finance-data/contributions-individuals-file-description/ -type EntityTypeCode = t.Literal[ - "CAN", # Candidate - "CCM", # Candidate Committee - "COM", # Committee - "IND", # Individual (a person) - "ORG", # Organization (not a committee and not a person) - "PAC", # Political Action Committee - "PTY", # Party Organization -] - - -class Nicknames(t.TypedDict): - """A dictionary of nicknames, keyed by the canonical name.""" - - names: list[list[str]] - indexes: dict[str, int] - - -@dataclass(frozen=True) -class Committee: - name: str - party: str # Republican, Democrat, or Independent - - -@dataclass -class MergedContributions: - total: Decimal - by_party: dict[str, Decimal] - by_committee: dict[str, tuple[str, str, Decimal]] - - @classmethod - def empty(cls) -> "MergedContributions": - """Create an empty MergedContributions object.""" - return cls(total=Decimal(0), by_party={}, by_committee={}) - - def add( - self, contribution: "Contribution", committees: dict[str, Committee] - ) -> None: - """Add a contribution to this object.""" - self.total += Decimal(contribution.transaction_amount) - if contribution.committee_id in self.by_committee: - committee_name, party, amount = self.by_committee[contribution.committee_id] - amount += Decimal(contribution.transaction_amount) - self.by_committee[contribution.committee_id] = ( - committee_name, - party, - amount, - ) - else: - self.by_committee[contribution.committee_id] = ( - committees[contribution.committee_id].name, - committees[contribution.committee_id].party, - Decimal(contribution.transaction_amount), - ) - party_total = self.by_party.get( - committees[contribution.committee_id].party, Decimal(0) - ) - party_total += Decimal(contribution.transaction_amount) - self.by_party[committees[contribution.committee_id].party] = party_total - - -type ContributorID = tuple[str, str, str] - - -@dataclass(frozen=True) -class Contribution: - """ - A single row in an FEC invididual contributions dataset. - - See https://www.fec.gov/campaign-finance-data/contributions-individuals-file-description/ - """ - - committee_id: str # The FEC ID of the recipient committee (e.g. C00100005) - amendment_indicator: str # Whether the contribution is an amendment (e.g. N) - report_type: str # The type of report (e.g. Q2 -- see https://www.fec.gov/campaign-finance-data/report-type-code-descriptions/) - transaction_pgi: str # Type and cycle of election (e.g. P2018) - image_number: str # May be 11-digit or 18-digit format. (e.g. 201810170912345678) - transaction_type: str # The type of transaction (see https://www.fec.gov/campaign-finance-data/transaction-type-code-descriptions/) - entity_type: EntityTypeCode # The type of entity (e.g. IND) - name: str # The contributor's name (e.g. "SMITH, JOHN A") - city: str # The contributor's city (e.g. "NEW YORK") - state: str # The contributor's state (e.g. "NY") - zip_code: str # The contributor's ZIP code + 4 (e.g. "100212021") - employer: str # The contributor's employer (e.g. "SELF-EMPLOYED") - occupation: str # The contributor's occupation (e.g. "WRITER") - transaction_date_str: str # The date of the transaction (e.g. "20180630") - transaction_amount: str # The amount of the transaction (e.g. 1000.00) - other_id: str # The FEC ID of the donor if it is a committee (e.g. C00100005) - transaction_id: str # Identifies a single long-running transaction (e.g. SA11A1A.8317) - file_number: str # Identifies the electronic or paper report (e.g. 1316462) - memo_code: str # 'X' indicates that the amount is not to be included in the itemization total (e.g. X) - memo_text: str # A description of the transaction (e.g. "CONTRIBUTION REFUND") - sub_id: str # A unique identifier for each itemization (e.g. 4020820181532341437) - - @property - def is_individual(self) -> bool: - """Whether the contributor is an individual.""" - return self.entity_type == "IND" - - @property - def transaction_pgi_code(self) -> TransactionPGICode: - """The variety of election (e.g. P for primary).""" - return t.cast(TransactionPGICode, self.transaction_pgi[0]) - - @property - def transaction_pgi_year(self) -> int: - """The year of the election (e.g. 2020).""" - return int(self.transaction_pgi[1:]) - - @property - def transaction_date(self) -> datetime.date: - """The date of the transaction.""" - return datetime.datetime.strptime(self.transaction_date_str, "%Y%m%d").date() - - @property - def zip5(self) -> str: - """The first five digits of the contributor's ZIP code.""" - return self.zip_code[:5] - - @property - def normalized_last_name(self) -> str: - """The last name of the contributor, normalized.""" - return self.name.split(",")[0].strip().upper() - - @property - def normalized_first_name(self) -> str: - """The first name of the contributor, normalized.""" - try: - return self.name.split(",")[1].strip().split(" ")[0].strip().upper() - except IndexError: - return "UNKNOWN" - - def get_contributor_id(self, nicknames: Nicknames) -> ContributorID: - """Get a unique identifier for the contributor.""" - last_name = self.normalized_last_name - first_name = str( - nicknames["indexes"].get( - self.normalized_first_name, self.normalized_first_name - ) - ) - zip5 = self.zip5 - return (last_name, first_name, zip5) - - @classmethod - def from_line(cls, line: str) -> "Contribution": - """ - Create a Contribution from a line of text. - - See https://www.fec.gov/campaign-finance-data/contributions-individuals-file-description/ - """ - ( - committee_id, - amendment_indicator, - report_type, - transaction_pgi, - image_number, - transaction_type, - entity_type, - name, - city, - state, - zip_code, - employer, - occupation, - transaction_date_str, - transaction_amount, - other_id, - transaction_id, - file_number, - memo_code, - memo_text, - sub_id, - ) = line.split("|") - return cls( - committee_id=committee_id, - amendment_indicator=amendment_indicator, - report_type=report_type, - transaction_pgi=transaction_pgi, - image_number=image_number, - transaction_type=transaction_type, - entity_type=t.cast(EntityTypeCode, entity_type), - name=name, - city=city, - state=state, - zip_code=zip_code, - employer=employer, - occupation=occupation, - transaction_date_str=transaction_date_str, - transaction_amount=transaction_amount, - other_id=other_id, - transaction_id=transaction_id, - file_number=file_number, - memo_code=memo_code, - memo_text=memo_text, - sub_id=sub_id, - ) - - -@click.command() -@click.argument("fec_path", type=click.Path(exists=True, dir_okay=False)) -@click.argument("nicks_path", type=click.Path(exists=True, dir_okay=False)) -@click.argument("committees_path", type=click.Path(exists=True, dir_okay=False)) -def munge(fec_path: str, nicks_path: str, committees_path: str): - """ - Munge some FEC data into a more compact form. - - Specifically, we attempt to take the 70M+ rows of individual - contributions data and reduce it to a more compact form. We seek - to identify 'unique' donors based on their name and geography. Because - people use common nicknames, we also use a nicknames file to unify - those names. - """ - print("Loading nicknames...", file=click.get_text_stream("stderr")) - with open(nicks_path) as nicks_file: - nicknames = t.cast(Nicknames, json.load(nicks_file)) - - print("Loading committees...", file=click.get_text_stream("stderr")) - with open(committees_path) as committees_file: - committees: dict[str, Committee] = {} - for line in committees_file: - split = line.split("|") - committees[split[0]] = Committee(name=split[1], party=split[10]) - - for key, value in list(committees.items())[:5]: - print(f"{key}: {value}", file=click.get_text_stream("stderr")) - - print("Munging FEC data...", file=click.get_text_stream("stderr")) - contributors: dict[ContributorID, MergedContributions] = {} - with open(fec_path) as fec_file: - for line in tqdm( - fec_file, desc="Munging FEC data", total=70_659_611, unit="row" - ): - contribution = Contribution.from_line(line) - if not contribution.is_individual: - continue - contributor_id = contribution.get_contributor_id(nicknames) - print(contributor_id, file=click.get_text_stream("stderr")) - if contributor_id in contributors: - contributors[contributor_id].add(contribution, committees) - else: - contributors[contributor_id] = MergedContributions.empty() - contributors[contributor_id].add(contribution, committees) - - for key, value in contributors.items(): - str_key = f"{key[0]}-{key[1]}-{key[2]}" - total = str(value.total) - party_json_safe = { - party: str(amount) for party, amount in value.by_party.items() - } - breakdown_json_safe = { - committee_id: [committee_name, party, str(amount)] - for committee_id, ( - committee_name, - party, - amount, - ) in value.by_committee.items() - } - jsonable = { - "id": str_key, - "total": total, - "party": party_json_safe, - "breakdown": breakdown_json_safe, - } - print(json.dumps(jsonable)) - - -if __name__ == "__main__": - munge() diff --git a/nicknames.py b/nicknames.py deleted file mode 100755 index 19cf973..0000000 --- a/nicknames.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python3 - -import json -import pathlib - -import click - - -@click.command() -@click.argument("input", type=click.Path(exists=True, dir_okay=False)) -def main(input: str): - """ - Read a messy nicknames data file. Create a single set of unique - matched names per line and write to stdout. - """ - input_path = pathlib.Path(input).resolve() - matching_names: list[set[str]] = [] - with input_path.open("rt") as input_file: - for line in input_file: - # Remove all commas - line = line.replace(",", "") - # Remove all slashes - line = line.replace("/", "") - # Remove parens, open and close - line = line.replace("(", "").replace(")", "") - # Break the line into a list of names -- split on any - # arbitrary number of spaces - names = line.split() - # Remove any empty strings - names = [stripped for name in names if (stripped := name.strip())] - # Remove any strings that don't start with a capital letter - names = [name for name in names if name[0].isupper()] - # Make a set of capitalized names - names_set = {name.upper() for name in names} - # Print it - matching_names.append(names_set) - - # Continuously merge sets that have overlapping names, until no - # more merges are possible - while True: - index = 0 - merged = False - while index < len(matching_names): - index2 = index + 1 - while index2 < len(matching_names): - if matching_names[index] & matching_names[index2]: - matching_names[index] |= matching_names[index2] - del matching_names[index2] - merged = True - else: - index2 += 1 - index += 1 - if not merged: - break - - name_to_index = {} - for index, names_set in enumerate(matching_names): - for name in names_set: - assert name not in name_to_index - name_to_index[name] = index - - # For each set in matching name, convert it to a sorted list - matching_names_list = [sorted(names) for names in matching_names] - - # Reorder name_to_index so that it's alphabetical by name - name_to_index = dict(sorted(name_to_index.items(), key=lambda x: x[0])) - - # Dump a final datastructure to stdout - print( - json.dumps({"names": matching_names_list, "indexes": name_to_index}, indent=2) - ) - - -if __name__ == "__main__": - main() diff --git a/scripts/test.sh b/scripts/test.sh index 82b42b3..accf27f 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -1,4 +1,4 @@ #!/bin/sh -pre-commit run --all +pre-commit run --all-files python -m unittest discover -s server diff --git a/server/data/__init__.py b/server/data/__init__.py new file mode 100644 index 0000000..43bffa7 --- /dev/null +++ b/server/data/__init__.py @@ -0,0 +1,11 @@ +"""Tools for working with all raw data files.""" + + +# CONSIDER: the FEC publishes what amounts to a relational dataset, and I +# originally considered just dumping stuff into a massive SQLite database. +# But then I got hooked on summarizing, and building fuzzy identifiers, and +# the code took a different form. In retrospect, the existence of IGetNicknameIndex +# and IGetCommittee just screams "dude, you shoulda used SQLAlchemy and done +# some ETL on the inbound side to slim it down". +# +# So this comment asks me to revisit this, and consider it a TODO. diff --git a/server/data/contacts/__init__.py b/server/data/contacts/__init__.py new file mode 100644 index 0000000..3604a39 --- /dev/null +++ b/server/data/contacts/__init__.py @@ -0,0 +1,39 @@ +"""Tools for working with contacts lists.""" + +import typing as t +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Contact: + """A contact in the address book.""" + + first: str + last: str + zip_code: str # Either 5 or 9 digits + + @property + def zip5(self) -> str: + """Returns the first 5 digits of the zip code.""" + return self.zip_code[:5] + + +class IContactProvider(t.Protocol): + """Defines a simple protocol for getting critical contact information.""" + + def get_contacts(self) -> t.Iterable[Contact]: + """Return an iterator of contacts.""" + ... + + +class SimpleContactProvider: + """A simple IContactProvider implementation.""" + + _contacts: list[Contact] + + def __init__(self, contacts: t.Iterable[Contact]): + self._contacts = list(contacts) + + def get_contacts(self) -> t.Iterable[Contact]: + """Return an iterator of contacts.""" + return iter(self._contacts) diff --git a/server/data/contacts/abbu.py b/server/data/contacts/abbu.py new file mode 100644 index 0000000..47ea7d2 --- /dev/null +++ b/server/data/contacts/abbu.py @@ -0,0 +1,83 @@ +import abc +import pathlib +import plistlib +import typing as t +import zipfile + +from server.utils.validations import validate_extant_dir, validate_extant_file + +from . import Contact + + +class ABBUManagerBase(abc.ABC): + """ + An abstract IContactProvider (see __init__.py) that supports Apple's Address Book + Backup format. We offer two implementations: one for a directory and one for a + zip file. + """ + + @abc.abstractmethod + def get_abpersons(self) -> t.Iterable[t.IO[bytes]]: + """Return an iterator of abpersons.""" + ... + + def get_contacts(self) -> t.Iterable[Contact]: + """Return an iterator of contacts.""" + for abperson in self.get_abpersons(): + maybe_abperson = self._parse_abperson(abperson) + if maybe_abperson: + yield maybe_abperson + + def _parse_abperson(self, abperson: t.IO[bytes]) -> Contact | None: + """Parse an abperson file into a Contact.""" + try: + plist_data = plistlib.load(abperson) + first = plist_data["First"].title() + last = plist_data["Last"].title() + # use the preferred zip code if it exists + zip_code = plist_data["Address"]["values"][0]["ZIP"].replace("-", "") + except Exception: + return None + if len(zip_code) not in {5, 9}: + return None + return Contact(first, last, zip_code) + + +class DirectoryABBUManager(ABBUManagerBase): + """An ABBUManager that expects its contents to be in a local directory.""" + + _path: pathlib.Path + + def __init__(self, path: str | pathlib.Path): + """Initialize a new instance of the DirectoryABBUManager class.""" + self._path = validate_extant_dir(pathlib.Path(path)) + + def get_abpersons(self) -> t.Iterable[t.IO[bytes]]: + """Return an iterator of abpersons.""" + for path in self._path.glob("**/Sources/**/*ABPerson.abcdp"): + yield path.open("rb") + + +class ZipABBUManager(ABBUManagerBase): + """ + An IContactProvider (see __init__.py) that supports Apple's Address Book Backup + format. We can be handed a path to an `abbu` directory *or* a path to a single + zip file that *is* an `abbu` directory. + """ + + _path: pathlib.Path + + def __init__(self, path: str | pathlib.Path): + """Initialize a new instance of the ZipAddressBookBackupManager class.""" + self._path = validate_extant_file(pathlib.Path(path)) + + def get_abpersons(self) -> t.Iterable[t.IO[bytes]]: + """Return an iterator of abpersons.""" + with zipfile.ZipFile(self._path) as zip_file: + for info in zip_file.infolist(): + if ( + info.filename.endswith("ABPerson.abcdp") + and "Sources" in info.filename + and "_MACOSX" not in info.filename + ): + yield zip_file.open(info) diff --git a/server/fec/__init__.py b/server/data/fec/__init__.py similarity index 100% rename from server/fec/__init__.py rename to server/data/fec/__init__.py diff --git a/server/data/fec/committees.py b/server/data/fec/committees.py new file mode 100644 index 0000000..99a3e37 --- /dev/null +++ b/server/data/fec/committees.py @@ -0,0 +1,274 @@ +""" +Support reading FEC committee master file content. + +You can download per-election-cycle committee master files from: +https://www.fec.gov/data/browse-data/?tab=bulk-data + +The schema for the committee master file is available at: +https://www.fec.gov/campaign-finance-data/committee-master-file-description/ +""" +import csv +import json +import pathlib +import typing as t +from dataclasses import dataclass + +from server.data.manager import DataManager +from server.utils import validations as v + + +class CommitteeTypeCode: + """ + Committee type codes. + + See: + https://www.fec.gov/campaign-finance-data/committee-type-code-descriptions/ + """ + + COMMUNICATION_COST = "C" + DELEGATE_COMMITTEE = "D" + ELECTIONEERING_COMMUNICATION = "E" + HOUSE = "H" + INDEPEDENT_PERSON_OR_GROUP = "I" + PAC_NONQUALIFIED = "N" + INDEPEDENT_SUPER_PAC = "O" + PRESIDENTIAL = "P" + PAC_QUALIFIED = "Q" + SENATE = "S" + SINGLE_CANDIDATE_INDEPENDENT_EXPENDITURE = "U" + HYBRID_PAC_NONQUALIFIED = "V" + HYBRID_PAC_QUALIFIED = "W" + PARTY_NONQUALIFIED = "X" + PARTY_QUALIFIED = "Y" + NATIONAL_PARTY_NONFEDERAL = "Z" + + @classmethod + def name_for_code(cls, code: str) -> str | None: + """Return the name for the given committee type code.""" + for attr in dir(CommitteeTypeCode): + if not attr.startswith("__"): + if getattr(CommitteeTypeCode, attr) == code: + return attr.replace("_", " ").title() + return None + + +class CommitteeColumns: + """ + Column indices for the committee master file. + + See: + https://www.fec.gov/campaign-finance-data/committee-master-file-description/ + """ + + ID = 0 # CMTE_ID + NAME = 1 # CMTE_NM + TREASURER_NAME = 2 # TRES_NM + STREET_1 = 3 # CMTE_ST1 + STREET_2 = 4 # CMTE_ST2 + CITY = 5 # CMTE_CITY + STATE = 6 # CMTE_ST + ZIP_CODE = 7 # CMTE_ZIP + DESIGNATION = 8 # CMTE_DSGN + TYPE = 9 # CMTE_TP + PARTY = 10 # CMTE_PTY_AFFILIATION + ORG_TYPE = 11 # ORG_TP + CONNECTED_ORG_NAME = 12 # CONNECTED_ORG_NM + CANDIDATE_ID = 13 # CAND_ID + + +class Party: + """ + Political party codes. + + For an (incredibly) exhaustive list, see: + https://www.fec.gov/campaign-finance-data/party-code-descriptions/ + """ + + REPUBLICAN = "REP" + DEMOCRAT = "DEM" + INDEPENDENT = "IND" + LIBERTARIAN = "LIB" + GREEN = "GRE" + UNKNOWN = "UNK" # We specifically ignore this/convert to None + + @classmethod + def name_for_code(cls, code: str) -> str | None: + """Return the name for the given party code.""" + for attr in dir(Party): + if not attr.startswith("__"): + if getattr(Party, attr) == code: + return attr.title() + return None + + +@dataclass(frozen=True) +class Committee: + """Our simplification of the committee record.""" + + id: str + name: str + party: str | None + candidate_id: str | None + + @classmethod + def from_committee_row(cls, row: t.Sequence[str]) -> "Committee": + """Create a committee from a row of the committee master file.""" + data = { + "id": row[CommitteeColumns.ID].strip(), + "name": row[CommitteeColumns.NAME].strip(), + } + party = row[CommitteeColumns.PARTY].strip().upper() + if party and party != Party.UNKNOWN: + data["party"] = party + candidate_id = row[CommitteeColumns.CANDIDATE_ID].strip() + if candidate_id: + data["candidate_id"] = candidate_id + return cls.from_data(data) + + @classmethod + def from_data(cls, value: t.Any) -> "Committee": + """Create a committee from arbitrary data, or raise an exception.""" + data = v.validate_dict(value) + return cls( + id=v.get_str(data, "id"), + name=v.get_str(data, "name"), + party=v.get_optional_str(data, "party"), + candidate_id=v.get_optional_str(data, "candidate_id"), + ) + + def to_data(self) -> dict: + """Return a dict representation of the committee.""" + data = { + "id": self.id, + "name": self.name, + } + if self.party is not None: + data["party"] = self.party + if self.candidate_id is not None: + data["candidate_id"] = self.candidate_id + return data + + +class IGetCommittee(t.Protocol): + """Interface for getting a committee.""" + + def get_committee(self, id: str) -> Committee | None: + """Get the committee with the given id, or None.""" + ... + + +class MockGetCommittee(IGetCommittee): + """A mock implementation of IGetCommittee.""" + + _id_to_committee: dict[str, Committee] + + def __init__(self, committees: t.Sequence[Committee]) -> None: + """Create a mock implementation.""" + self._id_to_committee = {committee.id: committee for committee in committees} + + def get_committee(self, id: str) -> Committee | None: + """Get the committee with the given id, or None.""" + return self._id_to_committee.get(id) + + +class CommitteeManager: + """Manages a collection of committees.""" + + _committees: list[Committee] + _id_to_committee: dict[str, Committee] | None + + def __init__(self, committees: t.Iterable[Committee]) -> None: + """Create a committee manager.""" + self._committees = list(committees) + self._id_to_committee = None + + @classmethod + def from_csv_io(cls, io: t.TextIO) -> "CommitteeManager": + """Create a committee manager from a CSV file.""" + reader = csv.reader(io, delimiter="|") + return cls(Committee.from_committee_row(row) for row in reader) + + @classmethod + def from_csv_path(cls, path: pathlib.Path) -> "CommitteeManager": + """Create a committee manager from a CSV file.""" + path = v.validate_extant_file(path) + with path.open() as file: + return cls.from_csv_io(file) + + @classmethod + def from_csv_data_manager( + cls, data_manager: "DataManager", year: int = 2020 + ) -> "CommitteeManager": + """Create a committee manager from a data manager.""" + return cls.from_csv_path(data_manager.path / "fec" / f"committees-{year}.txt") + + @classmethod + def from_jsonl_io(cls, io: t.TextIO) -> "CommitteeManager": + """Create a committee manager from a json-lines file.""" + return cls(Committee.from_data(json.loads(line)) for line in io) + + @classmethod + def from_jsonl_path(cls, path: pathlib.Path) -> "CommitteeManager": + """Create a committee manager from a json-lines file.""" + path = v.validate_extant_file(path) + with path.open() as file: + return cls.from_jsonl_io(file) + + @classmethod + def from_jsonl_data_manager( + cls, data_manager: "DataManager", year: int = 2020 + ) -> "CommitteeManager": + """Create a committee manager from a data manager.""" + return cls.from_jsonl_path( + data_manager.path / "fec" / f"committees-{year}.jsonl" + ) + + def to_data_lines(self) -> t.Iterable[dict]: + """Convert to a list of json-serializable objects.""" + return (committee.to_data() for committee in self._committees) + + def to_jsonl_io(self, io: t.TextIO) -> None: + """Write to a json file.""" + for data_line in self.to_data_lines(): + io.write(json.dumps(data_line)) + io.write("\n") + + def to_jsonl_path(self, path: pathlib.Path) -> None: + """Write to a json file.""" + with path.open("wt") as output_file: + self.to_jsonl_io(output_file) + + def to_jsonl_data_manager( + self, data_manager: "DataManager", year: int = 2020 + ) -> None: + """Write to a json file.""" + self.to_jsonl_path(data_manager.path / "fec" / f"committees-{year}.jsonl") + + def _index_committees(self) -> None: + """Index the committees by id.""" + assert self._id_to_committee is None + self._id_to_committee = {} + for committee in self._committees: + assert committee.id not in self._id_to_committee + self._id_to_committee[committee.id] = committee + + def _index_committees_if_needed(self) -> None: + """Index the committees by id if needed.""" + if self._id_to_committee is None: + self._index_committees() + + @property + def committees(self) -> t.Sequence[Committee]: + """Get the list of committees.""" + return self._committees + + @property + def id_to_committee(self) -> t.Mapping[str, Committee]: + """Get the mapping from id to committee.""" + self._index_committees_if_needed() + assert self._id_to_committee is not None + return self._id_to_committee + + def get_committee(self, id: str) -> Committee | None: + """Get the committee with the given id, or None.""" + return self.id_to_committee.get(id) diff --git a/server/data/fec/contributions.py b/server/data/fec/contributions.py new file mode 100644 index 0000000..bf106a0 --- /dev/null +++ b/server/data/fec/contributions.py @@ -0,0 +1,565 @@ +""" +Support reading FEC individual contribution master file content, and +converting it into several derived forms. + +You can download per-election-cycle individual contribution master files from: +https://www.fec.gov/data/browse-data/?tab=bulk-data + +The schema for the individual contribution master file is available at: +https://www.fec.gov/campaign-finance-data/contributions-individuals-file-description/ +""" +import json +import pathlib +import typing as t +from dataclasses import dataclass +from decimal import Decimal + +from server.data.manager import DataManager +from server.data.names.nicknames import IGetNicknameIndex, NicknamesManager +from server.utils import validations as v + +from .committees import CommitteeManager, IGetCommittee + + +def split_name(name: str) -> tuple[str, str | None]: + """ + Split a name into a last name and a first name. + + The name should be in the form LAST, FIRST . If there is no comma, + the entire name is assumed to be the last name. + """ + parts = name.split(",") + last_name = parts[0].strip() + first_name = None + if len(parts) > 1: + first_name = parts[1].strip().split(" ")[0].strip() + return (last_name, first_name) + + +class FuzzyIdentifier: + """A fuzzy identifier for a contributor.""" + + last_name: str + """The contributor's last name.""" + + first_name: str | None + """The contributor's first name, if known.""" + + zip_code: str + """The contributor's ZIP code, either 5 or 9 digits.""" + + _get_nickname_index: IGetNicknameIndex + _fuzzy_id: str | None + + def __init__( + self, + last_name: str, + first_name: str | None, + zip_code: str, + *, + get_nickname_index: IGetNicknameIndex, + ): + self.last_name = last_name + self.first_name = first_name + self.zip_code = zip_code + self._get_nickname_index = get_nickname_index + self._fuzzy_id = None + + @classmethod + def from_name( + cls, name: str, zip_code: str, *, get_nickname_index: IGetNicknameIndex + ) -> str: + """Return a fuzzy identifier from a LAST, FIRST style name.""" + last_name, first_name = split_name(name) + return cls.from_last_first( + last_name, first_name, zip_code, get_nickname_index=get_nickname_index + ) + + @classmethod + def from_last_first( + cls, + last_name: str, + first_name: str | None, + zip_code: str, + *, + get_nickname_index: IGetNicknameIndex, + ) -> str: + """Return a fuzzy identifier from a LAST, FIRST style name.""" + return cls( + last_name, first_name, zip_code, get_nickname_index=get_nickname_index + ).fuzzy_id + + def _nickname_index(self) -> int | None: + """Return the nickname index for the first name.""" + if self.first_name is None: + return None + return self._get_nickname_index.get_index(self.first_name) + + @property + def _first_nickname(self) -> str | None: + """Return the first name or nickname.""" + if self.first_name is None: + return None + index = self._nickname_index() + return self.first_name if index is None else str(index) + + def _make_fuzzy_id(self) -> str: + """Make the fuzzy ID.""" + return f"{self.last_name}-{self._first_nickname}-{self.zip_code[:5]}".upper() + + def _make_fuzzy_id_if_needed(self) -> None: + if self._fuzzy_id is None: + self._fuzzy_id = self._make_fuzzy_id() + + @property + def fuzzy_id(self) -> str: + """Return the fuzzy ID.""" + self._make_fuzzy_id_if_needed() + assert self._fuzzy_id is not None + return self._fuzzy_id + + +class ContributionColumns: + """ + Column indices for the individual contribution master file. + + See: + https://www.fec.gov/campaign-finance-data/contributions-individuals-file-description/ + """ + + COMMITTEE_ID = 0 # Filer identification number (CMTE_ID) + AMENDMENT_INDICATOR = 1 # AMNDT_IND + REPORT_TYPE = 2 # RPT_TP + PRIMARY_GENERAL_INDICATOR = 3 # TRANSACTION_PGI + IMAGE_NUMBER = 4 # IMAGE_NUM + TRANSACTION_TYPE = 5 # TRANSACTION_TP + ENTITY_TYPE = 6 # ENTITY_TP (see EntityTypeCode) + NAME = 7 # NAME (of the contributor, typically in LAST, FIRST format) + CITY = 8 # CITY + STATE = 9 # STATE + ZIP_CODE = 10 # ZIP_CODE (usually 5 or 9 digits, but there are lots of odd ones) + EMPLOYER = 11 # EMPLOYER + OCCUPATION = 12 # OCCUPATION + TRANSACTION_DATE = 13 # TRANSACTION_DT (MMDDYYYY) + TRANSACTION_AMOUNT = 14 # TRANSACTION_AMT (in dollars, NUMBER(14, 2)) + OTHER_ID = 15 # OTHER_ID (for non-individual contributions) + TRANSACTION_ID = 16 # TRAN_ID + FILE_NUMBER = 17 # FILE_NUM + MEMO_CODE = 18 # MEMO_CD + MEMO_TEXT = 19 # MEMO_TEXT + SUB_ID = 20 # SUB_ID (FEC record ID) + + +class EntityTypeCode: + CANDIDATE = "CAN" + CANDIDATE_COMMITTEE = "CCM" + COMMITTEE = "COM" + INDIVIDUAL = "IND" + ORGANIZATION = "ORG" + PAC = "PAC" + PARTY_ORGANIZATION = "PTY" + + @classmethod + def name_for_code(cls, code: str) -> str | None: + """Return the name for the given entity type code.""" + for attr in dir(EntityTypeCode): + if not attr.startswith("__"): + if getattr(EntityTypeCode, attr) == code: + return attr.replace("_", " ").title() + return None + + +@dataclass(frozen=True) +class Contribution: + """Our simpliciation of an individual contribution.""" + + id: str # The FEC record ID (SUB_ID) + committee_id: str # The committee ID (CMTE_ID) contributed to + name: str # The contributor's name (NAME) + city: str # The contributor's city (CITY) + state: str # The contributor's state (STATE) + zip_code: str # The contributor's ZIP code (ZIP_CODE) -- 5 or 9 digits + amount: Decimal + + @property + def zip5(self) -> str: + """Return the 5-digit ZIP code.""" + return self.zip_code[:5] + + @classmethod + def from_contribution_row(cls, row: t.Sequence[str]) -> t.Optional["Contribution"]: + """ + Create an individual contribution from a row of the committee master file. + + Return None if the contribution is not an individual contribution, or if + required fields are missing or invalid. + """ + sub_id = row[ContributionColumns.SUB_ID].strip() + if not sub_id: + return None + committee_id = row[ContributionColumns.COMMITTEE_ID].strip() + if not committee_id: + return None + entity_type = row[ContributionColumns.ENTITY_TYPE].strip() + if entity_type != EntityTypeCode.INDIVIDUAL: + return None + name = row[ContributionColumns.NAME].strip() + if "," not in name: + return None + city = row[ContributionColumns.CITY].strip() + if not city: + return None + state = row[ContributionColumns.STATE].strip() + if not state: + return None + zip_code = row[ContributionColumns.ZIP_CODE].strip() + if len(zip_code) not in {5, 9}: + return None + amount = row[ContributionColumns.TRANSACTION_AMOUNT].strip() + try: + amount = Decimal(amount) + except Exception: + return None + return cls( + id=sub_id, + committee_id=committee_id, + name=name, + city=city, + state=state, + zip_code=zip_code, + amount=amount, + ) + + @classmethod + def from_data(cls, value: t.Any) -> "Contribution": + """Create an individual contribution from arbitrary data, or raise.""" + data = v.validate_dict(value) + return cls( + id=v.get_str(data, "id"), + committee_id=v.get_str(data, "committee_id"), + name=v.get_str(data, "name"), + city=v.get_str(data, "city"), + state=v.get_str(data, "state"), + zip_code=v.get_str(data, "zip_code"), + amount=v.get_convert_decimal(data, "amount"), + ) + + def to_data(self) -> dict: + """Return the contribution as a dictionary.""" + return { + "id": self.id, + "committee_id": self.committee_id, + "name": self.name, + "city": self.city, + "state": self.state, + "zip_code": self.zip_code, + "amount": str(self.amount), + } + + +@dataclass +class ContributionSummary: + fuzzy_id: str + """ + A probably-unique identifier for the contributor. + + It should be possible to re-create this from `name` and `zip_code`. However, + we do not store *all* `name`s that led to this summary record. + """ + + name: str + """A non-fuzzy name for the contributor.""" + + zip_code: str + """The ZIP code of the contributor.""" + + total: Decimal + """The total amount contributed by the contributor.""" + + by_party: dict[str | None, Decimal] + """Total contributions by party. None is for contributions to unknown parties.""" + + by_committee: dict[str, Decimal] + """Total contributions by committee.""" + + @classmethod + def new( + cls, + fuzzy_id: str, + contribution: Contribution, + *, + get_committee: IGetCommittee, + ) -> "ContributionSummary": + """Return an empty contribution summary.""" + total = Decimal(contribution.amount) + committee = get_committee.get_committee(contribution.committee_id) + party = None if committee is None else committee.party + by_party = {party: total} + by_committee = {contribution.committee_id: total} + return cls( + fuzzy_id=fuzzy_id, + name=contribution.name, + zip_code=contribution.zip_code, + total=total, + by_party=by_party, + by_committee=by_committee, + ) + + def add(self, contribution: Contribution, *, get_committee: IGetCommittee) -> None: + """Add a single contribution to the summary.""" + self.total += Decimal(contribution.amount) + committee = get_committee.get_committee(contribution.committee_id) + party = None if committee is None else committee.party + self.by_party[party] = self.by_party.get(party, Decimal(0)) + Decimal( + contribution.amount + ) + self.by_committee[contribution.committee_id] = self.by_committee.get( + contribution.committee_id, Decimal(0) + ) + Decimal(contribution.amount) + + @classmethod + def from_data(cls, value: t.Any) -> "ContributionSummary": + """Create a contribution summary from arbitrary data, or raise.""" + data = v.validate_dict(value) + by_party_data = v.get_dict(data, "by_party") + by_committee_data = v.get_dict(data, "by_committee") + return cls( + fuzzy_id=v.get_str(data, "fuzzy_id"), + name=v.get_str(data, "name"), + zip_code=v.get_str(data, "zip_code"), + total=v.get_convert_decimal(data, "total"), + by_party={ + (None if party == "null" else party): v.validate_convert_decimal(amount) + for party, amount in by_party_data.items() + }, + by_committee={ + committee: v.validate_convert_decimal(amount) + for committee, amount in by_committee_data.items() + }, + ) + + def to_data(self) -> dict: + """Return a dict representation of the contribution summary.""" + return { + "fuzzy_id": self.fuzzy_id, + "name": self.name, + "zip_code": self.zip_code, + "total": str(self.total), + "by_party": { + party if party else "null": str(amount) + for party, amount in self.by_party.items() + }, + "by_committee": { + committee: str(amount) + for committee, amount in self.by_committee.items() + }, + } + + +class ContributionsManager: + """ + Tool for working with raw FEC individual contributions files. + + These are large files, even for a single election cycle. Be warned! + """ + + _contributions: list[Contribution] + """The raw list of contributions.""" + + _get_committee: IGetCommittee + """A tool for getting committees.""" + + _get_nickname_index: IGetNicknameIndex + """A tool for getting nickname indices.""" + + _contribution_summaries: dict[str, ContributionSummary] | None + """A mapping from fuzzy IDs to contribution summaries.""" + + def __init__( + self, + contributions: t.Iterable[Contribution], + *, + get_committee: IGetCommittee, + get_nickname_index: IGetNicknameIndex, + ) -> None: + self._contributions = list(contributions) + self._contribution_summaries = None + self._get_committee = get_committee + self._get_nickname_index = get_nickname_index + + @classmethod + def from_csv_io( + cls, + io: t.TextIO, + *, + get_committee: IGetCommittee, + get_nickname_index: IGetNicknameIndex, + ) -> "ContributionsManager": + """Create a contributions manager from a FEC individual contributions file.""" + # Turns out this is not simply a CSV with a pipe delimiter. I think it comes + # down to escaping quotes, but I'm not sure. So we'll just split on pipes. + rows = (row.strip().split("|") for row in io) + contributions = ( + contribution + for row in rows + if (contribution := Contribution.from_contribution_row(row)) is not None + ) + return cls( + contributions, + get_committee=get_committee, + get_nickname_index=get_nickname_index, + ) + + @classmethod + def from_path( + cls, + path: str | pathlib.Path, + *, + get_committee: IGetCommittee, + get_nickname_index: IGetNicknameIndex, + ) -> "ContributionsManager": + """Create a contributions manager from a path.""" + path = v.validate_extant_file(pathlib.Path(path)) + with path.open("rt") as input_file: + return cls.from_csv_io( + input_file, + get_committee=get_committee, + get_nickname_index=get_nickname_index, + ) + + @classmethod + def from_data_manager( + cls, data_manager: DataManager, year: int = 2020 + ) -> "ContributionsManager": + """Create a contributions manager from a data manager.""" + committee_manager = CommitteeManager.from_csv_data_manager(data_manager, year) + nicknames_manager = NicknamesManager.from_data_manager(data_manager) + return cls.from_path( + data_manager.path / "fec" / f"individual-{year}.txt", + get_committee=committee_manager, + get_nickname_index=nicknames_manager, + ) + + @property + def contributions(self) -> t.Sequence[Contribution]: + """Return the contributions.""" + return self._contributions + + def _summarize_contributions(self) -> None: + """Summarize the contributions.""" + assert self._contribution_summaries is None + self._contribution_summaries = {} + for contribution in self._contributions: + fuzzy_id = FuzzyIdentifier.from_name( + contribution.name, + contribution.zip_code, + get_nickname_index=self._get_nickname_index, + ) + if fuzzy_id not in self._contribution_summaries: + self._contribution_summaries[fuzzy_id] = ContributionSummary.new( + fuzzy_id, + contribution, + get_committee=self._get_committee, + ) + else: + self._contribution_summaries[fuzzy_id].add( + contribution, get_committee=self._get_committee + ) + + def _summarize_contributions_if_needed(self) -> None: + if self._contribution_summaries is None: + self._summarize_contributions() + + @property + def contribution_summaries(self) -> t.Mapping[str, ContributionSummary]: + """Return the contribution summaries.""" + self._summarize_contributions_if_needed() + assert self._contribution_summaries is not None + return self._contribution_summaries + + @property + def contribution_summaries_manager(self) -> "ContributionSummariesManager": + """Get the affiliated contribution summaries manager.""" + return ContributionSummariesManager(self.contribution_summaries) + + +class ContributionSummariesManager: + """ + Tool for working with summarized FEC individual contributions files. + + These are large files, even for a single election cycle. Be warned! + """ + + _contribution_summaries: dict[str, ContributionSummary] + """A mapping from fuzzy IDs to contribution summaries.""" + + def __init__( + self, contribution_summaries: t.Mapping[str, ContributionSummary] + ) -> None: + self._contribution_summaries = dict(contribution_summaries) + + @classmethod + def from_summaries(cls, contribution_summaries: t.Iterable[ContributionSummary]): + """Create a contribution summaries manager from summaries.""" + return cls({summary.fuzzy_id: summary for summary in contribution_summaries}) + + @classmethod + def from_jsonl_io(cls, io: t.TextIO) -> "ContributionSummariesManager": + """ + Read from a json lines file and create a manager. + + The file contains a single ContributionSummary record on each line. + The `fuzzy_id` fields must be unique across the entire dataset. + """ + summaries_data = (json.loads(line) for line in io) + summaries = (ContributionSummary.from_data(data) for data in summaries_data) + return cls({summary.fuzzy_id: summary for summary in summaries}) + + @classmethod + def from_path(cls, path: str | pathlib.Path) -> "ContributionSummariesManager": + """Create a contribution summaries manager from a path.""" + path = v.validate_extant_file(pathlib.Path(path)) + with path.open("rt") as input_file: + return cls.from_jsonl_io(input_file) + + @classmethod + def from_data_manager( + cls, data_manager: DataManager, year: int = 2020 + ) -> "ContributionSummariesManager": + """Create a contribution summaries manager from a data manager.""" + return cls.from_path( + data_manager.path / "fec" / f"contribution-summaries-{year}.jsonl", + ) + + def to_data_lines(self) -> t.Iterable[dict]: + """Convert to a json-serializable object.""" + return (summary.to_data() for summary in self._contribution_summaries.values()) + + def to_jsonl_io(self, io: t.TextIO) -> None: + """Write to a json lines file.""" + for data_line in self.to_data_lines(): + io.write(json.dumps(data_line)) + io.write("\n") + + def to_jsonl_path(self, path: str | pathlib.Path) -> None: + """Write to a json lines file.""" + path = pathlib.Path(path) + with path.open("wt") as output_file: + self.to_jsonl_io(output_file) + + def to_jsonl_data_manager( + self, data_manager: DataManager, year: int = 2020 + ) -> None: + """Write to a json lines file.""" + self.to_jsonl_path( + data_manager.path / "fec" / f"contribution-summaries-{year}.jsonl" + ) + + @property + def contribution_summaries(self) -> t.Mapping[str, ContributionSummary]: + """Return the contribution summaries.""" + return self._contribution_summaries + + def get_summary(self, fuzzy_id: str) -> ContributionSummary | None: + """Return a single contribution summary, if available.""" + return self._contribution_summaries.get(fuzzy_id) diff --git a/server/data/fec/test_committees.py b/server/data/fec/test_committees.py new file mode 100644 index 0000000..699845e --- /dev/null +++ b/server/data/fec/test_committees.py @@ -0,0 +1,168 @@ +# ruff: noqa: E501 D102 + +import io +import unittest + +from server.utils.validations import ValidationError + +from . import committees as c + +RAW_CSV_DATA = """\ +C00000059|HALLMARK CARDS PAC|SARAH MOE|2501 MCGEE|MD #500|KANSAS CITY|MO|64108|U|Q|UNK|M|C|| +C00000422|AMERICAN MEDICAL ASSOCIATION POLITICAL ACTION COMMITTEE|WALKER, KEVIN MR.|25 MASSACHUSETTS AVE, NW|SUITE 600|WASHINGTON|DC|200017400|B|Q||M||DELAWARE MEDICAL PAC| +C00000489|D R I V E POLITICAL FUND CHAPTER 886|JERRY SIMS JR|3528 W RENO||OKLAHOMA CITY|OK|73107|U|N||Q|L|| +C00000547|KANSAS MEDICAL SOCIETY POLITICAL ACTION COMMITTEE|JERRY SLAUGHTER|623 SW 10TH AVE||TOPEKA|KS|666121627|U|Q|UNK|Q|M|KANSAS MEDICAL SOCIETY| +C00000638|INDIANA STATE MEDICAL ASSOCIATION POLITICAL ACTION COMMITTEE|ACHENBACH, GRANT MR.|322 CANAL WALK, CANAL LEVEL||INDIANAPOLIS|IN|46202|U|Q||T|M|| +C00000729|AMERICAN DENTAL ASSOCIATION POLITICAL ACTION COMMITTEE|BARNES, BRAD W DR.|1111 14TH STREET, NW|SUITE 1100|WASHINGTON|DC|200055627|B|Q|UNK|M|M|INDIANA DENTAL PAC| +C00000885|INTERNATIONAL UNION OF PAINTERS AND ALLIED TRADES POLITICAL ACTION TOGETHER POLITICAL COMMITTEE|GALIS, GEORGE|7234 PARKWAY DRIVE||HANOVER|MD|21076|B|Q|UNK|M|L|INTERNATIONAL UNION OF PAINTERS AND ALLIED TRADES| +C00000901|BUILD POLITICAL ACTION COMMITTEE OF THE NATIONAL ASSOCIATION OF HOME BUILDERS (BUILDPAC)|RAMAGE, EILEEN|1201 15TH STREET, NW||WASHINGTON|DC|20005|B|Q|UNK|M|T|NATIONAL ASSOCIATION OF HOME BUILDERS| +C00000935|DCCC|GUINN, LUCINDA|430 SOUTH CAPITOL STREET, SE|2ND FLOOR|WASHINGTON|DC|200034024|U|Y|DEM|M||| +C00000984|UNITED STATES TELECOM ASSOCIATION POLITICAL ACTION COMMITTEE (TELECOMPAC)|HEINER, BRANDON|601 NEW JERSEY AVE NW|STE 600|WASHINGTON|DC|20001|B|Q|UNK|M|T|UNITED STATES TELECOM ASSOCIATION| +""" + + +class CommitteeTypeCodeTestCase(unittest.TestCase): + def test_name_for_code(self): + self.assertEqual( + c.CommitteeTypeCode.name_for_code(c.CommitteeTypeCode.COMMUNICATION_COST), + "Communication Cost", + ) + + def test_name_for_code_none(self): + self.assertEqual(c.CommitteeTypeCode.name_for_code("NOPE"), None) + + +class PartyTestCase(unittest.TestCase): + def test_name_for_code(self): + self.assertEqual(c.Party.name_for_code(c.Party.DEMOCRAT), "Democrat") + + def test_name_for_code_none(self): + self.assertEqual(c.Party.name_for_code("NOPE"), None) + + +class CommitteeTestCase(unittest.TestCase): + def test_from_data_id_name(self): + """Test that we can create a committee from data.""" + data = {"id": "id", "name": "name"} + committee = c.Committee.from_data(data) + self.assertEqual(committee.id, "id") + self.assertEqual(committee.name, "name") + self.assertIsNone(committee.party) + self.assertIsNone(committee.candidate_id) + + def test_from_data_all(self): + """Test that we can create a committee from data.""" + data = { + "id": "id", + "name": "name", + "party": "party", + "candidate_id": "candidate_id", + } + committee = c.Committee.from_data(data) + self.assertEqual(committee.id, "id") + self.assertEqual(committee.name, "name") + self.assertEqual(committee.party, "party") + self.assertEqual(committee.candidate_id, "candidate_id") + + def test_from_data_invalid(self): + """Test that we can create a committee from data.""" + data = {"id": "id", "name": "name", "party": 42, "candidate_id": None} + with self.assertRaises(ValidationError): + c.Committee.from_data(data) + + def test_to_data(self): + """Test that we can create a committee from data.""" + committee = c.Committee("id", "name", "party", "candidate_id") + data = committee.to_data() + self.assertEqual(data["id"], "id") + self.assertEqual(data["name"], "name") + self.assertEqual(data["party"], "party") + self.assertEqual(data["candidate_id"], "candidate_id") + + def test_to_data_missing(self): + """Test that we can create a committee from data.""" + committee = c.Committee("id", "name", None, None) + data = committee.to_data() + self.assertEqual(data["id"], "id") + self.assertEqual(data["name"], "name") + self.assertFalse("party" in data) + self.assertFalse("candidate_id" in data) + + def test_from_committee_row(self): + """Test that we can create a committee from a row.""" + row = [ + "C00000059", + "HALLMARK CARDS PAC", + "SARAH MOE", + "2501 MCGEE", + "MD #500", + "KANSAS CITY", + "MO", + "64108", + "U", + "Q", + "UNK", + "M", + "C", + "CRUNK", + ] + committee = c.Committee.from_committee_row(row) + self.assertEqual(committee.id, "C00000059") + self.assertEqual(committee.name, "HALLMARK CARDS PAC") + self.assertIsNone(committee.party) + self.assertEqual(committee.candidate_id, "CRUNK") + + +class CommitteeManagerTestCase(unittest.TestCase): + def setUp(self): + self.example_committees = [ + c.Committee("id1", "name1", "party1", "candidate_id1"), + c.Committee("id2", "name2", "party2", "candidate_id2"), + c.Committee("id3", "name3", None, None), + ] + + def test_committees(self): + """Test that we can create a committee manager.""" + manager = c.CommitteeManager(self.example_committees) + self.assertEqual(len(manager.committees), len(self.example_committees)) + + def test_id_to_committees(self): + """Test that we can create a committee manager.""" + manager = c.CommitteeManager(self.example_committees) + self.assertEqual( + manager.id_to_committee, + { + "id1": self.example_committees[0], + "id2": self.example_committees[1], + "id3": self.example_committees[2], + }, + ) + + def test_get_committee(self): + """Test that we can create a committee manager.""" + manager = c.CommitteeManager(self.example_committees) + self.assertEqual(manager.get_committee("id1"), self.example_committees[0]) + self.assertEqual(manager.get_committee("id2"), self.example_committees[1]) + self.assertEqual(manager.get_committee("id3"), self.example_committees[2]) + self.assertIsNone(manager.get_committee("id4")) + + def test_jsonl_io(self): + manager = c.CommitteeManager(self.example_committees) + writable = io.StringIO() + manager.to_jsonl_io(writable) + readable = io.StringIO(writable.getvalue()) + manager2 = c.CommitteeManager.from_jsonl_io(readable) + self.assertEqual(manager.committees, manager2.committees) + + def test_csv_io(self): + readable = io.StringIO(RAW_CSV_DATA) + manager = c.CommitteeManager.from_csv_io(readable) + self.assertEqual(len(manager.committees), 10) + committee = manager.get_committee("C00000059") + self.assertIsNotNone(committee) + assert committee is not None + self.assertEqual(committee.id, "C00000059") + self.assertEqual(committee.name, "HALLMARK CARDS PAC") + self.assertIsNone(committee.party) + self.assertIsNone(committee.candidate_id) + self.assertIsNone(manager.get_committee("NOPE")) diff --git a/server/data/fec/test_contributions.py b/server/data/fec/test_contributions.py new file mode 100644 index 0000000..dbf4a5b --- /dev/null +++ b/server/data/fec/test_contributions.py @@ -0,0 +1,529 @@ +# ruff: noqa: D102 +import io +import unittest +from decimal import Decimal + +from server.data.names.nicknames import MockGetNicknameIndex +from server.utils.validations import ValidationError + +from . import contributions as cont +from .committees import Committee, MockGetCommittee, Party + + +class SplitNameTestCase(unittest.TestCase): + def test_last_only(self): + self.assertEqual(cont.split_name("Smith"), ("Smith", None)) + + def test_last_comma_first(self): + self.assertEqual(cont.split_name("Smith, John"), ("Smith", "John")) + + def test_stripping(self): + self.assertEqual(cont.split_name(" Smith, John "), ("Smith", "John")) + + +class FuzzyIdentifierTestCase(unittest.TestCase): + def setUp(self): + self.get_nickname_index = MockGetNicknameIndex( + [["Dave", "David", "Davey"], ["Matt", "Matthew"]] + ) + + def test_last_first_simple(self): + self.assertEqual( + cont.FuzzyIdentifier.from_last_first( + "Smith", "John", "12345", get_nickname_index=self.get_nickname_index + ), + "SMITH-JOHN-12345", + ) + + def test_last_no_first_simple(self): + self.assertEqual( + cont.FuzzyIdentifier.from_last_first( + "Smith", None, "12345", get_nickname_index=self.get_nickname_index + ), + "SMITH-NONE-12345", + ) + + def test_last_first_nickname(self): + self.assertEqual( + cont.FuzzyIdentifier.from_last_first( + "Smith", + "Davey", + "12345", + get_nickname_index=self.get_nickname_index, + ), + "SMITH-0-12345", + ) + + +class ContributionTestCase(unittest.TestCase): + def test_from_data_valid(self): + contribution = cont.Contribution.from_data( + { + "id": "12345", + "committee_id": "C12345", + "name": "Smith, John", + "city": "Seattle", + "state": "WA", + "zip_code": "98101", + "amount": "10", + } + ) + self.assertEqual(contribution.id, "12345") + self.assertEqual(contribution.committee_id, "C12345") + self.assertEqual(contribution.name, "Smith, John") + self.assertEqual(contribution.city, "Seattle") + self.assertEqual(contribution.state, "WA") + self.assertEqual(contribution.zip_code, "98101") + self.assertEqual(contribution.amount, Decimal(10)) + + def test_from_data_invalid(self): + with self.assertRaises(ValidationError): + cont.Contribution.from_data({}) + + def test_to_data(self): + contribution = cont.Contribution( + id="12345", + committee_id="C12345", + name="Smith, John", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(10), + ) + self.assertEqual( + contribution.to_data(), + { + "id": "12345", + "committee_id": "C12345", + "name": "Smith, John", + "city": "Seattle", + "state": "WA", + "zip_code": "98101", + "amount": "10", + }, + ) + + def test_from_contribution_row_valid(self): + contribution = cont.Contribution.from_contribution_row( + [ + "C12345", + "", + "", + "", + "", + "", + cont.EntityTypeCode.INDIVIDUAL, + "Smith, John", + "Seattle", + "WA", + "98101", + "", + "", + "", + "10", + "", + "", + "", + "", + "", + "12345", + ] + ) + self.assertIsNotNone(contribution) + assert contribution is not None + self.assertEqual(contribution.id, "12345") + self.assertEqual(contribution.committee_id, "C12345") + self.assertEqual(contribution.name, "Smith, John") + self.assertEqual(contribution.city, "Seattle") + self.assertEqual(contribution.state, "WA") + self.assertEqual(contribution.zip_code, "98101") + self.assertEqual(contribution.amount, Decimal(10)) + + def test_from_contribution_row_invalid(self): + contribution = cont.Contribution.from_contribution_row( + [ + "C12345", + "", + "", + "", + "", + "", + cont.EntityTypeCode.CANDIDATE, + "Smith, John", + "Seattle", + "WA", + "98101", + "", + "", + "", + "10", + "", + "", + "", + "", + "", + "12345", + ] + ) + self.assertIsNone(contribution) + + +class ContributionSummaryTestCase(unittest.TestCase): + def setUp(self): + self.contribution_1 = cont.Contribution( + id="12345", + committee_id="C12345", + name="Smith, John", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(10), + ) + self.contribution_2 = cont.Contribution( + id="12346", + committee_id="C67890", + name="Smith, John", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(20), + ) + self.contribution_3 = cont.Contribution( + id="12347", + committee_id="CABCDE", + name="Smith, John", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(50), + ) + self.get_committee = MockGetCommittee( + [ + Committee( + id="C12345", + name="Barney for America", + party=Party.DEMOCRAT, + candidate_id="CAN12345", + ), + Committee( + id="C67890", + name="Donald for Duck", + party=Party.DEMOCRAT, + candidate_id="CAN67890", + ), + Committee( + id="CABCDE", + name="Jupiter for Pluto", + party=Party.GREEN, + candidate_id="CANABCDE", + ), + ] + ) + + def test_new(self): + summary = cont.ContributionSummary.new( + "SMITH-JOHN-98101", + self.contribution_1, + get_committee=self.get_committee, + ) + self.assertEqual(summary.fuzzy_id, "SMITH-JOHN-98101") + self.assertEqual(summary.name, "Smith, John") + self.assertEqual(summary.zip_code, "98101") + self.assertEqual(summary.total, Decimal(10)) + self.assertEqual(len(summary.by_party), 1) + self.assertEqual(summary.by_party.get(Party.DEMOCRAT), Decimal(10)) + self.assertTrue("C12345" in summary.by_committee) + self.assertEqual(len(summary.by_committee), 1) + self.assertEqual(summary.by_committee.get("C12345"), Decimal(10)) + + def test_add(self): + summary = cont.ContributionSummary.new( + "SMITH-JOHN-98101", + self.contribution_1, + get_committee=self.get_committee, + ) + summary.add(self.contribution_2, get_committee=self.get_committee) + summary.add(self.contribution_3, get_committee=self.get_committee) + self.assertEqual(summary.fuzzy_id, "SMITH-JOHN-98101") + self.assertEqual(summary.name, "Smith, John") + self.assertEqual(summary.zip_code, "98101") + self.assertEqual(summary.total, Decimal(80)) + self.assertEqual(len(summary.by_party), 2) + self.assertEqual(summary.by_party.get(Party.DEMOCRAT), Decimal(30)) + self.assertEqual(summary.by_party.get(Party.GREEN), Decimal(50)) + self.assertEqual(len(summary.by_committee), 3) + self.assertEqual(summary.by_committee.get("C12345"), Decimal(10)) + self.assertEqual(summary.by_committee.get("C67890"), Decimal(20)) + self.assertEqual(summary.by_committee.get("CABCDE"), Decimal(50)) + + def test_from_data_valid(self): + data = { + "fuzzy_id": "SMITH-JOHN-98101", + "name": "Smith, John", + "zip_code": "98101", + "total": "80", + "by_party": {Party.DEMOCRAT: "30", Party.GREEN: "50"}, + "by_committee": {"C12345": "10", "C67890": "20", "CABCDE": "50"}, + } + summary = cont.ContributionSummary.from_data(data) + self.assertEqual(summary.fuzzy_id, "SMITH-JOHN-98101") + self.assertEqual(summary.name, "Smith, John") + self.assertEqual(summary.zip_code, "98101") + self.assertEqual(summary.total, Decimal(80)) + self.assertEqual(len(summary.by_party), 2) + self.assertEqual(summary.by_party.get(Party.DEMOCRAT), Decimal(30)) + self.assertEqual(summary.by_party.get(Party.GREEN), Decimal(50)) + self.assertEqual(len(summary.by_committee), 3) + self.assertEqual(summary.by_committee.get("C12345"), Decimal(10)) + self.assertEqual(summary.by_committee.get("C67890"), Decimal(20)) + self.assertEqual(summary.by_committee.get("CABCDE"), Decimal(50)) + + def test_from_data_invalid(self): + data = { + "fuzzy_id": "SMITH-JOHN-98101", + } + with self.assertRaises(ValidationError): + cont.ContributionSummary.from_data(data) + + def test_to_data(self): + summary = cont.ContributionSummary.new( + "SMITH-JOHN-98101", + self.contribution_1, + get_committee=self.get_committee, + ) + summary.add(self.contribution_2, get_committee=self.get_committee) + summary.add(self.contribution_3, get_committee=self.get_committee) + data = summary.to_data() + self.assertEqual(data["fuzzy_id"], "SMITH-JOHN-98101") + self.assertEqual(data["name"], "Smith, John") + self.assertEqual(data["zip_code"], "98101") + self.assertEqual(data["total"], "80") + self.assertEqual(len(data["by_party"]), 2) + self.assertEqual(data["by_party"].get(Party.DEMOCRAT), "30") + self.assertEqual(data["by_party"].get(Party.GREEN), "50") + self.assertEqual(len(data["by_committee"]), 3) + self.assertEqual(data["by_committee"].get("C12345"), "10") + self.assertEqual(data["by_committee"].get("C67890"), "20") + self.assertEqual(data["by_committee"].get("CABCDE"), "50") + + +class ContributionsManagerTestCase(unittest.TestCase): + def setUp(self): + self.contribution_1 = cont.Contribution( + id="12345", + committee_id="C12345", + name="Smith, John", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(10), + ) + self.contribution_2 = cont.Contribution( + id="12346", + committee_id="C67890", + name="Smith, John", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(20), + ) + self.contribution_3 = cont.Contribution( + id="12347", + committee_id="CABCDE", + name="Smith, John", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(50), + ) + self.contributions = [ + self.contribution_1, + self.contribution_2, + self.contribution_3, + ] + self.get_committee = MockGetCommittee( + [ + Committee( + id="C12345", + name="Barney for America", + party=Party.DEMOCRAT, + candidate_id="CAN12345", + ), + Committee( + id="C67890", + name="Donald for Duck", + party=Party.DEMOCRAT, + candidate_id="CAN67890", + ), + Committee( + id="CABCDE", + name="Jupiter for Pluto", + party=Party.GREEN, + candidate_id="CANABCDE", + ), + ] + ) + self.get_nickname_index = MockGetNicknameIndex( + [["Dave", "David", "Davey"], ["Matt", "Matthew"]] + ) + + def test_contributions(self): + manager = cont.ContributionsManager( + self.contributions, + get_committee=self.get_committee, + get_nickname_index=self.get_nickname_index, + ) + self.assertEqual(len(manager.contributions), 3) + + def test_from_csv_io(self): + contribution_1 = """C12345||||||IND|Smith, John|Seattle|WA|98101||||10||||||12345""" # noqa: E501 + contribution_2 = """C12345||||||COM|Smith, John|Seattle|WA|98101||||10||||||12345""" # noqa: E501 + csv_io = io.StringIO("\n".join([contribution_1, contribution_2])) + manager = cont.ContributionsManager.from_csv_io( + csv_io, + get_committee=self.get_committee, + get_nickname_index=self.get_nickname_index, + ) + self.assertEqual(len(manager.contributions), 1) + self.assertEqual(manager.contributions[0].id, "12345") + + def test_contribution_summaries(self): + manager = cont.ContributionsManager( + self.contributions, + get_committee=self.get_committee, + get_nickname_index=self.get_nickname_index, + ) + self.assertEqual(len(manager.contribution_summaries), 1) + self.assertEqual(manager.contribution_summaries["SMITH-JOHN-98101"].total, 80) + + def test_contribution_summaries_manager(self): + manager = cont.ContributionsManager( + self.contributions, + get_committee=self.get_committee, + get_nickname_index=self.get_nickname_index, + ) + summaries_manager = manager.contribution_summaries_manager + self.assertEqual(len(summaries_manager.contribution_summaries), 1) + + +class ContributionSummariesManagerTestCase(unittest.TestCase): + def setUp(self): + self.summary_1 = cont.ContributionSummary.new( + "SMITH-JOHN-98101", + cont.Contribution( + id="12345", + committee_id="C12345", + name="Smith, John", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(10), + ), + get_committee=MockGetCommittee( + [ + Committee( + id="C12345", + name="Barney for America", + party=Party.DEMOCRAT, + candidate_id="CAN12345", + ) + ] + ), + ) + self.summary_1.add( + cont.Contribution( + id="12346", + committee_id="C67890", + name="Smith, John", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(20), + ), + get_committee=MockGetCommittee( + [ + Committee( + id="C67890", + name="Donald for Duck", + party=Party.DEMOCRAT, + candidate_id="CAN67890", + ) + ] + ), + ) + self.summary_2 = cont.ContributionSummary.new( + "PECK-1-98101", + cont.Contribution( + id="12347", + committee_id="CABCDE", + name="Peck, Dave", + city="Seattle", + state="WA", + zip_code="98101", + amount=Decimal(50), + ), + get_committee=MockGetCommittee( + [ + Committee( + id="CABCDE", + name="Jupiter for Pluto", + party=Party.GREEN, + candidate_id="CANABCDE", + ) + ] + ), + ) + self.summaries = [self.summary_1, self.summary_2] + self.indexed_summaries = { + "SMITH-JOHN-98101": self.summary_1, + "PECK-1-98101": self.summary_2, + } + + def test_contribution_summaries(self): + manager = cont.ContributionSummariesManager(self.indexed_summaries) + self.assertEqual(len(manager.contribution_summaries), 2) + self.assertEqual(manager.contribution_summaries["SMITH-JOHN-98101"].total, 30) + self.assertEqual(manager.contribution_summaries["PECK-1-98101"].total, 50) + + def test_from_summaries(self): + manager = cont.ContributionSummariesManager.from_summaries(self.summaries) + self.assertEqual(len(manager.contribution_summaries), 2) + + def test_from_jsonl_io(self): + json_lines = """\ +{"fuzzy_id": "SMITH-JOHN-98101", "name": "Smith, John", "zip_code": "98101", "total": "30", "by_party": {"DEMOCRAT": "30"}, "by_committee": {"C12345": "30"}} +{"fuzzy_id": "PECK-1-98101", "name": "Peck, Dave", "zip_code": "98101", "total": "50", "by_party": {"GREEN": "50"}, "by_committee": {"CABCDE": "50"}} +""" # noqa: E501 + jsonl_io = io.StringIO(json_lines) + manager = cont.ContributionSummariesManager.from_jsonl_io(jsonl_io) + self.assertEqual(len(manager.contribution_summaries), 2) + self.assertEqual(manager.contribution_summaries["SMITH-JOHN-98101"].total, 30) + self.assertEqual(manager.contribution_summaries["PECK-1-98101"].total, 50) + + def test_to_data_lines(self): + self.manager = cont.ContributionSummariesManager(self.indexed_summaries) + data_lines = list(self.manager.to_data_lines()) + self.assertEqual(len(data_lines), 2) + + def test_to_jsonl_io(self): + self.manager = cont.ContributionSummariesManager(self.indexed_summaries) + jsonl_io = io.StringIO() + self.manager.to_jsonl_io(jsonl_io) + jsonl_io.seek(0) + json_lines = jsonl_io.read() + self.assertEqual(len(json_lines.split("\n")), 3) + + def test_get_summary(self): + self.manager = cont.ContributionSummariesManager(self.indexed_summaries) + summary = self.manager.get_summary("SMITH-JOHN-98101") + self.assertIsNotNone(summary) + assert summary is not None + self.assertEqual(summary.total, 30) + self.assertEqual(summary.by_party.get(Party.DEMOCRAT), 30) + self.assertEqual(summary.by_committee.get("C12345"), 10) + self.assertEqual(summary.by_committee.get("C67890"), 20) + + def test_get_summary_none(self): + self.manager = cont.ContributionSummariesManager(self.indexed_summaries) + summary = self.manager.get_summary("SMITH-JOHN-98102") + self.assertIsNone(summary) diff --git a/server/data.py b/server/data/manager.py similarity index 70% rename from server/data.py rename to server/data/manager.py index 2dccf5a..4ecc659 100644 --- a/server/data.py +++ b/server/data/manager.py @@ -6,8 +6,8 @@ class DataManager: """Top-level manager of all content in the data/ directory.""" - def __init__(self, path: pathlib.Path) -> None: - self._path = validate_extant_dir(path) + def __init__(self, path: str | pathlib.Path) -> None: + self._path = validate_extant_dir(pathlib.Path(path)) self._zip_code_manager = None @property @@ -18,4 +18,4 @@ def path(self) -> pathlib.Path: @classmethod def default(cls) -> "DataManager": """Return a DataManager with the default data/ directory.""" - return cls(pathlib.Path(__file__).parent.parent / "data") + return cls(pathlib.Path(__file__).parent.parent.parent / "data") diff --git a/server/names/__init__.py b/server/data/names/__init__.py similarity index 100% rename from server/names/__init__.py rename to server/data/names/__init__.py diff --git a/server/data/names/nicknames.py b/server/data/names/nicknames.py new file mode 100644 index 0000000..3839993 --- /dev/null +++ b/server/data/names/nicknames.py @@ -0,0 +1,259 @@ +"""Tools for working with nicknames.""" +import json +import pathlib +import typing as t + +from server.data.manager import DataManager +from server.utils.validations import validate_extant_file + + +class MessyNicknamesManager: + """ + Tools for working with a 'messy' nicknames file. + + The primary operation of this manager is to both clean and merge the names, + and to provide a mapping from each name to a unique identifier. + """ + + _messy_names: list[frozenset[str]] + """ + A list of sets of related names. A given name may appear in multiple + sets. + """ + + _names: list[frozenset[str]] | None + """A list of sets of related names. A given name will only appear in one set.""" + + def __init__(self, messy_names: t.Sequence[frozenset[str]]): + self._messy_names = list(messy_names) + self._names = None + + @classmethod + def from_messy_io(cls, io: t.TextIO) -> "MessyNicknamesManager": + """ + Create a manager from a file-like object. + + The assumed format: on each line there is a list of related names. + These are probably separated by spaces, although they may also be separated + by `/` and `,` characters. There may be any number of spaces between the + names, and there may be leading and trailing spaces. The names will always + start with a capital letter; they _may_ contain dots (`A.B.`) and + apostrophes (`O'Neil`). It's possible that a given name appears on multiple + lines. + """ + names: list[frozenset[str]] = [] + for line in io: + # Remove all commas, slashes, parens + line = ( + line.replace(",", " ") + .replace("/", "") + .replace("(", "") + .replace(")", "") + ) + # Break the line into a list of names -- split on any + # arbitrary number of spaces + maybe_names = line.split() + # Remove any empty strings + maybe_names = [ + stripped for name in maybe_names if (stripped := name.strip()) + ] + # Remove any strings that don't start with a capital letter + maybe_names = [name for name in maybe_names if name[0].isupper()] + # Make a set of capitalized names + names_set = {name.title() for name in maybe_names} + # Add it if it's not empty + if names_set: + names.append(frozenset(names_set)) + return cls(names) + + @classmethod + def from_path(cls, path: str | pathlib.Path) -> "MessyNicknamesManager": + """Create a manager from a path.""" + path = validate_extant_file(pathlib.Path(path)) + with path.open("rt") as input_file: + return cls.from_messy_io(input_file) + + @classmethod + def from_data_manager(cls, data_manager: DataManager) -> "MessyNicknamesManager": + """Create a manager from a data manager.""" + return cls.from_path(data_manager.path / "names" / "messy.txt") + + def _merge_names(self) -> None: + """Merge the names.""" + # Continuously merge sets that have overlapping names, until no + # more merges are possible + names = list(self.messy_names) + while True: + index = 0 + merged = False + while index < len(names): + index2 = index + 1 + while index2 < len(names): + if names[index] & names[index2]: + names[index] |= names[index2] + del names[index2] + merged = True + else: + index2 += 1 + index += 1 + if not merged: + break + self._names = names + + def _merge_names_if_needed(self) -> None: + """Merge the names if they haven't been merged yet.""" + if self._names is None: + self._merge_names() + + @property + def messy_names(self) -> t.Sequence[frozenset[str]]: + """Get the list of names.""" + return self._messy_names + + @property + def names(self) -> t.Sequence[frozenset[str]]: + """Get the list of merged names.""" + self._merge_names_if_needed() + assert self._names is not None + return self._names + + @property + def nicknames_manager(self) -> "NicknamesManager": + """Get the nicknames manager.""" + return NicknamesManager(self.names) + + +class IGetNicknameIndex(t.Protocol): + """A protocol for getting the index of a nickname.""" + + def get_index(self, name: str) -> int | None: + """Get the index of a nickname.""" + ... + + +class MockGetNicknameIndex(IGetNicknameIndex): + """A simple implementation of IGetNicknameIndex useful for tests.""" + + _name_to_index: dict[str, int] + + def __init__(self, names: t.Sequence[t.Iterable[str]]) -> None: + self._name_to_index = {} + for index, names_set in enumerate(names): + for name in names_set: + self._name_to_index[name] = index + + def get_index(self, name: str) -> int | None: + """Return the index for a given nickname.""" + return self._name_to_index.get(name) + + +class NicknamesManager: + """ + Tool for working with a 'clean' nicknames file. + + This is basically just the merged/indexed version of the messy nicknames + file. + """ + + _names: list[frozenset[str]] + """A list of sets of related names. A given name will only appear in one set.""" + + _name_to_index: dict[str, int] | None = None + """A mapping from each name to the (merged) index of the set it appears in.""" + + def __init__( + self, + names: t.Iterable[frozenset[str]], + ): + self._names = list(names) + self._name_to_index = None + + @classmethod + def from_jsonl_io(cls, io: t.TextIO) -> "NicknamesManager": + """ + Read from a json file and create a manager. + + The file is a json-lines file, where each line is a list of names. + No name will appear more than once in the file. + """ + names = (frozenset(json.loads(line)) for line in io) + return cls(names) + + @classmethod + def from_path(cls, path: str | pathlib.Path) -> "NicknamesManager": + """Create a manager from a path.""" + path = validate_extant_file(pathlib.Path(path)) + with path.open("rt") as input_file: + return cls.from_jsonl_io(input_file) + + @classmethod + def from_data_manager(cls, data_manager: DataManager) -> "NicknamesManager": + """Create a manager from a data manager.""" + return cls.from_path(data_manager.path / "names" / "nicknames.jsonl") + + def to_data_lines(self) -> t.Iterable[list[str]]: + """Convert to a json-serializable object.""" + return (list(names) for names in self.names) + + def to_jsonl_io(self, io: t.TextIO) -> None: + """Write to a json file.""" + for data_line in self.to_data_lines(): + io.write(json.dumps(data_line)) + io.write("\n") + + def to_jsonl_path(self, path: str | pathlib.Path) -> None: + """Write to a json file.""" + path = pathlib.Path(path) + with path.open("wt") as output_file: + self.to_jsonl_io(output_file) + + def to_jsonl_data_manager(self, data_manager: DataManager) -> None: + """Write to a json file.""" + self.to_jsonl_path(data_manager.path / "names" / "nicknames.jsonl") + + def _index_names(self) -> None: + """Index the merged names.""" + self._name_to_index = {} + for index, names_set in enumerate(self.names): + for name in names_set: + if name in self._name_to_index: + raise ValueError(f"Name {name} appears in multiple sets") + self._name_to_index[name] = index + + def _index_names_if_needed(self) -> None: + """Index the merged names if they haven't been indexed yet.""" + if self._name_to_index is None: + self._index_names() + + @property + def names(self) -> t.Sequence[frozenset[str]]: + """Get the list of merged names.""" + return self._names + + @property + def name_to_index(self) -> t.Mapping[str, int]: + """Get the mapping from name to index.""" + self._index_names_if_needed() + assert self._name_to_index is not None + return self._name_to_index + + def get_index(self, name: str) -> int | None: + """Get the index of a name.""" + return self.name_to_index.get(name.title()) + + def get_names_for_index(self, index: int) -> frozenset[str]: + """Get the names associated with an index.""" + if index < 0 or index >= len(self._names): + return frozenset() + return self.names[index] + + def get_related_names(self, name: str) -> frozenset[str]: + """ + Get the set of related names for a name. + + The set will include the name itself. + """ + index = self.get_index(name) + if index is None: + return frozenset() + return self.get_names_for_index(index) diff --git a/server/data/names/test_nicknames.py b/server/data/names/test_nicknames.py new file mode 100644 index 0000000..f333c66 --- /dev/null +++ b/server/data/names/test_nicknames.py @@ -0,0 +1,199 @@ +# ruff: noqa: D102 +import io +import unittest + +from . import nicknames as n + + +class MessyNicknamesTestCase(unittest.TestCase): + def test_from_messy_io(self) -> None: + messy_io = io.StringIO( + """Dave David, Davey, Davie Rob\n""" + """John Jack, Johnny, Jonathan\n""" + """Bob Bobby, Rob, Robert\n""" + """\n""" + """Matt // Matthew, Matty, Mat, Rob\n""" + ) + manager = n.MessyNicknamesManager.from_messy_io(messy_io) + self.assertEqual( + manager.messy_names, + [ + frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), + frozenset(["John", "Jack", "Johnny", "Jonathan"]), + frozenset(["Bob", "Bobby", "Rob", "Robert"]), + frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), + ], + ) + + def test_messy_names(self) -> None: + manager = n.MessyNicknamesManager( + [ + frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), + frozenset(["John", "Jack", "Johnny", "Jonathan"]), + frozenset(["Bob", "Bobby", "Rob", "Robert"]), + frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), + ], + ) + self.assertEqual( + manager.messy_names, + [ + frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), + frozenset(["John", "Jack", "Johnny", "Jonathan"]), + frozenset(["Bob", "Bobby", "Rob", "Robert"]), + frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), + ], + ) + + def test_names(self) -> None: + """Validate that the names are merged.""" + manager = n.MessyNicknamesManager( + [ + frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), + frozenset(["John", "Jack", "Johnny", "Jonathan"]), + frozenset(["Bob", "Bobby", "Rob", "Robert"]), + frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), + ], + ) + self.assertEqual( + manager.names, + [ + frozenset( + [ + "Dave", + "David", + "Davey", + "Davie", + "Bob", + "Bobby", + "Rob", + "Robert", + "Matt", + "Matthew", + "Matty", + "Mat", + ] + ), + frozenset(["John", "Jack", "Johnny", "Jonathan"]), + ], + ) + + def test_nicknames_manager(self) -> None: + manager = n.MessyNicknamesManager( + [ + frozenset(["Dave", "David", "Davey", "Davie", "Rob"]), + frozenset(["John", "Jack", "Johnny", "Jonathan"]), + frozenset(["Bob", "Bobby", "Rob", "Robert"]), + frozenset(["Matt", "Matthew", "Matty", "Mat", "Rob"]), + ], + ) + nicknames_manager = manager.nicknames_manager + self.assertEqual( + nicknames_manager.names, + [ + frozenset( + [ + "Dave", + "David", + "Davey", + "Davie", + "Bob", + "Bobby", + "Rob", + "Robert", + "Matt", + "Matthew", + "Matty", + "Mat", + ] + ), + frozenset(["John", "Jack", "Johnny", "Jonathan"]), + ], + ) + + +class NicknamesManagerTestCase(unittest.TestCase): + def test_from_jsonl_io(self) -> None: + jsonl_io = io.StringIO("""["A", "B"]\n["C", "D"]\n["E", "F"]\n""") + manager = n.NicknamesManager.from_jsonl_io(jsonl_io) + self.assertEqual( + manager.names, + [ + frozenset(["A", "B"]), + frozenset(["C", "D"]), + frozenset(["E", "F"]), + ], + ) + + def test_names(self) -> None: + manager = n.NicknamesManager( + [ + frozenset(["A", "B"]), + frozenset(["C", "D"]), + frozenset(["E", "F"]), + ], + ) + self.assertEqual( + manager.names, + [ + frozenset(["A", "B"]), + frozenset(["C", "D"]), + frozenset(["E", "F"]), + ], + ) + + def test_name_to_index(self) -> None: + manager = n.NicknamesManager( + [ + frozenset(["A", "B"]), + frozenset(["C", "D"]), + frozenset(["E", "F"]), + ], + ) + self.assertEqual( + manager.name_to_index, {"A": 0, "B": 0, "C": 1, "D": 1, "E": 2, "F": 2} + ) + + def test_get_index(self) -> None: + manager = n.NicknamesManager( + [ + frozenset(["A", "B"]), + frozenset(["C", "D"]), + frozenset(["E", "F"]), + ], + ) + self.assertEqual(manager.get_index("A"), 0) + self.assertEqual(manager.get_index("B"), 0) + self.assertEqual(manager.get_index("C"), 1) + self.assertEqual(manager.get_index("D"), 1) + self.assertEqual(manager.get_index("E"), 2) + self.assertEqual(manager.get_index("F"), 2) + self.assertIsNone(manager.get_index("G")) + + def test_get_names_for_index(self) -> None: + manager = n.NicknamesManager( + [ + frozenset(["A", "B"]), + frozenset(["C", "D"]), + frozenset(["E", "F"]), + ], + ) + self.assertEqual(manager.get_names_for_index(0), frozenset({"A", "B"})) + self.assertEqual(manager.get_names_for_index(1), frozenset({"C", "D"})) + self.assertEqual(manager.get_names_for_index(2), frozenset({"E", "F"})) + self.assertEqual(manager.get_names_for_index(3), frozenset()) + + def test_get_related_names(self) -> None: + manager = n.NicknamesManager( + [ + frozenset(["A", "B"]), + frozenset(["C", "D"]), + frozenset(["E", "F"]), + ], + ) + self.assertEqual(manager.get_related_names("A"), frozenset({"A", "B"})) + self.assertEqual(manager.get_related_names("B"), frozenset({"A", "B"})) + self.assertEqual(manager.get_related_names("C"), frozenset({"C", "D"})) + self.assertEqual(manager.get_related_names("D"), frozenset({"C", "D"})) + self.assertEqual(manager.get_related_names("E"), frozenset({"E", "F"})) + self.assertEqual(manager.get_related_names("F"), frozenset({"E", "F"})) + self.assertEqual(manager.get_related_names("G"), frozenset()) diff --git a/server/usps/__init__.py b/server/data/usps/__init__.py similarity index 100% rename from server/usps/__init__.py rename to server/data/usps/__init__.py diff --git a/server/data/usps/city_state.py b/server/data/usps/city_state.py new file mode 100644 index 0000000..195ee94 --- /dev/null +++ b/server/data/usps/city_state.py @@ -0,0 +1,7 @@ +from dataclasses import dataclass + + +@dataclass(frozen=True) +class CityState: + city: str + state: str diff --git a/server/data/usps/metros.py b/server/data/usps/metros.py new file mode 100644 index 0000000..3d90b55 --- /dev/null +++ b/server/data/usps/metros.py @@ -0,0 +1,143 @@ +""" +Preferred mappings from city name to city and state, if we are unable +to find alternative info for a contact. +""" + +from .city_state import CityState + +_METROS: dict[str, CityState] = { + "New York": CityState("New York", "NY"), + "Newark": CityState("Newark", "NJ"), + "Jersey City": CityState("Jersey City", "NJ"), + "Los Angeles": CityState("Los Angeles", "CA"), + "Long Beach": CityState("Long Beach", "CA"), + "Anaheim": CityState("Anaheim", "CA"), + "Chicago": CityState("Chicago", "IL"), + "Naperville": CityState("Naperville", "IL"), + "Elgin": CityState("Elgin", "IL"), + "Dallas": CityState("Dallas", "TX"), + "Fort Worth": CityState("Fort Worth", "TX"), + # "Arlington": CityState("Arlington", "TX"), + "Houston": CityState("Houston", "TX"), + "The Woodlands": CityState("The Woodlands", "TX"), + "Sugar Land": CityState("Sugar Land", "TX"), + "Washington": CityState("Washington", "DC"), + # "Arlington": CityState("Arlington", "VA"), + "Alexandria": CityState("Alexandria", "VA"), + "Philadelphia": CityState("Philadelphia", "PA"), + "Camden": CityState("Camden", "NJ"), + "Wilmington": CityState("Wilmington", "DE"), + "Atlanta": CityState("Atlanta", "GA"), + "Sandy Springs": CityState("Sandy Springs", "GA"), + "Alpharetta": CityState("Alpharetta", "GA"), + "Miami": CityState("Miami", "FL"), + "Fort Lauderdale": CityState("Fort Lauderdale", "FL"), + "West Palm Beach": CityState("West Palm Beach", "FL"), + "Pompano Beach": CityState("Pompano Beach", "FL"), + "Phoenix": CityState("Phoenix", "AZ"), + "Mesa": CityState("Mesa", "AZ"), + "Chandler": CityState("Chandler", "AZ"), + "Boston": CityState("Boston", "MA"), + "Cambridge": CityState("Cambridge", "MA"), + "Newton": CityState("Newton", "MA"), + "Riverside": CityState("Riverside", "CA"), + "San Bernardino": CityState("San Bernardino", "CA"), + "Ontario": CityState("Ontario", "CA"), + "San Francisco": CityState("San Francisco", "CA"), + "Oakland": CityState("Oakland", "CA"), + "Berkeley": CityState("Berkeley", "CA"), + "Detroit": CityState("Detroit", "MI"), + "Warren": CityState("Warren", "MI"), + "Dearborn": CityState("Dearborn", "MI"), + "Seattle": CityState("Seattle", "WA"), + "Tacoma": CityState("Tacoma", "WA"), + "Bellevue": CityState("Bellevue", "WA"), + "Minneapolis": CityState("Minneapolis", "MN"), + "St. Paul": CityState("St. Paul", "MN"), + "Bloomington": CityState("Bloomington", "MN"), + "Tampa": CityState("Tampa", "FL"), + "St. Petersburg": CityState("St. Petersburg", "FL"), + "Clearwater": CityState("Clearwater", "FL"), + "San Diego": CityState("San Diego", "CA"), + "Chula Vista": CityState("Chula Vista", "CA"), + "Carlsbad": CityState("Carlsbad", "CA"), + "Denver": CityState("Denver", "CO"), + "Aurora": CityState("Aurora", "CO"), + "Lakewood": CityState("Lakewood", "CO"), + "Baltimore": CityState("Baltimore", "MD"), + "Columbia": CityState("Columbia", "MD"), + "Towson": CityState("Towson", "MD"), + "St. Louis": CityState("St. Louis", "MO"), + "Orlando": CityState("Orlando", "FL"), + "Kissimmee": CityState("Kissimmee", "FL"), + "Sanford": CityState("Sanford", "FL"), + "Charlotte": CityState("Charlotte", "NC"), + "Concord": CityState("Concord", "NC"), + "Gastonia": CityState("Gastonia", "NC"), + "San Antonio": CityState("San Antonio", "TX"), + "New Braunfels": CityState("New Braunfels", "TX"), + "Portland": CityState("Portland", "OR"), + "Vancouver": CityState("Vancouver", "WA"), + "Hillsboro": CityState("Hillsboro", "OR"), + "Austin": CityState("Austin", "TX"), + "Round Rock": CityState("Round Rock", "TX"), + "Georgetown": CityState("Georgetown", "TX"), + "Sacramento": CityState("Sacramento", "CA"), + "Roseville": CityState("Roseville", "CA"), + "Folsom": CityState("Folsom", "CA"), + "Pittsburgh": CityState("Pittsburgh", "PA"), + "Las Vegas": CityState("Las Vegas", "NV"), + "Henderson": CityState("Henderson", "NV"), + "Paradise": CityState("Paradise", "NV"), + "Cincinnati": CityState("Cincinnati", "OH"), + # "Kansas City": CityState("Kansas City", "MO"), + "Columbus": CityState("Columbus", "OH"), + "Indianapolis": CityState("Indianapolis", "IN"), + "Carmel": CityState("Carmel", "IN"), + "Anderson": CityState("Anderson", "IN"), + "Cleveland": CityState("Cleveland", "OH"), + "Elyria": CityState("Elyria", "OH"), + "Nashville": CityState("Nashville", "TN"), + "Davidson": CityState("Davidson", "TN"), + "Murfreesboro": CityState("Murfreesboro", "TN"), + "Franklin": CityState("Franklin", "TN"), + "San Jose": CityState("San Jose", "CA"), + "Sunnyvale": CityState("Sunnyvale", "CA"), + "Santa Clara": CityState("Santa Clara", "CA"), + "Virginia Beach": CityState("Virginia Beach", "VA"), + "Norfolk": CityState("Norfolk", "VA"), + "Newport News": CityState("Newport News", "VA"), + "Jacksonville": CityState("Jacksonville", "FL"), + "Providence": CityState("Providence", "RI"), + "Warwick": CityState("Warwick", "RI"), + "Milwaukee": CityState("Milwaukee", "WI"), + "Waukesha": CityState("Waukesha", "WI"), + "Raleigh": CityState("Raleigh", "NC"), + "Durham": CityState("Durham", "NC"), + "Cary": CityState("Cary", "NC"), + "Oklahoma City": CityState("Oklahoma City", "OK"), + "Richmond": CityState("Richmond", "VA"), + "Memphis": CityState("Memphis", "TN"), + "Louisville": CityState("Louisville", "KY"), + "Salt Lake City": CityState("Salt Lake City", "UT"), + "New Orleans": CityState("New Orleans", "LA"), + "Baton Rouge": CityState("Baton Rouge", "LA"), + "Metairie": CityState("Metairie", "LA"), + "Hartford": CityState("Hartford", "CT"), + "East Hartford": CityState("East Hartford", "CT"), + "Middletown": CityState("Middletown", "CT"), + "Buffalo": CityState("Buffalo", "NY"), + "Cheektowaga": CityState("Cheektowaga", "NY"), + "Birmingham": CityState("Birmingham", "AL"), + "Hoover": CityState("Hoover", "AL"), +} + + +class MajorMetros: + """Simple tool to look up preferred city and state for a given city name.""" + + @classmethod + def for_city(cls, city: str) -> CityState | None: + """Return the preferred city and state for the given city name.""" + cs = _METROS.get(city.title()) + return CityState(cs.city.upper(), cs.state.upper()) if cs else None diff --git a/server/data/usps/test_metros.py b/server/data/usps/test_metros.py new file mode 100644 index 0000000..775c091 --- /dev/null +++ b/server/data/usps/test_metros.py @@ -0,0 +1,17 @@ +# ruff: noqa: D102 +from unittest import TestCase + +from .city_state import CityState +from .metros import MajorMetros + + +class ForCityTestCase(TestCase): + def test_seattle(self): + self.assertEqual(MajorMetros.for_city("Seattle"), CityState("SEATTLE", "WA")) + + def test_case_inesensitive(self): + self.assertEqual(MajorMetros.for_city("seattle"), CityState("SEATTLE", "WA")) + self.assertEqual(MajorMetros.for_city("SEATTLE"), CityState("SEATTLE", "WA")) + + def test_nothing(self): + self.assertIsNone(MajorMetros.for_city("Nothing")) diff --git a/server/usps/test_zipcode.py b/server/data/usps/test_zipcode.py similarity index 57% rename from server/usps/test_zipcode.py rename to server/data/usps/test_zipcode.py index 1345a50..2e2af96 100644 --- a/server/usps/test_zipcode.py +++ b/server/data/usps/test_zipcode.py @@ -19,7 +19,7 @@ class ZipCodeManagerTestCase(TestCase): def setUp(self): self.data = io.StringIO(FAKE_CSV_DATA) - self.zip_code_manager = z.ZipCodeManager(self.data) + self.zip_code_manager = z.ZipCodeManager.from_csv_io(self.data) self.new_york = z.CityState("NEW YORK", "NY") self.seattle = z.CityState("SEATTLE", "WA") @@ -35,3 +35,17 @@ def test_zip5_to_city(self): self.assertEqual(len(self.zip_code_manager.zip5_to_city), 6) self.assertEqual(self.zip_code_manager.zip5_to_city["12345"], self.new_york) self.assertEqual(self.zip_code_manager.zip5_to_city["98101"], self.seattle) + + def test_get_zip_codes(self): + self.assertEqual(len(self.zip_code_manager.get_zip_codes(self.new_york)), 1) + self.assertEqual(len(self.zip_code_manager.get_zip_codes(self.seattle)), 5) + self.assertEqual(len(self.zip_code_manager.get_zip_codes("seattle")), 5) + self.assertEqual(len(self.zip_code_manager.get_zip_codes("nowhere")), 0) + + def test_get_city_state(self): + self.assertEqual(self.zip_code_manager.get_city_state("12345"), self.new_york) + self.assertEqual(self.zip_code_manager.get_city_state("98101"), self.seattle) + + def test_get_city_state_not_found(self): + self.assertIsNone(self.zip_code_manager.get_city_state("00000")) + self.assertIsNone(self.zip_code_manager.get_city_state("99999")) diff --git a/server/usps/zipcode.py b/server/data/usps/zipcode.py similarity index 67% rename from server/usps/zipcode.py rename to server/data/usps/zipcode.py index a902d95..da61533 100644 --- a/server/usps/zipcode.py +++ b/server/data/usps/zipcode.py @@ -3,14 +3,11 @@ import typing as t from dataclasses import dataclass -from server.data import DataManager +from server.data.manager import DataManager from server.utils.validations import validate_extant_file - -@dataclass(frozen=True) -class CityState: - city: str - state: str +from .city_state import CityState +from .metros import MajorMetros @dataclass(frozen=True) @@ -31,34 +28,37 @@ class ZipCodeManager: _city_to_zip_codes: dict[CityState, set[ZipCode]] | None _zip5_to_city: dict[str, CityState] | None - def __init__(self, data: t.TextIO) -> None: - self._load_zip_codes(data) + def __init__(self, zip_codes: t.Sequence[ZipCode]) -> None: + self._zip_codes = list(zip_codes) self._city_to_zip_codes = None self._zip5_to_city = None + @classmethod + def from_csv_io(cls, io: t.TextIO) -> "ZipCodeManager": + """Return a ZipCodeManager with the given io stream.""" + zip_codes = [] + reader = csv.DictReader(io) + for row in reader: + zip_code = ZipCode( + zip5=row["PHYSICAL ZIP"], + city=row["PHYSICAL CITY"].upper().strip(), + state=row["PHYSICAL STATE"].upper().strip(), + ) + zip_codes.append(zip_code) + return cls(zip_codes) + @classmethod def from_path(cls, path: str | pathlib.Path) -> "ZipCodeManager": """Return a ZipCodeManager with the given path.""" path = validate_extant_file(pathlib.Path(path)) with open(path) as f: - return cls(f) + return cls.from_csv_io(f) @classmethod def from_data_manager(cls, data_manager: DataManager) -> "ZipCodeManager": """Return a ZipCodeManager with the same path as the given DataManager.""" return cls.from_path(data_manager.path / "usps" / "unique-zips.csv") - def _load_zip_codes(self, data: t.TextIO) -> None: - self._zip_codes = [] - reader = csv.DictReader(data) - for row in reader: - zip_code = ZipCode( - zip5=row["PHYSICAL ZIP"], - city=row["PHYSICAL CITY"], - state=row["PHYSICAL STATE"], - ) - self._zip_codes.append(zip_code) - def _index_cities(self) -> None: assert self._city_to_zip_codes is None self._city_to_zip_codes = {} @@ -83,12 +83,12 @@ def _index_zip5s_if_needed(self) -> None: self._index_zip5s() @property - def zip_codes(self) -> list[ZipCode]: + def zip_codes(self) -> t.Sequence[ZipCode]: """Return a list of all unique ZIP codes.""" return self._zip_codes @property - def city_to_zip_codes(self) -> dict[CityState, set[ZipCode]]: + def city_to_zip_codes(self) -> t.Mapping[CityState, set[ZipCode]]: """ Return a dict mapping each city to a set of all unique ZIP codes in that city. @@ -98,8 +98,20 @@ def city_to_zip_codes(self) -> dict[CityState, set[ZipCode]]: return self._city_to_zip_codes @property - def zip5_to_city(self) -> dict[str, CityState]: + def zip5_to_city(self) -> t.Mapping[str, CityState]: """Return a dict mapping each ZIP5 to the city and state it belongs to.""" self._index_zip5s_if_needed() assert self._zip5_to_city is not None return self._zip5_to_city + + def get_zip_codes(self, city: str | CityState | None) -> set[ZipCode]: + """Return a set of all unique ZIP codes in the given city.""" + if isinstance(city, str): + city = MajorMetros.for_city(city) + if city is None: + return set() + return self.city_to_zip_codes.get(city, set()) + + def get_city_state(self, zip5: str) -> CityState | None: + """Return the city and state for the given ZIP5.""" + return self.zip5_to_city.get(zip5) diff --git a/server/utils/test_validations.py b/server/utils/test_validations.py index 4a60ee4..39a25bd 100644 --- a/server/utils/test_validations.py +++ b/server/utils/test_validations.py @@ -1,6 +1,7 @@ # ruff: noqa: D102 import pathlib import tempfile +from decimal import Decimal from unittest import TestCase from . import validations as v @@ -36,6 +37,96 @@ def test_validate_str_or_none_raises(self): v.validate_str_or_none(42) +class DictValidationTestCase(TestCase): + def test_is_dict_true(self): + self.assertTrue(v.is_dict({"foo": "bar"})) + + def test_is_dict_false(self): + self.assertFalse(v.is_dict(42)) + + def test_validate_dict(self): + self.assertEqual(v.validate_dict({"foo": "bar"}), {"foo": "bar"}) + + def test_validate_dict_raises(self): + with self.assertRaises(v.ValidationError): + v.validate_dict(42) + + +class DecimalValidationTestCase(TestCase): + def test_validate_convert_decimal_str(self): + self.assertEqual(v.validate_convert_decimal("42"), Decimal("42")) + + def test_validate_convert_decimal_decimal(self): + self.assertEqual(v.validate_convert_decimal(Decimal("42")), Decimal("42")) + + def test_validate_convert_decimal_int(self): + self.assertEqual(v.validate_convert_decimal(42), Decimal("42")) + + def test_validate_convert_decimal_invalid(self): + with self.assertRaises(v.ValidationError): + v.validate_convert_decimal("foo") + + +class DictContentValidationTestCase(TestCase): + def test_get_str_true(self): + self.assertEqual(v.get_str({"foo": "bar"}, "foo"), "bar") + + def test_get_str_false_key_not_found(self): + with self.assertRaises(v.ValidationError): + v.get_str({"foo": "bar"}, "baz") + + def test_get_str_false_value_not_str(self): + with self.assertRaises(v.ValidationError): + v.get_str({"foo": 42}, "foo") + + def test_get_optional_str_true(self): + self.assertEqual(v.get_optional_str({"foo": "bar"}, "foo"), "bar") + self.assertEqual(v.get_optional_str({}, "foo"), None) + + def test_get_optional_str_false_value_not_str(self): + with self.assertRaises(v.ValidationError): + v.get_optional_str({"foo": 42}, "foo") + + def test_get_str_or_none_true(self): + self.assertEqual(v.get_str_or_none({"foo": "bar"}, "foo"), "bar") + self.assertEqual(v.get_str_or_none({"foo": None}, "foo"), None) + + def test_get_str_or_none_false_key_not_found(self): + with self.assertRaises(v.ValidationError): + v.get_str_or_none({"foo": "bar"}, "baz") + + def test_get_str_or_none_false_value_not_str(self): + with self.assertRaises(v.ValidationError): + v.get_str_or_none({"foo": 42}, "foo") + + def test_get_convert_decimal_true_str(self): + self.assertEqual(v.get_convert_decimal({"foo": "42"}, "foo"), Decimal("42")) + + def test_get_convert_decimal_true_decimal(self): + self.assertEqual( + v.get_convert_decimal({"foo": Decimal("42")}, "foo"), Decimal("42") + ) + + def test_get_convert_decimal_false_inalid(self): + with self.assertRaises(v.ValidationError): + v.get_convert_decimal({"foo": "wakka"}, "foo") + + def test_get_convert_decimal_false_key_not_found(self): + with self.assertRaises(v.ValidationError): + v.get_convert_decimal({"foo": "42"}, "baz") + + def test_get_dict_true(self): + self.assertEqual(v.get_dict({"foo": {"bar": "baz"}}, "foo"), {"bar": "baz"}) + + def test_get_dict_false_key_not_found(self): + with self.assertRaises(v.ValidationError): + v.get_dict({"foo": {"bar": "baz"}}, "baz") + + def test_get_dict_false_value_not_dict(self): + with self.assertRaises(v.ValidationError): + v.get_dict({"foo": 42}, "foo") + + class DirValidationTestCase(TestCase): def test_is_extant_dir_true(self): with tempfile.TemporaryDirectory() as temp_dir: diff --git a/server/utils/validations.py b/server/utils/validations.py index 44a756a..a8b8870 100644 --- a/server/utils/validations.py +++ b/server/utils/validations.py @@ -1,4 +1,10 @@ import pathlib +import typing as t +from decimal import Decimal + +# CONSIDER: I'm using these for now, but (for instance) pydantic +# or attrs both provide pretty comprehensive tools here. (I sorta like +# the explicitness of this, though, maybe?) class ValidationError(Exception): @@ -7,23 +13,36 @@ class ValidationError(Exception): pass -def is_str(value: object) -> bool: +# +# Basic type validations +# + + +def is_str(value: t.Any) -> bool: """Return True if the value is a string.""" return isinstance(value, str) -def validate_str(value: object) -> str: +def validate_str(value: t.Any) -> str: """Return the value if it is a string, otherwise raise an exception.""" if isinstance(value, str): return value raise ValidationError(f"Expected a string, got {value}") -def is_str_or_none(value: object) -> bool: +def is_str_or_none(value: t.Any) -> bool: """Return True if the value is a string or None.""" return value is None or isinstance(value, str) +def validate_convert_decimal(value: t.Any) -> Decimal: + """Validate a string or decimal, converting the string to a decimal.""" + try: + return Decimal(value) + except Exception: + raise ValidationError(f"Expected convertible to decimal, got {value}") from None + + def validate_str_or_none(value: object) -> str | None: """Return the value if it is a string or None, otherwise raise an exception.""" if value is None or isinstance(value, str): @@ -31,6 +50,78 @@ def validate_str_or_none(value: object) -> str | None: raise ValidationError(f"Expected a string or None, got {value}") +def is_dict(value: object) -> bool: + """Return True if the value is a dict.""" + return isinstance(value, dict) + + +def validate_dict(value: object) -> dict: + """Return the value if it is a dict, otherwise raise an exception.""" + if isinstance(value, dict): + return value + raise ValidationError(f"Expected a dict, got {value}") + + +# +# Dict content validations +# + + +def get_str(d: dict, key: str) -> str: + """ + Return the value for `key` in `d` if it is a string, + otherwise raise an exception. + """ + if key not in d: + raise ValidationError(f"Key '{key}' not found in {d}") + return validate_str(d[key]) + + +def get_optional_str(d: dict, key: str) -> str | None: + """ + Return the value for `key` in `d` if it is a string, + otherwise raise an exception. + """ + if key not in d: + return None + return validate_str(d[key]) + + +def get_str_or_none(d: dict, key: str) -> str | None: + """ + Return the value for `key` in `d` if it is a string or None, + otherwise raise an exception. + """ + if key not in d: + raise ValidationError(f"Key '{key}' not found in {d}") + return validate_str_or_none(d[key]) + + +def get_convert_decimal(d: dict, key: str) -> Decimal: + """ + Return the value for `key` in `d` if it is a string or decimal, + otherwise raise an exception. + """ + if key not in d: + raise ValidationError(f"Key '{key}' not found in {d}") + return validate_convert_decimal(d[key]) + + +def get_dict(d: dict, key: str) -> dict: + """ + Return the value for `key` in `d` if it is a `dict`, otherwise + raise an exception. + """ + if key not in d: + raise ValidationError(f"Key '{key}' not found in {d}") + return validate_dict(d[key]) + + +# +# Path validations +# + + def is_extant_dir(path: pathlib.Path) -> bool: """Return True if the path exists and is a directory.""" path = path.resolve()