Skip to content

Commit

Permalink
Build custom sqlite database for fast fuzzy indexing of contribution …
Browse files Browse the repository at this point in the history
…summaries (#2)

* Use full USPS zipcode database as of November 1, 2023

* Let's see here...

* This is slow and I am sad.

* Add zip data to each state database; fix name cleaning bug (alas).

* Names tests.

* Bugfix duplicate zip entries.

* Update for fast action

* Very confused about glitches.

* Compute largest summaries for contact

* Preferred
  • Loading branch information
davepeck authored Nov 29, 2023
1 parent 0769fce commit 7c9812d
Show file tree
Hide file tree
Showing 23 changed files with 913 additions and 2,390 deletions.
155 changes: 108 additions & 47 deletions fec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
import json

import click
import sqlalchemy.orm as sao
from tqdm import tqdm

from server.data.contacts import Contact, IContactProvider, SimpleContactProvider
from server.data.contacts.abbu import DirectoryABBUManager, ZipABBUManager
from server.data.fec.committees import CommitteeManager
from server.data.fec.contributions import (
ContributionsManager,
ContributionSummariesManager,
FuzzyIdentifier,
)
from server.data.manager import DataManager
from server.data.names.nicknames import MessyNicknamesManager, NicknamesManager
from server.data.models import (
Committee,
Contribution,
ZipCode,
create_db_tables,
get_engine,
)
from server.data.nicknames import NicknamesManager
from server.data.summaries import ContributionSummaryManager


@click.group()
Expand All @@ -37,65 +41,116 @@ def names():
required=False,
default=None,
)
def clean(data: str | None = None):
"""Clean raw names data."""
@click.argument("name", required=True)
def related(name: str, data: str | None = None):
"""Show all related name sets."""
data_manager = DataManager(data) if data is not None else DataManager.default()
messy_names_manager = MessyNicknamesManager.from_data_manager(data_manager)
nicknames_manager = messy_names_manager.nicknames_manager
nicknames_manager.to_jsonl_data_manager(data_manager)
nicknames_manager = NicknamesManager.from_data_manager(data_manager)
for related_name_set in nicknames_manager.get_related_names(name):
print(json.dumps(list(related_name_set)))


@fec.group()
def committees():
"""Work with FEC committees data."""
def db():
"""Work with the database."""
pass


@committees.command(name="lookup")
@click.argument("committee_id")
@db.command()
@click.option(
"--data",
type=click.Path(exists=True),
help="Path to data dir.",
required=False,
default=None,
)
def committee_lookup(committee_id: str, data: str | None = None):
"""Search FEC committees data."""
def init(data: str | None = None):
"""Initialize the database."""
data_manager = DataManager(data) if data is not None else DataManager.default()
committees_manager = CommitteeManager.from_csv_data_manager(data_manager)
committee = committees_manager.get_committee(committee_id)
if committee is None:
print("No matching committee.")
else:
print(json.dumps(committee.to_data(), indent=2))

batch_size = 1_000
state_to_engine = {}
state_to_buffer = {}

def _process_buffer(state: str, contributions: list[Contribution]):
"""Add a batch of contributions to the database."""
# A batch is ready to be added to the database.
engine = state_to_engine.get(state)

# Create a new engine if we don't have one for this state, and create
# the database tables.
if engine is None:
engine = get_engine(data_manager, state)
state_to_engine[state] = engine
create_db_tables(engine)
with sao.Session(engine) as session, session.begin():
session.add_all(Committee.from_data_manager(data_manager))
session.add_all(ZipCode.from_data_manager(data_manager))

# Add the batch to the database.
with sao.Session(engine) as session, session.begin():
session.add_all(contributions)

print("Adding individual contributions...")
for contribution in tqdm(
Contribution.from_data_manager(data_manager),
unit="contribution",
total=70_659_611,
):
# Queue up contributions by state.
state = contribution.state
state_to_buffer.setdefault(state, []).append(contribution)
if len(state_to_buffer[state]) < batch_size:
continue

_process_buffer(state, state_to_buffer[state])
state_to_buffer[state] = []

# Add any remaining contributions.
print("Adding remaining contributions...")
for state, contributions in state_to_buffer.items():
if not contributions:
continue
_process_buffer(state, contributions)
state_to_buffer[state] = []

print("Done.")


@fec.group()
def contributions():
"""Work with FEC contributions data."""
def committees():
"""Work with FEC committees data."""
pass


@contributions.command()
@committees.command(name="search")
@click.argument("name")
@click.option(
"--data",
type=click.Path(exists=True),
help="Path to data dir.",
required=False,
default=None,
)
def summarize(data: str | None = None):
"""Summarize raw FEC individual contribution data."""
def committee_search(name: str, data: str | None = None):
"""Search FEC committees data."""
data_manager = DataManager(data) if data is not None else DataManager.default()
contributions_manager = ContributionsManager.from_data_manager(data_manager)
summaries_manager = contributions_manager.contribution_summaries_manager
summaries_manager.to_jsonl_data_manager(data_manager)
with sao.Session(get_engine(data_manager, "WA")) as session:
for committee in Committee.for_name(session, name):
print(json.dumps(committee.to_data(), indent=2))


