diff --git a/server/data/fec/committees.py b/server/data/fec/committees.py new file mode 100644 index 0000000..73aaa1a --- /dev/null +++ b/server/data/fec/committees.py @@ -0,0 +1,245 @@ +""" +Support reading FEC committee master file content. + +You can download per-election-cycle committee master files from: +https://www.fec.gov/data/browse-data/?tab=bulk-data + +The schema for the committee master file is available at: +https://www.fec.gov/campaign-finance-data/committee-master-file-description/ +""" +import csv +import json +import pathlib +import typing as t +from dataclasses import dataclass + +from server.data.manager import DataManager +from server.utils import validations as v + + +class CommitteeTypeCode: + """ + Committee type codes. + + See: + https://www.fec.gov/campaign-finance-data/committee-type-code-descriptions/ + """ + + COMMUNICATION_COST = "C" + DELEGATE_COMMITTEE = "D" + ELECTIONEERING_COMMUNICATION = "E" + HOUSE = "H" + INDEPEDENT_PERSON_OR_GROUP = "I" + PAC_NONQUALIFIED = "N" + INDEPEDENT_SUPER_PAC = "O" + PRESIDENTIAL = "P" + PAC_QUALIFIED = "Q" + SENATE = "S" + SINGLE_CANDIDATE_INDEPENDENT_EXPENDITURE = "U" + HYBRID_PAC_NONQUALIFIED = "V" + HYBRID_PAC_QUALIFIED = "W" + PARTY_NONQUALIFIED = "X" + PARTY_QUALIFIED = "Y" + NATIONAL_PARTY_NONFEDERAL = "Z" + + @classmethod + def name_for_code(cls, code: str) -> str | None: + """Return the name for the given committee type code.""" + for attr in dir(CommitteeTypeCode): + if not attr.startswith("__"): + if getattr(CommitteeTypeCode, attr) == code: + return attr.replace("_", " ").title() + return None + + +class CommitteeColumns: + ID = 0 # CMTE_ID + NAME = 1 # CMTE_NM + TREASURER_NAME = 2 # TRES_NM + STREET_1 = 3 # CMTE_ST1 + STREET_2 = 4 # CMTE_ST2 + CITY = 5 # CMTE_CITY + STATE = 6 # CMTE_ST + ZIP_CODE = 7 # CMTE_ZIP + DESIGNATION = 8 # CMTE_DSGN + TYPE = 9 # CMTE_TP + PARTY = 10 # CMTE_PTY_AFFILIATION + ORG_TYPE = 11 # ORG_TP + CONNECTED_ORG_NAME = 12 # CONNECTED_ORG_NM + CANDIDATE_ID = 13 # CAND_ID + + +class Party: + """ + Political party codes. + + For an (incredibly) exhaustive list, see: + https://www.fec.gov/campaign-finance-data/party-code-descriptions/ + """ + + REPUBLICAN = "REP" + DEMOCRAT = "DEM" + INDEPENDENT = "IND" + LIBERTARIAN = "LIB" + GREEN = "GRE" + UNKNOWN = "UNK" # We specifically ignore this/convert to None + + @classmethod + def name_for_code(cls, code: str) -> str | None: + """Return the name for the given party code.""" + for attr in dir(Party): + if not attr.startswith("__"): + if getattr(Party, attr) == code: + return attr.title() + return None + + +@dataclass(frozen=True) +class Committee: + """Our simplification of the committee record.""" + + id: str + name: str + party: str | None + candidate_id: str | None + + @classmethod + def from_committee_row(cls, row: t.Sequence[str]) -> "Committee": + """Create a committee from a row of the committee master file.""" + data = { + "id": row[CommitteeColumns.ID].strip(), + "name": row[CommitteeColumns.NAME].strip(), + } + party = row[CommitteeColumns.PARTY].strip().upper() + if party and party != Party.UNKNOWN: + data["party"] = party + candidate_id = row[CommitteeColumns.CANDIDATE_ID].strip() + if candidate_id: + data["candidate_id"] = candidate_id + return cls.from_data(data) + + @classmethod + def from_data(cls, value: t.Any) -> "Committee": + """Create a committee from arbitrary data, or raise an exception.""" + data = v.validate_dict(value) + return cls( + id=v.get_str(data, "id"), + name=v.get_str(data, "name"), + party=v.get_optional_str(data, "party"), + candidate_id=v.get_optional_str(data, "candidate_id"), + ) + + def to_data(self) -> dict: + """Return a dict representation of the committee.""" + data = { + "id": self.id, + "name": self.name, + } + if self.party is not None: + data["party"] = self.party + if self.candidate_id is not None: + data["candidate_id"] = self.candidate_id + return data + + +class CommitteeManager: + """Manages a collection of committees.""" + + _committees: list[Committee] + _id_to_committee: dict[str, Committee] | None + + def __init__(self, committees: t.Iterable[Committee]) -> None: + """Create a committee manager.""" + self._committees = list(committees) + self._id_to_committee = None + + @classmethod + def from_csv_io(cls, io: t.TextIO) -> "CommitteeManager": + """Create a committee manager from a CSV file.""" + reader = csv.reader(io, delimiter="|") + return cls(Committee.from_committee_row(row) for row in reader) + + @classmethod + def from_csv_path(cls, path: pathlib.Path) -> "CommitteeManager": + """Create a committee manager from a CSV file.""" + path = v.validate_extant_file(path) + with path.open() as file: + return cls.from_csv_io(file) + + @classmethod + def from_csv_data_manager( + cls, data_manager: "DataManager", year: int = 2020 + ) -> "CommitteeManager": + """Create a committee manager from a data manager.""" + return cls.from_csv_path(data_manager.path / "fec" / f"committees-{year}.csv") + + @classmethod + def from_jsonl_io(cls, io: t.TextIO) -> "CommitteeManager": + """Create a committee manager from a json-lines file.""" + return cls(Committee.from_data(json.loads(line)) for line in io) + + @classmethod + def from_jsonl_path(cls, path: pathlib.Path) -> "CommitteeManager": + """Create a committee manager from a json-lines file.""" + path = v.validate_extant_file(path) + with path.open() as file: + return cls.from_jsonl_io(file) + + @classmethod + def from_jsonl_data_manager( + cls, data_manager: "DataManager", year: int = 2020 + ) -> "CommitteeManager": + """Create a committee manager from a data manager.""" + return cls.from_jsonl_path( + data_manager.path / "fec" / f"committees-{year}.jsonl" + ) + + def to_data_lines(self) -> t.Iterable[dict]: + """Convert to a list of json-serializable objects.""" + return (committee.to_data() for committee in self._committees) + + def to_jsonl_io(self, io: t.TextIO) -> None: + """Write to a json file.""" + for data_line in self.to_data_lines(): + io.write(json.dumps(data_line)) + io.write("\n") + + def to_jsonl_path(self, path: pathlib.Path) -> None: + """Write to a json file.""" + with path.open("wt") as output_file: + self.to_jsonl_io(output_file) + + def to_jsonl_data_manager( + self, data_manager: "DataManager", year: int = 2020 + ) -> None: + """Write to a json file.""" + self.to_jsonl_path(data_manager.path / "fec" / f"committees-{year}.jsonl") + + def _index_committees(self) -> None: + """Index the committees by id.""" + assert self._id_to_committee is None + self._id_to_committee = {} + for committee in self._committees: + assert committee.id not in self._id_to_committee + self._id_to_committee[committee.id] = committee + + def _index_committees_if_needed(self) -> None: + """Index the committees by id if needed.""" + if self._id_to_committee is None: + self._index_committees() + + @property + def committees(self) -> t.Sequence[Committee]: + """Get the list of committees.""" + return self._committees + + @property + def id_to_committee(self) -> t.Mapping[str, Committee]: + """Get the mapping from id to committee.""" + self._index_committees_if_needed() + assert self._id_to_committee is not None + return self._id_to_committee + + def get_committee(self, id: str) -> Committee | None: + """Get the committee with the given id, or None.""" + return self.id_to_committee.get(id) diff --git a/server/data/fec/test_committees.py b/server/data/fec/test_committees.py new file mode 100644 index 0000000..699845e --- /dev/null +++ b/server/data/fec/test_committees.py @@ -0,0 +1,168 @@ +# ruff: noqa: E501 D102 + +import io +import unittest + +from server.utils.validations import ValidationError + +from . import committees as c + +RAW_CSV_DATA = """\ +C00000059|HALLMARK CARDS PAC|SARAH MOE|2501 MCGEE|MD #500|KANSAS CITY|MO|64108|U|Q|UNK|M|C|| +C00000422|AMERICAN MEDICAL ASSOCIATION POLITICAL ACTION COMMITTEE|WALKER, KEVIN MR.|25 MASSACHUSETTS AVE, NW|SUITE 600|WASHINGTON|DC|200017400|B|Q||M||DELAWARE MEDICAL PAC| +C00000489|D R I V E POLITICAL FUND CHAPTER 886|JERRY SIMS JR|3528 W RENO||OKLAHOMA CITY|OK|73107|U|N||Q|L|| +C00000547|KANSAS MEDICAL SOCIETY POLITICAL ACTION COMMITTEE|JERRY SLAUGHTER|623 SW 10TH AVE||TOPEKA|KS|666121627|U|Q|UNK|Q|M|KANSAS MEDICAL SOCIETY| +C00000638|INDIANA STATE MEDICAL ASSOCIATION POLITICAL ACTION COMMITTEE|ACHENBACH, GRANT MR.|322 CANAL WALK, CANAL LEVEL||INDIANAPOLIS|IN|46202|U|Q||T|M|| +C00000729|AMERICAN DENTAL ASSOCIATION POLITICAL ACTION COMMITTEE|BARNES, BRAD W DR.|1111 14TH STREET, NW|SUITE 1100|WASHINGTON|DC|200055627|B|Q|UNK|M|M|INDIANA DENTAL PAC| +C00000885|INTERNATIONAL UNION OF PAINTERS AND ALLIED TRADES POLITICAL ACTION TOGETHER POLITICAL COMMITTEE|GALIS, GEORGE|7234 PARKWAY DRIVE||HANOVER|MD|21076|B|Q|UNK|M|L|INTERNATIONAL UNION OF PAINTERS AND ALLIED TRADES| +C00000901|BUILD POLITICAL ACTION COMMITTEE OF THE NATIONAL ASSOCIATION OF HOME BUILDERS (BUILDPAC)|RAMAGE, EILEEN|1201 15TH STREET, NW||WASHINGTON|DC|20005|B|Q|UNK|M|T|NATIONAL ASSOCIATION OF HOME BUILDERS| +C00000935|DCCC|GUINN, LUCINDA|430 SOUTH CAPITOL STREET, SE|2ND FLOOR|WASHINGTON|DC|200034024|U|Y|DEM|M||| +C00000984|UNITED STATES TELECOM ASSOCIATION POLITICAL ACTION COMMITTEE (TELECOMPAC)|HEINER, BRANDON|601 NEW JERSEY AVE NW|STE 600|WASHINGTON|DC|20001|B|Q|UNK|M|T|UNITED STATES TELECOM ASSOCIATION| +""" + + +class CommitteeTypeCodeTestCase(unittest.TestCase): + def test_name_for_code(self): + self.assertEqual( + c.CommitteeTypeCode.name_for_code(c.CommitteeTypeCode.COMMUNICATION_COST), + "Communication Cost", + ) + + def test_name_for_code_none(self): + self.assertEqual(c.CommitteeTypeCode.name_for_code("NOPE"), None) + + +class PartyTestCase(unittest.TestCase): + def test_name_for_code(self): + self.assertEqual(c.Party.name_for_code(c.Party.DEMOCRAT), "Democrat") + + def test_name_for_code_none(self): + self.assertEqual(c.Party.name_for_code("NOPE"), None) + + +class CommitteeTestCase(unittest.TestCase): + def test_from_data_id_name(self): + """Test that we can create a committee from data.""" + data = {"id": "id", "name": "name"} + committee = c.Committee.from_data(data) + self.assertEqual(committee.id, "id") + self.assertEqual(committee.name, "name") + self.assertIsNone(committee.party) + self.assertIsNone(committee.candidate_id) + + def test_from_data_all(self): + """Test that we can create a committee from data.""" + data = { + "id": "id", + "name": "name", + "party": "party", + "candidate_id": "candidate_id", + } + committee = c.Committee.from_data(data) + self.assertEqual(committee.id, "id") + self.assertEqual(committee.name, "name") + self.assertEqual(committee.party, "party") + self.assertEqual(committee.candidate_id, "candidate_id") + + def test_from_data_invalid(self): + """Test that we can create a committee from data.""" + data = {"id": "id", "name": "name", "party": 42, "candidate_id": None} + with self.assertRaises(ValidationError): + c.Committee.from_data(data) + + def test_to_data(self): + """Test that we can create a committee from data.""" + committee = c.Committee("id", "name", "party", "candidate_id") + data = committee.to_data() + self.assertEqual(data["id"], "id") + self.assertEqual(data["name"], "name") + self.assertEqual(data["party"], "party") + self.assertEqual(data["candidate_id"], "candidate_id") + + def test_to_data_missing(self): + """Test that we can create a committee from data.""" + committee = c.Committee("id", "name", None, None) + data = committee.to_data() + self.assertEqual(data["id"], "id") + self.assertEqual(data["name"], "name") + self.assertFalse("party" in data) + self.assertFalse("candidate_id" in data) + + def test_from_committee_row(self): + """Test that we can create a committee from a row.""" + row = [ + "C00000059", + "HALLMARK CARDS PAC", + "SARAH MOE", + "2501 MCGEE", + "MD #500", + "KANSAS CITY", + "MO", + "64108", + "U", + "Q", + "UNK", + "M", + "C", + "CRUNK", + ] + committee = c.Committee.from_committee_row(row) + self.assertEqual(committee.id, "C00000059") + self.assertEqual(committee.name, "HALLMARK CARDS PAC") + self.assertIsNone(committee.party) + self.assertEqual(committee.candidate_id, "CRUNK") + + +class CommitteeManagerTestCase(unittest.TestCase): + def setUp(self): + self.example_committees = [ + c.Committee("id1", "name1", "party1", "candidate_id1"), + c.Committee("id2", "name2", "party2", "candidate_id2"), + c.Committee("id3", "name3", None, None), + ] + + def test_committees(self): + """Test that we can create a committee manager.""" + manager = c.CommitteeManager(self.example_committees) + self.assertEqual(len(manager.committees), len(self.example_committees)) + + def test_id_to_committees(self): + """Test that we can create a committee manager.""" + manager = c.CommitteeManager(self.example_committees) + self.assertEqual( + manager.id_to_committee, + { + "id1": self.example_committees[0], + "id2": self.example_committees[1], + "id3": self.example_committees[2], + }, + ) + + def test_get_committee(self): + """Test that we can create a committee manager.""" + manager = c.CommitteeManager(self.example_committees) + self.assertEqual(manager.get_committee("id1"), self.example_committees[0]) + self.assertEqual(manager.get_committee("id2"), self.example_committees[1]) + self.assertEqual(manager.get_committee("id3"), self.example_committees[2]) + self.assertIsNone(manager.get_committee("id4")) + + def test_jsonl_io(self): + manager = c.CommitteeManager(self.example_committees) + writable = io.StringIO() + manager.to_jsonl_io(writable) + readable = io.StringIO(writable.getvalue()) + manager2 = c.CommitteeManager.from_jsonl_io(readable) + self.assertEqual(manager.committees, manager2.committees) + + def test_csv_io(self): + readable = io.StringIO(RAW_CSV_DATA) + manager = c.CommitteeManager.from_csv_io(readable) + self.assertEqual(len(manager.committees), 10) + committee = manager.get_committee("C00000059") + self.assertIsNotNone(committee) + assert committee is not None + self.assertEqual(committee.id, "C00000059") + self.assertEqual(committee.name, "HALLMARK CARDS PAC") + self.assertIsNone(committee.party) + self.assertIsNone(committee.candidate_id) + self.assertIsNone(manager.get_committee("NOPE")) diff --git a/server/data/names/nicknames.py b/server/data/names/nicknames.py index e08600e..6bee8e2 100644 --- a/server/data/names/nicknames.py +++ b/server/data/names/nicknames.py @@ -164,16 +164,26 @@ def from_data_manager(cls, data_manager: DataManager) -> "NicknamesManager": """Create a manager from a data manager.""" return cls.from_path(data_manager.path / "names" / "nicknames.json") - def to_jsonl(self) -> t.Iterable[list[str]]: + def to_data_lines(self) -> t.Iterable[list[str]]: """Convert to a json-serializable object.""" - return [list(names) for names in self.names] + return (list(names) for names in self.names) - def to_io(self, io: t.TextIO) -> None: + def to_jsonl_io(self, io: t.TextIO) -> None: """Write to a json file.""" - for json_line in self.to_jsonl(): - io.write(json.dumps(json_line)) + for data_line in self.to_data_lines(): + io.write(json.dumps(data_line)) io.write("\n") + def to_jsonl_path(self, path: str | pathlib.Path) -> None: + """Write to a json file.""" + path = pathlib.Path(path) + with path.open("wt") as output_file: + self.to_jsonl_io(output_file) + + def to_jsonl_data_manager(self, data_manager: DataManager) -> None: + """Write to a json file.""" + self.to_jsonl_path(data_manager.path / "names" / "nicknames.json") + def _index_names(self) -> None: """Index the merged names.""" self._name_to_index = {} diff --git a/server/utils/test_validations.py b/server/utils/test_validations.py index 4a60ee4..9300fa2 100644 --- a/server/utils/test_validations.py +++ b/server/utils/test_validations.py @@ -36,6 +36,54 @@ def test_validate_str_or_none_raises(self): v.validate_str_or_none(42) +class DictValidationTestCase(TestCase): + def test_is_dict_true(self): + self.assertTrue(v.is_dict({"foo": "bar"})) + + def test_is_dict_false(self): + self.assertFalse(v.is_dict(42)) + + def test_validate_dict(self): + self.assertEqual(v.validate_dict({"foo": "bar"}), {"foo": "bar"}) + + def test_validate_dict_raises(self): + with self.assertRaises(v.ValidationError): + v.validate_dict(42) + + +class DictContentValidationTestCase(TestCase): + def test_get_str_true(self): + self.assertEqual(v.get_str({"foo": "bar"}, "foo"), "bar") + + def test_get_str_false_key_not_found(self): + with self.assertRaises(v.ValidationError): + v.get_str({"foo": "bar"}, "baz") + + def test_get_str_false_value_not_str(self): + with self.assertRaises(v.ValidationError): + v.get_str({"foo": 42}, "foo") + + def test_get_optional_str_true(self): + self.assertEqual(v.get_optional_str({"foo": "bar"}, "foo"), "bar") + self.assertEqual(v.get_optional_str({}, "foo"), None) + + def test_get_optional_str_false_value_not_str(self): + with self.assertRaises(v.ValidationError): + v.get_optional_str({"foo": 42}, "foo") + + def test_get_str_or_none_true(self): + self.assertEqual(v.get_str_or_none({"foo": "bar"}, "foo"), "bar") + self.assertEqual(v.get_str_or_none({"foo": None}, "foo"), None) + + def test_get_str_or_none_false_key_not_found(self): + with self.assertRaises(v.ValidationError): + v.get_str_or_none({"foo": "bar"}, "baz") + + def test_get_str_or_none_false_value_not_str(self): + with self.assertRaises(v.ValidationError): + v.get_str_or_none({"foo": 42}, "foo") + + class DirValidationTestCase(TestCase): def test_is_extant_dir_true(self): with tempfile.TemporaryDirectory() as temp_dir: diff --git a/server/utils/validations.py b/server/utils/validations.py index 44a756a..3f72efe 100644 --- a/server/utils/validations.py +++ b/server/utils/validations.py @@ -7,6 +7,11 @@ class ValidationError(Exception): pass +# +# Basic type validations +# + + def is_str(value: object) -> bool: """Return True if the value is a string.""" return isinstance(value, str) @@ -31,6 +36,58 @@ def validate_str_or_none(value: object) -> str | None: raise ValidationError(f"Expected a string or None, got {value}") +def is_dict(value: object) -> bool: + """Return True if the value is a dict.""" + return isinstance(value, dict) + + +def validate_dict(value: object) -> dict: + """Return the value if it is a dict, otherwise raise an exception.""" + if isinstance(value, dict): + return value + raise ValidationError(f"Expected a dict, got {value}") + + +# +# Dict content validations +# + + +def get_str(d: dict, key: str) -> str: + """ + Return the value for `key` in `d` if it is a string, + otherwise raise an exception. + """ + if key not in d: + raise ValidationError(f"Key '{key}' not found in {d}") + return validate_str(d[key]) + + +def get_optional_str(d: dict, key: str) -> str | None: + """ + Return the value for `key` in `d` if it is a string, + otherwise raise an exception. + """ + if key not in d: + return None + return validate_str(d[key]) + + +def get_str_or_none(d: dict, key: str) -> str | None: + """ + Return the value for `key` in `d` if it is a string or None, + otherwise raise an exception. + """ + if key not in d: + raise ValidationError(f"Key '{key}' not found in {d}") + return validate_str_or_none(d[key]) + + +# +# Path validations +# + + def is_extant_dir(path: pathlib.Path) -> bool: """Return True if the path exists and is a directory.""" path = path.resolve()