Skip to content

Commit

Permalink
Merge pull request #305 from deployment-gap-model-education-fund/dev
Browse files Browse the repository at this point in the history
2024.01.23 `dev -> main` merge
  • Loading branch information
bendnorman authored Jan 25, 2024
2 parents b1a3a15 + dded721 commit a16f4db
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/dbcp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""The Down Ballot Climate Project (DBCP) Project."""
import dbcp.data_mart # noqa: F401
import dbcp.data_mart.br_election_data # noqa: F401
import dbcp.data_mart.counties # noqa: F401
import dbcp.data_mart.projects # noqa: F401
import dbcp.etl # noqa: F401
Expand Down
6 changes: 2 additions & 4 deletions src/dbcp/data_mart/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
logger = logging.getLogger(__name__)


def create_data_marts(args):
def create_data_marts(args): # noqa: max-complexity=11
"""Collect and load all data mart tables to data warehouse."""
engine = dbcp.helpers.get_sql_engine()
data_marts = {}
Expand Down Expand Up @@ -61,9 +61,7 @@ def create_data_marts(args):
with engine.connect() as con:
for table in metadata.sorted_tables:
logger.info(f"Load {table.name} to postgres.")
df = enforce_dtypes(
data_marts[table.name], table.name, "data_mart", metadata
)
df = enforce_dtypes(data_marts[table.name], table.name, "data_mart")
df.to_sql(
name=table.name,
con=con,
Expand Down
2 changes: 1 addition & 1 deletion src/dbcp/data_mart/br_election_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _create_county_commission_elections_long(
]

# Aggregate
mode = lambda x: x.value_counts().index[0] # noqa: E731
mode = lambda x: x.value_counts(dropna=False).index[0] # noqa: E731

grp_fields = [
"election_id",
Expand Down
4 changes: 2 additions & 2 deletions src/dbcp/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def etl_energy_communities_by_county() -> dict[str, pd.DataFrame]:

def etl_ballot_ready() -> dict[str, pd.DataFrame]:
"""ETL Ballot Ready election data."""
source_uri = "gs://dgm-archive/ballot_ready/BallotReady_upcoming_races_with_counties_08_14_2023.csv"
source_uri = "gs://dgm-archive/ballot_ready/Climate Partners_Upcoming Races_All Tiers_20231013.csv"
raw_df = dbcp.extract.ballot_ready.extract(source_uri)
transformed = dbcp.transform.ballot_ready.transform(raw_df)
return transformed
Expand Down Expand Up @@ -245,7 +245,7 @@ def etl(args):
for table in metadata.sorted_tables:
logger.info(f"Load {table.name} to postgres.")
df = enforce_dtypes(
transformed_dfs[table.name], table.name, "data_warehouse", metadata
transformed_dfs[table.name], table.name, "data_warehouse"
)
df.to_sql(
name=table.name,
Expand Down
31 changes: 24 additions & 7 deletions src/dbcp/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import sqlalchemy as sa
from botocore import UNSIGNED
from botocore.config import Config
from google.cloud import bigquery
from tqdm import tqdm

import dbcp
Expand Down Expand Up @@ -158,22 +159,33 @@ def upload_schema_to_bigquery(schema: str, dev: bool = True) -> None:
loaded_tables[table_name] = pd.read_sql_table(
table_name, con, schema=schema
)
# Use dtypes that support pd.NA
loaded_tables[table_name] = loaded_tables[table_name].convert_dtypes()
loaded_tables[table_name] = enforce_dtypes(
loaded_tables[table_name], table_name, schema
)

# load to big query
credentials = _get_bigquery_credentials()
client = bigquery.Client(credentials=credentials, project=GCP_PROJECT_ID)

for table_name, df in loaded_tables.items():
full_table_name = f"{schema}{'_dev' if dev else ''}.{table_name}"
schema_environment = f"{schema}{'_dev' if dev else ''}"
full_table_name = f"{schema_environment}.{table_name}"
table_schema = get_bq_schema_from_metadata(table_name, schema, dev)
logger.info(f"Loading: {table_name}")

# Delete the table because pandas_gbq doesn't recreate the BQ
# table schema which leads to problems when we change the metadata.
table_id = f"{GCP_PROJECT_ID}.{schema_environment}.{table_name}"
client.delete_table(table_id, not_found_ok=True)

pandas_gbq.to_gbq(
df,
full_table_name,
project_id=GCP_PROJECT_ID,
if_exists="replace",
credentials=credentials,
table_schema=get_bq_schema_from_metadata(table_name, schema, dev),
table_schema=table_schema,
chunksize=5000,
)
logger.info(f"Finished: {full_table_name}")

Expand Down Expand Up @@ -217,10 +229,15 @@ def psql_insert_copy(table, conn, keys, data_iter):
}


def enforce_dtypes(
df: pd.DataFrame, table_name: str, schema: str, metadata: sa.sql.schema.MetaData
) -> pd.DataFrame:
def enforce_dtypes(df: pd.DataFrame, table_name: str, schema: str) -> pd.DataFrame:
"""Enforce datatypes specified in the dbcp.metadata.sqlalchemy schemas."""
schema_sa_metadata = {
"data_warehouse": dbcp.metadata.data_warehouse.metadata,
"data_mart": dbcp.metadata.data_mart.metadata,
}
metadata = schema_sa_metadata.get(schema, None)
if not metadata:
raise KeyError(f"Metadata for schema: {schema} does not exists.")
full_table_name = f"{schema}.{table_name}"
return df.astype(
{
Expand Down
8 changes: 6 additions & 2 deletions src/dbcp/metadata/data_mart.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,12 @@
Column("number_of_seats", Integer, nullable=False),
Column("normalized_position_id", Integer, nullable=False),
Column("normalized_position_name", String, nullable=False),
Column("frequency", String, nullable=False),
Column("reference_year", Integer, nullable=False),
Column(
"frequency", String, nullable=True
), # Starting 2023-10-03 update there were a couple hundred nulls
Column(
"reference_year", String, nullable=True
), # Starting 2023-10-03 update there were a couple hundred nulls
Column("partisan_type", String),
Column("race_created_at", DateTime, nullable=False),
Column("race_updated_at", DateTime, nullable=False),
Expand Down
8 changes: 6 additions & 2 deletions src/dbcp/metadata/data_warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,9 @@
metadata,
Column("position_id", Integer, nullable=False, primary_key=True),
Column("position_name", String, nullable=False),
Column("reference_year", Integer, nullable=False),
Column(
"reference_year", Integer, nullable=True
), # Starting 2023-10-03 update there were a couple hundred nulls
Column("sub_area_name", String),
Column("sub_area_value", String),
Column("sub_area_name_secondary", String),
Expand All @@ -1224,7 +1226,9 @@
Column("is_retention", Boolean, nullable=False),
Column("normalized_position_id", Integer, nullable=False),
Column("normalized_position_name", String, nullable=False),
Column("frequency", String, nullable=False),
Column(
"frequency", String, nullable=True
), # Starting 2023-10-03 update there were a couple hundred nulls
Column("partisan_type", String),
schema=schema,
)
Expand Down
66 changes: 47 additions & 19 deletions src/dbcp/transform/ballot_ready.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""Module for cleaning Ballot Ready data."""
import logging

