From 708773388d60977367d74600408a09e1bca42421 Mon Sep 17 00:00:00 2001 From: YUE LI Date: Mon, 25 Mar 2024 11:37:21 +0000 Subject: [PATCH] Fix some type errors regarding mypy check. --- src/nird/road.py | 109 ++++++++++++++++++++++++++--------------------- 1 file changed, 60 insertions(+), 49 deletions(-) diff --git a/src/nird/road.py b/src/nird/road.py index 2a5b41a..6550308 100644 --- a/src/nird/road.py +++ b/src/nird/road.py @@ -5,10 +5,11 @@ from collections import defaultdict from functools import partial import numpy as np +import numpy.typing as npt import pandas as pd -import geopandas as gpd # type: ignore +import geopandas as gpd -import igraph # type: ignore +import igraph import nird.constants as cons from nird.utils import get_flow_on_edges @@ -23,7 +24,7 @@ def select_partial_roads( road_links: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame, col_name: str, - list_of_values: list, + list_of_values: list[str], ) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """Extract major roads @@ -45,17 +46,17 @@ def select_partial_roads( selected_nodes: gpd.GeoDataFrame The selected types of road nodes. """ - selected_links = [] + selected_links_dfs = [] # road links selection for ci in list_of_values: - selected_links.append(road_links[road_links[col_name] == ci]) + selected_links_dfs.append(road_links[road_links[col_name] == ci]) - selected_links = pd.concat(selected_links, ignore_index=True) + selected_links = pd.concat(selected_links_dfs, ignore_index=True) selected_links = gpd.GeoDataFrame(selected_links, geometry="geometry") - selected_links["e_id"] = selected_links.id - selected_links["from_id"] = selected_links.start_node - selected_links["to_id"] = selected_links.end_node + selected_links["e_id"] = selected_links["id"] + selected_links["from_id"] = selected_links["start_node"] + selected_links["to_id"] = selected_links["end_node"] # road nodes selection sel_node_idx = list( @@ -180,7 +181,9 @@ def cost_func( def initial_speed_func( - road_type: str, form_of_road: str, free_flow_speed_dict: dict + road_type: str, + form_of_road: str, + free_flow_speed_dict: dict[str, float], ) -> Union[float, None]: """Append free-flow speed to each road segment @@ -216,10 +219,10 @@ def speed_flow_func( road_type: str, isUrban: int, vp: float, - free_flow_speed_dict: dict, - flow_breakpoint_dict: dict, - min_speed_cap: dict, - urban_speed_cap: dict, + free_flow_speed_dict: dict[str, float], + flow_breakpoint_dict: dict[str, int], + min_speed_cap: dict[str, float], + urban_speed_cap: dict[str, float], ) -> Union[float, None]: """Update the average flow rate of each road segment in terms of flow changes. @@ -304,12 +307,14 @@ def speed_flow_func( return None -def filter_less_than_one(arr: np.ndarray) -> np.ndarray: +def filter_less_than_one(arr: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """Convert values less than one to zero""" - return np.where(arr >= 1, arr, 0) + return np.where(arr >= 1, arr, 0.0) -def find_nearest_node(zones: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame) -> dict: +def find_nearest_node( + zones: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame +) -> dict[str, str]: """Find nearest network node for each admin centroid Parameters @@ -349,11 +354,11 @@ def find_nearest_node(zones: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame) -> def od_interpret( od_matrix: pd.DataFrame, - zone_to_node: dict, + zone_to_node: dict[str, str], col_origin: str, col_destination: str, col_count: str, -) -> Tuple[list, dict, dict]: +) -> Tuple[list[str], dict[str, list[str]], dict[str, list[float]]]: """Generate a list of origins along with their associated destinations and outbound trips. Parameters @@ -383,8 +388,8 @@ def od_interpret( supply_dict: dict[str, list[float]] = defaultdict(list) for idx in tqdm(range(od_matrix.shape[0]), desc="Processing"): - from_zone = od_matrix.loc[idx, col_origin] - to_zone = od_matrix.loc[idx, col_destination] + from_zone: str = od_matrix.loc[idx, col_origin] # type: ignore + to_zone: str = od_matrix.loc[idx, col_destination] # type: ignore count: float = od_matrix.loc[idx, col_count] # type: ignore try: from_node = zone_to_node[from_zone] @@ -405,10 +410,10 @@ def od_interpret( def create_igraph_network( - node_name_to_index: dict, + node_name_to_index: dict[str, int], road_links: gpd.GeoDataFrame, road_nodes: gpd.GeoDataFrame, - initialSpeeds: dict, + initialSpeeds: dict[str, float], ) -> igraph.Graph: """Network creation using igraph. @@ -483,8 +488,8 @@ def create_igraph_network( def initialise_igraph_network( road_links: gpd.GeoDataFrame, - initial_capacity_dict: dict, - initial_speed_dict: dict, + initial_capacity_dict: dict[str, int], + initial_speed_dict: dict[str, float], col_road_classification: str, ) -> gpd.GeoDataFrame: """Road Network Initialisation @@ -529,9 +534,9 @@ def initialise_igraph_network( def update_od_matrix( temp_flow_matrix: pd.DataFrame, - supply_dict: dict, - destination_dict: dict, -) -> Tuple[list, dict, dict, float]: + supply_dict: dict[str, list[int]], + destination_dict: dict[str, list[str]], +) -> Tuple[list[str], dict[str, list[int]], dict[str, list[str]], float]: """Drop the origin-destination pairs with no accessible link or unallocated trips Parameters @@ -588,10 +593,10 @@ def update_od_matrix( def update_network_structure( network: igraph.Graph, - length_dict: dict, - speed_dict: dict, + length_dict: dict[str, float], + speed_dict: dict[str, float], temp_edge_flow: pd.DataFrame, -) -> Tuple[igraph.Graph, dict]: +) -> Tuple[igraph.Graph, dict[int, str]]: """Update the network structure (links and nodes) Parameters @@ -656,17 +661,17 @@ def update_network_structure( def network_flow_model( network: igraph.Graph, road_links: gpd.GeoDataFrame, - node_name_to_index: dict, - edge_index_to_name: dict, - list_of_origins: list, - supply_dict: dict, - destination_dict: dict, - free_flow_speed_dict: dict, - flow_breakpoint_dict: dict, - min_speed_cap: dict, - urban_speed_cap: dict, + node_name_to_index: dict[str, int], + edge_index_to_name: dict[int, str], + list_of_origins: list[str], + supply_dict: dict[str, list[int]], + destination_dict: dict[str, list[str]], + free_flow_speed_dict: dict[str, float], + flow_breakpoint_dict: dict[str, int], + min_speed_cap: dict[str, float], + urban_speed_cap: dict[str, float], col_eid: str, -) -> Tuple[dict, dict, dict]: +) -> Tuple[dict[str, float], dict[str, int], dict[str, int]]: """Network flow model Parameters @@ -719,19 +724,25 @@ def network_flow_model( print(f"The initial number of destinations: {number_of_destinations}") # road link properties - edge_cbtype_dict = road_links.set_index(col_eid)["combined_label"].to_dict() - edge_isUrban_dict = road_links.set_index(col_eid)["urban"].to_dict() - edge_length_dict = ( + edge_cbtype_dict: dict[str, str] = road_links.set_index(col_eid)[ + "combined_label" + ].to_dict() + edge_isUrban_dict: dict[str, int] = road_links.set_index(col_eid)["urban"].to_dict() + edge_length_dict: dict[str, float] = ( road_links.set_index(col_eid)["geometry"].length * cons.CONV_METER_TO_MILE ).to_dict() - acc_flow_dict = road_links.set_index(col_eid)["acc_flow"].to_dict() - acc_capacity_dict = road_links.set_index(col_eid)["acc_capacity"].to_dict() - acc_speed_dict = road_links.set_index(col_eid)["ave_flow_rate"].to_dict() + acc_flow_dict: dict[str, int] = road_links.set_index(col_eid)["acc_flow"].to_dict() + acc_capacity_dict: dict[str, int] = road_links.set_index(col_eid)[ + "acc_capacity" + ].to_dict() + acc_speed_dict: dict[str, float] = road_links.set_index(col_eid)[ + "ave_flow_rate" + ].to_dict() # starts iter_flag = 1 - total_non_allocated_flow = 0 - while total_remain > 0: + total_non_allocated_flow = 0.0 + while total_remain > 0.0: print(f"No.{iter_flag} iteration starts:") list_of_spath = [] # find the shortest path for each origin-destination pair