@fec.group()
def contributions():
"""Work with FEC contributions data."""
pass


@contributions.command()
@click.argument("first_name", required=False, default=None)
@click.argument("last_name", required=False, default=None)
@click.argument("city", required=False, default=None)
@click.argument("state", required=False, default=None)
@click.argument("zip_code", required=False, default=None)
@click.option(
"-c",
Expand Down Expand Up @@ -124,43 +179,49 @@ def search(
first_name: str | None = None,
last_name: str | None = None,
zip_code: str | None = None,
city: str | None = None,
state: str | None = None,
data: str | None = None,
contact_dir: str | None = None,
contact_zip: str | None = None,
):
"""Search summarized FEC contributions data."""
data_manager = DataManager(data) if data is not None else DataManager.default()
nicknames_manager = NicknamesManager.from_data_manager(data_manager)
summaries_manager = ContributionSummariesManager.from_data_manager(data_manager)

contact_provider: IContactProvider | None = None

if contact_dir is not None:
contact_provider = DirectoryABBUManager(contact_dir)
elif contact_zip is not None:
contact_provider = ZipABBUManager(contact_zip)
elif first_name and last_name and zip_code:
singleton = Contact(first_name, last_name, zip_code)
elif first_name and last_name and city and state:
singleton = Contact(
first_name.upper(), last_name.upper(), city.upper(), state.upper(), zip_code
)
contact_provider = SimpleContactProvider([singleton])

if contact_provider is None:
raise click.UsageError(
"You must provide a contact dir, zip file, or explicit name & zip."
)

state_to_manager = {}

for contact in contact_provider.get_contacts():
fuzzy_id = FuzzyIdentifier(
contact.last,
contact.first,
contact.zip_code,
get_nickname_index=nicknames_manager,
).fuzzy_id
summary = summaries_manager.get_summary(fuzzy_id)
print(f"--> {contact.first} {contact.last} {contact.zip_code}")
if summary is None:
print("{}")
else:
print(json.dumps(summary.to_data(), indent=2))
manager = state_to_manager.get(contact.state)
if manager is None:
manager = ContributionSummaryManager(
get_engine(data_manager, contact.state),
nicknames_manager,
)
state_to_manager[contact.state] = manager
summary = manager.preferred_summary_for_contact(contact)
result = {
"contact": contact.to_data(),
"summary": summary.to_data() if summary else None,
}
print(json.dumps(result))


if __name__ == "__main__":
Expand Down
26 changes: 20 additions & 6 deletions server/data/contacts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,28 @@
class Contact:
"""A contact in the address book."""

first: str
last: str
zip_code: str # Either 5 or 9 digits
first_name: str
last_name: str
city: str
state: str
zip_code: str | None # Either 5 or 9 digits

@property
def zip5(self) -> str:
"""Returns the first 5 digits of the zip code."""
return self.zip_code[:5]
def zip5(self) -> str | None:
"""Returns the first 5 digits of the zip code, if any."""
return self.zip_code[:5] if self.zip_code else None

def to_data(self) -> dict[str, str]:
"""Return a dictionary representation of the contact."""
data = {
"first_name": self.first_name,
"last_name": self.last_name,
"city": self.city,
"state": self.state,
}
if self.zip_code:
data["zip_code"] = self.zip_code
return data


class IContactProvider(t.Protocol):
Expand Down
18 changes: 13 additions & 5 deletions server/data/contacts/abbu.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,23 @@ def _parse_abperson(self, abperson: t.IO[bytes]) -> Contact | None:
"""Parse an abperson file into a Contact."""
try:
plist_data = plistlib.load(abperson)
first = plist_data["First"].title()
last = plist_data["Last"].title()
first_name = plist_data["First"].upper()
last_name = plist_data["Last"].upper()
# use the preferred zip code if it exists
zip_code = plist_data["Address"]["values"][0]["ZIP"].replace("-", "")
city = plist_data["Address"]["values"][0]["City"].upper()
state = plist_data["Address"]["values"][0]["State"].upper()
maybe_zip_code = (
plist_data["Address"]["values"][0].get("ZIP", "").replace("-", "")
)
if len(maybe_zip_code) in {5, 9}: # 5 or 9 digits
zip_code = maybe_zip_code
else:
zip_code = None
except Exception:
return None
if len(zip_code) not in {5, 9}:
if len(state) != 2:
return None
return Contact(first, last, zip_code)
return Contact(first_name, last_name, city, state, zip_code)


class DirectoryABBUManager(ABBUManagerBase):
Expand Down
1 change: 0 additions & 1 deletion server/data/fec/__init__.py

This file was deleted.

Loading

0 comments on commit 7c9812d

Please sign in to comment.