Skip to content

Commit

Permalink
Support area codes (1/2)
Browse files Browse the repository at this point in the history
  • Loading branch information
davepeck committed Nov 30, 2023
1 parent a16097b commit 4af5790
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 18 deletions.
56 changes: 56 additions & 0 deletions area_codes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env python3
# ruff: noqa: E501
import typing as t

import click
import httpx
from bs4 import BeautifulSoup
from tqdm import tqdm


def _get(npa_id: str) -> str:
result = httpx.post(
"https://nationalnanpa.com/enas/displayNpaCityReport.do", data={"npaId": npa_id}
)
result.raise_for_status()
return result.text


def _table(npa_id: str):
html = _get(npa_id)
soup = BeautifulSoup(html, "html.parser")
return soup.find_all("table", attrs={"border": "1"})[0]


def _rows(npa_id: str):
return _table(npa_id).find_all("tr", attrs={"align": "CENTER"})[1:]


def _tuples(npa_id: str) -> t.Iterable[tuple[str, str, str]]:
for row in _rows(npa_id):
cells = row.find_all("td")
yield (cells[0].text.strip(), cells[1].text.strip(), cells[3].text.strip())


def _npa_ids() -> t.Iterable[str]:
with open("data/npa/npa_ids.txt") as f:
for line in f:
yield line.strip()


@click.command()
def download_area_codes():
"""Download details about area codes (aka npa_ids) from NAMPA."""
all_tuples = []
npa_ids = list(_npa_ids())
for npa_id in tqdm(npa_ids):
all_tuples.extend(_tuples(npa_id))

with open("data/npa/npa_details.csv", "w") as f:
f.write("area_code,city,state\n")
for area_code, city, state in all_tuples:
f.write(f"{area_code},{city},{state}\n")


if __name__ == "__main__":
download_area_codes()
55 changes: 54 additions & 1 deletion fec.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,52 @@ def fec():
pass


@fec.group()
def contacts():
"""Work with contacts data."""
pass


@contacts.command(name="list")
@click.option(
"-c",
"--contact-dir",
type=click.Path(exists=True, dir_okay=True, file_okay=False),
help="Path to a `.abbu` contacts dir.",
required=False,
default=None,
)
@click.option(
"-z",
"--contact-zip",
type=click.Path(exists=True, dir_okay=False, file_okay=True),
help="Path to a `.abbu` contacts zip file.",
required=False,
default=None,
)
def list_contacts(contact_dir: str | None = None, contact_zip: str | None = None):
"""List contacts."""
contact_provider: IContactProvider | None = None

if contact_dir is not None:
contact_provider = DirectoryABBUManager(contact_dir)
elif contact_zip is not None:
contact_provider = ZipABBUManager(contact_zip)

if contact_provider is None:
raise click.UsageError(
"You must provide a contact dir, zip file, or explicit name & zip."
)

seen_contacts = set()

for contact in contact_provider.get_contacts():
if contact.without_zip() in seen_contacts:
continue
seen_contacts.add(contact.without_zip())
print(json.dumps(contact.to_data()))


@fec.group()
def names():
"""Work with names data."""
Expand Down Expand Up @@ -197,7 +243,12 @@ def search(
contact_provider = ZipABBUManager(contact_zip)
elif first_name and last_name and city and state:
singleton = Contact(
first_name.upper(), last_name.upper(), city.upper(), state.upper(), zip_code
first_name.upper(),
last_name.upper(),
city.upper(),
state.upper(),
None,
zip_code,
)
contact_provider = SimpleContactProvider([singleton])

Expand All @@ -212,6 +263,8 @@ def search(
for contact in contact_provider.get_contacts():
if contact.without_zip() in seen_contacts:
continue
if contact.state is None:
continue
seen_contacts.add(contact.without_zip())
manager = state_to_manager.get(contact.state)
if manager is None:
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
beautifulsoup4~=4.12.2
black~=23.11.0
httpx[cli]~=0.25.1
pre-commit~=3.5.0
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ aiosqlite~=0.19.0
click~=8.1.7
httpx~=0.25.1
litestar[standard]~=2.3.2
phonenumbers~=8.13.26
pydantic-settings~=2.1.0
pydantic~=2.5.1
sqlalchemy~=2.0.23
11 changes: 8 additions & 3 deletions server/data/contacts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ class Contact:

first_name: str
last_name: str
city: str
state: str
city: str | None
state: str | None
phone: str | None
zip_code: str | None # Either 5 or 9 digits

@property
Expand All @@ -27,13 +28,17 @@ def to_data(self) -> dict[str, str]:
"city": self.city,
"state": self.state,
}
if self.phone:
data["phone"] = self.phone
if self.zip_code:
data["zip_code"] = self.zip_code
return data

def without_zip(self) -> "Contact":
"""Return a copy of the contact without the zip code."""
return Contact(self.first_name, self.last_name, self.city, self.state, None)
return Contact(
self.first_name, self.last_name, self.city, self.state, self.phone, None
)


class IContactProvider(t.Protocol):
Expand Down
29 changes: 18 additions & 11 deletions server/data/contacts/abbu.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,27 @@ def _parse_abperson(self, abperson: t.IO[bytes]) -> Contact | None:
first_name = plist_data["First"].upper()
last_name = plist_data["Last"].upper()
# use the preferred zip code if it exists
city = plist_data["Address"]["values"][0]["City"].upper()
state = plist_data["Address"]["values"][0]["State"].upper()
maybe_zip_code = (
plist_data["Address"]["values"][0].get("ZIP", "").replace("-", "")
)
if len(maybe_zip_code) in {5, 9}: # 5 or 9 digits
zip_code = maybe_zip_code
else:
try:
address_0 = plist_data["Address"]["values"][0]
except Exception:
address_0 = {}
city = address_0.get("City", "").upper() or None
state = address_0.get("State", "").upper() or None
if state and len(state) != 2:
state = None

zip_code = address_0.get("ZIP", "").replace("-", "") or None
if zip_code and len(zip_code) not in {5, 9}:
zip_code = None

try:
phone = plist_data["Phone"]["values"][0]
except Exception:
phone = None

except Exception:
return None
if len(state) != 2:
return None
return Contact(first_name, last_name, city, state, zip_code)
return Contact(first_name, last_name, city, state, phone, zip_code)


class DirectoryABBUManager(ABBUManagerBase):
Expand Down
4 changes: 2 additions & 2 deletions server/data/summaries.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ def _contact_stmt(self, contact: Contact, related_name_set: frozenset[str]):
if contact.zip5 is None:
return Contribution.for_last_city_state_firsts_stmt(
contact.last_name,
contact.city,
contact.state,
contact.city or "TODO: NOWHERE",
contact.state or "XX",
related_name_set,
)
else:
Expand Down
2 changes: 1 addition & 1 deletion server/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def summarize() -> dict:
data_manager = DataManager.default()
nicknames_manager = NicknamesManager.from_data_manager(data_manager)
engine = get_engine(data_manager, "WA")
contact = Contact("MICHAEL", "MATHIEU", "SEATTLE", "WA", None)
contact = Contact("MICHAEL", "MATHIEU", "SEATTLE", "WA", None, None)
summary_manager = ContributionSummaryManager(engine, nicknames_manager)
summary = summary_manager.preferred_summary_for_contact(contact)
return summary.to_data() if summary else {}
Expand Down

0 comments on commit 4af5790

Please sign in to comment.