Source code for utdf2gmns.func_lib.utdf.read_utdf

# -*- coding:utf-8 -*-
##############################################################
# Created Date: Tuesday, January 17th 2023
# Contact Info: luoxiangyong01@gmail.com
# Author/Copyright: Mr. Xiangyong Luo
##############################################################

import os
import pandas as pd
from pyufunc import func_running_time

from utdf2gmns.util_lib.pkg_settings import utdf_categories, utdf_link_col_names

# avoid the warning of "A value is trying to be set on a copy of a slice from a DataFrame"
pd.options.mode.chained_assignment = None  # default='warn'


[docs] @func_running_time def read_UTDF(path_utdf: str) -> dict: """read the utdf.csv file and return a dictionary of dataframes Args: path_utdf (str): path to the utdf.csv file Example: >>> import utdf2gmns as ug >>> path_utdf = "your utdf file, in csv format" >>> utdf_dict = ug.read_UTDF(path_utdf) >>> utdf_dict.keys() dict_keys(['Network', 'Nodes', 'Links', 'Lanes', 'Timeplans', 'Phases', 'phase_timeplans']) Returns: dict: a dictionary of dataframes with keys are Network, Nodes, Links, Lanes, Timeplans, Phases """ # check if the file is a valid csv file if not isinstance(path_utdf, str): raise ValueError( f"path_utdf should be a string, but got {type(path_utdf)}. Please provide a valid path to the utdf.csv file.") if not os.path.isfile(path_utdf): raise FileNotFoundError( f"The file {path_utdf} does not exist. Please provide a valid path to the utdf.csv file.") if not path_utdf.endswith(".csv"): raise ValueError( f"The file {path_utdf} is not a CSV file. Please provide a valid CSV file for the UTDF data.") # read the utdf.csv file with open(path_utdf, "r", encoding='utf-8') as f: lines = f.readlines() # find the start index of each category, the index is the row contain column names categorical_data_beginning_index_dict = {} for i in range(len(lines)): if "Network" in lines[i] and utdf_categories["Network"] in lines[i + 1]: categorical_data_beginning_index_dict[i + 2] = "Network" elif "Nodes" in lines[i] and utdf_categories["Nodes"] in lines[i + 1]: categorical_data_beginning_index_dict[i + 2] = "Nodes" elif "Links" in lines[i] and utdf_categories["Links"] in lines[i + 1]: categorical_data_beginning_index_dict[i + 2] = "Links" elif "Lanes" in lines[i] and utdf_categories["Lanes"] in lines[i + 1]: categorical_data_beginning_index_dict[i + 2] = "Lanes" elif "Timeplans" in lines[i] and utdf_categories["Timeplans"] in lines[i + 1]: categorical_data_beginning_index_dict[i + 2] = "Timeplans" elif "Phases" in lines[i] and utdf_categories["Phases"] in lines[i + 1]: categorical_data_beginning_index_dict[i + 2] = "Phases" else: continue categorical_index_ordered = sorted( list(categorical_data_beginning_index_dict.keys())) # ascending order # prepare dataframe for each category utdf_dict_data = {} for j in range(len(categorical_index_ordered)): # get the category name from start_index_dict category_name = categorical_data_beginning_index_dict[categorical_index_ordered[j]] # if it's the last value in the list, then the end index is the end of the file if j == len(categorical_index_ordered) - 1: category_value = [k.split(",") for k in lines[categorical_index_ordered[j]:]] # if it's not the last value in the list, then the end index is the start index of the next category - 2 else: category_value = [ k.split(",") for k in lines[categorical_index_ordered[j]:categorical_index_ordered[j + 1] - 2]] # save data to dictionary utdf_dict_data[category_name] = pd.DataFrame( category_value[1:], columns=category_value[0]) # format utdf_lane data : remove unnecessary rows with None and column with '\n' # format each table in utdf_dict_data for table_name, df_table_name in utdf_dict_data.items(): try: # get the last column name from utdf_setting last_col_name = list(df_table_name.columns)[-1] # remove unnecessary rows / invalid rows with NaN if table_name != "Network": df_table_name = df_table_name[df_table_name["INTID"].notna()] df_table_name = df_table_name[df_table_name['INTID'].astype( str).str.isdigit()] else: df_table_name = df_table_name[df_table_name[last_col_name].notna( )] # clean the data / remove '\n' in the end of column SW df_table_name.loc[:, last_col_name] = df_table_name[last_col_name].map( lambda x: x.replace("\n", "")) df_table_name = df_table_name.rename( columns={last_col_name: last_col_name.replace("\n", "")}) # drop the column with empty string df_table_name = df_table_name.drop( columns=[""], axis=1) if "" in df_table_name.columns else df_table_name utdf_dict_data[table_name] = df_table_name except Exception as e: print(f"Could not format table: {table_name} for {e}") continue # update Timeplans table with three columns needed utdf_dict_data["Timeplans"] = utdf_dict_data["Timeplans"].iloc[:, 0:3] utdf_dict_data["phase_timeplans"] = spanning_phase_timeplans_data( utdf_dict_data) return utdf_dict_data
@func_running_time def generate_intersection_from_Links(df_link: pd.DataFrame, city_name: str) -> pd.DataFrame: """generate_intersection_data_from_utdf: convert utdf links to intersection Args: utdf_dict_data (dict): a dictionary include key of Links city_name (str): city name of the utdf data Returns: pd.DataFrame: a dataframe of intersection """ # update columns name df_link.columns = [ i.replace("\n", "") if "\n" in i else i for i in df_link.columns.tolist()] # remove unnecessary rows / invalid rows with NaN df_link = df_link[df_link["INTID"].notna()] # clean the data / remove '\n' in the end of column SW # df_link = df_link.rename(columns={"SW\n": "SW"}) # df_link["SW"] = df_link["SW"].map(lambda x: x.replace("\n", "")) # get the unique link id link_id = df_link["INTID"].unique().tolist() # generate link dictionary in format of: {link_id: {RECORDNAME:{columns:values}}} df_link_dict = {} for single_id in link_id: df_single_id = df_link[df_link["INTID"] == single_id] record_name = df_single_id["RECORDNAME"].unique().tolist() df_single_id_dict = {} for name in record_name: # convert one row of dataframe to dictionary df_single_id_name = df_single_id[df_single_id["RECORDNAME"] == name].to_dict("records")[ 0] df_single_id_dict[name] = df_single_id_name df_link_dict[single_id] = df_single_id_dict # prepare intersection dataframe sequenced_intersection_id = 0 link_list = [] # for each single link id for single_id in df_link_dict: # define a flag to indicate whether an intersection exists isIntersection = False direction_list = df_link_dict[single_id]["Name"] direction_name_list = [] # for link table, direction info store in column 3 to 10 for direction_id in range(3, 11): direction_name = direction_list.get( utdf_link_col_names.get(direction_id), "") if direction_name not in direction_name_list and direction_name != '' and direction_name != '\n': direction_name_list.append(direction_name) if len(direction_name_list) > 1: isIntersection = True # generate intersection name if it's an intersection intersection_name = "" if isIntersection: intersection_name = ' & '.join(direction_name_list) # get "INTID" intersection_id = direction_list[utdf_link_col_names.get(2)] link_list.append([intersection_name, city_name, intersection_id, "", sequenced_intersection_id]) sequenced_intersection_id += 1 intersection_column_name = ["intersection_name", "city_name", "synchro_INTID", "file_name", "intersection_id"] df_utdf_intersection = pd.DataFrame( link_list, columns=intersection_column_name) df_utdf_intersection["intersection_name"] = df_utdf_intersection["intersection_name"].map( lambda x: x.replace("\n", "")) return df_utdf_intersection # combine phase and timeplans data into one dataframe (intersection id based) def spanning_phase_timeplans_data(utdf_dict_data: dict, isSimpleCol: bool = True) -> pd.DataFrame: """spanning_phase_timeplans_data: combine phase and timeplans data into one dataframe (intersection id based) Args: utdf_dict_data (dict): a dictionary include key of Phases and Timeplans isSimpleCol (bool, optional): defaults to True, if True, keep columns between Start_D and End_D if _D exists in column name, else keep all columns Returns: pd.DataFrame: a dataframe of phase and timeplans """ df_phase = utdf_dict_data.get("Phases") df_timeplans = utdf_dict_data.get("Timeplans") # get unique intersection id intersection_id = df_phase["INTID"].unique().tolist() final_spanned_list = [] for INTID in intersection_id: # get utdf_phase dataframe by id df_phase_single_id = df_phase[df_phase["INTID"] == INTID].reset_index(drop=True) df_timeplans_single_id = df_timeplans[df_timeplans["INTID"] == INTID].reset_index( drop=True) df_phase_single_id_dict = df_phase_single_id.to_dict("list") df_timeplans_single_id_dict = df_timeplans_single_id.to_dict("list") # df phase new column name and data df_phase_col_name_new = [] df_phase_single_id_new = [] # df timeplans new column name and data df_timeplans_col_name_new = [] df_timeplans_single_id_new = [] # span df_phase_single_id_dict for col_name in df_phase_single_id_dict: if col_name not in ["RECORDNAME", "INTID"]: df_phase_col_name_new += [ f"{row_name}_{col_name}" for row_name in df_phase_single_id_dict.get("RECORDNAME")] df_phase_single_id_new += df_phase_single_id_dict.get(col_name) # span df_timeplans_single_id_dict df_timeplans_col_name_new = df_timeplans_single_id_dict.get( "RECORDNAME") df_timeplans_single_id_new = df_timeplans_single_id_dict.get("DATA") # generate final column name and data for single intersection id single_id_col_name_final = [ "INTID"] + df_phase_col_name_new + df_timeplans_col_name_new single_id_data_final = [INTID] + \ df_phase_single_id_new + df_timeplans_single_id_new final_spanned_list.append(pd.DataFrame( [single_id_data_final], columns=single_id_col_name_final)) # if isSimpleCol is True, then only keep the columns with Start_D and End_D if _D exists # if _D not exists in column name, then keep all columns if isSimpleCol: simple_col_list = [] for i in single_id_col_name_final: if "_D" in i: if ("Start" in i and "Local" not in i) or "End" in i: simple_col_list.append(i) else: simple_col_list.append(i) final_spanned_list = [df[simple_col_list] for df in final_spanned_list] return pd.concat(final_spanned_list, axis=0, ignore_index=True) return pd.concat(final_spanned_list, axis=0, ignore_index=True) def reformat_lane_dataframe(utdf_dict_data: dict) -> pd.DataFrame: """reformat the utdf_lane dataframe to a new dataframe Args: utdf_dict_data (dict): a dictionary include key of Lanes Returns: pd.DataFrame: a dataframe of lanes """ # get utdf_lane data df_lane = utdf_dict_data["Lanes"] # get unique intersection id intersection_id_list = list(df_lane["INTID"].unique()) # convert utdf_lane dataframe to dictionary lane_dict = [] for intersection_id in intersection_id_list: df_lane_intersection_id = df_lane[df_lane["INTID"] == intersection_id] df_lane_intersection_id.set_index("RECORDNAME", inplace=True) lane_dict.append(df_lane_intersection_id.to_dict("dict")) # reformat lane_dict to dataframe df_lane_formatted = pd.DataFrame(lane_dict) df_lane_formatted["INTID"] = intersection_id_list return df_lane_formatted