Skip to content

Commit

Permalink
Merge pull request #1223 from cal-itp/private-datasets
Browse files Browse the repository at this point in the history
Exclude private datasets from 6 geoportal datasets, GTFS digest
  • Loading branch information
tiffanychu90 authored Sep 11, 2024
2 parents 8b5e4cf + 01f2382 commit aaa8816
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 116 deletions.
2 changes: 1 addition & 1 deletion _shared_utils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
setup(
name="shared_utils",
packages=find_packages(),
version="2.5",
version="2.5.1",
description="Shared utility functions for data analyses",
author="Cal-ITP",
license="Apache",
Expand Down
20 changes: 20 additions & 0 deletions _shared_utils/shared_utils/gtfs_utils_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,23 @@ def get_stop_times(
)

return stop_times


def filter_to_public_schedule_gtfs_dataset_keys(get_df: bool = False) -> list:
"""
Return a list of schedule_gtfs_dataset_keys that have
private_dataset == None.
private_dataset holds values:True or None, no False.
"""
dim_gtfs_datasets = schedule_rt_utils.filter_dim_gtfs_datasets(
keep_cols=["key", "name", "private_dataset"],
custom_filtering={
"type": ["schedule"],
},
get_df=True,
) >> filter(_.private_dataset != True)

if get_df:
return dim_gtfs_datasets
else:
return dim_gtfs_datasets.gtfs_dataset_key.unique().tolist()
12 changes: 12 additions & 0 deletions _shared_utils/shared_utils/publish_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Union

import gcsfs
import pandas as pd

fs = gcsfs.GCSFileSystem()
PUBLIC_BUCKET = "gs://calitp-publish-data-analysis/"
Expand Down Expand Up @@ -47,3 +48,14 @@ def if_exists_then_delete(filepath: str):
fs.rm(filepath)

return


def exclude_private_datasets(
df: pd.DataFrame,
col: str = "schedule_gtfs_dataset_key",
public_gtfs_dataset_keys: list = [],
) -> pd.DataFrame:
"""
Filter out private datasets.
"""
return df[df[col].isin(public_gtfs_dataset_keys)].reset_index(drop=True)
26 changes: 19 additions & 7 deletions gtfs_digest/merge_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from calitp_data_analysis import utils
from segment_speed_utils import gtfs_schedule_wrangling, time_series_utils
from shared_utils import gtfs_utils_v2, publish_utils
from update_vars import GTFS_DATA_DICT, SEGMENT_GCS, RT_SCHED_GCS, SCHED_GCS

route_time_cols = ["schedule_gtfs_dataset_key",
Expand Down Expand Up @@ -220,16 +221,21 @@ def set_primary_typology(df: pd.DataFrame) -> pd.DataFrame:

return df3


if __name__ == "__main__":

from shared_utils import rt_dates

analysis_date_list = (rt_dates.y2024_dates + rt_dates.y2023_dates
)
analysis_date_list = (
rt_dates.y2024_dates + rt_dates.y2023_dates
)

DIGEST_RT_SCHED = GTFS_DATA_DICT.digest_tables.route_schedule_vp
DIGEST_SEGMENT_SPEEDS = GTFS_DATA_DICT.digest_tables.route_segment_speeds

# These are public schedule_gtfs_dataset_keys
public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys()

# Get cardinal direction for each route
df_sched = concatenate_schedule_by_route_direction(analysis_date_list)

Expand Down Expand Up @@ -274,11 +280,15 @@ def set_primary_typology(df: pd.DataFrame) -> pd.DataFrame:
df_crosswalk,
on = ["schedule_gtfs_dataset_key", "name", "service_date"],
how = "left"
).pipe(
# Find the most common cardinal direction
gtfs_schedule_wrangling.top_cardinal_direction
).pipe(
# Drop any private datasets before exporting
publish_utils.exclude_private_datasets,
public_gtfs_dataset_keys= public_feeds
)

# Find the most common cardinal direction
df = gtfs_schedule_wrangling.top_cardinal_direction(df)


integrify = [
"n_scheduled_trips", "n_vp_trips",
"minutes_atleast1_vp", "minutes_atleast2_vp",
Expand All @@ -304,7 +314,9 @@ def set_primary_typology(df: pd.DataFrame) -> pd.DataFrame:
primary_typology,
on = route_time_cols,
how = "left"
)
).pipe(
publish_utils.exclude_private_datasets,
public_gtfs_dataset_keys= public_feeds)

