Source code for utdf2gmns.func_lib.sumo.update_sumo_signal_from_utdf

'''
##############################################################
# Created Date: Monday, December 30th 2024
# Contact Info: luoxiangyong01@gmail.com
# Author/Copyright: Ms. Yiran Zhang
# Author/Copyright: Mr. Xiangyong Luo
##############################################################
'''

from utdf2gmns.func_lib.sumo.read_sumo import ReadSUMO
from utdf2gmns.func_lib.sumo.signal_intersections import parse_signal_control
from utdf2gmns.func_lib.sumo.signal_mapping import (direction_mapping,
                                                    build_linkDuration,
                                                    extract_dir_info,
                                                    create_SignalTimingPlan,
                                                    process_pedestrian_crossing)
from utdf2gmns.func_lib.utdf.read_utdf import read_UTDF


[docs] def update_sumo_signal_from_utdf(sumo_net_xml: str, utdf_dict_or_fname: dict | str, verbose: bool = False) -> bool: """update sumo signal (.net.xml) from UTDF signal information Args: sumo_net_xml (str): the path of sumo network xml file utdf_dict_or_fname (dict | str): the UTDF dictionary or the path of UTDF csv file verbose (bool): whether to print the process. Defaults to False. Example: >>> import utdf2gmns as ug >>> sumo_net_xml = "your sumo network xml file" >>> utdf_dict_or_fname = "your utdf file, in csv format" >>> ug.update_sumo_signal_from_utdf(sumo_net_xml, utdf_dict_or_fname, verbose=True) Returns: bool: whether the generation is successful """ # Check if utdf_dict_or_fname is a dictionary or a file name if isinstance(utdf_dict_or_fname, dict): utdf_dict = utdf_dict_or_fname elif isinstance(utdf_dict_or_fname, str): utdf_dict = read_UTDF(utdf_dict_or_fname) else: raise TypeError("utdf_dict_or_fname must be a dictionary or a file name") # check if sumo_net_xml ends with .net.xml if not sumo_net_xml.endswith(".net.xml"): raise ValueError("sumo_net_xml must end with .net.xml") # get signal intersection ids from UTDF signalized_int_ids = list(set(utdf_dict.get("Timeplans")["INTID"].tolist())) # read sumo network sumo_net = ReadSUMO(sumo_net_xml) # parse signal intersection information utdf_signal = {} for int_id in signalized_int_ids: # in this case sumo id equal to UTDF signal intersection id int_id = str(int_id) utdf_signal[int_id] = parse_signal_control(df_phase=utdf_dict.get("Phases"), df_lane=utdf_dict.get("Lanes"), int_id=int_id) UTDF_DIRS = set(extract_dir_info(utdf_signal[int_id])) traffic_directions = set(map(lambda s: s[0:2], UTDF_DIRS)) if verbose: print(f"\nIntersection id: {int_id} \nDirections: {UTDF_DIRS}\n") unique_inbound_edges = [] for connection_index in sumo_net.sumo_signal_info[int_id].keys(): sumo_movement = sumo_net.sumo_signal_info[int_id][connection_index] if ":" not in sumo_movement["fromEdge"] and sumo_movement["fromEdge"] not in unique_inbound_edges: unique_inbound_edges.append(sumo_movement["fromEdge"]) if len(unique_inbound_edges) != len(traffic_directions): if verbose: print(f" :UTDF node {int_id} does not have the same " f"number of inbounds with SUMO {int_id}") flag, inbound_direction_mapping = direction_mapping(sumo_net, int_id, unique_inbound_edges, traffic_directions, verbose=verbose) if not flag: if verbose: print(f" :UTDF node {int_id} map inbounds with SUMO {int_id} failed") for connection_index in sumo_net.sumo_signal_info[int_id].keys(): sumo_movement = sumo_net.sumo_signal_info[int_id][connection_index] if ":" not in sumo_movement["fromEdge"]: if sumo_movement["fromEdge"] not in inbound_direction_mapping: if verbose: print(f" :UTDF node {int_id} match inbound failed") break if sumo_movement["dir"] == "s": synchro_dir = "T" else: synchro_dir = sumo_movement["dir"].upper() sumo_movement["dir"] = inbound_direction_mapping[sumo_movement["fromEdge"]] + synchro_dir else: process_pedestrian_crossing(int_id, sumo_net, sumo_movement, UTDF_DIRS) control_type = {"0": "static", "1": "actuated", "2": "actuated", "3": "actuated"} valid_ids = {} valid = 0 for int_id in signalized_int_ids: if verbose: print(f" :processing signal @ id: {int_id}") ret = create_SignalTimingPlan( utdf_signal[int_id], sumo_net.sumo_signal_info[int_id], verbose=verbose) linkDur = build_linkDuration( utdf_signal[int_id], sumo_net.sumo_signal_info[int_id]) if ret: for i in sumo_net.sumo_signal_info[int_id]: if verbose: print(f" :{i} {sumo_net.sumo_signal_info[int_id][i]}") timeplans = utdf_dict.get("Timeplans") types = str(control_type[list(timeplans['DATA'][(timeplans['INTID'] == str(int_id)) & ( timeplans['RECORDNAME'] == 'Control Type')])[0]]) offsets = str(list(timeplans['DATA'][(timeplans['INTID'] == str(int_id)) & ( timeplans['RECORDNAME'] == 'Offset')])[0]) sumo_net.replace_tl_logic_xml(int_id, ret, linkDur, types, int(float(offsets))) valid_ids[int_id] = int_id valid += 1 print(f" :Total signal intersections: {len(signalized_int_ids)}" f", valid intersections: {valid}\n") # update sumo.net.xml sumo_net.write_xml() return True