Source code for utdf2gmns._utdf2gmns

# -*- coding:utf-8 -*-
##############################################################
# Created Date: Friday, January 27th 2023
# Contact Info: luoxiangyong01@gmail.com
# Author/Copyright: Mr. Xiangyong Luo
##############################################################

import os
import json
from pathlib import Path
import subprocess
import pandas as pd

# import utility functions from pyufunc
import pyufunc as pf

from utdf2gmns.util_lib.pkg_utils import time_unit_converter, time_str_to_seconds

# For deployment
from utdf2gmns.func_lib.utdf.geocoding_intersection import generate_intersection_coordinates
from utdf2gmns.func_lib.utdf.read_utdf import (generate_intersection_from_Links, read_UTDF)
from utdf2gmns.func_lib.utdf.cvt_utdf_lane_df_to_dict import cvt_lane_df_to_dict

from utdf2gmns.func_lib.gmns.geocoding_Nodes import update_node_from_one_intersection
from utdf2gmns.func_lib.gmns.geocoding_Links import (generate_links,
                                                     generate_links_polygon,
                                                     cvt_link_df_to_dict)
from utdf2gmns.func_lib.gmns.sigma_x_process_signal_intersection import cvt_utdf_to_signal_intersection

from utdf2gmns.func_lib.sumo.signal_intersections import parse_signal_control
from utdf2gmns.func_lib.sumo.update_sumo_signal_from_utdf import update_sumo_signal_from_utdf
from utdf2gmns.func_lib.sumo.remove_u_turn import remove_sumo_U_turn


# SUMO related functions
from utdf2gmns.func_lib.sumo.gmns2sumo import (generate_sumo_nod_xml,
                                               generate_sumo_edg_xml,
                                               generate_sumo_flow_xml,
                                               generate_sumo_connection_xml,
                                               generate_sumo_loop_detector_add_xml)


pd.options.mode.chained_assignment = None  # default='warn'


