How to use _set_info method in yandex-tank

Best Python code snippet using yandex-tank

mp_agents.py

Source:mp_agents.py Github

copy

Full Screen

...42 self._sim = env._sim43 self._task = env._task44 def _end_episode(self):45 self._task.should_end = True46 def _set_info(self, k: str, v: Any) -> None:47 self._last_info[k] = v48 def _has_info(self, k: str) -> bool:49 return k in self._last_info50 def get_and_clear_info(self) -> Dict[str, Any]:51 ret = self._last_info52 self._last_info = {}53 return ret54 def reset(self) -> None:55 if self._auto_get_args_fn is not None:56 self.set_args(**self._auto_get_args_fn(self))57 def set_args(self, **kwargs) -> None:58 pass59 def _log(self, txt):60 if self._config.VERBOSE:61 print("%s: %s" % (str(self), txt))62 def act(self, observations: Observations) -> Dict[str, Any]:63 if self._should_auto_end:64 self._end_episode()65 return {}66 def should_term(self, observations: Observations) -> bool:67 return False68class AgentComposition(ParameterizedAgent):69 def __init__(70 self,71 skills,72 env,73 config,74 action_config,75 should_auto_end=True,76 auto_get_args_fn=None,77 ):78 super().__init__(79 env, config, action_config, should_auto_end, auto_get_args_fn80 )81 self.skills: List[ParameterizedAgent] = skills82 self.cur_skill: int = 083 def _set_info(self, k, v):84 self._last_info[k] = v85 def _has_info(self, k):86 return any([skill._has_info(k) for skill in self.skills])87 def get_and_clear_info(self):88 r = {}89 for skill in self.skills:90 r.update(skill.get_and_clear_info())91 return r92 def set_args(self, **kwargs):93 self._enter_kwargs = kwargs94 if self._is_done_with_skills:95 return96 self.skills[self.cur_skill].set_args(**self._enter_kwargs)97 @property98 def _is_done_with_skills(self):99 return self.cur_skill >= len(self.skills)100 def reset(self):101 super().reset()102 self.cur_skill = 0103 self.skills[self.cur_skill].reset()104 def act(self, observations):105 if self.should_term(observations):106 return get_noop_arm_action(self._sim)107 action = self.skills[self.cur_skill].act(observations)108 return action109 def should_term(self, observations):110 if self._is_done_with_skills:111 return True112 if self.skills[self.cur_skill].should_term(observations):113 self.cur_skill += 1114 if self.cur_skill < len(self.skills):115 self._log(f"Moving to skill {self.skills[self.cur_skill]}")116 self.skills[self.cur_skill].reset()117 self.skills[self.cur_skill].set_args(**self._enter_kwargs)118 return self.cur_skill >= len(self.skills)119class ArmTargModule(ParameterizedAgent):120 """Reaches the arm to a target position."""121 def __init__(122 self,123 env,124 config,125 action_config,126 should_auto_end=True,127 auto_get_args_fn=None,128 ):129 super().__init__(130 env, config, action_config, should_auto_end, auto_get_args_fn131 )132 self._grasp_thresh = self._agent_config.ARM_ACTION.GRASP_THRESH_DIST133 self._viz_points = []134 self._mp = MotionPlanner(self._sim, self._config)135 self._mp.set_should_render(self._config.MP_RENDER)136 self._enter_kwargs = None137 @property138 def wait_after(self) -> int:139 return 0140 @property141 def timeout(self) -> int:142 return 400143 def set_args(self, **kwargs) -> None:144 self._log(f"Set arm targ args {kwargs}")145 self._enter_kwargs = kwargs146 def reset(self) -> None:147 self._enter_kwargs = None148 super().reset()149 self._log("Entered arm targ")150 self._plan_idx = 0151 self._term = False152 self._clean_viz_points()153 self._viz_points = []154 self._plan: Any = None155 self._has_generated_plan = False156 def _add_debug_viz_point(self, pos):157 pos_name = f"arm_targ_{len(self._viz_points)}"158 self._sim.viz_ids[pos_name] = self._sim.visualize_position(159 pos, self._sim.viz_ids[pos_name]160 )161 self._viz_points.append(pos_name)162 def act(self, observations: Observations) -> Dict[str, Any]:163 assert self._enter_kwargs is not None, "Need to first call `set_args`!"164 if not self._has_generated_plan:165 self._plan = self._generate_plan(166 observations, **self._enter_kwargs167 )168 self._has_generated_plan = True169 cur_plan_ac = self._get_plan_ac(observations)170 if cur_plan_ac is None:171 self._term = True172 return get_noop_arm_action(self._sim)173 self._plan_idx += 1174 grip = self._get_gripper_ac(cur_plan_ac)175 return {176 "action": "ARM_ACTION",177 "action_args": {"arm_action": cur_plan_ac, "grip_action": grip},178 }179 def _get_plan_ac(self, observations) -> np.ndarray:180 r"""Get the plan action for the current timestep. By default return the181 action at the current plan index.182 """183 if self._plan is None:184 self._log("Planning failed")185 self._end_episode()186 return None187 if self.adjusted_plan_idx >= len(self._plan):188 return self._plan[-1]189 else:190 return self._plan[self.adjusted_plan_idx]191 def _internal_should_term(self, observations):192 return False193 def should_term(self, observations: Observations) -> bool:194 done = self._term195 if (196 self._plan is not None197 and self.adjusted_plan_idx >= len(self._plan) + self.wait_after198 ):199 self._log("Plan finished")200 done = True201 if self._plan_idx > self.timeout:202 self._log("Skill timed out")203 done = True204 if self._has_generated_plan and self._internal_should_term(205 observations206 ):207 self._log("Skill requested termination")208 done = True209 if done:210 self._log("Skill requested hard termination")211 self._on_done()212 return done213 def _get_force_set_ee(self):214 return None215 def _on_done(self):216 self._clean_viz_points()217 def _clean_viz_points(self):218 if not self._config.VERBOSE:219 return220 for viz_point_name in self._viz_points:221 if self._sim.viz_ids[viz_point_name] is None:222 continue223 self._sim.remove_object(self._sim.viz_ids[viz_point_name])224 del self._sim.viz_ids[viz_point_name]225 self._viz_points = []226 def _get_gripper_ac(self, plan_ac) -> float:227 # keep the gripper state as is.228 if self._sim.robot.is_gripper_open:229 grip = -1.0230 else:231 grip = 1.0232 return grip233 @property234 def adjusted_plan_idx(self) -> bool:235 return self._plan_idx // self._config.RUN_FREQ236 @abc.abstractmethod237 def _generate_plan(self, observations, **kwargs) -> np.ndarray:238 r"""Gets the plan this controller will execute.239 :return: Either a sequence of 3D EE targets or a sequence of arm joint240 targets.241 """242 def _clean_mp(self):243 if self._mp.traj_viz_id is not None:244 self._sim._sim.remove_traj_obj(self._mp.traj_viz_id)245 self._mp.traj_viz_id = None246class IkMoveArm(ArmTargModule):247 def _get_plan_ac(self, observations):248 ee_pos = observations[EEPositionSensor.cls_uuid]249 to_target = self._robot_target - ee_pos250 to_target = self._config.IK_SPEED_FACTOR * (251 to_target / np.linalg.norm(to_target)252 )253 return to_target254 def _on_done(self):255 super()._on_done()256 self._set_info("execute_ik_failure", 0)257 def _generate_plan(self, observations, robot_target, **kwargs):258 self._set_info("execute_ik_failure", 1)259 self._robot_target = robot_target260 def _internal_should_term(self, observations):261 dist_to_target = np.linalg.norm(262 observations["ee_pos"] - self._robot_target263 )264 return dist_to_target < self._config.IK_DIST_THRESH265class SpaManipPick(ArmTargModule):266 @property267 def wait_after(self):268 return 5269 def _internal_should_term(self, observations):270 is_holding = observations["is_holding"].item() == 1271 if is_holding:272 self._log("Robot is holding object, leaving pick")273 # Override indicating we succeeded274 self._set_info("execute_pick_failure", 0)275 return True276 else:277 return False278 def _generate_plan(self, observations, obj, **kwargs):279 self._set_info("execute_ee_to_obj_dist", 0)280 self._set_info("execute_ee_dist", 0)281 self._mp.set_config(282 self._config.MP_MARGIN,283 self._config.MP_OBJ,284 self._grasp_thresh,285 self._config.N_GRASPS,286 self._config,287 )288 obj_idx = self._sim.scene_obj_ids[obj]289 robo_targ = self._mp.grasp_gen.gen_target_from_obj_idx(obj_idx)290 self._targ_obj_idx = obj_idx291 self._robo_targ = robo_targ292 if self._config.VERBOSE:293 self._add_debug_viz_point(robo_targ.ee_target_pos)294 plan = self._mp.motion_plan(295 self._sim.robot.arm_joint_pos,296 robo_targ,297 timeout=self._config.TIMEOUT,298 )299 for k, v in self._mp.get_recent_plan_stats(plan, robo_targ).items():300 self._set_info(k, v)301 self._set_info("execute_bad_coll_failure", int(self._mp.was_bad_coll))302 # Don't double count execute failure.303 self._set_info("execute_failure", int(plan is not None))304 return plan305 def _on_done(self):306 super()._on_done()307 cur_ee = self._sim.robot.ee_transform.translation308 obj_pos = np.array(self._sim.get_translation(self._targ_obj_idx))309 ee_dist = np.linalg.norm(self._robo_targ.ee_target_pos - cur_ee)310 ee_dist_to_obj = np.linalg.norm(obj_pos - cur_ee)311 if (312 ee_dist_to_obj < self._grasp_thresh313 and ee_dist < self._config.EXEC_EE_THRESH314 ):315 self._set_info("execute_failure", 0)316 self._set_info("execute_bad_coll_failure", 0)317 else:318 self._set_info("execute_ee_to_obj_dist", ee_dist_to_obj)319 self._set_info("execute_ee_dist", ee_dist)320 self._clean_mp()321 def _get_gripper_ac(self, plan_ac):322 if self.adjusted_plan_idx >= len(self._plan):323 grip = 1324 else:325 grip = -1326 return grip327class SpaResetModule(ArmTargModule):328 def __init__(329 self,330 env,331 config,332 action_config,333 should_auto_end=True,334 ignore_first=False,335 auto_get_args_fn=None,336 ):337 super().__init__(338 env, config, action_config, should_auto_end, auto_get_args_fn339 )340 self._ignore_first = ignore_first341 def _generate_plan(self, observations, **kwargs):342 self._mp.set_config(343 self._config.MP_MARGIN,344 self._config.MP_OBJ,345 self._grasp_thresh,346 self._config.N_GRASPS,347 self._config,348 ignore_first=self._ignore_first,349 use_prev=True,350 )351 robo_targ = RobotTarget(352 joints_target=self._sim.robot.params.arm_init_params353 )354 plan = self._mp.motion_plan(355 self._sim.robot.arm_joint_pos,356 robo_targ,357 timeout=self._config.TIMEOUT,358 )359 for k, v in self._mp.get_recent_plan_stats(360 plan, robo_targ, "reset_"361 ).items():362 self._set_info(k, v)363 self._set_info(364 "execute_reset_bad_coll_failure", int(self._mp.was_bad_coll)365 )366 # Don't double count execute failure.367 self._set_info("execute_reset_failure", int(plan is not None))368 return plan369 def _on_done(self):370 super()._on_done()371 self._set_info("execute_reset_failure", 0)372 self._set_info("execute_reset_bad_coll_failure", 0)373 self._clean_mp()374 def _get_gripper_ac(self, plan_ac):375 if self._sim.robot.is_gripper_open:376 grip = -1.0377 else:378 grip = 1.0379 return grip380 @property381 def wait_after(self):382 return 0383def main():384 parser = argparse.ArgumentParser()385 parser.add_argument("--skill-type", default="pick")386 parser.add_argument("--num-eval", type=int, default=None)...

