How to use scenario_wrapper method in pytest-bdd

Best Python code snippet using pytest-bdd_python

simulations.py

Source:simulations.py Github

copy

Full Screen

1"""2SUMO simulation specific helper methods3"""4__author__ = "Peter Kocsis, Edmond Irani Liu"5__copyright__ = "TUM Cyber-Physical System Group"6__credits__ = []7__version__ = "0.1"8__maintainer__ = "Edmond Irani Liu"9__email__ = "edmond.irani@tum.de"10__status__ = "Integration"11import copy12import os13import pickle14from enum import unique, Enum15from math import sin, cos16from typing import Tuple, Dict, Optional17import numpy as np18from sumocr.sumo_config.default import DefaultConfig19from sumocr.interface.ego_vehicle import EgoVehicle20from sumocr.interface.sumo_simulation import SumoSimulation21from sumocr.maps.sumo_scenario import ScenarioWrapper22from sumocr.visualization.video import create_video23from sumocr.sumo_docker.interface.docker_interface import SumoInterface24from commonroad.scenario.scenario import Scenario25from commonroad.planning.planning_problem import PlanningProblemSet26from commonroad.common.solution import Solution27from commonroad.common.file_reader import CommonRoadFileReader28from main_interactive_CRplanner import InteractiveCRPlanner29import matplotlib.pyplot as plt30from commonroad.visualization.draw_dispatch_cr import draw_object31@unique32class SimulationOption(Enum):33 WITHOUT_EGO = "_without_ego"34 MOTION_PLANNER = "_planner"35 SOLUTION = "_solution"36def simulate_scenario(mode: SimulationOption,37 conf: DefaultConfig,38 scenario_wrapper: ScenarioWrapper,39 scenario_path: str,40 num_of_steps: int = None,41 planning_problem_set: PlanningProblemSet = None,42 solution: Solution = None,43 use_sumo_manager: bool = False) -> Tuple[Scenario, Dict[int, EgoVehicle]]:44 """45 Simulates an interactive scenario with specified mode46 :param mode: 0 = without ego, 1 = with plugged in planner, 2 = with solution trajectory47 :param conf: config of the simulation48 :param scenario_wrapper: scenario wrapper used by the Simulator49 :param scenario_path: path to the interactive scenario folder50 :param num_of_steps: number of steps to simulate51 :param planning_problem_set: planning problem set of the scenario52 :param solution: solution to the planning problem53 :param use_sumo_manager: indicates whether to use the SUMO Manager54 :return: simulated scenario and dictionary with items {planning_problem_id: EgoVehicle}55 """56 if num_of_steps is None:57 num_of_steps = conf.simulation_steps58 sumo_interface = None59 if use_sumo_manager:60 sumo_interface = SumoInterface(use_docker=True)61 sumo_sim = sumo_interface.start_simulator()62 sumo_sim.send_sumo_scenario(conf.scenario_name,63 scenario_path)64 else:65 sumo_sim = SumoSimulation()66 # initialize simulation67 sumo_sim.initialize(conf, scenario_wrapper, None)68 if mode is SimulationOption.WITHOUT_EGO:69 # simulation without ego vehicle70 for step in range(num_of_steps):71 # set to dummy simulation72 sumo_sim.dummy_ego_simulation = True73 sumo_sim.simulate_step()74 elif mode is SimulationOption.MOTION_PLANNER:75 # simulation with plugged in planner76 def run_simulation():77 ego_vehicles = sumo_sim.ego_vehicles78 for step in range(num_of_steps):79 if use_sumo_manager:80 ego_vehicles = sumo_sim.ego_vehicles81 # retrieve the CommonRoad scenario at the current time step, e.g. as an input for a predicition module82 current_scenario = sumo_sim.commonroad_scenario_at_time_step(sumo_sim.current_time_step)83 for idx, ego_vehicle in enumerate(ego_vehicles.values()):84 # retrieve the current state of the ego vehicle85 state_current_ego = ego_vehicle.current_state86 # ====== plug in your motion planner here87 # example motion planner which decelerates to full stop88 planning_problem = list(planning_problem_set.planning_problem_dict.values())[0]89 main_planner = InteractiveCRPlanner(current_scenario, ego_vehicle.current_state)90 last_action = []91 is_new_action_needed = True92 next_state, last_action, is_new_action_needed = main_planner.planning(current_scenario,93 planning_problem,94 ego_vehicle,95 sumo_sim.current_time_step,96 last_action,97 is_new_action_needed)98 # plt.clf()99 # draw_parameters = {100 # 'time_begin': sumo_sim.current_time_step,101 # 'scenario':102 # {'dynamic_obstacle': {'show_label': True, },103 # 'lanelet_network': {'lanelet': {'show_label': False, }, },104 # },105 # }106 # draw_object(current_scenario, draw_params=draw_parameters)107 # draw_object(planning_problem_set)108 # plt.gca().set_aspect('equal')109 # plt.pause(0.001)110 # next_state = copy.deepcopy(state_current_ego)111 # next_state.steering_angle = 0.0112 # a = -5.0113 # dt = 0.1114 # if next_state.velocity > 0:115 # v = next_state.velocity116 # x, y = next_state.position117 # o = next_state.orientation118 # next_state.position = np.array([x + v * cos(o) * dt, y + v * sin(o) * dt])119 # next_state.velocity += a * dt120 # ====== end of motion planner121 # update the ego vehicle with new trajectory with only 1 state for the current step122 next_state.time_step = 1123 trajectory_ego = [next_state]124 ego_vehicle.set_planned_trajectory(trajectory_ego)125 if use_sumo_manager:126 # set the modified ego vehicles to synchronize in case of using sumo_docker127 sumo_sim.ego_vehicles = ego_vehicles128 sumo_sim.simulate_step()129 run_simulation()130 elif mode is SimulationOption.SOLUTION:131 # simulation with given solution trajectory132 def run_simulation():133 ego_vehicles = sumo_sim.ego_vehicles134 for time_step in range(num_of_steps):135 if use_sumo_manager:136 ego_vehicles = sumo_sim.ego_vehicles137 for idx_ego, ego_vehicle in enumerate(ego_vehicles.values()):138 # update the ego vehicles with solution trajectories139 trajectory_solution = solution.planning_problem_solutions[idx_ego].trajectory140 next_state = copy.deepcopy(trajectory_solution.state_list[time_step])141 next_state.time_step = 1142 trajectory_ego = [next_state]143 ego_vehicle.set_planned_trajectory(trajectory_ego)144 if use_sumo_manager:145 # set the modified ego vehicles to synchronize in case of using SUMO Manager146 sumo_sim.ego_vehicles = ego_vehicles147 sumo_sim.simulate_step()148 check_trajectories(solution, planning_problem_set, conf)149 run_simulation()150 # retrieve the simulated scenario in CR format151 simulated_scenario = sumo_sim.commonroad_scenarios_all_time_steps()152 # stop the simulation153 sumo_sim.stop()154 if use_sumo_manager:155 sumo_interface.stop_simulator()156 ego_vechicles = {list(planning_problem_set.planning_problem_dict.keys())[0]:157 ego_v for _, ego_v in sumo_sim.ego_vehicles.items()}158 return simulated_scenario, ego_vechicles159def simulate_without_ego(interactive_scenario_path: str,160 output_folder_path: str = None,161 create_video: bool = False,162 use_sumo_manager: bool = False) -> Tuple[Scenario, PlanningProblemSet]:163 """164 Simulates an interactive scenario without ego vehicle165 :param interactive_scenario_path: path to the interactive scenario folder166 :param output_folder_path: path to the output folder167 :param create_video: indicates whether to create a mp4 of the simulated scenario168 :param use_sumo_manager: indicates whether to use the SUMO Manager169 :return: Tuple of the simulated scenario and the planning problem set170 """171 conf = load_sumo_configuration(interactive_scenario_path)172 scenario_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.cr.xml")173 scenario, planning_problem_set = CommonRoadFileReader(scenario_file).open()174 scenario_wrapper = ScenarioWrapper()175 scenario_wrapper.sumo_cfg_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.sumo.cfg")176 scenario_wrapper.initial_scenario = scenario177 # simulation without ego vehicle178 simulated_scenario_without_ego, _ = simulate_scenario(SimulationOption.WITHOUT_EGO, conf,179 scenario_wrapper,180 interactive_scenario_path,181 num_of_steps=conf.simulation_steps,182 planning_problem_set=planning_problem_set,183 solution=None,184 use_sumo_manager=use_sumo_manager)185 simulated_scenario_without_ego.scenario_id = scenario.scenario_id186 if create_video:187 create_video_for_simulation(simulated_scenario_without_ego, output_folder_path, planning_problem_set,188 {}, SimulationOption.WITHOUT_EGO.value)189 return simulated_scenario_without_ego, planning_problem_set190def simulate_with_solution(interactive_scenario_path: str,191 output_folder_path: str = None,192 solution: Solution = None,193 create_video: bool = False,194 use_sumo_manager: bool = False,195 create_ego_obstacle: bool = False) -> Tuple[Scenario, PlanningProblemSet, Dict[int, EgoVehicle]]:196 """197 Simulates an interactive scenario with a given solution198 :param interactive_scenario_path: path to the interactive scenario folder199 :param output_folder_path: path to the output folder200 :param solution: solution to the planning problem201 :param create_video: indicates whether to create a mp4 of the simulated scenario202 :param use_sumo_manager: indicates whether to use the SUMO Manager203 :param create_ego_obstacle: indicates whether to create obstacles as the ego vehicles204 :return: Tuple of the simulated scenario and the planning problem set205 """206 if not isinstance(solution, Solution):207 raise Exception("Solution to the planning problem is not given.")208 conf = load_sumo_configuration(interactive_scenario_path)209 scenario_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.cr.xml")210 scenario, planning_problem_set = CommonRoadFileReader(scenario_file).open()211 scenario_wrapper = ScenarioWrapper()212 scenario_wrapper.sumo_cfg_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.sumo.cfg")213 scenario_wrapper.initial_scenario = scenario214 scenario_with_solution, ego_vehicles = simulate_scenario(SimulationOption.SOLUTION, conf,215 scenario_wrapper,216 interactive_scenario_path,217 num_of_steps=conf.simulation_steps,218 planning_problem_set=planning_problem_set,219 solution=solution,220 use_sumo_manager=use_sumo_manager)221 scenario_with_solution.scenario_id = scenario.scenario_id222 if create_video:223 create_video_for_simulation(scenario_with_solution, output_folder_path, planning_problem_set,224 ego_vehicles, SimulationOption.SOLUTION.value)225 if create_ego_obstacle:226 for pp_id, planning_problem in planning_problem_set.planning_problem_dict.items():227 obstacle_ego = ego_vehicles[pp_id].get_dynamic_obstacle()228 scenario_with_solution.add_objects(obstacle_ego)229 return scenario_with_solution, planning_problem_set, ego_vehicles230def simulate_with_planner(interactive_scenario_path: str,231 output_folder_path: str = None,232 create_video: bool = False,233 use_sumo_manager: bool = False,234 create_ego_obstacle: bool = False) \235 -> Tuple[Scenario, PlanningProblemSet, Dict[int, EgoVehicle]]:236 """237 Simulates an interactive scenario with a plugged in motion planner238 :param interactive_scenario_path: path to the interactive scenario folder239 :param output_folder_path: path to the output folder240 :param create_video: indicates whether to create a mp4 of the simulated scenario241 :param use_sumo_manager: indicates whether to use the SUMO Manager242 :param create_ego_obstacle: indicates whether to create obstacles from the planned trajectories as the ego vehicles243 :return: Tuple of the simulated scenario, planning problem set, and list of ego vehicles244 """245 conf = load_sumo_configuration(interactive_scenario_path)246 scenario_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.cr.xml")247 scenario, planning_problem_set = CommonRoadFileReader(scenario_file).open()248 scenario_wrapper = ScenarioWrapper()249 scenario_wrapper.sumo_cfg_file = os.path.join(interactive_scenario_path, f"{conf.scenario_name}.sumo.cfg")250 scenario_wrapper.initial_scenario = scenario251 scenario_with_planner, ego_vehicles = simulate_scenario(SimulationOption.MOTION_PLANNER, conf,252 scenario_wrapper,253 interactive_scenario_path,254 num_of_steps=conf.simulation_steps,255 planning_problem_set=planning_problem_set,256 use_sumo_manager=use_sumo_manager)257 scenario_with_planner.scenario_id = scenario.scenario_id258 if create_video:259 create_video_for_simulation(scenario_with_planner, output_folder_path, planning_problem_set,260 ego_vehicles, SimulationOption.MOTION_PLANNER.value)261 if create_ego_obstacle:262 for pp_id, planning_problem in planning_problem_set.planning_problem_dict.items():263 obstacle_ego = ego_vehicles[pp_id].get_dynamic_obstacle()264 scenario_with_planner.add_objects(obstacle_ego)265 return scenario_with_planner, planning_problem_set, ego_vehicles266def load_sumo_configuration(interactive_scenario_path: str) -> DefaultConfig:267 with open(os.path.join(interactive_scenario_path, "simulation_config.p"), "rb") as input_file:268 conf = pickle.load(input_file)269 return conf270def check_trajectories(solution: Solution, pps: PlanningProblemSet, config: DefaultConfig):271 assert len(set(solution.planning_problem_ids) - set(pps.planning_problem_dict.keys())) == 0, \272 f"Provided solution trajectories with IDs {solution.planning_problem_ids} don't match " \273 f"planning problem IDs{list(pps.planning_problem_dict.keys())}"274 for s in solution.planning_problem_solutions:275 if s.trajectory.final_state.time_step < config.simulation_steps:276 raise ValueError(f"The simulation requires {config.simulation_steps}"277 f"states, but the solution only provides"278 f"{s.trajectory.final_state.time_step} time steps!")279def create_video_for_simulation(scenario_with_planner: Scenario, output_folder_path: str,280 planning_problem_set: PlanningProblemSet,281 ego_vehicles: Optional[Dict[int, EgoVehicle]],282 suffix: str, follow_ego: bool = True):283 """Creates the mp4 animation for the simulation result."""284 if not output_folder_path:285 print("Output folder not specified, skipping mp4 generation.")286 return287 # create list of planning problems and trajectories288 list_planning_problems = []289 # create mp4 animation290 create_video(scenario_with_planner,291 output_folder_path,292 planning_problem_set=planning_problem_set,293 trajectory_pred=ego_vehicles,294 follow_ego=follow_ego,...

