Best Python code snippet using pytest-bdd_python
simulations.py
Source:simulations.py  
1"""2SUMO simulation specific helper methods3"""4__author__ = "Peter Kocsis, Edmond Irani Liu"5__copyright__ = "TUM Cyber-Physical System Group"6__credits__ = []7__version__ = "0.1"8__maintainer__ = "Edmond Irani Liu"9__email__ = "edmond.irani@tum.de"10__status__ = "Integration"11import copy12import os13import pickle14from enum import unique, Enum15from math import sin, cos16from typing import Tuple, Dict, Optional17import numpy as np18from sumocr.sumo_config.default import DefaultConfig19from sumocr.interface.ego_vehicle import EgoVehicle20from sumocr.interface.sumo_simulation import SumoSimulation21from sumocr.maps.sumo_scenario import ScenarioWrapper22from sumocr.visualization.video import create_video23from sumocr.sumo_docker.interface.docker_interface import SumoInterface24from commonroad.scenario.scenario import Scenario25from commonroad.planning.planning_problem import PlanningProblemSet26from commonroad.common.solution import Solution27from commonroad.common.file_reader import CommonRoadFileReader28from main_interactive_CRplanner import InteractiveCRPlanner29import matplotlib.pyplot as plt30from commonroad.visualization.draw_dispatch_cr import draw_object31@unique32class SimulationOption(Enum):33    WITHOUT_EGO = "_without_ego"34    MOTION_PLANNER = "_planner"35    SOLUTION = "_solution"36def simulate_scenario(mode: SimulationOption,37                      conf: DefaultConfig,38                      scenario_wrapper: ScenarioWrapper,39                      scenario_path: str,40                      num_of_steps: int = None,41                      planning_problem_set: PlanningProblemSet = None,42                      solution: Solution = None,43                      use_sumo_manager: bool = False) -> Tuple[Scenario, Dict[int, EgoVehicle]]:44    """45    Simulates an interactive scenario with specified mode46    :param mode: 0 = without ego, 1 = with plugged in planner, 2 = with solution trajectory47    :param conf: config of the simulation48    :param scenario_wrapper: scenario wrapper used by the Simulator49    :param scenario_path: path to the interactive scenario folder50    :param num_of_steps: number of steps to simulate51    :param planning_problem_set: planning problem set of the scenario52    :param solution: solution to the planning problem53    :param use_sumo_manager: indicates whether to use the SUMO Manager54    :return: simulated scenario and dictionary with items {planning_problem_id: EgoVehicle}55    """56    if num_of_steps is None:57        num_of_steps = conf.simulation_steps58    sumo_interface = None59    if use_sumo_manager:60        sumo_interface = SumoInterface(use_docker=True)61        sumo_sim = sumo_interface.start_simulator()62        sumo_sim.send_sumo_scenario(conf.scenario_name,63                                    scenario_path)64    else:65        sumo_sim = SumoSimulation()66    # initialize simulation67    sumo_sim.initialize(conf, scenario_wrapper, None)68    if mode is SimulationOption.WITHOUT_EGO:69        # simulation without ego vehicle70        for step in range(num_of_steps):71            # set to dummy simulation72            sumo_sim.dummy_ego_simulation = True73            sumo_sim.simulate_step()74    elif mode is SimulationOption.MOTION_PLANNER:75        # simulation with plugged in planner76        def run_simulation():77            ego_vehicles = sumo_sim.ego_vehicles78            for step in range(num_of_steps):79                if use_sumo_manager:80                    ego_vehicles = sumo_sim.ego_vehicles81                # retrieve the CommonRoad scenario at the current time step, e.g. as an input for a predicition module82                current_scenario = sumo_sim.commonroad_scenario_at_time_step(sumo_sim.current_time_step)83                for idx, ego_vehicle in enumerate(ego_vehicles.values()):84                    # retrieve the current state of the ego vehicle85                    state_current_ego = ego_vehicle.current_state86                    # ====== plug in your motion planner here87                    # example motion planner which decelerates to full stop88                    planning_problem = list(planning_problem_set.planning_problem_dict.values())[0]89                    main_planner = InteractiveCRPlanner(current_scenario, ego_vehicle.current_state)90                    last_action = []91                    is_new_action_needed = True92                    next_state, last_action, is_new_action_needed = main_planner.planning(current_scenario,93                                                                                        planning_problem,94                                                                                        ego_vehicle,95                                                                                        sumo_sim.current_time_step,96                                                                                        last_action,97                                                                                        is_new_action_needed)98                    # plt.clf()99                    # draw_parameters = {100                    #     'time_begin': sumo_sim.current_time_step,101                    #     'scenario':102                    #         {'dynamic_obstacle': {'show_label': True, },103                    #         'lanelet_network': {'lanelet': {'show_label': False, }, },104                    #         },105                    # }106                    # draw_object(current_scenario, draw_params=draw_parameters)107                    # draw_object(planning_problem_set)108                    # plt.gca().set_aspect('equal')109                    # plt.pause(0.001)110                    # next_state = copy.deepcopy(state_current_ego)111                    # next_state.steering_angle = 0.0112                    # a = -5.0113                    # dt = 0.1114                    # if next_state.velocity > 0:115                    #     v = next_state.velocity116                    #     x, y = next_state.position117                    #     o = next_state.orientation118                    #     next_state.position = np.array([x + v * cos(o) * dt, y + v * sin(o) * dt])119                    #     next_state.velocity += a * dt120                    # ====== end of motion planner121                    # update the ego vehicle with new trajectory with only 1 state for the current step122                    next_state.time_step = 1123                    trajectory_ego = [next_state]124                    ego_vehicle.set_planned_trajectory(trajectory_ego)125                if use_sumo_manager:126                    # set the modified ego vehicles to synchronize in case of using sumo_docker127                    sumo_sim.ego_vehicles = ego_vehicles128                sumo_sim.simulate_step()129        run_simulation()130    elif mode is SimulationOption.SOLUTION:131        # simulation with given solution trajectory132        def run_simulation():133            ego_vehicles = sumo_sim.ego_vehicles134            for time_step in range(num_of_steps):135                if use_sumo_manager:136                    ego_vehicles = sumo_sim.ego_vehicles137                for idx_ego, ego_vehicle in enumerate(ego_vehicles.values()):138                    # update the ego vehicles with solution trajectories139                    trajectory_solution = solution.planning_problem_solutions[idx_ego].trajectory140                    next_state = copy.deepcopy(trajectory_solution.state_list[time_step])141                    next_state.time_step = 1142                    trajectory_ego = [next_state]143                    ego_vehicle.set_planned_trajectory(trajectory_ego)144                if use_sumo_manager:145                    # set the modified ego vehicles to synchronize in case of using SUMO Manager146                    sumo_sim.ego_vehicles = ego_vehicles147                sumo_sim.simulate_step()148        check_trajectories(solution, planning_problem_set, conf)149        run_simulation()150    # retrieve the simulated scenario in CR format151    simulated_scenario = sumo_sim.commonroad_scenarios_all_time_steps()152    # stop the simulation153    sumo_sim.stop()154    if use_sumo_manager:155        sumo_interface.stop_simulator()156    ego_vechicles = {list(planning_problem_set.planning_problem_dict.keys())[0]:157                         ego_v for _, ego_v in sumo_sim.ego_vehicles.items()}158    return simulated_scenario, ego_vechicles159def simulate_without_ego(interactive_scenario_path: str,160                         output_folder_path: str = None,161                         create_video: bool = False,162                         use_sumo_manager: bool = False) -> Tuple[Scenario, PlanningProblemSet]:163    """164    Simulates an interactive scenario without ego vehicle165    :param interactive_scenario_path: path to the interactive scenario folder166    :param output_folder_path: path to the output folder167    :param create_video: indicates whether to create a mp4 of the simulated scenario168    :param use_sumo_manager: indicates whether to use the SUMO Manager169    :return: Tuple of the simulated scenario and the planning problem set170    """171    conf = load_sumo_configuration(interactive_scenario_path)172    scenario_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.cr.xml")173    scenario, planning_problem_set = CommonRoadFileReader(scenario_file).open()174    scenario_wrapper = ScenarioWrapper()175    scenario_wrapper.sumo_cfg_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.sumo.cfg")176    scenario_wrapper.initial_scenario = scenario177    # simulation without ego vehicle178    simulated_scenario_without_ego, _ = simulate_scenario(SimulationOption.WITHOUT_EGO, conf,179                                                          scenario_wrapper,180                                                          interactive_scenario_path,181                                                          num_of_steps=conf.simulation_steps,182                                                          planning_problem_set=planning_problem_set,183                                                          solution=None,184                                                          use_sumo_manager=use_sumo_manager)185    simulated_scenario_without_ego.scenario_id = scenario.scenario_id186    if create_video:187        create_video_for_simulation(simulated_scenario_without_ego, output_folder_path, planning_problem_set,188                                    {}, SimulationOption.WITHOUT_EGO.value)189    return simulated_scenario_without_ego, planning_problem_set190def simulate_with_solution(interactive_scenario_path: str,191                           output_folder_path: str = None,192                           solution: Solution = None,193                           create_video: bool = False,194                           use_sumo_manager: bool = False,195                           create_ego_obstacle: bool = False) -> Tuple[Scenario, PlanningProblemSet, Dict[int, EgoVehicle]]:196    """197    Simulates an interactive scenario with a given solution198    :param interactive_scenario_path: path to the interactive scenario folder199    :param output_folder_path: path to the output folder200    :param solution: solution to the planning problem201    :param create_video: indicates whether to create a mp4 of the simulated scenario202    :param use_sumo_manager: indicates whether to use the SUMO Manager203    :param create_ego_obstacle: indicates whether to create obstacles as the ego vehicles204    :return: Tuple of the simulated scenario and the planning problem set205    """206    if not isinstance(solution, Solution):207        raise Exception("Solution to the planning problem is not given.")208    conf = load_sumo_configuration(interactive_scenario_path)209    scenario_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.cr.xml")210    scenario, planning_problem_set = CommonRoadFileReader(scenario_file).open()211    scenario_wrapper = ScenarioWrapper()212    scenario_wrapper.sumo_cfg_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.sumo.cfg")213    scenario_wrapper.initial_scenario = scenario214    scenario_with_solution, ego_vehicles = simulate_scenario(SimulationOption.SOLUTION, conf,215                                                                       scenario_wrapper,216                                                                       interactive_scenario_path,217                                                                       num_of_steps=conf.simulation_steps,218                                                                       planning_problem_set=planning_problem_set,219                                                                       solution=solution,220                                                                       use_sumo_manager=use_sumo_manager)221    scenario_with_solution.scenario_id = scenario.scenario_id222    if create_video:223        create_video_for_simulation(scenario_with_solution, output_folder_path, planning_problem_set,224                                    ego_vehicles, SimulationOption.SOLUTION.value)225    if create_ego_obstacle:226        for pp_id, planning_problem in planning_problem_set.planning_problem_dict.items():227            obstacle_ego = ego_vehicles[pp_id].get_dynamic_obstacle()228            scenario_with_solution.add_objects(obstacle_ego)229    return scenario_with_solution, planning_problem_set, ego_vehicles230def simulate_with_planner(interactive_scenario_path: str,231                          output_folder_path: str = None,232                          create_video: bool = False,233                          use_sumo_manager: bool = False,234                          create_ego_obstacle: bool = False) \235        -> Tuple[Scenario, PlanningProblemSet, Dict[int, EgoVehicle]]:236    """237    Simulates an interactive scenario with a plugged in motion planner238    :param interactive_scenario_path: path to the interactive scenario folder239    :param output_folder_path: path to the output folder240    :param create_video: indicates whether to create a mp4 of the simulated scenario241    :param use_sumo_manager: indicates whether to use the SUMO Manager242    :param create_ego_obstacle: indicates whether to create obstacles from the planned trajectories as the ego vehicles243    :return: Tuple of the simulated scenario, planning problem set, and list of ego vehicles244    """245    conf = load_sumo_configuration(interactive_scenario_path)246    scenario_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.cr.xml")247    scenario, planning_problem_set = CommonRoadFileReader(scenario_file).open()248    scenario_wrapper = ScenarioWrapper()249    scenario_wrapper.sumo_cfg_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.sumo.cfg")250    scenario_wrapper.initial_scenario = scenario251    scenario_with_planner, ego_vehicles = simulate_scenario(SimulationOption.MOTION_PLANNER, conf,252                                                                      scenario_wrapper,253                                                                      interactive_scenario_path,254                                                                      num_of_steps=conf.simulation_steps,255                                                                      planning_problem_set=planning_problem_set,256                                                                      use_sumo_manager=use_sumo_manager)257    scenario_with_planner.scenario_id = scenario.scenario_id258    if create_video:259        create_video_for_simulation(scenario_with_planner, output_folder_path, planning_problem_set,260                                    ego_vehicles, SimulationOption.MOTION_PLANNER.value)261    if create_ego_obstacle:262        for pp_id, planning_problem in planning_problem_set.planning_problem_dict.items():263            obstacle_ego = ego_vehicles[pp_id].get_dynamic_obstacle()264            scenario_with_planner.add_objects(obstacle_ego)265    return scenario_with_planner, planning_problem_set, ego_vehicles266def load_sumo_configuration(interactive_scenario_path: str) -> DefaultConfig:267    with open(os.path.join(interactive_scenario_path, "simulation_config.p"), "rb") as input_file:268        conf = pickle.load(input_file)269    return conf270def check_trajectories(solution: Solution, pps: PlanningProblemSet, config: DefaultConfig):271    assert len(set(solution.planning_problem_ids) - set(pps.planning_problem_dict.keys())) == 0, \272        f"Provided solution trajectories with IDs {solution.planning_problem_ids} don't match " \273        f"planning problem IDs{list(pps.planning_problem_dict.keys())}"274    for s in solution.planning_problem_solutions:275        if s.trajectory.final_state.time_step < config.simulation_steps:276            raise ValueError(f"The simulation requires {config.simulation_steps}"277                             f"states, but the solution only provides"278                             f"{s.trajectory.final_state.time_step} time steps!")279def create_video_for_simulation(scenario_with_planner: Scenario, output_folder_path: str,280                                planning_problem_set: PlanningProblemSet,281                                ego_vehicles: Optional[Dict[int, EgoVehicle]],282                                suffix: str, follow_ego: bool = True):283    """Creates the mp4 animation for the simulation result."""284    if not output_folder_path:285        print("Output folder not specified, skipping mp4 generation.")286        return287    # create list of planning problems and trajectories288    list_planning_problems = []289    # create mp4 animation290    create_video(scenario_with_planner,291                 output_folder_path,292                 planning_problem_set=planning_problem_set,293                 trajectory_pred=ego_vehicles,294                 follow_ego=follow_ego,...scenario.py
Source:scenario.py  
...140        for arg in scenario.get_example_params():141            if arg not in function_args:142                function_args.append(arg)143        @pytest.mark.usefixtures(*function_args)144        def scenario_wrapper(request):145            _execute_scenario(feature, scenario, request, encoding)146            return fn(*[request.getfixturevalue(arg) for arg in args])147        for param_set in scenario.get_params():148            if param_set:149                scenario_wrapper = pytest.mark.parametrize(*param_set)(scenario_wrapper)150        for tag in scenario.tags.union(feature.tags):151            config = CONFIG_STACK[-1]152            config.hook.pytest_bdd_apply_tag(tag=tag, function=scenario_wrapper)153        scenario_wrapper.__doc__ = u"{feature_name}: {scenario_name}".format(154            feature_name=feature_name, scenario_name=scenario_name155        )156        scenario_wrapper.__scenario__ = scenario157        scenario_wrapper.__pytest_bdd_counter__ = counter158        scenario.test_function = scenario_wrapper...test_multiset.py
Source:test_multiset.py  
1from copy import deepcopy2from itertools import product3import pytest4from tiny_algos.minmultiset import MaxMultiSet, MinMultiSet5from tiny_algos.multiset import MultiSet6PINF = 9223372036854775807  # 2 ** 63 - 17NINF = -PINF8MIN_DEFAULT = PINF9MAX_DEFAULT = NINF10class ScenarioWrapper:11    def __init__(self, base, action, description):12        self.base = base13        self.action = action14        self.description = description15    def __repr__(self):16        return f'"{self.description}"'17scenarios = [18    ScenarioWrapper(19        base=[],20        action=[21            ("add", 1),22            ("add", 2),23            ("add", -1),24        ],25        description="construct with empty list",26    ),27    ScenarioWrapper(28        base=[1, -1],29        action=[30            ("remove", 1),31            ("remove", -1),32            ("add", 1),33            ("add", -1),34        ],35        description="Remove all and add",36    ),37    ScenarioWrapper(38        base=[0, 1, 1, 2],39        action=[40            ("add", 3),41            ("add", 2),42            ("add", -1),43            ("add", -2),44        ],45        description="add",46    ),47    ScenarioWrapper(48        base=[0, 1, 1, 2],49        action=[50            ("remove", 1),51            ("remove", 2),52            ("remove", 0),53        ],54        description="remove",55    ),56    ScenarioWrapper(57        base=[2, 4, 0, 2],58        action=[59            ("add", 3),60            ("remove", 4),61            ("add", 1),62            ("remove", 0),63            ("add", 1),64            ("add", 3),65            ("remove", 1),66            ("remove", 3),67            ("remove", 1),68            ("remove", 2),69        ],70        description="add and remove",71    ),72]73class MultiSetWrapper:74    def __init__(self, ms_cls, extremes):75        self.ms_cls = ms_cls76        self.extremes = extremes77    def __repr__(self):78        return self.ms_cls.__name__79multisets = [80    MultiSetWrapper(MultiSet, extremes=("min", "max")),81    MultiSetWrapper(MinMultiSet, extremes=("min")),82    MultiSetWrapper(MaxMultiSet, extremes=("max")),83]84@pytest.mark.parametrize(85    ["scenario_wrapper", "multiset_wrapper"],86    product(scenarios, multisets),87)88def test_multiset(scenario_wrapper, multiset_wrapper):89    ms_cls = multiset_wrapper.ms_cls90    extremes = multiset_wrapper.extremes91    base = deepcopy(scenario_wrapper.base)92    action = deepcopy(scenario_wrapper.action)93    test_list = list(base)94    test_ms = ms_cls(list(base))95    def check_extremes():96        if "min" in extremes:97            assert test_ms.min == min(test_list, default=MIN_DEFAULT)98        if "max" in extremes:99            assert test_ms.max == max(test_list, default=MAX_DEFAULT)100    check_extremes()101    for action, value in action:102        if action == "add":103            test_list.append(value)104            test_ms.add(value)105        elif action == "remove":106            test_list.remove(value)107            test_ms.remove(value)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
