Skip to content

Commit

Permalink
It works, but dang is it slow...
Browse files Browse the repository at this point in the history
  • Loading branch information
davepeck committed Dec 14, 2023
1 parent 538c274 commit 486a7e4
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 24 deletions.
2 changes: 1 addition & 1 deletion fec.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ def contributions():
@contributions.command()
@click.argument("first_name", required=False, default=None)
@click.argument("last_name", required=False, default=None)
@click.argument("zip_code", required=False, default=None)
@click.argument("city", required=False, default=None)
@click.argument("state", required=False, default=None)
@click.argument("zip_code", required=False, default=None)
@click.option(
"-c",
"--contact-dir",
Expand Down
33 changes: 18 additions & 15 deletions server/data/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import pydantic as p

from server.data.fec_types import (
ContributionColumns,
EntityTypeCode,
Party,
)
Expand Down Expand Up @@ -79,7 +78,7 @@ class CommitteeTable(Table[Committee]):
"""Tools for querying the BigQuery committee master file."""

def __init__(self, client: BQClient, year: str | datetime.date):
super().__init__(client, f"cm{get_yy(year)}")
super().__init__(client, f"bigquery-public-data.fec.cm{get_yy(year)}")

def get_model_instance(self, row: t.Any) -> Committee:
"""Create a committee from a row of the committee master file."""
Expand All @@ -92,7 +91,7 @@ def get_model_instance(self, row: t.Any) -> Committee:

def for_name_stmt(self, name: str) -> Statement:
"""Return a select statement for committees matching the given criteria."""
return self.all_stmt().where("cmte_nm", "LIKE", name.upper())
return self.all_stmt().where("cmte_nm", "LIKE", f"%{name.upper()}%")

def for_name(self, name: str) -> t.Iterable[Committee]:
"""Return a query for committees matching the given criteria."""
Expand All @@ -114,7 +113,7 @@ class Contribution(p.BaseModel, frozen=True):
class ContributionTable(Table[Contribution]):
def __init__(self, client: BQClient, year: str | datetime.date):
self._committee_table = CommitteeTable(client, year)
super().__init__(client, f"indiv{get_yy(year)}")
super().__init__(client, f"bigquery-public-data.fec.indiv{get_yy(year)}")

def all_stmt(self) -> Statement:
"""
Expand All @@ -125,8 +124,8 @@ def all_stmt(self) -> Statement:
super()
.all_stmt()
.where("entity_tp", "=", "IND")
.where("amount", ">", 0)
.join(self._committee_table.name, "cmte_id", "cmte_id")
.where("transaction_amt", ">", 0)
.join(self._committee_table.name, "indiv20.cmte_id = cm20.cmte_id")
)

def for_last_zip_firsts_stmt(
Expand Down Expand Up @@ -174,28 +173,32 @@ def for_last_city_state_firsts(

def get_model_instance(self, row: t.Any) -> Contribution | None:
"""Insert a contribution from a row of the contributions file."""
sub_id = row[ContributionColumns.SUB_ID].strip()
sub_id = (str(row.sub_id) or "").strip()
if not sub_id:
return None
committee_id = row[ContributionColumns.COMMITTEE_ID].strip()
committee_id = (row.cmte_id or "").strip()
if not committee_id:
return None
entity_type = row[ContributionColumns.ENTITY_TYPE].strip()
entity_type = (row.entity_tp or "").strip()
if entity_type != EntityTypeCode.INDIVIDUAL:
return None
name = row[ContributionColumns.NAME].strip()
name = (row.name or "").strip()
if "," not in name:
return None
city = row[ContributionColumns.CITY].strip()
city = (row.city or "").strip()
if not city:
return None
state = row[ContributionColumns.STATE].strip()
state = (row.state or "").strip()
if not state:
return None
zip_code = row[ContributionColumns.ZIP_CODE].strip()
zip_code = (row.zip_code or "").strip()
if len(zip_code) not in {5, 9}:
return None
amount = row[ContributionColumns.TRANSACTION_AMOUNT].strip()
amount_str = (str(row.transaction_amt) or "").strip()
try:
amount = Decimal(amount_str)
except Exception:
return None
committee = self._committee_table.get_model_instance(row)
if committee is None:
return None
Expand All @@ -206,5 +209,5 @@ def get_model_instance(self, row: t.Any) -> Contribution | None:
city=city,
state=state,
zip_code=zip_code,
amount=Decimal(amount),
amount=amount,
)
4 changes: 2 additions & 2 deletions server/data/summaries.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ def preferred_summary_for_contact(
try_contacts = [contact]
if contact.has_zip:
try_contacts.append(contact.without_zip())
summaries = []
summaries: list[ContributionSummary] = []
for try_contact in try_contacts:
summaries.extend(list(self._summaries_for_contact(try_contact)))
if not summaries:
return None
return max(summaries, key=lambda s: s.total_cents)
return max(summaries, key=lambda s: s.total)
33 changes: 27 additions & 6 deletions server/utils/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,27 @@
type QueryParams = t.Mapping[str, QueryParamValue]


def default_table_alias(table_name: str, table_alias: str | None = None) -> str | None:
"""Return the default table alias if appropriate, and no explicit alias is given."""
if not table_alias and "." in table_name:
table_alias = table_name.split(".")[-1]
return table_alias


class Statement:
"""A BigQuery SQL statement builder."""

table_name: str
table_alias: str | None
select_columns: list[str]
filters: list[str]
joins: list[str]
params: dict[str, QueryParamValue]

def __init__(self, table_name: str):
def __init__(self, table_name: str, table_alias: str | None = None):
self.table_name = table_name
self.table_alias = default_table_alias(table_name, table_alias)
self.table_alias = table_alias
self.select_columns = []
self.filters = []
self.joins = []
Expand Down Expand Up @@ -53,9 +63,15 @@ def where(self, column, operator, value, param_name=None):
self.filters.append(condition.strip())
return self

def join(self, join_type, table_name, on_clause):
def join(
self, table_name, on_clause, join_type="INNER", table_alias: str | None = None
):
"""Add a JOIN clause to the query."""
self.joins.append(f"{join_type} JOIN {table_name} ON {on_clause}")
table_alias = default_table_alias(table_name, table_alias)
alias_clause = f" AS {table_alias}" if table_alias else ""
self.joins.append(
f"{join_type} JOIN `{table_name}`{alias_clause} ON {on_clause}"
)
return self

def build_query(self):
Expand All @@ -64,8 +80,10 @@ def build_query(self):
join_clause = " ".join(self.joins).strip()
where_clause = " AND ".join(self.filters) if self.filters else ""

alias_clause = f" AS {self.table_alias}" if self.table_alias else ""

query_parts = [
f"SELECT {select_clause} FROM '{self.table_name}'",
f"SELECT {select_clause} FROM `{self.table_name}`{alias_clause}",
]
if join_clause:
query_parts.append(join_clause)
Expand Down Expand Up @@ -109,6 +127,7 @@ class BQClient(bigquery.Client):
def execute(self, statement: Statement):
"""Execute the query."""
query = statement.build_query()
# print("EXECUTING: ", query, "WITH PARAMS: ", statement.params)
job_config = statement.build_query_job_config()
job = self.query(query, job_config=job_config)
return job.result()
Expand All @@ -119,10 +138,12 @@ class Table[ModelT](ABC):

client: BQClient
name: str
alias: str | None

def __init__(self, client: BQClient, name: str):
def __init__(self, client: BQClient, name: str, alias: str | None = None):
self.client = client
self.name = name
self.alias = default_table_alias(name, alias)

@abstractmethod
def get_model_instance(self, bq_row: t.Any) -> ModelT:
Expand All @@ -135,7 +156,7 @@ def execute(self, statement: Statement) -> t.Iterable[ModelT]:

def all_stmt(self) -> Statement:
"""Return the default statement."""
return Statement(self.name)
return Statement(self.name, self.alias)

def all(self) -> t.Iterable[ModelT]:
"""Return the default query."""
Expand Down

0 comments on commit 486a7e4

Please sign in to comment.