Skip to content

Commit

Permalink
update refs for speeds
Browse files Browse the repository at this point in the history
  • Loading branch information
tiffanychu90 committed Sep 11, 2024
1 parent cd065ff commit 01f2382
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 27 deletions.
22 changes: 6 additions & 16 deletions gtfs_digest/merge_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from calitp_data_analysis import utils
from segment_speed_utils import gtfs_schedule_wrangling, time_series_utils
from shared_utils import gtfs_utils_v2
from shared_utils import gtfs_utils_v2, publish_utils
from update_vars import GTFS_DATA_DICT, SEGMENT_GCS, RT_SCHED_GCS, SCHED_GCS

route_time_cols = ["schedule_gtfs_dataset_key",
Expand Down Expand Up @@ -222,19 +222,6 @@ def set_primary_typology(df: pd.DataFrame) -> pd.DataFrame:
return df3


def exclude_private_datasets(
df: pd.DataFrame,
col: str = "schedule_gtfs_dataset_key",
public_gtfs_dataset_keys: list = [],
) -> pd.DataFrame:
"""
Filter out private datasets.
"""
return df[
df[col].isin(public_gtfs_dataset_keys)
].reset_index(drop=True)


if __name__ == "__main__":

from shared_utils import rt_dates
Expand Down Expand Up @@ -298,7 +285,8 @@ def exclude_private_datasets(
gtfs_schedule_wrangling.top_cardinal_direction
).pipe(
# Drop any private datasets before exporting
exclude_private_datasets, public_gtfs_dataset_keys= public_feeds
publish_utils.exclude_private_datasets,
public_gtfs_dataset_keys= public_feeds
)

integrify = [
Expand Down Expand Up @@ -326,7 +314,9 @@ def exclude_private_datasets(
primary_typology,
on = route_time_cols,
how = "left"
).pipe(exclude_private_datasets, public_gtfs_dataset_keys= public_feeds)
).pipe(
publish_utils.exclude_private_datasets,
public_gtfs_dataset_keys= public_feeds)

utils.geoparquet_gcs_export(
segment_speeds2,
Expand Down
9 changes: 5 additions & 4 deletions gtfs_digest/merge_operator_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from calitp_data_analysis import utils
from segment_speed_utils import time_series_utils
from merge_data import merge_in_standardized_route_names, exclude_private_datasets
from shared_utils import publish_utils
from merge_data import merge_in_standardized_route_names
from update_vars import GTFS_DATA_DICT, SCHED_GCS, RT_SCHED_GCS

sort_cols = ["schedule_gtfs_dataset_key", "service_date"]
Expand Down Expand Up @@ -154,7 +155,7 @@ def operator_category_counts_by_date() -> pd.DataFrame:
# Drop duplicates created after merging
op_profiles_df2 = (op_profiles_df1
.pipe(
exclude_private_datasets,
publish_utils.exclude_private_datasets,
col = "schedule_gtfs_dataset_key",
public_gtfs_dataset_keys = public_feeds
).drop_duplicates(subset = list(op_profiles_df1.columns))
Expand All @@ -169,7 +170,7 @@ def operator_category_counts_by_date() -> pd.DataFrame:
).pipe(
merge_in_standardized_route_names
).pipe(
exclude_private_datasets,
publish_utils.exclude_private_datasets,
col = "schedule_gtfs_dataset_key",
public_gtfs_dataset_keys = public_feeds
)
Expand All @@ -181,7 +182,7 @@ def operator_category_counts_by_date() -> pd.DataFrame:
)

operator_category_counts = operator_category_counts_by_date().pipe(
exclude_private_datasets,
publish_utils.exclude_private_datasets,
col = "schedule_gtfs_dataset_key",
public_gtfs_dataset_keys = public_feeds
)
Expand Down
9 changes: 5 additions & 4 deletions gtfs_digest/merge_operator_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
"""
import pandas as pd

from merge_data import exclude_private_datasets
from segment_speed_utils import (gtfs_schedule_wrangling, helpers,
time_series_utils)
from segment_speed_utils.project_vars import (
COMPILED_CACHED_VIEWS, weeks_available)
from shared_utils import gtfs_utils_v2, rt_dates
from shared_utils import gtfs_utils_v2, publish_utils, rt_dates
from update_vars import GTFS_DATA_DICT, RT_SCHED_GCS


Expand Down Expand Up @@ -103,14 +102,16 @@ def total_service_hours_all_months(week_list: list[list]) -> pd.DataFrame:
and for the months we have a full week's worth of data downloaded.
As of 5/2024, we have April 2023, October 2023, and April 2024.
"""
public_datasets = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys(get_df=True)
public_datasets = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys(
get_df=True
)
public_feeds = public_datasets.gtfs_dataset_name.unique().tolist()

# Combine everything
all_df = pd.concat(
[total_service_hours(one_week) for one_week in week_list]
).pipe(
exclude_private_datasets,
publish_utils.exclude_private_datasets,
col = "name",
public_gtfs_dataset_keys = public_feeds
)
Expand Down
6 changes: 5 additions & 1 deletion rt_segment_speeds/scripts/publish_open_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pathlib import Path

from calitp_data_analysis import utils
from shared_utils import gtfs_utils_v2
from update_vars import GTFS_DATA_DICT, SEGMENT_GCS


Expand All @@ -16,6 +17,8 @@ def stage_open_data_exports(analysis_date: str):
export them to a stable GCS URL so we can always
read it in open_data/catalog.yml.
"""
public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys()

datasets = [
GTFS_DATA_DICT.stop_segments.route_dir_single_segment,
#GTFS_DATA_DICT.speedmap_segments.route_dir_single_segment,
Expand All @@ -24,7 +27,8 @@ def stage_open_data_exports(analysis_date: str):

for d in datasets:
gdf = gpd.read_parquet(
f"{SEGMENT_GCS}{d}_{analysis_date}.parquet"
f"{SEGMENT_GCS}{d}_{analysis_date}.parquet",
filters = [[("schedule_gtfs_dataset_key", "in", public_feeds)]]
)

utils.geoparquet_gcs_export(
Expand Down
9 changes: 7 additions & 2 deletions rt_segment_speeds/scripts/publish_public_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pathlib import Path

from calitp_data_analysis import utils
from shared_utils import rt_dates
from shared_utils import rt_dates, gtfs_utils_v2
from update_vars import GTFS_DATA_DICT, SEGMENT_GCS, PUBLIC_GCS

if __name__ == "__main__":
Expand All @@ -19,11 +19,16 @@
GTFS_DATA_DICT.speedmap_segments.route_dir_single_segment,
]

public_feeds = gtfs_utils_v2.filter_to_public_schedule_gtfs_dataset_keys()

for d in datasets:

start = datetime.datetime.now()

df = gpd.read_parquet(f"{SEGMENT_GCS}{d}_{analysis_date}.parquet")
df = gpd.read_parquet(
f"{SEGMENT_GCS}{d}_{analysis_date}.parquet",
filters = [[("schedule_gtfs_dataset_key", "in", public_feeds)]]
)

utils.geoparquet_gcs_export(
df,
Expand Down

0 comments on commit 01f2382

Please sign in to comment.