Full Screen

Full Screen

scenario.py

Source:scenario.py Github

copy

Full Screen

...140 for arg in scenario.get_example_params():141 if arg not in function_args:142 function_args.append(arg)143 @pytest.mark.usefixtures(*function_args)144 def scenario_wrapper(request):145 _execute_scenario(feature, scenario, request, encoding)146 return fn(*[request.getfixturevalue(arg) for arg in args])147 for param_set in scenario.get_params():148 if param_set:149 scenario_wrapper = pytest.mark.parametrize(*param_set)(scenario_wrapper)150 for tag in scenario.tags.union(feature.tags):151 config = CONFIG_STACK[-1]152 config.hook.pytest_bdd_apply_tag(tag=tag, function=scenario_wrapper)153 scenario_wrapper.__doc__ = u"{feature_name}: {scenario_name}".format(154 feature_name=feature_name, scenario_name=scenario_name155 )156 scenario_wrapper.__scenario__ = scenario157 scenario_wrapper.__pytest_bdd_counter__ = counter158 scenario.test_function = scenario_wrapper...

Full Screen

Full Screen

test_multiset.py

Source:test_multiset.py Github

copy

Full Screen

1from copy import deepcopy2from itertools import product3import pytest4from tiny_algos.minmultiset import MaxMultiSet, MinMultiSet5from tiny_algos.multiset import MultiSet6PINF = 9223372036854775807 # 2 ** 63 - 17NINF = -PINF8MIN_DEFAULT = PINF9MAX_DEFAULT = NINF10class ScenarioWrapper:11 def __init__(self, base, action, description):12 self.base = base13 self.action = action14 self.description = description15 def __repr__(self):16 return f'"{self.description}"'17scenarios = [18 ScenarioWrapper(19 base=[],20 action=[21 ("add", 1),22 ("add", 2),23 ("add", -1),24 ],25 description="construct with empty list",26 ),27 ScenarioWrapper(28 base=[1, -1],29 action=[30 ("remove", 1),31 ("remove", -1),32 ("add", 1),33 ("add", -1),34 ],35 description="Remove all and add",36 ),37 ScenarioWrapper(38 base=[0, 1, 1, 2],39 action=[40 ("add", 3),41 ("add", 2),42 ("add", -1),43 ("add", -2),44 ],45 description="add",46 ),47 ScenarioWrapper(48 base=[0, 1, 1, 2],49 action=[50 ("remove", 1),51 ("remove", 2),52 ("remove", 0),53 ],54 description="remove",55 ),56 ScenarioWrapper(57 base=[2, 4, 0, 2],58 action=[59 ("add", 3),60 ("remove", 4),61 ("add", 1),62 ("remove", 0),63 ("add", 1),64 ("add", 3),65 ("remove", 1),66 ("remove", 3),67 ("remove", 1),68 ("remove", 2),69 ],70 description="add and remove",71 ),72]73class MultiSetWrapper:74 def __init__(self, ms_cls, extremes):75 self.ms_cls = ms_cls76 self.extremes = extremes77 def __repr__(self):78 return self.ms_cls.__name__79multisets = [80 MultiSetWrapper(MultiSet, extremes=("min", "max")),81 MultiSetWrapper(MinMultiSet, extremes=("min")),82 MultiSetWrapper(MaxMultiSet, extremes=("max")),83]84@pytest.mark.parametrize(85 ["scenario_wrapper", "multiset_wrapper"],86 product(scenarios, multisets),87)88def test_multiset(scenario_wrapper, multiset_wrapper):89 ms_cls = multiset_wrapper.ms_cls90 extremes = multiset_wrapper.extremes91 base = deepcopy(scenario_wrapper.base)92 action = deepcopy(scenario_wrapper.action)93 test_list = list(base)94 test_ms = ms_cls(list(base))95 def check_extremes():96 if "min" in extremes:97 assert test_ms.min == min(test_list, default=MIN_DEFAULT)98 if "max" in extremes:99 assert test_ms.max == max(test_list, default=MAX_DEFAULT)100 check_extremes()101 for action, value in action:102 if action == "add":103 test_list.append(value)104 test_ms.add(value)105 elif action == "remove":106 test_list.remove(value)107 test_ms.remove(value)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pytest-bdd automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful