Skip to content

Commit

Permalink
Updates existing clean-stage models to use the macro.
Browse files Browse the repository at this point in the history
  • Loading branch information
MattTriano committed Dec 9, 2024
1 parent 00f2804 commit be6e66f
Show file tree
Hide file tree
Showing 36 changed files with 356 additions and 1,209 deletions.
2 changes: 1 addition & 1 deletion airflow/dags/cook_county/update_chicago_traffic_crashes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.decorators import dag

from tasks.socrata_tasks import update_socrata_table
from sources.tables import CHICAGO_TRAFFIC_CRASHES_LINES as SOCRATA_TABLE
from sources.tables import CHICAGO_TRAFFIC_CRASHES as SOCRATA_TABLE

task_logger = logging.getLogger("airflow.task")

Expand Down
56 changes: 28 additions & 28 deletions airflow/dags/sources/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@
CHICAGO_311_SERVICE_REQUESTS = SocrataTable(
table_id="v6vf-nfxy",
table_name="chicago_311_service_requests",
schedule="52 3 * * 4",
schedule="52 2 1,15 * 4",
)

CHICAGO_AFFORDABLE_RENTAL_HOUSING = SocrataTable(
table_id="s6ha-ppgi",
table_name="chicago_affordable_rental_housing",
schedule="40 2 * * 1",
schedule="40 2 1 */2 0",
)

CHICAGO_BIKE_PATHS = SocrataTable(
table_id="3w5d-sru8",
table_name="chicago_bike_paths",
schedule="0 22 10 8 *",
schedule="0 22 10 */3 *",
)

CHICAGO_BUSINESS_LICENSES = SocrataTable(
table_id="r5kz-chrr",
table_name="chicago_business_licenses",
schedule="51 4 * * 3",
schedule="51 4 1,15 * *",
)