Full Screen

Full Screen

Game.py

Source:Game.py Github

copy

Full Screen

...11 if len(text) > 100:12 with open("QuoridorLog2.html", "w") as f:13 f.write(text)14 Glob.ui.infoBottomLabel.setText(text)15def _set_info(text):16 Glob.ui.infoLabel.setText(text)17def _check_turn():18 if Glob.turn == Glob.ui.username:19 return True20 return False21def change_grids(main_grid, wallh_grid, wallv_grid, wallfills_grid, walls=None):22 """Update the grids on the UI."""23 # Update cells.24 for y in range(9):25 for x in range(9):26 if main_grid[y][x] == 0:27 Glob.ui.cells[y][x].setText("")28 else:29 Glob.ui.cells[y][x].setText(str(main_grid[y][x]))30 # Update horizontal walls.31 for y in range(8):32 for x in range(9):33 if wallh_grid[y][x] == 0:34 Glob.ui.wallsh[y][x].setPalette(Glob.ui.empty_wallh_palette)35 else:36 Glob.ui.wallsh[y][x].setPalette(Glob.ui.wall_palette)37 # Update vertical walls.38 for y in range(9):39 for x in range(8):40 if wallv_grid[y][x] == 0:41 Glob.ui.wallsv[y][x].setPalette(Glob.ui.empty_wallv_palette)42 else:43 Glob.ui.wallsv[y][x].setPalette(Glob.ui.wall_palette)44 # Update wallfills.45 for y in range(8):46 for x in range(8):47 if wallfills_grid[y][x] == 0:48 Glob.ui.wallfills[y][x].setPalette(Glob.ui.empty_wallv_palette)49 else:50 Glob.ui.wallfills[y][x].setPalette(Glob.ui.wall_palette)51 # Update wall numbers for each player.52 if walls != None:53 remaining_walls = "Remaining walls:\n"54 for user in ("1", "2", "3", "4"):55 if user in walls:56 remaining_walls += ("%s (%s): %d\n" %57 (walls[user][0], user, walls[user][1]))58 else:59 remaining_walls = ""60 Glob.ui.remainingWallsLabel.setText(remaining_walls)61def _wait_for_turn(old_turn, starting=False):62 """Wait for the player's turn.63Works by connecting frequently to check status.64"""65 while True:66 sleep(Glob.wait_time)67 try:68 response = Glob.ui.session.get(Urls.urls["play_and_status"])69 _set_bottom_info("")70 except requests.exceptions.ConnectionError:71 _set_bottom_info("Connection failed.")72 continue73 74 result = response.text.split("\n")75 try:76 result[0] = json.loads(result[0])77 except ValueError:78 _set_bottom_info("Something went wrong: " + response.text)79 continue80 81 if not response.ok:82 _set_bottom_info(result[0]["error"])83 if "new" in result[0] or "waiting" in result[0]:84 _set_info(result[0]["status"])85 _set_bottom_info("")86 if "new" in result[0]:87 Glob.turn = result[0]["turn"]88 if len(result) > 1:89 main_grid = json.loads(result[1])90 wallh_grid = json.loads(result[2])91 wallv_grid = json.loads(result[3])92 wallfills_grid = json.loads(result[4])93 walls = json.loads(result[5])94 change_grids(main_grid, wallh_grid, wallv_grid,95 wallfills_grid, walls)96 continue97 if "winner" in result[0]:98 if result[0]["winner"] == Glob.ui.username:99 _set_info("You have won the game!")100 else:101 _set_info(result[0]["status"])102 _set_bottom_info("")103 Glob.ui.won = True104 if "stopped" in result[0]:105 _set_info(result[0]["status"])106 Glob.ui.stopped = True107 Glob.ui.goTo("twoOrFour")108 return109 main_grid = json.loads(result[1])110 wallh_grid = json.loads(result[2])111 wallv_grid = json.loads(result[3])112 wallfills_grid = json.loads(result[4])113 walls = json.loads(result[5])114 change_grids(main_grid, wallh_grid, wallv_grid,115 wallfills_grid, walls)116 if Glob.ui.won:117 return118 Glob.turn = result[0]["turn"]119 if result[0]["turn"] == Glob.ui.username:120 _set_info("It's your turn now.")121 _set_bottom_info("")122 break123 if result[0]["turn"] != old_turn:124 _set_info("It's %s's turn now. Waiting..." % result[0]["turn"])125 _set_bottom_info("")126def _wait_for_turn_thread(old_turn, starting=False):127 thread = threading.Thread(target=_wait_for_turn,128 args=(old_turn,), kwargs={"starting": starting})129 thread.daemon = True130 thread.start()131def _request_move(payload):132 """Sends a request to the server containing the game move."""133 try:134 response = Glob.ui.session.post(Urls.urls["play_and_status"], data=payload)135 except requests.exceptions.ConnectionError:136 _set_bottom_info("Connection failed.")137 return138 result = response.text.split("\n")139 try:140 result[0] = json.loads(result[0])141 except ValueError:142 _set_bottom_info("Something went wrong: " + response.text)143 return144 if "error" in result[0]:145 if response.status_code == 405:146 _set_bottom_info("")147 _set_info(result[0]["error"])148 else:149 _set_bottom_info(result[0]["error"])150 _set_info("")151 return152 # Extract data and update grids.153 status = result[0]["status"]154 if "winner" in result[0]:155 if result[0]["winner"] == Glob.ui.username:156 _set_info("You have won the game!")157 else:158 _set_info(result[0]["status"])159 _set_bottom_info("")160 Glob.ui.won = True161 else:162 turn = result[0]["turn"]163 Glob.turn = result[0]["turn"]164 main_grid = json.loads(result[1])165 wallh_grid = json.loads(result[2])166 wallv_grid = json.loads(result[3])167 wallfills_grid = json.loads(result[4])168 walls = json.loads(result[5])169 change_grids(main_grid, wallh_grid, wallv_grid, wallfills_grid, walls)170 if Glob.ui.won:171 return172 _set_info("Done. It's %s's turn now. Waiting..." % turn)173 _set_bottom_info("")174 _wait_for_turn_thread(turn)175def clickedCell(x, y):176 """Slot function for a cell being clicked."""177 if not _check_turn() or Glob.ui.won or Glob.ui.stopped:178 return179 payload = {"move": json.dumps({"type": "move", "x": x, "y": y})}180 _request_move(payload)181def clickedWallh(x, y):182 """Slot function for a horizontal wall being clicked."""183 if not _check_turn() or Glob.ui.won or Glob.ui.stopped:184 return185 payload = {"move": json.dumps({"type": "wall", "direction": "h",186 "x": x, "y": y})}...