utils.geoparquet_gcs_export(
segment_speeds2,
Expand Down
35 changes: 28 additions & 7 deletions gtfs_digest/merge_operator_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from calitp_data_analysis import utils
from segment_speed_utils import time_series_utils
from shared_utils import publish_utils
from merge_data import merge_in_standardized_route_names
from update_vars import GTFS_DATA_DICT, SCHED_GCS, RT_SCHED_GCS

Expand Down Expand Up @@ -100,6 +101,7 @@ def operator_category_counts_by_date() -> pd.DataFrame:

return operator_category_counts


if __name__ == "__main__":

from shared_utils import rt_dates
Expand All @@ -111,6 +113,8 @@ def operator_category_counts_by_date() -> pd.DataFrame:
SCHED_RT_CATEGORY = GTFS_DATA_DICT.digest_tables.operator_sched_rt
CROSSWALK = GTFS_DATA_DICT.schedule_tables.gtfs_key_crosswalk

public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys()

# Concat operator profiles
df = concatenate_operator_stats(analysis_date_list)

Expand Down Expand Up @@ -141,14 +145,20 @@ def operator_category_counts_by_date() -> pd.DataFrame:

# Merge
merge_cols = ["schedule_gtfs_dataset_key", "service_date"]
op_profiles_df1 = pd.merge(df,
crosswalk_df,
on = merge_cols,
how = "left")
op_profiles_df1 = pd.merge(
df,
crosswalk_df,
on = merge_cols,
how = "left"
)

# Drop duplicates created after merging
op_profiles_df2 = (op_profiles_df1
.drop_duplicates(subset = list(op_profiles_df1.columns))
.pipe(
publish_utils.exclude_private_datasets,
col = "schedule_gtfs_dataset_key",
public_gtfs_dataset_keys = public_feeds
).drop_duplicates(subset = list(op_profiles_df1.columns))
.reset_index(drop = True))

op_profiles_df2.to_parquet(
Expand All @@ -157,15 +167,26 @@ def operator_category_counts_by_date() -> pd.DataFrame:

gdf = concatenate_operator_routes(
analysis_date_list
).pipe(merge_in_standardized_route_names)
).pipe(
merge_in_standardized_route_names
).pipe(
publish_utils.exclude_private_datasets,
col = "schedule_gtfs_dataset_key",
public_gtfs_dataset_keys = public_feeds
)

utils.geoparquet_gcs_export(
gdf,
RT_SCHED_GCS,
OPERATOR_ROUTE
)

operator_category_counts = operator_category_counts_by_date()
operator_category_counts = operator_category_counts_by_date().pipe(
publish_utils.exclude_private_datasets,
col = "schedule_gtfs_dataset_key",
public_gtfs_dataset_keys = public_feeds
)

operator_category_counts.to_parquet(
f"{RT_SCHED_GCS}{SCHED_RT_CATEGORY}.parquet"
)
Expand Down
136 changes: 64 additions & 72 deletions gtfs_digest/merge_operator_service.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import pandas as pd
import numpy as np
from segment_speed_utils import helpers, time_series_utils, gtfs_schedule_wrangling
from segment_speed_utils.project_vars import (COMPILED_CACHED_VIEWS, RT_SCHED_GCS, SCHED_GCS)

from shared_utils import catalog_utils, rt_dates

GTFS_DATA_DICT = catalog_utils.get_catalog("gtfs_analytics_data")

"""
Finding the total number of scheduled service hours for
an operator across its routes for a full week. The data is
downloaded every 1/2 a year.
Grain is operator-service_date-route
"""
import pandas as pd

from segment_speed_utils import (gtfs_schedule_wrangling, helpers,
time_series_utils)
from segment_speed_utils.project_vars import (
COMPILED_CACHED_VIEWS, weeks_available)
from shared_utils import gtfs_utils_v2, publish_utils, rt_dates
from update_vars import GTFS_DATA_DICT, RT_SCHED_GCS


def concatenate_trips(
date_list: list,
) -> pd.DataFrame:
Expand Down Expand Up @@ -44,29 +45,6 @@ def concatenate_trips(

return df

def get_day_type(date):
"""
Function to return the day type (e.g., Monday, Tuesday, etc.) from a datetime object.
"""
days_of_week = ["Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
"Sunday"]
return days_of_week[date.weekday()]

def weekday_or_weekend(row):
"""
Tag if a day is a weekday or Saturday/Sunday
"""
if row.day_type == "Sunday":
return "Sunday"
if row.day_type == "Saturday":
return "Saturday"
else:
return "Weekday"

def total_service_hours(date_list: list) -> pd.DataFrame:
"""
Expand All @@ -76,67 +54,81 @@ def total_service_hours(date_list: list) -> pd.DataFrame:
# Combine all the days' data for a week.
df = concatenate_trips(date_list)

# Find day type aka Monday, Tuesday, Wednesday based on service date.
df['day_type'] = df['service_date'].apply(get_day_type)

# Tag if the day is a weekday, Saturday, or Sunday.
df["weekday_weekend"] = df.apply(weekday_or_weekend, axis=1)
WEEKDAY_DICT = {
**{k: "Weekday" for k in ["Monday", "Tuesday", "Wednesday",
"Thursday", "Friday"]},
"Saturday": "Saturday",
"Sunday": "Sunday"
}

# df = gtfs_schedule_wrangling.add_weekday_weekend_column(df)

# Find the minimum departure hour.
df["departure_hour"] = df.trip_first_departure_datetime_pacific.dt.hour
# Find day type (Monday, Tuesday, etc), departure hour, month_year, and weekday_weekend
df = df.assign(
day_type = df.service_date.dt.day_name(),
departure_hour = df.trip_first_departure_datetime_pacific.dt.hour.astype("Int64"),
# get month_year that's 2024-04 for Apr2024 format
month_year = (df.service_date.dt.year.astype(str) +
"-" + df.service_date.dt.month.astype(str).str.zfill(2)),
).pipe(
gtfs_schedule_wrangling.add_weekday_weekend_column, WEEKDAY_DICT
)

# Delete out the specific day, leave only month & year.
df["month"] = df.service_date.astype(str).str.slice(stop=7)

# Total up service hours by weekday, Sunday, and Saturday.
# Total up hourly service hours by weekday, Sunday, and Saturday.
df2 = (
df.groupby(["name",
"month",
"month_year",
"weekday_weekend",
"departure_hour"])
.agg(
{
"service_hours": "sum",
}
)
.agg({"service_hours": "sum"})
.reset_index()
)

# For weekday hours, divide by 5.
df2["weekday_service_hours"] = df2.service_hours/5
# weekday hours should be divided by 5, while keeping sat/sun intact
df2 = df2.assign(
daily_service_hours = df2.apply(
lambda x: round(x.service_hours / 5, 2)
if x.weekday_weekend=="Weekday"
else round(x.service_hours, 2), axis=1
),
service_hours = df2.service_hours.round(2),
)

# Rename projects.
df2 = df2.rename(columns = {'service_hours':'weekend_service_hours'})
return df2

def total_service_hours_all_months() -> pd.DataFrame:

def total_service_hours_all_months(week_list: list[list]) -> pd.DataFrame:
"""
Find service hours for a full week for one operator
and for the months we have a full week's worth of data downloaded.
As of 5/2024, we have April 2023 and October 2023.
"""
# Grab the dataframes with a full week's worth of data.
apr_23week = rt_dates.get_week(month="apr2023", exclude_wed=False)
oct_23week = rt_dates.get_week(month="oct2023", exclude_wed=False)
apr_24week = rt_dates.get_week(month="apr2024", exclude_wed=False)

# Sum up total service_hours
apr_23df = total_service_hours(apr_23week)
oct_23df = total_service_hours(oct_23week)
apr_24df = total_service_hours(apr_24week)
As of 5/2024, we have April 2023, October 2023, and April 2024.
"""
public_datasets = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys(
get_df=True
)
public_feeds = public_datasets.gtfs_dataset_name.unique().tolist()

# Combine everything
all_df = pd.concat([apr_23df, oct_23df, apr_24df])

all_df = pd.concat(
[total_service_hours(one_week) for one_week in week_list]
).pipe(
publish_utils.exclude_private_datasets,
col = "name",
public_gtfs_dataset_keys = public_feeds
)

return all_df


if __name__ == "__main__":

# Save service hours.
SERVICE_EXPORT = f"{GTFS_DATA_DICT.digest_tables.dir}{GTFS_DATA_DICT.digest_tables.scheduled_service_hours}.parquet"
service_hours = total_service_hours_all_months()
service_hours.to_parquet(SERVICE_EXPORT)
print(f"Aggregating for dates: {weeks_available}")

# Save service hours
SERVICE_EXPORT = GTFS_DATA_DICT.digest_tables.scheduled_service_hours

service_hours = total_service_hours_all_months(weeks_available)

service_hours.to_parquet(
f"{RT_SCHED_GCS}{SERVICE_EXPORT}.parquet"
)

Loading

0 comments on commit aaa8816

Please sign in to comment.