CHICAGO_CITY_BOUNDARY = SocrataTable(
Expand All @@ -39,43 +39,43 @@
CHICAGO_CRIMES = SocrataTable(
table_id="ijzp-q8t2",
table_name="chicago_crimes",
schedule="10 4 * * *",
schedule="10 4 1,15 * *",
)

CHICAGO_CTA_TRAIN_LINES = SocrataTable(
table_id="53r7-y88m",
table_name="chicago_cta_train_lines",
schedule="12 5 5 * *",
schedule="20 5 1 * *",
)

CHICAGO_CTA_TRAIN_STATIONS = SocrataTable(
table_id="8pix-ypme",
table_name="chicago_cta_train_stations",
schedule="0 5 5 * *",
schedule="25 5 1 * *",
)

CHICAGO_CTA_BUS_STOPS = SocrataTable(
table_id="hvnx-qtky",
table_name="chicago_cta_bus_stops",
schedule="30 5 5 * *",
schedule="30 5 1 * *",
)

CHICAGO_DIVVY_STATIONS = SocrataTable(
table_id="bbyy-e7gq",
table_name="chicago_divvy_stations",
schedule="30 5 * * 1",
schedule="35 5 1 * 1",
)

CHICAGO_FOOD_INSPECTIONS = SocrataTable(
table_id="4ijn-s7e5",
table_name="chicago_food_inspections",
schedule="30 8,20 * * *",
schedule="30 20 1,15 * *",
)

CHICAGO_HOMICIDES_AND_SHOOTING_VICTIMIZATIONS = SocrataTable(
table_id="gumc-mgzr",
table_name="chicago_homicide_and_shooting_victimizations",
schedule="30 12 * * 1",
schedule="30 12 * * 0",
)

CHICAGO_MURALS = SocrataTable(
Expand Down Expand Up @@ -117,25 +117,25 @@
CHICAGO_RED_LIGHT_CAMERA_VIOLATIONS = SocrataTable(
table_id="spqx-js37",
table_name="chicago_red_light_camera_violations",
schedule="35 05 * * *",
schedule="40 5 * * 0",
)

CHICAGO_RELOCATED_VEHICLES = SocrataTable(
table_id="5k2z-suxx",
table_name="chicago_relocated_vehicles",
schedule="35 11,23 * * *",
schedule="35 7 */3 * *",
)

CHICAGO_SIDEWALK_CAFE_PERMITS = SocrataTable(
table_id="nxj5-ix6z",
table_name="chicago_sidewalk_cafe_permits",
schedule="50 3 * * *",
schedule="50 3 1,15 * *",
)

CHICAGO_SHOTSPOTTER_ALERTS = SocrataTable(
table_id="3h7q-7mdb",
table_name="chicago_shotspotter_alerts",
schedule="20 6,18 * * *",
schedule=None,
)

CHICAGO_STREET_CENTER_LINES = SocrataTable(
Expand All @@ -147,13 +147,13 @@
CHICAGO_TOWED_VEHICLES = SocrataTable(
table_id="ygr5-vcbg",
table_name="chicago_towed_vehicles",
schedule="15 11,23 * * *",
schedule="15 7 */3 * *",
)

CHICAGO_TRAFFIC_CRASHES_LINES = SocrataTable(
CHICAGO_TRAFFIC_CRASHES = SocrataTable(
table_id="85ca-t3if",
table_name="chicago_traffic_crashes",
schedule="20 3,15 * * *",
schedule="20 3 1,5 * *",
)

CHICAGO_VACANT_AND_ABANDONED_BUILDINGS = SocrataTable(
Expand All @@ -165,55 +165,55 @@
COOK_COUNTY_ADDRESS_POINTS = SocrataTable(
table_id="78yw-iddh",
table_name="cook_county_address_points",
schedule="40 5 * * 2",
schedule="45 5 1 * *",
)

COOK_COUNTY_PARCEL_ASSESSMENT_APPEALS = SocrataTable(
table_id="7pny-nedm",
table_name="cook_county_parcel_assessment_appeals",
schedule="0 13 * * 6",
schedule="0 13 * * 0",
)

COOK_COUNTY_NEIGHBORHOOD_BOUNDARIES = SocrataTable(
table_id="wyzt-dzf8",
table_name="cook_county_neighborhood_boundaries",
schedule="0 4 3 3 *",
schedule="0 4 7 */3 *",
)

COOK_COUNTY_PARCEL_ADDRESSES = SocrataTable(
table_id="3723-97qp",
table_name="cook_county_parcel_addresses",
schedule="10 7 4 * *",
schedule="10 7 7 * *",
)

COOK_COUNTY_PARCEL_LOCATIONS = SocrataTable(
table_id="c49d-89sn",
table_name="cook_county_parcel_locations",
schedule="0 7 4 * *",
schedule="40 6 7 * *",
)

COOK_COUNTY_PARCEL_SALES = SocrataTable(
table_id="wvhk-k5uv",
table_name="cook_county_parcel_sales",
schedule="0 6 4 * *",
schedule="0 6 7 * *",
)

COOK_COUNTY_PARCEL_VALUE_ASSESSMENTS = SocrataTable(
table_id="uzyt-m557",
table_name="cook_county_parcel_value_assessments",
schedule="0 3 4 * *",
schedule="0 3 7 * *",
)

COOK_COUNTY_MULTIFAM_PARCEL_IMPROVEMENTS = SocrataTable(
table_id="x54s-btds",
table_name="cook_county_multifam_parcel_improvements",
schedule="20 3 * * 4",
schedule="20 3 7 * *",
)

COOK_COUNTY_CONDO_PARCEL_IMPROVEMENTS = SocrataTable(
table_id="3r7i-mrz4",
table_name="cook_county_condo_parcel_improvements",
schedule="20 3 * * 3",
schedule="40 3 7 * 3",
)

COOK_COUNTY_SAO_CASE_INTAKE_DATA = SocrataTable(
Expand All @@ -239,5 +239,5 @@
NYC_PARCEL_SALES = SocrataTable(
table_id="usep-8jbt",
table_name="nyc_parcel_sales",
schedule="20 2 * * 3",
schedule="20 2 10 * 3",
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{ config(materialized='view') }}
{% set dataset_name = "chicago_affordable_rental_housing" %}
{% set ck_cols = ["address", "management_company", "units", "property_type"] %}
{% set record_id = "housing_development_id" %}
{% set base_cols = [
Expand All @@ -7,42 +7,14 @@
"y_coordinate", "latitude", "longitude", "geometry", "source_data_updated",
"ingestion_check_time"
] %}
{% set updated_at_col = "source_data_updated" %}

-- selects all records from the standardized view of this data
WITH std_data AS (
SELECT *
FROM {{ ref('chicago_affordable_rental_housing_standardized') }}
),
{% set query = generate_clean_stage_incremental_dedupe_query(
dataset_name=dataset_name,
record_id=record_id,
ck_cols=ck_cols,
base_cols=base_cols,
updated_at_col=updated_at_col
) %}

-- keeps the most recently updated version of each record
std_records_numbered_latest_first AS (
SELECT *,
row_number() over(partition by {{record_id}} ORDER BY source_data_updated DESC) as rn
FROM std_data
),
most_current_records AS (
SELECT *
FROM std_records_numbered_latest_first
WHERE rn = 1
),

-- selects the source_data_updated (ie the date of publication) value from each record's
-- first ingestion into the local data warehouse
std_records_numbered_earliest_first AS (
SELECT *,
row_number() over(partition by {{record_id}} ORDER BY source_data_updated ASC) as rn
FROM std_data
),
records_first_ingested_pub_date AS (
SELECT {{record_id}}, source_data_updated AS first_ingested_pub_date
FROM std_records_numbered_earliest_first
WHERE rn = 1
)

SELECT
{% for bc in base_cols %}mcr.{{ bc }},{% endfor %}
fi.first_ingested_pub_date
FROM most_current_records AS mcr
LEFT JOIN records_first_ingested_pub_date AS fi
ON mcr.{{ record_id }} = fi.{{ record_id }}
ORDER BY {% for ck in ck_cols %}mcr.{{ ck }} DESC, {% endfor %} mcr.source_data_updated DESC
{{ query }}
48 changes: 10 additions & 38 deletions airflow/dbt/models/clean/chicago_bike_paths_clean.sql
Original file line number Diff line number Diff line change
@@ -1,47 +1,19 @@
{{ config(materialized='view') }}
{% set dataset_name = "chicago_bike_paths" %}
{% set ck_cols = ["st_name", "f_street", "t_street", "br_ow_dir"] %}
{% set record_id = "bike_route_segment_id" %}
{% set base_cols = [
"bike_route_segment_id", "st_name", "f_street", "t_street", "street", "displayrou",
"oneway_dir", "contraflow", "br_ow_dir", "br_oneway", "mi_ctrline", "geometry",
"source_data_updated", "ingestion_check_time"
] %}
{% set updated_at_col = "source_data_updated" %}

-- selects all records from the standardized view of this data
WITH std_data AS (
SELECT *
FROM {{ ref('chicago_bike_paths_standardized') }}
),
{% set query = generate_clean_stage_incremental_dedupe_query(
dataset_name=dataset_name,
record_id=record_id,
ck_cols=ck_cols,
base_cols=base_cols,
updated_at_col=updated_at_col
) %}

-- keeps the most recently updated version of each record
std_records_numbered_latest_first AS (
SELECT *,
row_number() over(partition by {{record_id}} ORDER BY source_data_updated DESC) as rn
FROM std_data
),
most_current_records AS (
SELECT *
FROM std_records_numbered_latest_first
WHERE rn = 1
),

-- selects the source_data_updated (ie the date of publication) value from each record's
-- first ingestion into the local data warehouse
std_records_numbered_earliest_first AS (
SELECT *,
row_number() over(partition by {{record_id}} ORDER BY source_data_updated ASC) as rn
FROM std_data
),
records_first_ingested_pub_date AS (
SELECT {{record_id}}, source_data_updated AS first_ingested_pub_date
FROM std_records_numbered_earliest_first
WHERE rn = 1
)

SELECT
{% for bc in base_cols %}mcr.{{ bc }},{% endfor %}
fi.first_ingested_pub_date
FROM most_current_records AS mcr
LEFT JOIN records_first_ingested_pub_date AS fi
ON mcr.{{ record_id }} = fi.{{ record_id }}
ORDER BY {% for ck in ck_cols %}mcr.{{ ck }} DESC, {% endfor %} mcr.source_data_updated DESC
{{ query }}
48 changes: 10 additions & 38 deletions airflow/dbt/models/clean/chicago_city_boundary_clean.sql
Original file line number Diff line number Diff line change
@@ -1,46 +1,18 @@
{{ config(materialized='view') }}
{% set dataset_name = "chicago_city_boundary" %}
{% set ck_cols = ["objectid"] %}
{% set record_id = "objectid" %}
{% set base_cols = [
"objectid", "name", "shape_area", "shape_len", "geometry", "source_data_updated",
"ingestion_check_time"
] %}
{% set updated_at_col = "source_data_updated" %}

-- selects all records from the standardized view of this data
WITH std_data AS (
SELECT *
FROM {{ ref('chicago_city_boundary_standardized') }}
),
{% set query = generate_clean_stage_incremental_dedupe_query(
dataset_name=dataset_name,
record_id=record_id,
ck_cols=ck_cols,
base_cols=base_cols,
updated_at_col=updated_at_col
) %}

-- keeps the most recently updated version of each record
std_records_numbered_latest_first AS (
SELECT *,
row_number() over(partition by {{record_id}} ORDER BY source_data_updated DESC) as rn
FROM std_data
),
most_current_records AS (
SELECT *
FROM std_records_numbered_latest_first
WHERE rn = 1
),

-- selects the source_data_updated (ie the date of publication) value from each record's
-- first ingestion into the local data warehouse
std_records_numbered_earliest_first AS (
SELECT *,
row_number() over(partition by {{record_id}} ORDER BY source_data_updated ASC) as rn
FROM std_data
),
records_first_ingested_pub_date AS (
SELECT {{record_id}}, source_data_updated AS first_ingested_pub_date
FROM std_records_numbered_earliest_first
WHERE rn = 1
)

SELECT
{% for bc in base_cols %}mcr.{{ bc }},{% endfor %}
fi.first_ingested_pub_date
FROM most_current_records AS mcr
LEFT JOIN records_first_ingested_pub_date AS fi
ON mcr.{{ record_id }} = fi.{{ record_id }}
ORDER BY {% for ck in ck_cols %}mcr.{{ ck }} DESC, {% endfor %} mcr.source_data_updated DESC
{{ query }}
Loading

0 comments on commit be6e66f

Please sign in to comment.