import pandas as pd

from pudl.helpers import add_fips_ids

DATETIME_COLUMNS = ["race_created_at", "race_updated_at", "election_day"]

logger = logging.getLogger(__name__)


def _normalize_entities(ballot_ready: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""Normalize ballot ready data into elections, position and race entities.
Expand Down Expand Up @@ -38,8 +42,6 @@ def _normalize_entities(ballot_ready: pd.DataFrame) -> dict[str, pd.DataFrame]:
trns_dfs["br_elections"] = br_elections

# Positions
position_pk_fields = ["position_id"]

position_fields = [
"reference_year",
"position_id",
Expand All @@ -57,19 +59,39 @@ def _normalize_entities(ballot_ready: pd.DataFrame) -> dict[str, pd.DataFrame]:
"frequency",
"partisan_type",
]
# position_id == 156594 is the only position with two frequencies and reference_years
# Create a new index for it
new_index = ballot_ready.position_id.max() + 1
assert new_index not in ballot_ready.position_id
ballot_ready.loc[ballot_ready.race_id == 2020783, "position_id"] = new_index
# A small number of positions have multiple records where frequencies and referece_year
# differ. This is because of edge cases where a position has changed its election year
# or frequency due to redistricting or an election law. The old position_election_frequency
# and the future/current one are reflected in the data.

# check if position_id is unique
br_positions = ballot_ready.drop_duplicates(subset=position_fields)
is_duplciate_position = br_positions.position_id.duplicated(keep=False)
duplicate_positions = br_positions[is_duplciate_position].copy()

logger.info(f"Found {len(duplicate_positions)} duplicate positions.")
assert (
(ballot_ready.groupby(position_pk_fields)[position_fields].nunique() <= 1)
.all()
.all()
), "There is duplicate entity information in the positions dataframe."
br_positions = ballot_ready.drop_duplicates(subset=position_pk_fields)[
position_fields
].copy()
len(duplicate_positions) <= 52
), f"Found more duplicate positions than expected: {len(duplicate_positions)}"