[docs] class UTDF2GMNS: """UTDF2GMNS performs the data conversion from UTDF to different formats. The class includes functions such as: - geocode_utdf_intersections: geocode intersections - create_signal_control: signalize intersections - create_gmns_links: create network from UTDF data by combining Nodes, Links, Lanes, and Phases - utdf_to_gmns: convert UTDF data to GMNS data and save to the output directory - utdf_to_sumo: convert UTDF data to SUMO data and save to the output directory - and more... """
[docs] def __init__(self, utdf_filename: str, region_name: str = "", *, verbose: bool = False): """Initialize UTDF2GMNS class with UTDF file and region name Args: utdf_filename (str): the path to the UTDF file. region_name (str): the metropolitan region/place the utdf file represent. Defaults to "". verbose (bool): whether to printout processing message. Defaults to False. """ print("Initializing UTDF2GMNS...") self._utdf_filename = pf.path2linux(os.path.abspath(utdf_filename)) self._utdf_region_name = region_name self._verbose = verbose # check if city_name is provided if not region_name: print(" :region_name not provided, " "it is recommended to specify the region name that the UTDF.csv file represents.") # load UTDF data from the file in the initialization self.__load_utdf()
def __load_utdf(self) -> bool: """Load UTDF file and generate dataframes for Networks, Nodes, Links, Lanes, Timeplans, and Phases """ # TDD: check if the file exists if not os.path.exists(self._utdf_filename): raise FileNotFoundError(f"UTDF file {self._utdf_filename} not found!") # read UTDF file and create dataframes utdf_dict_data = read_UTDF(self._utdf_filename) # Extract network settings from utdf_dict_data self.network_settings = { utdf_dict_data.get("Network") .loc[i, "RECORDNAME"]: utdf_dict_data.get("Network") .loc[i, "DATA"] for i in range(len(utdf_dict_data.get("Network")))} self.network_unit = "feet, mph" if str(self.network_settings.get("Metric")) == "0" else "meters, km/h" # assign to instance variable self._utdf_dict = utdf_dict_data self.network_int_ids = [str(int_id) for int_id in set(self._utdf_dict.get("Nodes")["INTID"].tolist())] self.network_int_ids_signalized = [str(int_id) for int_id in set( self._utdf_dict.get("Timeplans")["INTID"].tolist())] # initialize the instance variables self._is_geocoding_intersections = False print(f" :Total number of intersections in the UTDF file: {len(self.network_int_ids)}") return True def geocode_utdf_intersections(self, *, single_intersection_coord: dict = None, dist_threshold: float = 0.01) -> bool: """Geocode intersections Firstly, geocode one intersection from given single intersection coordinate. Then, according to the Nodes information, calculate all intersections based on relative coordinates. Args: single_intersection_coord (dict): a single intersection coordinates, defaults to None. If not provided, geocoding one intersection from address. Sample data: {"INTID": "1", "x_coord": -114.568, "y_coord": 35.155} dist_threshold (float): distance threshold for geocoding intersections, defaults to 0.01. Unit: km only used when single_intersection_coord is not provided. Note: - single_intersection_coord should follow the format: {"INTID": "1", "x_coord": -114.568, "y_coord": 35.155} - if single_intersection_coord is not provided, geocode intersections from address (region_name must be provided from input). - dist_threshold is the distance threshold for geocoding intersections. Defaults to 0.01. Unit: km and only used when single_intersection_coord is not provided. Raises: ValueError: Single coordinate should have INTID, x_coord, and y_coord keys! ValueError: INTID should be an integer! ValueError: x_coord should be a float! ValueError: y_coord should be a float! ValueError: single_intersection_coord: {int_id} is not in the Nodes! Exception: No valid intersection is geo-coded! Returns: bool: whether the geocoding intersections is successful. """ print("\nGeocoding UTDF intersections...") # check if single coordinate is provided and validate it's value if single_intersection_coord: if not {"INTID", "x_coord", "y_coord"}.issubset(set(single_intersection_coord.keys())): raise ValueError("Single coordinate should have INTID, x_coord, and y_coord keys!") if not isinstance(single_intersection_coord.get("INTID"), str): raise ValueError("INTID should be a string!") if not isinstance(single_intersection_coord.get("x_coord"), float): raise ValueError("x_coord should be a float!") if not isinstance(single_intersection_coord.get("y_coord"), float): raise ValueError("y_coord should be a float!") # check if id is in the Nodes int_id = int(single_intersection_coord.get("INTID")) if str(int_id) not in self.network_int_ids: raise ValueError(f"single intersection: {int_id} not in the UTDF Nodes!") single_intersection = single_intersection_coord else: if self._utdf_region_name: # generate intersections with name for coordinations df_utdf_intersection = generate_intersection_from_Links( self._utdf_dict.get("Links"), self._utdf_region_name) # geocoding one intersection from address, with threshold (default 0.01) km single_intersection = generate_intersection_coordinates( df_utdf_intersection, dist_threshold=dist_threshold, geocode_one=True) # check if the single_intersection is empty if single_intersection["INTID"] is None: raise Exception( "\n No valid intersection is geo-coded!" " Please change dist_threshold or provide single_coord manually.") else: raise Exception( "\nCould not geocode intersections, two ways to solve this issue: \n" " 1. provide city_name when initializing UTDF2GMNS class; \n" " 2. provide single_coord manually while running geocoding_intersections()." ) # update Nodes from single_intersection node_dict = update_node_from_one_intersection(single_intersection, self._utdf_dict.get("Nodes"), self.network_unit) self.network_nodes = node_dict self._utdf_dict["network_nodes"] = node_dict self._is_geocoding_intersections = True return True def create_signal_control(self) -> bool: """Signalize intersections 1. get signal intersection id from phase 2. parse signal control from UTDF data and create signal control for each intersection 3. assign signal control to network_signal_control, a dictionary as internal variable """ # get signal intersection id from phase df_phase = self._utdf_dict.get("Phases") df_lane = self._utdf_dict.get("Lanes") signal_int_id = df_phase["INTID"].unique() signal_intersections = { int_id: parse_signal_control(df_phase, df_lane, int_id) for int_id in signal_int_id } self.network_signal_control = signal_intersections return True def create_gmns_links(self, *, default_width: float = 12, is_link_polygon: bool = False) -> bool: """Create network from UTDF data by combining Nodes, Links, Lanes, and Phases Args: default_width (float): default width of the link, defaults to 12. is_link_polygon (bool): whether to create link polygon (bbox), defaults to False. Returns: bool: whether the network is created successfully. """ width = self.network_settings.get("DefWidth", default_width) unit = self.network_unit # whether to create link polygon if is_link_polygon: links_dict = generate_links_polygon(self._utdf_dict.get("Links"), self.network_nodes, width, unit) else: links_dict = generate_links(self._utdf_dict.get("Links"), self.network_nodes, width, unit) self.network_links = links_dict print(f" :Total number of edges generated: {len(links_dict)}") return True @pf.func_running_time def utdf_to_gmns_signal_ints(self, *, output_dir: str = "") -> bool: """ Empower Sigma-X engine to generate each signal intersection with visualization """ print("\nRunning Sigma-X engine... \n") # print out approximate time for processing total_seconds = 3.5 * len(self.network_int_ids_signalized) print(" :Processing each signal intersection, please wait...") print(f" :Total time for {len(self.network_int_ids_signalized)} intersections" f" might be: {time_unit_converter(total_seconds, 's', 'm', False):.2f} minutes...") cvt_utdf_to_signal_intersection( self._utdf_filename, verbose=self._verbose) return True def utdf_to_gmns(self, *, output_dir: str = "", incl_utdf: bool = True, is_link_polygon: bool = False) -> bool: """Convert UTDF data to GMNS data and save to the output directory Args: out_dir (str): output directory to save the GMNS data, defaults to the same directory as the UTDF file. incl_utdf (bool): whether to save the UTDF data to the output directory, defaults to True. is_link_polygon (bool): whether to create link polygon, defaults to False. Note: - the UTDF data includes Nodes, Networks, Timeplans, Links, Lanes, and Phases. - the GMNS data includes node.csv and link.csv. Raises: FileNotFoundError: Output directory not found! Returns: bool: whether the conversion is successful. """ print("\nConverting UTDF to GMNS...") # check if the output directory exists utdf_dir = Path(self._utdf_filename).parent.absolute() gmns_output_dir = output_dir or os.path.join(utdf_dir, "utdf_to_gmns") gmns_output_dir = pf.path2linux(gmns_output_dir) # convert to universal path format # create the output directory if it does not exist if not os.path.exists(gmns_output_dir): os.makedirs(gmns_output_dir) # Create node and link data if not exist if not hasattr(self, "network_nodes"): raise Exception("Please geocode intersections first: net.geocode_utdf_intersections()") if not hasattr(self, "network_links"): self.create_gmns_links(is_link_polygon=is_link_polygon) if not hasattr(self, "network_signal_control"): self.create_signal_control() # Save the GMNS data to the output directory pd.DataFrame(self.network_nodes.values()).to_csv( os.path.join(gmns_output_dir, "node.csv"), index=False) pd.DataFrame(self.network_links.values()).to_csv( os.path.join(gmns_output_dir, "link.csv"), index=False) with open(os.path.join(gmns_output_dir, "signal.json"), "w") as f: json.dump(self.network_signal_control, f) # save the UTDF data to the output directory if incl_utdf: self._utdf_dict.get("Nodes").to_csv( os.path.join(gmns_output_dir, "utdf_nodes.csv"), index=False) self._utdf_dict.get("Network").to_csv( os.path.join(gmns_output_dir, "utdf_network.csv"), index=False) self._utdf_dict.get("Timeplans").to_csv( os.path.join(gmns_output_dir, "utdf_timeplans.csv"), index=False) self._utdf_dict.get("Links").to_csv( os.path.join(gmns_output_dir, "utdf_links.csv"), index=False) self._utdf_dict.get("Lanes").to_csv( os.path.join(gmns_output_dir, "utdf_lanes.csv"), index=False) self._utdf_dict.get("Phases").to_csv( os.path.join(gmns_output_dir, "utdf_phases.csv"), index=False) print(f" :Successfully saved GMNS(csv) data to \n {gmns_output_dir}.") return True def utdf_to_sumo(self, *, output_dir: str = "", sim_name: str = "", show_warning_message: bool = False, disable_U_turn: bool = True, sim_start_time: int = 0, sim_duration: int = 3600 # 1 hour ) -> bool: """Convert UTDF to SUMO and save networks to the output directory Args: out_dir (str): the output directory for the generated sumo files. Defaults to "". If not provided, the output directory is the same as the UTDF file. sim_name (str): name the generated sumo files. Defaults to "". If not provided, the name is "utdf_to_sumo". show_warning_message (bool): whether to show warning message during the net processing. Defaults to False. disable_U_turn (bool): whether to remove U-turns in the SUMO network. Defaults to True. sim_start_time (int): the start time of the simulation in seconds. The program will extract start time from UTDF file, if not provided, will use the default value of 0. sim_duration (int): the duration of the simulation in seconds. Defaults to 3600 seconds (1 hour). Returns: bool: whether the conversion is successful. """ print("\nConverting UTDF to SUMO using GMNS standard...") # check if the output directory exists utdf_dir = Path(self._utdf_filename).parent.absolute() sumo_output_dir = output_dir or os.path.join(utdf_dir, "utdf_to_sumo") sumo_output_dir = pf.path2linux(sumo_output_dir) # create the output directory if it does not exist os.makedirs(sumo_output_dir, exist_ok=True) # Crate network nodes and links if not exist if not hasattr(self, "network_nodes"): raise Exception("Please geocode intersections first: net.geocode_utdf_intersections()") # if not hasattr(self, "network_links"): # self.create_gmns_links(is_link_polygon=is_link_polygon) # print() xml_name = sim_name or "utdf_to_sumo" # create SUMO .nod.xml file output_node_file = os.path.join(sumo_output_dir, f"{xml_name}.nod.xml") output_node_file = pf.path2linux(output_node_file) generate_sumo_nod_xml(self._utdf_dict, output_node_file) print(f" :generated SUMO node xml file: {xml_name}.nod.xml") # create SUMO .edg.xml file output_edge_file = os.path.join(sumo_output_dir, f"{xml_name}.edg.xml") output_edge_file = pf.path2linux(output_edge_file) generate_sumo_edg_xml(self._utdf_dict, self.network_unit, output_edge_file) print(f" :generated SUMO edge xml file: {xml_name}.edg.xml") # Create SUMO .con.xml file output_con_file = os.path.join(sumo_output_dir, f"{xml_name}.con.xml") output_con_file = pf.path2linux(output_con_file) generate_sumo_connection_xml(self._utdf_dict, output_con_file) print(f" :generated SUMO connection xml file: {xml_name}.con.xml") # Create SUMO loop detector in .add.xml file output_add_file = os.path.join(sumo_output_dir, f"{xml_name}.add.xml") output_add_file = pf.path2linux(output_add_file) generate_sumo_loop_detector_add_xml(self._utdf_dict, self.network_unit, detector_type="E1", add_fname=output_add_file, sim_output_fname="") print(f" :generated SUMO loop detector xml file: {xml_name}.add.xml") # convert .nod.xml and .edg.xml files to .net.xml file output_net_file = os.path.join(sumo_output_dir, f"{xml_name}.net.xml") output_net_file = pf.path2linux(output_net_file) try: # sumo-netconvert -n network.nod.xml -e network.edg.xml -o network.net.xml result = subprocess.run(["netconvert", f"--node-files={output_node_file}", f"--edge-files={output_edge_file}", f"--connection-files={output_con_file}", f"--output-file={output_net_file}", "--no-warnings=true", "--proj.utm"], cwd=sumo_output_dir, capture_output=True, text=True) if result.returncode != 0: # the return code is 0, which means the command executed failed # One of the reason is that the running environment is not set up correctly # Such as SUMO_HOME is not set up correctly or # SUMO is not installed # We will run netconvert (nc) from the package build-in file # get the path of the netconvert(nc) file under the engine directory nc_filename = Path(__file__).parent / "engine" / "netconvert.exe" nc_filename = pf.path2linux(nc_filename) result = subprocess.run([nc_filename, f"--node-files={output_node_file}", f"--edge-files={output_edge_file}", f"--connection-files={output_con_file}", f"--output-file={output_net_file}", "--no-warnings=true", "--proj.utm"], cwd=sumo_output_dir, capture_output=True, text=True) if result.returncode != 0: print(" :SUMO netconvert from nod.xml, edg.xml to net.xml failed!") print(f" :{result.stderr}") return False print(f" :Successfully generated SUMO network to \n {sumo_output_dir}.") if show_warning_message: pass # print("Warning message in generating SUMO network:") # print(f"{result.stderr}") except Exception as e: print(f" :Error in generating SUMO network: {e}") return False # update SUMO signal in .net.xml file update_sumo_signal_from_utdf(output_net_file, self._utdf_dict, verbose=self._verbose) print(f" :Successfully updated SUMO signal xml to \n {sumo_output_dir}.") # create SUMO .flow.xml file output_flow_file = os.path.join(sumo_output_dir, f"{xml_name}.flow.xml") output_flow_file = pf.path2linux(output_flow_file) if begin := self.network_settings.get("ScenarioTime"): begin_time = time_str_to_seconds(begin, verbose=False) else: begin_time = sim_start_time end_time = begin_time + sim_duration generate_sumo_flow_xml(self._utdf_dict, output_flow_file, begin=begin_time, end=end_time) # create .rou.xml file output_rou_file = os.path.join(sumo_output_dir, f"{xml_name}.rou.xml") output_rou_file = pf.path2linux(output_rou_file) try: print("\n :Generating SUMO .rou.xml file from UTDF lanes...") jtrrouter_fname = Path(__file__).parent / "engine" / "jtrrouter.exe" jtrrouter_fname = pf.path2linux(jtrrouter_fname) result = subprocess.run([jtrrouter_fname, f"--route-files={output_flow_file}", f"--net-file={output_net_file}", "--accept-all-destinations=false", f"--output-file={output_rou_file}", "--remove-loops=true", "--no-internal-links=false", "--no-warnings=true"], cwd=sumo_output_dir, capture_output=True, text=True) if result.returncode != 0: print(f" :{result.stderr}") # get the path of the randomTrips.py file under the func_lib directory print(" :Generating SUMO .rou.xml file with random trips...") path_random_trips = Path(__file__).parent / "func_lib" / "sumo" / "randomTrips.py" path_random_trips = pf.path2linux(path_random_trips) # run randomTrips.py to generate .rou.xml file result = subprocess.run(["python", path_random_trips, "-n", output_net_file, "-r", output_rou_file], cwd=sumo_output_dir, capture_output=True, text=True) if result.returncode != 0: print(" :SUMO create .flow.xml failed!") print(f" :{result.stderr}") return False print(f" :Successfully generated default flow file to \n {sumo_output_dir}.") except Exception as e: print(f" :Error in generating SUMO route file: {e}") return False # remove U-turns in the SUMO network if disable_U_turn: print() remove_sumo_U_turn(output_net_file) print() # create .sumocfg file for the generated network # will generate default .rou.xml file for the network sumo_cfg_file = os.path.join(sumo_output_dir, f"{xml_name}.sumocfg") sumo_cfg_file = pf.path2linux(sumo_cfg_file) cfg_str = ( f'<?xml version="1.0" encoding="UTF-8"?>\n' f'\n' f'<configuration>\n' f' <input>\n' f' <net-file value="{xml_name}.net.xml"/>\n' f' <route-files value="{xml_name}.rou.xml"/>\n' f' <additional-files value="{xml_name}.add.xml"/>\n' f' </input>\n' f' <output>\n' f' <edgedata-output value="EdgeData.xml"/>\n' f' </output>\n' f' <time>\n' f' <begin value="{begin_time}"/>\n' f' <end value="{end_time}"/>\n' f' <step-length value="1"/>\n' f' </time>\n' f'</configuration>\n' ) # Parse the XML string # pretty_xml_str = minidom.parseString(cfg_str).toprettyxml(indent=" ") with open(sumo_cfg_file, "w") as f: f.write(cfg_str) print(f" :Successfully generated SUMO configuration file to \n {sumo_output_dir}.") return True