"""
Config Helper Tools
===================
This module contains a series of functions that transform the parsed excel document into SUMO conpatible XML.
Each function corresponds to ONE of the SUMO configuration files
"""
import re
import ts_core.config.bindings.sumocfg as sumocfg
from ts_core.config.config_exceptions import *
import ts_core.config.bindings.nodes_xml as nodes
import ts_core.config.bindings.edges_xml as edges
import ts_core.config.bindings.additional_xml as additional
from xml.dom import minidom
import numpy as np
from pyxb import BIND
_lanes_fields = [
'L_lanes',
'R_lanes',
'S_lanes',
'RS_lanes',
'LS_lanes'
]
def _sind(x, places):
return round(np.sin( x * np.pi / 180), places)
def _cosd(x, places):
"""
Helper function for doing degree
Parameters
----------
x
places
Returns
-------
"""
return round(np.cos(x * np.pi / 180), places)
[docs]def mk_add(intersections: dict, branches: dict):
"""
Generate the additional attributes XML file.
This file handles, among other things, traffic detectors.
Lane Area Detectors
-------------------
So far the only implemented option is a set of lane area detectors on
all lanes for each branch. For each branch the start and end points of the detector
are defined in the Excel configuration as the distance from the intersection.
Parameters
----------
intersections
branches
Returns
-------
"""
_branch_validation = [
'id',
'L_lanes',
'R_lanes',
'S_lanes',
'RS_lanes',
'LS_lanes',
'priority',
'sensor_start',
'sensor_end'
]
ad1 = additional.additionalType()
for branch in branches:
numlanes = sum([branches[branch][l] for l in _lanes_fields])
if not all(field in branches[branch] for field in _branch_validation):
raise ParsedSchemaError("All of the fields in the list {} must be in each branch".format(str(_branch_validation)))
for lane in range(numlanes):
tmp = additional.e2DetectorType(
id="Area" + str(branches[branch]['id']) + "i" + "_" + str(lane),
lane=str(branches[branch]['id']) + "i" + "_" + str(lane),
pos=str(-branches[branch]['sensor_end']),
endPos=str(-branches[branch]['sensor_start']),
freq="10",
file="test_area.xml"
)
ad1.laneAreaDetector.append(tmp)
xmlstr = minidom.parseString(ad1.toxml("utf-8", element_name='additional').decode('utf-8')).toprettyxml(indent=" ")
return xmlstr
[docs]def mk_edge(intersections: dict, branches: dict):
"""
Helper to make the edg.xml file
Parameters
----------
intersections
branches
Returns
-------
"""
edge = edges.edges()
_branch_validation = [
'id',
'L_lanes',
'R_lanes',
'S_lanes',
'RS_lanes',
'LS_lanes',
'priority'
]
for branch in branches:
numlanes = sum([branches[branch][l] for l in _lanes_fields])
# Make sure each branch has all the keys that are about to be called
if not all(field in branches[branch] for field in _branch_validation):
raise ParsedSchemaError("All of the fields in the list {} must be in each branch".format(str(_branch_validation)))
inbound_edge = edges.edgeType(
id=str(branches[branch]['id'])+"i",
priority=branches[branch]['priority'],
from_=str(branches[branch]['id']),
to=0,
numLanes=numlanes,
speed=str(branches[branch]['speed'])
)
outbound_edge = edges.edgeType(
id=str(branches[branch]['id'])+"o",
priority=branches[branch]['priority'],
to=str(branches[branch]['id']),
from_=0,
numLanes=numlanes,
speed=str(branches[branch]['speed'])
)
edge.append(inbound_edge)
edge.append(outbound_edge)
xmlstr = minidom.parseString(edge.toxml("utf-8").decode('utf-8')).toprettyxml(indent=" ")
return xmlstr
[docs]def mk_node(intersections: dict, branches: dict):
"""
Helper to make the nod.xml file
Parameters
----------
intersections
branches
Returns
-------
"""
n = nodes.nodes()
_branch_validation = [
'id',
'type',
'theta',
'Length',
]
_intersection_validation = [
'id',
'type'
]
branch_metadata = {}
# Iterate intersections
# NOTE: Will only generate the intersection with id == 0 until multiple intersection
# support is added.
for inter in intersections:
if not all(field in intersections[inter] for field in _intersection_validation):
raise ParsedSchemaError("All of the fields in the list {} must be in each branch".format(
str(_branch_validation))
)
if intersections[inter]['id'] != 0:
continue
n.append(nodes.nodeType(id=intersections[inter]['id'], x=0, y=0, type=intersections[inter]['type']))
for branch in branches:
# Make sure each branch has all the keys that are about to be called
if not all(field in branches[branch] for field in _branch_validation):
raise ParsedSchemaError("All of the fields in the list {} must be in each branch".format(str(_branch_validation)))
# branch_metadata[branches[branch]['id']] = {}
try:
n.append(nodes.nodeType(id=branches[branch]['id'],
x=branches[branch]['Length']*_cosd(branches[branch]['theta'], 2),
y=branches[branch]['Length']*_sind(branches[branch]['theta'], 2),
type=str.lower(branches[branch]['type'])))
except Exception as e:
print(branches[branch])
print(e)
xmlstr = minidom.parseString(n.toxml("utf-8").decode('utf-8')).toprettyxml(indent=" ")
return xmlstr
[docs]def mk_sumocfg(parsed_data: dict):
"""
Helper function to process the SUMOCFG key of the parsed Excel config file
into XML.
Parameters
----------
parsed_data : dictionary
The "SUMOCFG" key of the parsed Excel file.
Returns
-------
"""
underscore = re.compile(r'_')
cfg = sumocfg.configuration()
_implemented = {'input', 'time', 'gui_only'}
# Identify if any of the params passed are not implemented
# by this function.
_not_implemented = set(parsed_data.keys()) - _implemented
if len(_not_implemented) > 0:
raise ParsedConfigNotImplementedError
for tag in _implemented:
try:
parsed_data[tag]
except KeyError:
continue
_input = getattr(sumocfg, tag+'Type')()
for attr, value in parsed_data[tag].items():
attr_type = _input.__getattribute__("_"+tag+"Type__"+attr).__dict__['_ElementDeclaration__elementBinding']
_input.__setattr__(attr, attr_type(value_=value))
cfg.append(_input)
xmlstr = minidom.parseString(cfg.toxml("utf-8").decode('utf-8')).toprettyxml(indent=" ")
return xmlstr