diff --git a/_shared_utils/setup.py b/_shared_utils/setup.py index 00c6f6f40..686f5f105 100644 --- a/_shared_utils/setup.py +++ b/_shared_utils/setup.py @@ -4,7 +4,7 @@ setup( name="shared_utils", packages=find_packages(), - version="2.5", + version="2.5.1", description="Shared utility functions for data analyses", author="Cal-ITP", license="Apache", diff --git a/_shared_utils/shared_utils/gtfs_utils_v2.py b/_shared_utils/shared_utils/gtfs_utils_v2.py index 3e08d5701..4929d362c 100644 --- a/_shared_utils/shared_utils/gtfs_utils_v2.py +++ b/_shared_utils/shared_utils/gtfs_utils_v2.py @@ -503,3 +503,23 @@ def get_stop_times( ) return stop_times + + +def filter_to_public_schedule_gtfs_dataset_keys(get_df: bool = False) -> list: + """ + Return a list of schedule_gtfs_dataset_keys that have + private_dataset == None. + private_dataset holds values:True or None, no False. + """ + dim_gtfs_datasets = schedule_rt_utils.filter_dim_gtfs_datasets( + keep_cols=["key", "name", "private_dataset"], + custom_filtering={ + "type": ["schedule"], + }, + get_df=True, + ) >> filter(_.private_dataset != True) + + if get_df: + return dim_gtfs_datasets + else: + return dim_gtfs_datasets.gtfs_dataset_key.unique().tolist() diff --git a/_shared_utils/shared_utils/publish_utils.py b/_shared_utils/shared_utils/publish_utils.py index 098d29238..deb9e0697 100644 --- a/_shared_utils/shared_utils/publish_utils.py +++ b/_shared_utils/shared_utils/publish_utils.py @@ -3,6 +3,7 @@ from typing import Union import gcsfs +import pandas as pd fs = gcsfs.GCSFileSystem() PUBLIC_BUCKET = "gs://calitp-publish-data-analysis/" @@ -47,3 +48,14 @@ def if_exists_then_delete(filepath: str): fs.rm(filepath) return + + +def exclude_private_datasets( + df: pd.DataFrame, + col: str = "schedule_gtfs_dataset_key", + public_gtfs_dataset_keys: list = [], +) -> pd.DataFrame: + """ + Filter out private datasets. + """ + return df[df[col].isin(public_gtfs_dataset_keys)].reset_index(drop=True) diff --git a/gtfs_digest/merge_data.py b/gtfs_digest/merge_data.py index 71e5125db..c0648b89a 100644 --- a/gtfs_digest/merge_data.py +++ b/gtfs_digest/merge_data.py @@ -8,6 +8,7 @@ from calitp_data_analysis import utils from segment_speed_utils import gtfs_schedule_wrangling, time_series_utils +from shared_utils import gtfs_utils_v2, publish_utils from update_vars import GTFS_DATA_DICT, SEGMENT_GCS, RT_SCHED_GCS, SCHED_GCS route_time_cols = ["schedule_gtfs_dataset_key", @@ -220,16 +221,21 @@ def set_primary_typology(df: pd.DataFrame) -> pd.DataFrame: return df3 + if __name__ == "__main__": from shared_utils import rt_dates - analysis_date_list = (rt_dates.y2024_dates + rt_dates.y2023_dates - ) + analysis_date_list = ( + rt_dates.y2024_dates + rt_dates.y2023_dates + ) DIGEST_RT_SCHED = GTFS_DATA_DICT.digest_tables.route_schedule_vp DIGEST_SEGMENT_SPEEDS = GTFS_DATA_DICT.digest_tables.route_segment_speeds + # These are public schedule_gtfs_dataset_keys + public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys() + # Get cardinal direction for each route df_sched = concatenate_schedule_by_route_direction(analysis_date_list) @@ -274,11 +280,15 @@ def set_primary_typology(df: pd.DataFrame) -> pd.DataFrame: df_crosswalk, on = ["schedule_gtfs_dataset_key", "name", "service_date"], how = "left" + ).pipe( + # Find the most common cardinal direction + gtfs_schedule_wrangling.top_cardinal_direction + ).pipe( + # Drop any private datasets before exporting + publish_utils.exclude_private_datasets, + public_gtfs_dataset_keys= public_feeds ) - - # Find the most common cardinal direction - df = gtfs_schedule_wrangling.top_cardinal_direction(df) - + integrify = [ "n_scheduled_trips", "n_vp_trips", "minutes_atleast1_vp", "minutes_atleast2_vp", @@ -304,7 +314,9 @@ def set_primary_typology(df: pd.DataFrame) -> pd.DataFrame: primary_typology, on = route_time_cols, how = "left" - ) + ).pipe( + publish_utils.exclude_private_datasets, + public_gtfs_dataset_keys= public_feeds) utils.geoparquet_gcs_export( segment_speeds2, diff --git a/gtfs_digest/merge_operator_data.py b/gtfs_digest/merge_operator_data.py index 6f580a500..04efad49a 100644 --- a/gtfs_digest/merge_operator_data.py +++ b/gtfs_digest/merge_operator_data.py @@ -8,6 +8,7 @@ from calitp_data_analysis import utils from segment_speed_utils import time_series_utils +from shared_utils import publish_utils from merge_data import merge_in_standardized_route_names from update_vars import GTFS_DATA_DICT, SCHED_GCS, RT_SCHED_GCS @@ -100,6 +101,7 @@ def operator_category_counts_by_date() -> pd.DataFrame: return operator_category_counts + if __name__ == "__main__": from shared_utils import rt_dates @@ -111,6 +113,8 @@ def operator_category_counts_by_date() -> pd.DataFrame: SCHED_RT_CATEGORY = GTFS_DATA_DICT.digest_tables.operator_sched_rt CROSSWALK = GTFS_DATA_DICT.schedule_tables.gtfs_key_crosswalk + public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys() + # Concat operator profiles df = concatenate_operator_stats(analysis_date_list) @@ -141,14 +145,20 @@ def operator_category_counts_by_date() -> pd.DataFrame: # Merge merge_cols = ["schedule_gtfs_dataset_key", "service_date"] - op_profiles_df1 = pd.merge(df, - crosswalk_df, - on = merge_cols, - how = "left") + op_profiles_df1 = pd.merge( + df, + crosswalk_df, + on = merge_cols, + how = "left" + ) # Drop duplicates created after merging op_profiles_df2 = (op_profiles_df1 - .drop_duplicates(subset = list(op_profiles_df1.columns)) + .pipe( + publish_utils.exclude_private_datasets, + col = "schedule_gtfs_dataset_key", + public_gtfs_dataset_keys = public_feeds + ).drop_duplicates(subset = list(op_profiles_df1.columns)) .reset_index(drop = True)) op_profiles_df2.to_parquet( @@ -157,7 +167,13 @@ def operator_category_counts_by_date() -> pd.DataFrame: gdf = concatenate_operator_routes( analysis_date_list - ).pipe(merge_in_standardized_route_names) + ).pipe( + merge_in_standardized_route_names + ).pipe( + publish_utils.exclude_private_datasets, + col = "schedule_gtfs_dataset_key", + public_gtfs_dataset_keys = public_feeds + ) utils.geoparquet_gcs_export( gdf, @@ -165,7 +181,12 @@ def operator_category_counts_by_date() -> pd.DataFrame: OPERATOR_ROUTE ) - operator_category_counts = operator_category_counts_by_date() + operator_category_counts = operator_category_counts_by_date().pipe( + publish_utils.exclude_private_datasets, + col = "schedule_gtfs_dataset_key", + public_gtfs_dataset_keys = public_feeds + ) + operator_category_counts.to_parquet( f"{RT_SCHED_GCS}{SCHED_RT_CATEGORY}.parquet" ) diff --git a/gtfs_digest/merge_operator_service.py b/gtfs_digest/merge_operator_service.py index 4574b3948..ac59ceb50 100644 --- a/gtfs_digest/merge_operator_service.py +++ b/gtfs_digest/merge_operator_service.py @@ -1,12 +1,3 @@ -import pandas as pd -import numpy as np -from segment_speed_utils import helpers, time_series_utils, gtfs_schedule_wrangling -from segment_speed_utils.project_vars import (COMPILED_CACHED_VIEWS, RT_SCHED_GCS, SCHED_GCS) - -from shared_utils import catalog_utils, rt_dates - -GTFS_DATA_DICT = catalog_utils.get_catalog("gtfs_analytics_data") - """ Finding the total number of scheduled service hours for an operator across its routes for a full week. The data is @@ -14,6 +5,16 @@ Grain is operator-service_date-route """ +import pandas as pd + +from segment_speed_utils import (gtfs_schedule_wrangling, helpers, + time_series_utils) +from segment_speed_utils.project_vars import ( + COMPILED_CACHED_VIEWS, weeks_available) +from shared_utils import gtfs_utils_v2, publish_utils, rt_dates +from update_vars import GTFS_DATA_DICT, RT_SCHED_GCS + + def concatenate_trips( date_list: list, ) -> pd.DataFrame: @@ -44,29 +45,6 @@ def concatenate_trips( return df -def get_day_type(date): - """ - Function to return the day type (e.g., Monday, Tuesday, etc.) from a datetime object. - """ - days_of_week = ["Monday", - "Tuesday", - "Wednesday", - "Thursday", - "Friday", - "Saturday", - "Sunday"] - return days_of_week[date.weekday()] - -def weekday_or_weekend(row): - """ - Tag if a day is a weekday or Saturday/Sunday - """ - if row.day_type == "Sunday": - return "Sunday" - if row.day_type == "Saturday": - return "Saturday" - else: - return "Weekday" def total_service_hours(date_list: list) -> pd.DataFrame: """ @@ -76,67 +54,81 @@ def total_service_hours(date_list: list) -> pd.DataFrame: # Combine all the days' data for a week. df = concatenate_trips(date_list) - # Find day type aka Monday, Tuesday, Wednesday based on service date. - df['day_type'] = df['service_date'].apply(get_day_type) - - # Tag if the day is a weekday, Saturday, or Sunday. - df["weekday_weekend"] = df.apply(weekday_or_weekend, axis=1) + WEEKDAY_DICT = { + **{k: "Weekday" for k in ["Monday", "Tuesday", "Wednesday", + "Thursday", "Friday"]}, + "Saturday": "Saturday", + "Sunday": "Sunday" + } - # df = gtfs_schedule_wrangling.add_weekday_weekend_column(df) - - # Find the minimum departure hour. - df["departure_hour"] = df.trip_first_departure_datetime_pacific.dt.hour + # Find day type (Monday, Tuesday, etc), departure hour, month_year, and weekday_weekend + df = df.assign( + day_type = df.service_date.dt.day_name(), + departure_hour = df.trip_first_departure_datetime_pacific.dt.hour.astype("Int64"), + # get month_year that's 2024-04 for Apr2024 format + month_year = (df.service_date.dt.year.astype(str) + + "-" + df.service_date.dt.month.astype(str).str.zfill(2)), + ).pipe( + gtfs_schedule_wrangling.add_weekday_weekend_column, WEEKDAY_DICT + ) - # Delete out the specific day, leave only month & year. - df["month"] = df.service_date.astype(str).str.slice(stop=7) - # Total up service hours by weekday, Sunday, and Saturday. + # Total up hourly service hours by weekday, Sunday, and Saturday. df2 = ( df.groupby(["name", - "month", + "month_year", "weekday_weekend", "departure_hour"]) - .agg( - { - "service_hours": "sum", - } - ) + .agg({"service_hours": "sum"}) .reset_index() ) - # For weekday hours, divide by 5. - df2["weekday_service_hours"] = df2.service_hours/5 + # weekday hours should be divided by 5, while keeping sat/sun intact + df2 = df2.assign( + daily_service_hours = df2.apply( + lambda x: round(x.service_hours / 5, 2) + if x.weekday_weekend=="Weekday" + else round(x.service_hours, 2), axis=1 + ), + service_hours = df2.service_hours.round(2), + ) - # Rename projects. - df2 = df2.rename(columns = {'service_hours':'weekend_service_hours'}) return df2 -def total_service_hours_all_months() -> pd.DataFrame: + +def total_service_hours_all_months(week_list: list[list]) -> pd.DataFrame: """ Find service hours for a full week for one operator and for the months we have a full week's worth of data downloaded. - As of 5/2024, we have April 2023 and October 2023. - """ - # Grab the dataframes with a full week's worth of data. - apr_23week = rt_dates.get_week(month="apr2023", exclude_wed=False) - oct_23week = rt_dates.get_week(month="oct2023", exclude_wed=False) - apr_24week = rt_dates.get_week(month="apr2024", exclude_wed=False) - - # Sum up total service_hours - apr_23df = total_service_hours(apr_23week) - oct_23df = total_service_hours(oct_23week) - apr_24df = total_service_hours(apr_24week) + As of 5/2024, we have April 2023, October 2023, and April 2024. + """ + public_datasets = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys( + get_df=True + ) + public_feeds = public_datasets.gtfs_dataset_name.unique().tolist() # Combine everything - all_df = pd.concat([apr_23df, oct_23df, apr_24df]) - + all_df = pd.concat( + [total_service_hours(one_week) for one_week in week_list] + ).pipe( + publish_utils.exclude_private_datasets, + col = "name", + public_gtfs_dataset_keys = public_feeds + ) + return all_df if __name__ == "__main__": - # Save service hours. - SERVICE_EXPORT = f"{GTFS_DATA_DICT.digest_tables.dir}{GTFS_DATA_DICT.digest_tables.scheduled_service_hours}.parquet" - service_hours = total_service_hours_all_months() - service_hours.to_parquet(SERVICE_EXPORT) + print(f"Aggregating for dates: {weeks_available}") + + # Save service hours + SERVICE_EXPORT = GTFS_DATA_DICT.digest_tables.scheduled_service_hours + + service_hours = total_service_hours_all_months(weeks_available) + + service_hours.to_parquet( + f"{RT_SCHED_GCS}{SERVICE_EXPORT}.parquet" + ) \ No newline at end of file diff --git a/high_quality_transit_areas/D1_assemble_hqta_points.py b/high_quality_transit_areas/D1_assemble_hqta_points.py index 177bf9bf7..4226bedb5 100644 --- a/high_quality_transit_areas/D1_assemble_hqta_points.py +++ b/high_quality_transit_areas/D1_assemble_hqta_points.py @@ -20,6 +20,7 @@ from A1_rail_ferry_brt_stops import clip_to_ca, get_rail_ferry_brt_extract from calitp_data_analysis import geography_utils, utils from segment_speed_utils import helpers +from shared_utils import gtfs_utils_v2 from update_vars import analysis_date, GCS_FILE_PATH, PROJECT_CRS catalog = intake.open_catalog("*.yml") @@ -147,7 +148,7 @@ def get_agency_info(df: pd.DataFrame, date: str) -> pd.DataFrame: "organization_source_record_id": "org_id" })[["schedule_gtfs_dataset_key", "agency", "org_id", "base64_url"]] - + return crosswalk @@ -211,6 +212,13 @@ def add_agency_names_hqta_details( def final_processing(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: + """ + Final steps for getting dataset ready for Geoportal. + Subset to columns, drop duplicates, sort for readability, + always project into WGS84. + """ + public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys() + keep_cols = [ "agency_primary", "hqta_type", "stop_id", "route_id", @@ -222,12 +230,14 @@ def final_processing(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: "geometry" ] - gdf2 = (gdf.reindex(columns = keep_cols) - .drop_duplicates( - subset=["agency_primary", "hqta_type", "stop_id", "route_id"]) - .sort_values(["agency_primary", "hqta_type", "stop_id"]) - .reset_index(drop=True) - .to_crs(geography_utils.WGS84) + gdf2 = ( + gdf[gdf.schedule_gtfs_dataset_key.isin(public_feeds)] + .reindex(columns = keep_cols) + .drop_duplicates( + subset=["agency_primary", "hqta_type", "stop_id", "route_id"]) + .sort_values(["agency_primary", "hqta_type", "stop_id"]) + .reset_index(drop=True) + .to_crs(geography_utils.WGS84) ) return gdf2 diff --git a/high_quality_transit_areas/D2_assemble_hqta_polygons.py b/high_quality_transit_areas/D2_assemble_hqta_polygons.py index 47d62ebc5..7d68be922 100644 --- a/high_quality_transit_areas/D2_assemble_hqta_polygons.py +++ b/high_quality_transit_areas/D2_assemble_hqta_polygons.py @@ -15,6 +15,7 @@ import D1_assemble_hqta_points as assemble_hqta_points from calitp_data_analysis import utils, geography_utils from D1_assemble_hqta_points import (EXPORT_PATH, add_route_info) +from shared_utils import gtfs_utils_v2 from update_vars import GCS_FILE_PATH, analysis_date, PROJECT_CRS catalog = intake.open_catalog("*.yml") @@ -108,6 +109,7 @@ def final_processing(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: Drop extra columns, get sorting done. Used to drop bad stops, but these all look ok. """ + public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys() keep_cols = [ "agency_primary", "agency_secondary", @@ -118,7 +120,8 @@ def final_processing(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: ] # Drop bad stops, subset columns - gdf2 = (gdf[keep_cols] + gdf2 = ( + gdf[gdf.schedule_gtfs_dataset_key.isin(public_feeds)][keep_cols] .drop_duplicates() .sort_values(["hqta_type", "agency_primary", "agency_secondary", diff --git a/open_data/create_routes_data.py b/open_data/create_routes_data.py index 6d7961536..9b4da38b0 100644 --- a/open_data/create_routes_data.py +++ b/open_data/create_routes_data.py @@ -2,9 +2,6 @@ Create routes file with identifiers including route_id, route_name, operator name. """ -import os -os.environ['USE_PYGEOS'] = '0' - import geopandas as gpd import pandas as pd @@ -12,7 +9,7 @@ import prep_traffic_ops from calitp_data_analysis import utils, geography_utils -from shared_utils import portfolio_utils +from shared_utils import gtfs_utils_v2, portfolio_utils from segment_speed_utils import helpers from update_vars import analysis_date, TRAFFIC_OPS_GCS @@ -90,6 +87,8 @@ def finalize_export_df(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Suppress certain columns used in our internal modeling for export. """ + public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys() + # Change column order route_cols = [ 'organization_source_record_id', 'organization_name', @@ -98,9 +97,10 @@ def finalize_export_df(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: agency_ids = ['base64_url'] col_order = route_cols + shape_cols + agency_ids + ['geometry'] - df2 = (df[col_order] + df2 = (df[df.schedule_gtfs_dataset_key.isin(public_feeds)][col_order] .reindex(columns = col_order) .rename(columns = prep_traffic_ops.RENAME_COLS) + .reset_index(drop=True) ) return df2 diff --git a/open_data/create_stops_data.py b/open_data/create_stops_data.py index 7795f0056..0a4fccea1 100644 --- a/open_data/create_stops_data.py +++ b/open_data/create_stops_data.py @@ -2,10 +2,6 @@ Create stops file with identifiers including route_id, route_name, agency_id, agency_name. """ -import os -os.environ['USE_PYGEOS'] = '0' - -import dask.dataframe as dd import geopandas as gpd import pandas as pd @@ -13,7 +9,7 @@ import prep_traffic_ops from calitp_data_analysis import utils, geography_utils -from shared_utils import schedule_rt_utils +from shared_utils import gtfs_utils_v2, schedule_rt_utils from segment_speed_utils import helpers from update_vars import analysis_date, TRAFFIC_OPS_GCS @@ -21,7 +17,7 @@ def attach_route_info_to_stops( stops: gpd.GeoDataFrame, trips: pd.DataFrame, - stop_times: dd.DataFrame + stop_times: pd.DataFrame ) -> gpd.GeoDataFrame: """ Attach all the various route information (route_id, route_type) @@ -35,7 +31,7 @@ def attach_route_info_to_stops( ] stops_with_route_info = ( - dd.merge( + pd.merge( stop_times, trips[trip_cols], on = ["feed_key", "trip_id"] @@ -43,9 +39,9 @@ def attach_route_info_to_stops( "route_id", "route_type"]) .drop(columns = "trip_id") .reset_index(drop=True) - ).compute() + ) - stops_with_geom = dd.merge( + stops_with_geom = pd.merge( stops, stops_with_route_info, on = ["feed_key", "stop_id"], @@ -69,6 +65,8 @@ def finalize_export_df(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Suppress certain columns used in our internal modeling for export. """ + public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys() + # Change column order route_cols = [ 'organization_source_record_id', 'organization_name', @@ -78,9 +76,10 @@ def finalize_export_df(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: col_order = route_cols + stop_cols + agency_ids + ['geometry'] - df2 = (df[col_order] + df2 = (df[df.schedule_gtfs_dataset_key.isin(public_feeds)][col_order] .reindex(columns = col_order) .rename(columns = prep_traffic_ops.RENAME_COLS) + .reset_index(drop=True) ) return df2 @@ -105,7 +104,8 @@ def create_stops_file_for_export(date: str) -> gpd.GeoDataFrame: stop_times = helpers.import_scheduled_stop_times( date, - columns = prep_traffic_ops.keep_stop_time_cols + columns = prep_traffic_ops.keep_stop_time_cols, + get_panda = True ) stops_assembled = attach_route_info_to_stops(stops, trips, stop_times) diff --git a/rt_segment_speeds/scripts/publish_open_data.py b/rt_segment_speeds/scripts/publish_open_data.py index 84efbf6c8..5f285d777 100644 --- a/rt_segment_speeds/scripts/publish_open_data.py +++ b/rt_segment_speeds/scripts/publish_open_data.py @@ -7,6 +7,7 @@ from pathlib import Path from calitp_data_analysis import utils +from shared_utils import gtfs_utils_v2 from update_vars import GTFS_DATA_DICT, SEGMENT_GCS @@ -16,6 +17,8 @@ def stage_open_data_exports(analysis_date: str): export them to a stable GCS URL so we can always read it in open_data/catalog.yml. """ + public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys() + datasets = [ GTFS_DATA_DICT.stop_segments.route_dir_single_segment, #GTFS_DATA_DICT.speedmap_segments.route_dir_single_segment, @@ -24,7 +27,8 @@ def stage_open_data_exports(analysis_date: str): for d in datasets: gdf = gpd.read_parquet( - f"{SEGMENT_GCS}{d}_{analysis_date}.parquet" + f"{SEGMENT_GCS}{d}_{analysis_date}.parquet", + filters = [[("schedule_gtfs_dataset_key", "in", public_feeds)]] ) utils.geoparquet_gcs_export( diff --git a/rt_segment_speeds/scripts/publish_public_gcs.py b/rt_segment_speeds/scripts/publish_public_gcs.py index fb1c32c9c..8b361b0e8 100644 --- a/rt_segment_speeds/scripts/publish_public_gcs.py +++ b/rt_segment_speeds/scripts/publish_public_gcs.py @@ -8,7 +8,7 @@ from pathlib import Path from calitp_data_analysis import utils -from shared_utils import rt_dates +from shared_utils import rt_dates, gtfs_utils_v2 from update_vars import GTFS_DATA_DICT, SEGMENT_GCS, PUBLIC_GCS if __name__ == "__main__": @@ -19,11 +19,16 @@ GTFS_DATA_DICT.speedmap_segments.route_dir_single_segment, ] + public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys() + for d in datasets: start = datetime.datetime.now() - df = gpd.read_parquet(f"{SEGMENT_GCS}{d}_{analysis_date}.parquet") + df = gpd.read_parquet( + f"{SEGMENT_GCS}{d}_{analysis_date}.parquet", + filters = [[("schedule_gtfs_dataset_key", "in", public_feeds)]] + ) utils.geoparquet_gcs_export( df, diff --git a/rt_segment_speeds/segment_speed_utils/gtfs_schedule_wrangling.py b/rt_segment_speeds/segment_speed_utils/gtfs_schedule_wrangling.py index 9a43a4073..5c467cb2a 100644 --- a/rt_segment_speeds/segment_speed_utils/gtfs_schedule_wrangling.py +++ b/rt_segment_speeds/segment_speed_utils/gtfs_schedule_wrangling.py @@ -109,9 +109,13 @@ def add_peak_offpeak_column(df: pd.DataFrame) -> pd.DataFrame: return df -def add_weekday_weekend_column(df: pd.DataFrame) -> pd.DataFrame: +def add_weekday_weekend_column(df: pd.DataFrame, category_dict: dict = time_helpers.WEEKDAY_DICT) -> pd.DataFrame: + """ + Add a single weekday_weekend column based on a service date's day_name. + day_name gives values like Monday, Tuesday, etc. + """ df = df.assign( - weekday_weekend = df.service_date.dt.day_name().map(time_helpers.WEEKDAY_DICT) + weekday_weekend = df.service_date.dt.day_name().map(category_dict) ) return df