Skip to content

Commit

Permalink
Merge pull request #1224 from cal-itp/pems-daytype
Browse files Browse the repository at this point in the history
Match PEMS stations with SHN postmiles
  • Loading branch information
tiffanychu90 authored Sep 16, 2024
2 parents aaa8816 + 81ebba0 commit d2f526c
Show file tree
Hide file tree
Showing 12 changed files with 1,803 additions and 17 deletions.
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ repos:
rev: 6.0.0
hooks:
- id: flake8
args: ["--ignore=E501,W503,F403,F405,E711,E712,E231,E702"]
args: ["--ignore=E501,W503,F403,F405,E711,E712,E231,E702,E203"]
# E711: comparison to None should be 'if cond is not None:' (siuba filtering requires we use != None and not is not)
# E712: line too long and line before binary operator (black is ok with these), assign lambda expression OK, comparison to True with is (siuba uses ==)
# E231: missing whitespace after colon (we don't want white space when setting gs://)
# E702: multiple statements on one line (semicolon)
# E203: whitespace before ':', this rule is in conflict with another formatter's.
types:
- python
files: _shared_utils
Expand Down
2 changes: 1 addition & 1 deletion _shared_utils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
setup(
name="shared_utils",
packages=find_packages(),
version="2.5.1",
version="2.6",
description="Shared utility functions for data analyses",
author="Cal-ITP",
license="Apache",
Expand Down
2 changes: 2 additions & 0 deletions _shared_utils/shared_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from . import (
arcgis_query,
catalog_utils,
dask_utils,
gtfs_utils_v2,
Expand All @@ -10,6 +11,7 @@
)

__all__ = [
"arcgis_query",
"catalog_utils",
"dask_utils",
"gtfs_utils_v2",
Expand Down
156 changes: 156 additions & 0 deletions _shared_utils/shared_utils/arcgis_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""
Query beyond the 2,000 rows ESRI gives.
https://gis.stackexchange.com/questions/266897/how-to-get-around-the-1000-objectids-limit-on-arcgis-server
"""
import urllib.parse

import geopandas as gpd
import numpy as np
import pandas as pd
import requests


def query_arcgis_feature_server(url_feature_server=""):
"""
This function downloads all of the features available on a given ArcGIS
feature server. The function is written to bypass the limitations imposed
by the online service, such as only returning up to 1,000 or 2,000 featues
at a time.
Parameters
----------
url_feature_server : string
Sting containing the URL of the service API you want to query. It should
end in a forward slash and look something like this:
'https://services.arcgis.com/P3ePLMYs2RVChkJx/arcgis/rest/services/USA_Counties/FeatureServer/0/'
Returns
-------
geodata_final : gpd.GeoDataFrame
This is a GeoDataFrame that contains all of the features from the
Feature Server. After calling this function, the `geodata_final` object
can be used to store the data on disk in several different formats
including, but not limited to, Shapefile (.shp), GeoJSON (.geojson),
GeoPackage (.gpkg), or PostGIS.
See https://geopandas.org/en/stable/docs/user_guide/io.html#writing-spatial-data
for more details.
"""
if url_feature_server == "":
geodata_final = gpd.GeoDataFrame()
return geodata_final

# Fixing last character in case the URL provided didn't end in a
# forward slash
if url_feature_server[-1] != "/":
url_feature_server = url_feature_server + "/"

# Getting the layer definitions. This contains important info such as the
# name of the column used as feature_ids/object_ids, among other things.
layer_def = requests.get(url_feature_server + "?f=pjson").json()

# The `objectIdField` is the column name used for the
# feature_ids/object_ids
fid_colname = layer_def["objectIdField"]

# The `maxRecordCount` tells us the maximum number of records this REST
# API service can return at once. The code below is written such that we
# perform multiple calls to the API, each one being short enough never to
# go beyond this limit.
record_count_max = layer_def["maxRecordCount"]

# Part of the URL that specifically requests only the object IDs
url_query_get_ids = f"query?f=geojson&returnIdsOnly=true" f"&where={fid_colname}+is+not+null"

url_comb = url_feature_server + url_query_get_ids

# Getting all the object IDs
service_request = requests.get(url_comb)
all_objectids = np.sort(service_request.json()["properties"]["objectIds"])

# This variable will store all the parts of the multiple queries. These
# parts will, at the end, be concatenated into one large GeoDataFrame.
geodata_parts = []

# This part of the query is fixed and never actually changes
url_query_fixed = "query?f=geojson&outFields=*&where="

# Identifying the largest query size allowed per request. This will dictate
# how many queries will need to be made. We start the search at
# the max record count, but that generates errors sometimes - the query
# might time out because it's too big. If the test query times out, we try
# shrink the query size until the test query goes through without
# generating a time-out error.
block_size = min(record_count_max, len(all_objectids))
worked = False
while not worked:
# Moving the "cursors" to their appropriate locations
id_start = all_objectids[0]
id_end = all_objectids[block_size - 1]

readable_query_string = f"{fid_colname}>={id_start} " f"and {fid_colname}<={id_end}"

url_query_variable = urllib.parse.quote(readable_query_string)

url_comb = url_feature_server + url_query_fixed + url_query_variable

url_get = requests.get(url_comb)

if "error" in url_get.json():
block_size = int(block_size / 2) + 1
else:
geodata_part = gpd.read_file(url_get.text)

geodata_parts.append(geodata_part.copy())
worked = True

# Performing the actual query to the API multiple times. This skips the
# first few rows/features in the data because those rows were already
# captured in the query performed in the code chunk above.
for i in range(block_size, len(all_objectids), block_size):
# Moving the "cursors" to their appropriate locations and finding the
# limits of each block
sub_list = all_objectids[i : i + block_size]
id_start = sub_list[0]
id_end = sub_list[-1]

readable_query_string = f"{fid_colname}>={id_start} " f"and {fid_colname}<={id_end}"

# Encoding from readable text to URL
url_query_variable = urllib.parse.quote(readable_query_string)

# Constructing the full request URL
url_comb = url_feature_server + url_query_fixed + url_query_variable

# Actually performing the query and storing its results in a
# GeoDataFrame
geodata_part = gpd.read_file(url_comb, driver="GeoJSON")

# Appending the result to `geodata_parts`
if geodata_part.shape[0] > 0:
geodata_parts.append(geodata_part)

# Concatenating all of the query parts into one large GeoDataFrame
geodata_final = pd.concat(geodata_parts, ignore_index=True).sort_values(by=fid_colname).reset_index(drop=True)

# Checking if any object ID is missing
ids_queried = set(geodata_final[fid_colname])
for i, this_id in enumerate(all_objectids):
if this_id not in ids_queried:
print("WARNING! The following ObjectID is missing from the final " f"GeoDataFrame: ObjectID={this_id}")
pass

# Checking if any object ID is included twice
geodata_temp = geodata_final[[fid_colname]].copy()
geodata_temp["temp"] = 1
geodata_temp = geodata_temp.groupby(fid_colname).agg({"temp": "sum"}).reset_index()
geodata_temp = geodata_temp.loc[geodata_temp["temp"] > 1].copy()
for i, this_id in enumerate(geodata_temp[fid_colname].values):
n_times = geodata_temp["temp"].values[i]
print(
"WARNING! The following ObjectID is included multiple times in"
f"the final GeoDataFrame: ObjectID={this_id}\tOccurrences={n_times}"
)

return geodata_final
94 changes: 89 additions & 5 deletions _shared_utils/shared_utils/shared_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
"""
import geopandas as gpd
import pandas as pd
import shapely
from calitp_data_analysis import geography_utils, utils
from calitp_data_analysis.sql import to_snakecase
from shared_utils.arcgis_query import query_arcgis_feature_server

GCS_FILE_PATH = "gs://calitp-analytics-data/data-analyses/shared_data/"

Expand Down Expand Up @@ -47,20 +50,21 @@ def make_county_centroids():
ca_row2 = pd.DataFrame.from_dict(ca_row, orient="index").T
gdf2 = gdf.append(ca_row2).reset_index(drop=True)

print("County centroids dataset created")

# Save as parquet, because lat/lon held in list, not point geometry anymore
gdf2.to_parquet(f"{GCS_FILE_PATH}ca_county_centroids.parquet")

print("County centroids exported to GCS")

return


def make_clean_state_highway_network():
"""
Create State Highway Network dataset.
"""
HIGHWAY_URL = "https://opendata.arcgis.com/datasets/" "77f2d7ba94e040a78bfbe36feb6279da_0.geojson"
gdf = gpd.read_file(HIGHWAY_URL)
URL = "https://opendata.arcgis.com/datasets/" "77f2d7ba94e040a78bfbe36feb6279da_0.geojson"

gdf = gpd.read_file(URL)

keep_cols = ["Route", "County", "District", "RouteType", "Direction", "geometry"]

Expand All @@ -78,8 +82,88 @@ def make_clean_state_highway_network():
utils.geoparquet_gcs_export(gdf2, GCS_FILE_PATH, "state_highway_network")


# Run functions to create these datasets...store in GCS
def export_shn_postmiles():
"""
Create State Highway Network postmiles dataset.
These are points....maybe we can somehow create line segments?
"""
URL = "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/" "CHhighway/SHN_Postmiles_Tenth/" "FeatureServer/0/"

gdf = query_arcgis_feature_server(URL)

gdf2 = to_snakecase(gdf).drop(columns="objectid")

utils.geoparquet_gcs_export(gdf2, GCS_FILE_PATH, "state_highway_network_postmiles")

return


def draw_line_between_points(gdf: gpd.GeoDataFrame, group_cols: list) -> gpd.GeoDataFrame:
"""
Use the current postmile as the
starting geometry / segment beginning
and the subsequent postmile (based on odometer)
as the ending geometry / segment end.
Segment goes from current to next postmile.
"""
# Grab the subsequent point geometry
# We can drop whenever the last point is missing within
# a group. If we have 3 points, we can draw 2 lines.
gdf = gdf.assign(end_geometry=(gdf.groupby(group_cols, group_keys=False).geometry.shift(-1))).dropna(
subset="end_geometry"
)

# Construct linestring with 2 point coordinates
gdf = (
gdf.assign(
line_geometry=gdf.apply(lambda x: shapely.LineString([x.geometry, x.end_geometry]), axis=1).set_crs(
geography_utils.WGS84
)
)
.drop(columns=["geometry", "end_geometry"])
.rename(columns={"line_geometry": "geometry"})
)

return gdf


def create_postmile_segments(group_cols: list) -> gpd.GeoDataFrame:
"""
Take the SHN postmiles gdf, group by highway / odometer
and convert the points into lines.
We'll lose the last postmile for each highway-direction.
Segment goes from current postmile point to subseq postmile point.
"""
gdf = gpd.read_parquet(
f"{GCS_FILE_PATH}state_highway_network_postmiles.parquet",
columns=["route", "direction", "odometer", "geometry"],
)

# If there are duplicates with highway-direction and odometer
# (where pm or other column differs slightly),
# we'll drop and cut as long of a segment we can
# There may be differences in postmile (relative to county start)
# and odometer (relative to line's origin).
gdf2 = (
gdf.sort_values(group_cols + ["odometer"])
.drop_duplicates(subset=group_cols + ["odometer"])
.reset_index(drop=True)
)

gdf3 = draw_line_between_points(gdf2, group_cols)

utils.geoparquet_gcs_export(gdf3, GCS_FILE_PATH, "state_highway_network_postmile_segments")

return


if __name__ == "__main__":
# Run functions to create these datasets...store in GCS

make_county_centroids()

make_clean_state_highway_network()
export_shn_postmiles()

create_postmile_segments(["route", "direction"])
13 changes: 13 additions & 0 deletions _shared_utils/shared_utils/shared_data_catalog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,16 @@ sources:
args:
# source: bus_service_increase/bus_service_utils/generate_calenviroscreen_lehd_data.py
urlpath: gs://calitp-analytics-data/data-analyses/bus_service_increase/calenviroscreen_lehd_by_tract.parquet
state_highway_network_postmiles:
driver: geoparquet
description: Caltrans State Highway Network postmiles (every 0.1 mile) with postmiles as point geometry.
args:
# source: https://gisdata-caltrans.opendata.arcgis.com/datasets/c22341fec9c74c6b9488ee4da23dd967_0/about
# hitting url directly would limit us to 2,000 rows
urlpath: gs://calitp-analytics-data/data-analyses/shared_data/state_highway_network_postmiles.parquet
state_highway_network_postmile_segments:
driver: geoparquet
description: Caltrans State Highway Network postmile segments (postmiles converted to line segments)
args:
# source: shared_utils/shared_data.py
urlpath: gs://calitp-analytics-data/data-analyses/shared_data/state_highway_network_postmile_segments.parquet
Loading

0 comments on commit d2f526c

Please sign in to comment.