Full Screen

Full Screen

Slots.py

Source:Slots.py Github

copy

Full Screen

...10 if len(text) > 100:11 with open("QuoridorLog1.html", "w") as f:12 f.write(text)13 self.infoBottomLabel.setText(text)14 def _set_info(self, text):15 self.infoLabel.setText(text)16 def quit(self):17 self.MainWindow.close()18 def loginButtonSlot(self):19 self.goTo("signin")20 def signupButtonSlot(self):21 self.goTo("signup")22 def exitButtonSlot(self):23 self.quit()24 def actionExitSlot(self):25 self.quit()26 def signinOkButtonSlot(self):27 username = self.signinUsernameInput.text()28 password = self.signinPasswordInput.text()29 payload = {"username": username, "password": password}30 try:31 response = self.session.post(Urls.urls["signin"], data=payload)32 except requests.exceptions.ConnectionError:33 self._set_bottom_info("Connection failed.")34 return35 if response.ok:36 self._set_info(response.text)37 self._set_bottom_info("")38 self.username = username39 if response.text != "You're already in a game.":40 self.goTo("twoOrFour")41 else:42 self.goTo("game")43 Game._wait_for_turn_thread("")44 else:45 self._set_info("")46 self._set_bottom_info(response.text)47 def signupOkButtonSlot(self):48 name = self.signupNameInput.text()49 username = self.signupUsernameInput.text()50 password = self.signupPasswordInput.text()51 payload = {"username": username, "password": password, "name": name}52 try:53 response = self.session.post(Urls.urls["signup"], data=payload)54 except requests.exceptions.ConnectionError:55 self._set_bottom_info("Connection failed.")56 self._set_info("")57 return58 if response.ok:59 self._set_info(response.text)60 self._set_bottom_info("")61 self.goTo("menu")62 else:63 self._set_info("")64 self._set_bottom_info(response.text)65 def twoButtonSlot(self):66 payload = {"players": "two"}67 try:68 response = self.session.post(Urls.urls["two_or_four"], data=payload)69 except requests.exceptions.ConnectionError:70 self._set_bottom_info("Connection failed.")71 return72 if response.ok:73 self._set_info(response.text)74 self._set_bottom_info("")75 self.goTo("game")76 Game._wait_for_turn_thread("", starting=True)77 else:78 self._set_info("")79 self._set_bottom_info(response.text)80 def fourButtonSlot(self):81 payload = {"players": "four"}82 try:83 response = self.session.post(Urls.urls["two_or_four"], data=payload)84 except requests.exceptions.ConnectionError:85 self._set_bottom_info("Connection failed.")86 return87 if response.ok:88 self._set_info(response.text)89 self._set_bottom_info("")90 self.goTo("game")91 Game._wait_for_turn_thread("", starting=True)92 else:93 self._set_info("")94 self._set_bottom_info(response.text)95 def leaveButtonSlot(self):96 if self.stopped:97 self.goTo("twoOrFour")98 return99 try:100 response = self.session.get(Urls.urls["leave"])101 except requests.exceptions.ConnectionError:102 self._set_bottom_info("Connection failed.")103 return104 if response.ok:105 result = json.loads(response.text)106 self._set_info(result["status"])107 self._set_bottom_info("")108 self.stopped = True109 self.goTo("twoOrFour")110 else:111 try:112 result = json.loads(response.text)113 except ValueError:114 self._set_info("")115 self._set_bottom_info(response.text)116 return117 self._set_info("")118 self._set_bottom_info(result["error"])119 self.goTo("twoOrFour")120 def scoresButtonSlot(self):121 try:122 response = self.session.get(Urls.urls["scores"])123 except requests.exceptions.ConnectionError:124 self._set_bottom_info("Connection failed.")125 return126 if response.ok:127 self.scoresBox.setText(response.text)128 self.scoresBox.show()129 else:130 self._set_info("")131 self._set_bottom_info(response.text)132 def infoButtonSlot(self):133 try:134 response = self.session.get(Urls.urls["user_info"])135 except requests.exceptions.ConnectionError:136 self._set_bottom_info("Connection failed.")137 return138 if response.ok:139 self.infoBox.setText(response.text)140 self.infoBox.show()141 else:142 self._set_info("")143 self._set_bottom_info(response.text)144 def signoutButtonSlot(self):145 try:146 response = self.session.get(Urls.urls["logout"])147 except requests.exceptions.ConnectionError:148 self._set_bottom_info("Connection failed.")149 return150 if response.ok:151 self._set_info(response.text)152 self._set_bottom_info("")153 self.username = ""154 self.goTo("menu")155 else:156 self._set_info("")157 self._set_bottom_info(response.text)158 def signinBackButtonSlot(self):159 self.goTo("menu")160 def signupBackButtonSlot(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful