"""
.. _ts_runner_cli:
``ts_runner`` Overview
----------------------
``ts_runner`` manages the startup and execution of the SUMO runtime along with data collection, traffic light control, and optimization
Software Structure and Implementation
-------------------------------------
Light Control
-------------
``ts_runner`` will accept an argument that specifies the file and class name to import at startup. Messing with this
part of Python is tricky business.
Optimization
------------
ts_runner looks for a file named optimizer_example to run
inside the optimizer_example file an OptimizerExample class is expected to exist with a function called ``train`` which is called after every simulation tick
Output
------
Output from ts_runner is directed to ``<sumo_project_dir>/output/<YYYY_MM_DD_HH_MM_SS_dddddd>/'' which
contains:
``sutripinfo.xml``
``YYYY_MM_DD_HH_MM_SS_dddddd_ts_runner.log`` (Not yet implemented, writes to current dir)
"""
import argparse
from argparse import RawTextHelpFormatter
from ts_core.utils.argparse_utils import FullPaths, is_dir, is_file
from ts_core.execution.traci_loop import run_loop
from ts_core.utils.ts_logging import setup_logging
import logging
setup_logging()
logger = logging.getLogger(__name__)
import sys
import time
import re
import datetime as dt
import os, shutil
import random
import traci
import sumolib
from six import string_types, integer_types
overwrite_output_help = \
"""
Overwrite the output directory. This will remove all previous runs and regenerate the folder.
"""
[docs]def run_loop():
"""
execute the TraCI control loop
Parameters
----------
Returns
-------
"""
tick = 0
# from ts_core.optimization.optimizer_example import OptimizerExample
# op = OptimizerExample()
# op.train(tick)
#traci.trafficlights.setPhase("0", 2)
car_flag = False
while traci.simulation.getMinExpectedNumber() > 1 or not car_flag:
traci.simulationStep()
if traci.simulation.getMinExpectedNumber() > 5:
car_flag = True
# op.train(tick)
tick += 1
traci.close()
sys.stdout.flush()
print()
print("Simulation Took %s ticks" % tick)
[docs]def make_phase(light_array, default_time, min_time=None, max_time=None):
"""
Parameters
----------
light_array: String containing the light pattern for this phase (i.e "GGrrGGrr)
default_time: Number of ticks that this phase should be active
min_time: Minimum number of ticks that this phase should be active
max_time: maximum number of ticks that this phase should be active
Returns
-------
Traci phase object with specified data
"""
mult = 1000 # Multiplier (units are in mili ticks??) TODO Understand this
if min_time is None or min_time > default_time:
min_time = default_time
if max_time is None or max_time < default_time:
max_time = default_time
return traci.trafficlight.Phase(default_time*mult, min_time*mult, max_time*mult, light_array)
[docs]def make_and_set_program(program_name, phases, starting_phase=0, tl_type=0, traffic_light_ids=()):
"""
Parameters
----------
program_name: Name for this program. Use this when setting the programs
phases: List/tuple of Traci phase objects demoting the various phases for this program
starting_phase: Integer: When this program is set, this phase ID will be active first
tl_type: ---DOES NOT WORK. ALWAYS STATIC--- 0: static, 1: delay_based, 2: actuated
traffic_light_ids: ID of Traffic lights to set to this program. If empty, does not set any traffic lights
Accepts String, integer, or list (of mixed strings or integers)
Returns
-------
Traci Program object
"""
new_program = traci.trafficlight.Logic(program_name, tl_type, 0, starting_phase, phases)
if traffic_light_ids != ():
if isinstance(traffic_light_ids, string_types): # If given as a single string
traffic_light_ids = [traffic_light_ids]
elif isinstance(traffic_light_ids, integer_types): # IF given as a single integer
traffic_light_ids = [traffic_light_ids]
for light_id in traffic_light_ids:
traci.trafficlight.setCompleteRedYellowGreenDefinition(str(light_id), new_program)
return new_program
[docs]def main():
"""
Initializes sumo to run using given files
Starts either command-line or GUI SUMO binary based on command-line arguments
Orchestrates the running of the simulation, traingin the optimizer, and controlling the traffic lights
Returns
-------
Nothing
"""
# Initialize argument parser and declare arguments.
parser = argparse.ArgumentParser(formatter_class=RawTextHelpFormatter, description=main.__doc__)
""" Argument Definitions """
parser.add_argument('sumo_config_dir', action=FullPaths, type=is_dir,
help='Specify the target directory from which to load config files')
parser.add_argument('--gui', action='store_false', help='Toggle the use of the SUMO GUI')
parser.add_argument('--overwrite_output', action='store_true', help=overwrite_output_help)
args = parser.parse_args()
# A handy variable with the datetime object at the time when the run started
run_start_dt = dt.datetime.now()
run_start_dt_str = re.sub('[^A-Za-z0-9]+', '_', run_start_dt.isoformat())
output_dir = args.sumo_config_dir + "/output/" + run_start_dt_str
""" Output Directory Handling """
# Wipe out the output directory if instructed by command line argument
if args.overwrite_output:
shutil.rmtree(args.sumo_config_dir + "/output", ignore_errors=True) # Dont crash if there is no directory
# Create the output director if it does not exist
if not os.path.isdir(args.sumo_config_dir + "/output"):
os.mkdir(args.sumo_config_dir + "/output")
# Create the directory for this run's output
os.mkdir(output_dir)
# Checking the binary for both the GUI and non-gui alternatives
if args.gui:
sumoBinary = sumolib.checkBinary('sumo')
else:
sumoBinary = sumolib.checkBinary('sumo-gui')
for file in os.listdir(args.sumo_config_dir):
if file.lower().endswith(".sumocfg"):
sumo_cfg_file = file
# sumo_cfg_name = file.split('.')[0]
# print("Configuration name: %s" % sumo_cfg_name)
# print("Found .sumocfg file: %s" % file)
break
else:
print("No .sumocfg file found in %s" % args.sumo_config_dir)
print("***EXITING***")
return
""" SUMO Startup """
# Start up the SUMO binary as specified by the SUMO_HOME environment variable
traci.start([sumoBinary, "-c", args.sumo_config_dir + '/' + sumo_cfg_file,
"--tripinfo-output", output_dir + "/sutripinfo.xml"])
""" Engage the SUMO event loop """
run_loop()
return
if __name__ == "__main__":
main()