Skip to content

Commit

Permalink
Fix some type errors regarding mypy check.
Browse files Browse the repository at this point in the history
  • Loading branch information
YueeeeeLi committed Mar 25, 2024
1 parent 41814e3 commit 7087733
Showing 1 changed file with 60 additions and 49 deletions.
109 changes: 60 additions & 49 deletions src/nird/road.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from collections import defaultdict
from functools import partial
import numpy as np
import numpy.typing as npt
import pandas as pd
import geopandas as gpd # type: ignore
import geopandas as gpd

import igraph # type: ignore
import igraph

import nird.constants as cons
from nird.utils import get_flow_on_edges
Expand All @@ -23,7 +24,7 @@ def select_partial_roads(
road_links: gpd.GeoDataFrame,
road_nodes: gpd.GeoDataFrame,
col_name: str,
list_of_values: list,
list_of_values: list[str],
) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""Extract major roads
Expand All @@ -45,17 +46,17 @@ def select_partial_roads(
selected_nodes: gpd.GeoDataFrame
The selected types of road nodes.
"""
selected_links = []
selected_links_dfs = []
# road links selection
for ci in list_of_values:
selected_links.append(road_links[road_links[col_name] == ci])
selected_links_dfs.append(road_links[road_links[col_name] == ci])

selected_links = pd.concat(selected_links, ignore_index=True)
selected_links = pd.concat(selected_links_dfs, ignore_index=True)
selected_links = gpd.GeoDataFrame(selected_links, geometry="geometry")

selected_links["e_id"] = selected_links.id
selected_links["from_id"] = selected_links.start_node
selected_links["to_id"] = selected_links.end_node
selected_links["e_id"] = selected_links["id"]
selected_links["from_id"] = selected_links["start_node"]
selected_links["to_id"] = selected_links["end_node"]

# road nodes selection
sel_node_idx = list(
Expand Down Expand Up @@ -180,7 +181,9 @@ def cost_func(


def initial_speed_func(
road_type: str, form_of_road: str, free_flow_speed_dict: dict
road_type: str,
form_of_road: str,
free_flow_speed_dict: dict[str, float],
) -> Union[float, None]:
"""Append free-flow speed to each road segment
Expand Down Expand Up @@ -216,10 +219,10 @@ def speed_flow_func(
road_type: str,
isUrban: int,
vp: float,
free_flow_speed_dict: dict,
flow_breakpoint_dict: dict,
min_speed_cap: dict,
urban_speed_cap: dict,
free_flow_speed_dict: dict[str, float],
flow_breakpoint_dict: dict[str, int],
min_speed_cap: dict[str, float],
urban_speed_cap: dict[str, float],
) -> Union[float, None]:
"""Update the average flow rate of each road segment in terms of flow changes.
Expand Down Expand Up @@ -304,12 +307,14 @@ def speed_flow_func(
return None


def filter_less_than_one(arr: np.ndarray) -> np.ndarray:
def filter_less_than_one(arr: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""Convert values less than one to zero"""
return np.where(arr >= 1, arr, 0)
return np.where(arr >= 1, arr, 0.0)


def find_nearest_node(zones: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame) -> dict:
def find_nearest_node(
zones: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame
) -> dict[str, str]:
"""Find nearest network node for each admin centroid
Parameters
Expand Down Expand Up @@ -349,11 +354,11 @@ def find_nearest_node(zones: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame) ->

def od_interpret(
od_matrix: pd.DataFrame,
zone_to_node: dict,
zone_to_node: dict[str, str],
col_origin: str,
col_destination: str,
col_count: str,
) -> Tuple[list, dict, dict]:
) -> Tuple[list[str], dict[str, list[str]], dict[str, list[float]]]:
"""Generate a list of origins along with their associated destinations and outbound trips.
Parameters
Expand Down Expand Up @@ -383,8 +388,8 @@ def od_interpret(
supply_dict: dict[str, list[float]] = defaultdict(list)

for idx in tqdm(range(od_matrix.shape[0]), desc="Processing"):
from_zone = od_matrix.loc[idx, col_origin]
to_zone = od_matrix.loc[idx, col_destination]
from_zone: str = od_matrix.loc[idx, col_origin] # type: ignore
to_zone: str = od_matrix.loc[idx, col_destination] # type: ignore
count: float = od_matrix.loc[idx, col_count] # type: ignore
try:
from_node = zone_to_node[from_zone]
Expand All @@ -405,10 +410,10 @@ def od_interpret(


def create_igraph_network(
node_name_to_index: dict,
node_name_to_index: dict[str, int],
road_links: gpd.GeoDataFrame,
road_nodes: gpd.GeoDataFrame,
initialSpeeds: dict,
initialSpeeds: dict[str, float],
) -> igraph.Graph:
"""Network creation using igraph.
Expand Down Expand Up @@ -483,8 +488,8 @@ def create_igraph_network(

def initialise_igraph_network(
road_links: gpd.GeoDataFrame,
initial_capacity_dict: dict,
initial_speed_dict: dict,
initial_capacity_dict: dict[str, int],
initial_speed_dict: dict[str, float],
col_road_classification: str,
) -> gpd.GeoDataFrame:
"""Road Network Initialisation
Expand Down Expand Up @@ -529,9 +534,9 @@ def initialise_igraph_network(

def update_od_matrix(
temp_flow_matrix: pd.DataFrame,
supply_dict: dict,
destination_dict: dict,
) -> Tuple[list, dict, dict, float]:
supply_dict: dict[str, list[int]],
destination_dict: dict[str, list[str]],
) -> Tuple[list[str], dict[str, list[int]], dict[str, list[str]], float]:
"""Drop the origin-destination pairs with no accessible link or unallocated trips
Parameters
Expand Down Expand Up @@ -588,10 +593,10 @@ def update_od_matrix(

def update_network_structure(
network: igraph.Graph,
length_dict: dict,
speed_dict: dict,
length_dict: dict[str, float],
speed_dict: dict[str, float],
temp_edge_flow: pd.DataFrame,
) -> Tuple[igraph.Graph, dict]:
) -> Tuple[igraph.Graph, dict[int, str]]:
"""Update the network structure (links and nodes)
Parameters
Expand Down Expand Up @@ -656,17 +661,17 @@ def update_network_structure(
def network_flow_model(
network: igraph.Graph,
road_links: gpd.GeoDataFrame,
node_name_to_index: dict,
edge_index_to_name: dict,
list_of_origins: list,
supply_dict: dict,
destination_dict: dict,
free_flow_speed_dict: dict,
flow_breakpoint_dict: dict,
min_speed_cap: dict,
urban_speed_cap: dict,
node_name_to_index: dict[str, int],
edge_index_to_name: dict[int, str],
list_of_origins: list[str],
supply_dict: dict[str, list[int]],
destination_dict: dict[str, list[str]],
free_flow_speed_dict: dict[str, float],
flow_breakpoint_dict: dict[str, int],
min_speed_cap: dict[str, float],
urban_speed_cap: dict[str, float],
col_eid: str,
) -> Tuple[dict, dict, dict]:
) -> Tuple[dict[str, float], dict[str, int], dict[str, int]]:
"""Network flow model
Parameters
Expand Down Expand Up @@ -719,19 +724,25 @@ def network_flow_model(
print(f"The initial number of destinations: {number_of_destinations}")

# road link properties
edge_cbtype_dict = road_links.set_index(col_eid)["combined_label"].to_dict()
edge_isUrban_dict = road_links.set_index(col_eid)["urban"].to_dict()
edge_length_dict = (
edge_cbtype_dict: dict[str, str] = road_links.set_index(col_eid)[
"combined_label"
].to_dict()
edge_isUrban_dict: dict[str, int] = road_links.set_index(col_eid)["urban"].to_dict()
edge_length_dict: dict[str, float] = (
road_links.set_index(col_eid)["geometry"].length * cons.CONV_METER_TO_MILE
).to_dict()
acc_flow_dict = road_links.set_index(col_eid)["acc_flow"].to_dict()
acc_capacity_dict = road_links.set_index(col_eid)["acc_capacity"].to_dict()
acc_speed_dict = road_links.set_index(col_eid)["ave_flow_rate"].to_dict()
acc_flow_dict: dict[str, int] = road_links.set_index(col_eid)["acc_flow"].to_dict()
acc_capacity_dict: dict[str, int] = road_links.set_index(col_eid)[
"acc_capacity"
].to_dict()
acc_speed_dict: dict[str, float] = road_links.set_index(col_eid)[
"ave_flow_rate"
].to_dict()

# starts
iter_flag = 1
total_non_allocated_flow = 0
while total_remain > 0:
total_non_allocated_flow = 0.0
while total_remain > 0.0:
print(f"No.{iter_flag} iteration starts:")
list_of_spath = []
# find the shortest path for each origin-destination pair
Expand Down

0 comments on commit 7087733

Please sign in to comment.