# dropnas in frequency and reference_year
duplicate_positions = duplicate_positions.dropna(
subset=["frequency", "reference_year"]
)
# select the the position that has the most recent election day
latest_position_idx = duplicate_positions.groupby("position_id")[
"election_day"
].idxmax()
duplicate_positions = duplicate_positions.loc[latest_position_idx]

# merge with the non duplicates
br_positions = pd.concat(
[duplicate_positions, br_positions[~is_duplciate_position]]
)[position_fields]

assert (
br_positions.position_id.is_unique
), "position_id is not unique. Deduplication did not work as expected."
trns_dfs["br_positions"] = br_positions

# Races
Expand Down Expand Up @@ -155,11 +177,12 @@ def _explode_counties(raw_ballot_ready: pd.DataFrame) -> pd.DataFrame:

# Valdez-Cordova Census Area was split into two areas in 2019
# https://www.census.gov/programs-surveys/geography/technical-documentation/county-changes/2010.html
# All elections are state and federal level so I will duplicate the races for the two new census areas
# It is reasonable to split State and Federal elections between the two areas.
# However, adding local elections to both counties is not appropriate. I'm going to do it
# anyways because there aren't any great options for accurately geocoding the local elections.
valdez = ballot_ready.query("county_id_fips == '02261'")
assert valdez.level.isin(
["state", "federal"]
).all(), "Found a local election in the Valdez-Cordova Census Area!"
if valdez.level.isin(["state", "federal"]).all():
logger.info("Found a local election in the Valdez-Cordova Census Area!")

ballot_ready = ballot_ready[ballot_ready.county_id_fips != "02261"].copy()

Expand All @@ -178,10 +201,15 @@ def _explode_counties(raw_ballot_ready: pd.DataFrame) -> pd.DataFrame:
ballot_ready = pd.concat(valdez_corrections_dfs + [ballot_ready])

# Drop unused columns
ballot_ready = ballot_ready.drop(columns=["position_description", "id"])
ballot_ready = ballot_ready.drop(columns=["position_description"])
ballot_ready = ballot_ready.rename(
columns={"county": "raw_county", "state": "raw_state"}
)

# Clean up boolean columns
bool_columns = [col for col in ballot_ready.columns if col.startswith("is_")]
for col in bool_columns:
ballot_ready[col] = ballot_ready[col].map({"t": True, "f": False})
return ballot_ready


Expand Down

0 comments on commit a16f4db

Please sign in to comment.