How to use _formatAction method in fMBT

Best Python code snippet using fMBT_python

AbstractEnv.py

Source:AbstractEnv.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""AbstractEnv to make the link between Gym and Sofa.3"""4__authors__ = ("PSC", "dmarchal", "emenager")5__contact__ = ("pierre.schegg@robocath.com", "damien.marchal@univ-lille.fr", "etienne.menager@ens-rennes.fr")6__version__ = "1.0.0"7__copyright__ = "(c) 2020, Robocath, CNRS, Inria"8__date__ = "Oct 7 2020"9import gym10from gym.utils import seeding11import numpy as np12import copy13import os14import splib315from sofagym.env.common.viewer import Viewer16from sofagym.env.common.rpc_server import start_server, add_new_step, get_result, clean_registry, close_scene17class AbstractEnv(gym.Env):18 """Use Sofa scene with a Gym interface.19 Methods:20 -------21 __init__: classical __init__ method.22 initialization: Initialization of all arguments.23 seed: Initialization of the seed.24 step: Realise a step in the environment.25 async_step: Realise a step without blocking queue.26 reset: Reset the environment and useful arguments.27 render: Use viewer to see the environment.28 _automatic_rendering: Automatically render the intermediate frames29 while an action is still ongoing.30 close: Terminate the simulation.31 configure: Add element in the configuration.32 clean: clean the registery.33 _formataction.. : transforme the type of action to use server.34 Arguments:35 ---------36 config: Dictionary.37 Contains the configuration of the environment.38 Minimum:39 - scene : the name of the simulation.40 Note: define the name of the toolbox <scene>Toolbox and the41 scene <scene>Scene in the directory ../<scene>.42 - deterministic: whether or not the environment is deterministic.43 - source,target: definition of the Sofa camera point of view.44 - goalList : list of the goals to reach (position or index).45 - start_node: the start node (position or index).46 - scale_factor: int that define the number of step in simulation.47 - timer_limit: int that define the maximum number of steps.48 - timeout: int that define the timeout for the server/client requests.49 - display_size: tuple of int that define the size of the Viewer50 window.51 - save_path: path to save the image of the simulation.52 - render: wheter or not the viewer displays images.53 0: no rendering.54 1: render after simulation.55 2: render all steps.56 Warning: we can't change this value after initialization.57 - save_data: wheter or not the data are saved.58 - save_image: wheter or not the images are saved.59 - planning: if realise planning or not.60 - discrete: if the environment is discrete or not.61 - timer_limit: the limit of the time.62 - seed : the seed.63 - start_from_history: list of actions that have to be carried64 out before starting the training.65 - python_version: the version of python.66 - time_before_start: initialize the simulation with time_before_start steps.67 observation_space: spaces.Box68 Define the size of the environment.69 past_actions: list of int.70 Keeps track of past actions. Allows you to retrieve past71 configurations of the environment.72 goalList: list73 List of possible objectives to be achieved.74 goal: list75 Current objective.76 num_envs: int77 The number of environment.78 np_random: np.random.RandomState()79 Exposes a number of methods for generating random numbers80 viewer: <class viewer>81 Allows to manage the visual feedback of the simulation.82 automatic_rendering_callback:83 Callback function used in _automatic_rendering.84 timer:85 Number of steps already completed.86 deterministic:87 Whether the environment is deterministic or not.88 timeout:89 Number of times the queue is blocking. Allows to avoid blocking90 situations.91 Notes:92 -----93 It is necessary to define the specificity of the environment in a94 subclass.95 Usage:96 -----97 Use the reset method before launch the environment.98 """99 def __init__(self, config=None):100 """101 Classic initialization of a class in python.102 Parameters:103 ----------104 config: Dictionary or None, default = None105 Customisable configuration element.106 Returns:107 ---------108 None.109 """110 # Define a DEFAULT_CONFIG in sub-class.111 self.config = copy.deepcopy(self.DEFAULT_CONFIG)112 if config is not None:113 self.config.update(config)114 self.initialization()115 def initialization(self):116 """Initialization of all parameters.117 Parameters:118 ----------119 None.120 Returns:121 -------122 None.123 """124 self.goalList = None125 self.goal = None126 self.past_actions = []127 self.num_envs = 40128 self.np_random = None129 self.seed(self.config['seed'])130 self.viewer = None131 self.automatic_rendering_callback = None132 self.timer = 0133 self.timeout = self.config["timeout"]134 # Start the server which distributes the calculations to its clients135 start_server(self.config)136 if 'save_data' in self.config and self.config['save_data']:137 save_path_results = self.config['save_path']+"/data"138 os.makedirs(save_path_results, exist_ok=True)139 else:140 save_path_results = None141 if 'save_image' in self.config and self.config['save_image']:142 save_path_image = self.config['save_path']+"/img"143 os.makedirs(save_path_image, exist_ok=True)144 else:145 save_path_image = None146 self.configure({"save_path_image": save_path_image, "save_path_results": save_path_results})147 def seed(self, seed=None):148 """149 Computes the random generators of the environment.150 Parameters:151 ----------152 seed: int, 1D array or None, default = None153 seed for the RandomState.154 Returns:155 ---------156 [seed]157 """158 self.np_random, seed = seeding.np_random(seed)159 return [seed]160 def _formataction(self, action):161 """Change the type of action to be in [list, float, int].162 Parameters:163 ----------164 action:165 The action with no control on the type.166 Returns:167 -------168 action: in [list, float, int]169 The action with control on the type.170 """171 if isinstance(action, np.ndarray):172 action = action.tolist()173 elif isinstance(action, np.int64):174 action = int(action)175 elif isinstance(action, np.float64):176 action = float(action)177 elif isinstance(action, tuple):178 action = self._formatactionTuple(action)179 elif isinstance(action, dict):180 action = self._formatactionDict(action)181 return action182 def _formatactionTuple(self, action):183 """Change the type of tuple action to be in [list, float, int].184 Parameters:185 ----------186 action:187 The action with no control on the type.188 Returns:189 -------190 action:191 The action with control on the type.192 """193 return self._formataction(action[0]), self._formataction(action[1])194 def _formatactionDict(self, action):195 """Change the type of tuple action to be in [list, float, int].196 Parameters:197 ----------198 action:199 The action with no control on the type.200 Returns:201 -------202 action:203 The action with control on the type.204 """205 for key in action.keys():206 action[key] = self._formataction(action[key])207 return action208 def clean(self):209 """Function to clean the registery .210 Close clients who are processing unused sequences of actions (for211 planning)212 Parameters:213 ----------214 None.215 Returns:216 -------217 None.218 """219 clean_registry(self.past_actions)220 def step(self, action):221 """Executes one action in the environment.222 Apply action and execute scale_factor simulation steps of 0.01 s.223 Parameters:224 ----------225 action: int226 Action applied in the environment.227 Returns:228 -------229 obs:230 The new state of the agent.231 reward:232 The reward obtain after applying the action in the current state.233 done:234 Whether the goal is reached or not.235 {}: additional information (not used here)236 """237 # assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))238 action = self._formataction(action)239 # Pass the actions to the server to launch the simulation.240 result_id = add_new_step(self.past_actions, action)241 self.past_actions.append(action)242 # Request results from the server.243 # print("[INFO] >>> Result id:", result_id)244 results = get_result(result_id, timeout=self.timeout)245 obs = np.array(results["observation"]) # to work with baseline246 reward = results["reward"]247 done = results["done"]248 # Avoid long explorations by using a timer.249 self.timer += 1250 if self.timer >= self.config["timer_limit"]:251 # reward = -150252 done = True253 if self.config["planning"]:254 self.clean()255 return obs, reward, done, {}256 def async_step(self, action):257 """Executes one action in the environment.258 Apply action and execute scale_factor simulation steps of 0.01 s.259 Like step but useful if you want to parallelise (blocking "get").260 Otherwise use step.261 Parameters:262 ----------263 action: int264 Action applied in the environment.265 Returns:266 -------267 LateResult:268 Class which allows to store the id of the client who performs269 the calculation and to return later the usual information270 (observation, reward, done) thanks to a get method.271 """272 assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))273 result_id = add_new_step(self.past_actions, action)274 self.past_actions.append(action)275 class LateResult:276 def __init__(self, result_id):277 self.result_id = result_id278 def get(self, timeout=None):279 results = get_result(self.result_id, timeout=timeout)280 obs = results["observation"]281 reward = results["reward"]282 done = results["done"]283 return obs, reward, done, {}284 return LateResult(copy.copy(result_id))285 def reset(self):286 """Reset simulation.287 Parameters:288 ----------289 None.290 Returns:291 -------292 None.293 """294 self.close()295 self.initialization()296 splib3.animation.animate.manager = None297 if not self.goalList:298 self.goalList = self.config["goalList"]299 # Set a new random goal from the list300 id_goal = self.np_random.choice(range(len(self.goalList)))301 self.config.update({'goal_node': id_goal})302 self.goal = self.goalList[id_goal]303 self.timer = 0304 self.past_actions = []305 return306 def render(self, mode='rgb_array'):307 """See the current state of the environment.308 Get the OpenGL Context to render an image (snapshot) of the simulation309 state.310 Parameters:311 ----------312 mode: string, default = 'rgb_array'313 Type of representation.314 Returns:315 -------316 None.317 """318 if self.config['render'] != 0:319 # Define the viewer at the first run of render.320 if not self.viewer:321 display_size = self.config["display_size"] # Sim display322 if 'zFar' in self.config:323 zFar = self.config['zFar']324 else:325 zFar = 0326 self.viewer = Viewer(self, display_size, zFar=zFar, save_path=self.config["save_path_image"])327 # Use the viewer to display the environment.328 self.viewer.render()329 else:330 print(">> No rendering")331 def _automatic_rendering(self):332 """Automatically render the intermediate frames while an action is still ongoing.333 This allows to render the whole video and not only single steps334 corresponding to agent decision-making.335 If a callback has been set, use it to perform the rendering. This is336 useful for the environment wrappers such as video-recording monitor that337 need to access these intermediate renderings.338 Parameters:339 ----------340 None.341 Returns:342 -------343 None.344 """345 if self.viewer is not None:346 if self.automatic_rendering_callback:347 self.automatic_rendering_callback()348 else:349 self.render()350 def close(self):351 """Terminate simulation.352 Close the viewer and the scene.353 Parametres:354 ----------355 None.356 Returns:357 -------358 None.359 """360 if self.viewer is not None:361 self.viewer.close()362 close_scene()363 print("All clients are closed. Bye Bye.")364 def configure(self, config):365 """Update the configuration.366 Parameters:367 ----------368 config: Dictionary.369 Elements to be added in the configuration.370 Returns:371 -------372 None.373 """...

Full Screen

Full Screen

fmbtlogger.py

Source:fmbtlogger.py Github

copy

Full Screen

...28import sys29import traceback30import types31import fmbt32def _formatAction(fmt, actionName):33 values = {34 "action": actionName35 }36 return fmt % values37def _formatKwArgs(kwargs):38 l = []39 for key in sorted(kwargs):40 v = kwargs[key]41 if type(v) == str: l.append("%s=%s" % (key, repr(v)))42 else: l.append("%s=%s" % (key, str(v)))43 return l44def _formatCall(fmt, func, args, kwargs):45 arglist = []46 for a in args:47 if type(a) == str: arglist.append(repr(a))48 else: arglist.append(str(a))49 values = {50 "func": func.__name__,51 "args": ", ".join(arglist),52 "kwargs": ", ".join([""] + _formatKwArgs(kwargs))53 }54 return fmt % values55def _formatRetunValue(fmt, retval):56 if type(retval) == str: values={'value': repr(retval)}57 else: values={'value': str(retval)}58 return fmt % values59def _formatException(fmt):60 s = traceback.format_exc()61 exc, msg, _ = sys.exc_info()62 values = {63 "tb": s,64 "exc": exc.__name__,65 "msg": msg66 }67 return fmt % values68class FileToLogFunc(object):69 def __init__(self, fileObj, timeFormat="%s.%f "):70 self._fileObj = fileObj71 self._timeFormat = timeFormat72 def __call__(self, msg):73 self._fileObj.write("%s%s\n" % (74 datetime.datetime.now().strftime(self._timeFormat), msg))75class LogWriter(object):76 """77 LogWriter interface has the following methods:78 start(actionName)79 called after the execution of next action in fMBT model80 has started.81 end(actionName)82 called after the execution of previously executed action83 has ended.84 call(func, args, kwargs)85 called before func(args, kwargs) is called in logged86 interface.87 ret(returnValue)88 called after function returned returnValue.89 exc()90 called after function raised exception. The exception91 can be inspected using sys.exc_info().92 """93 defaultFormats = {94 "start": "%(action)s",95 "end": "",96 "call": "%(func)s(%(args)s%(kwargs)s)",97 "ret": "= %(value)s",98 "exc": "! %(exc)s (%(msg)s)"99 }100class CSVLogWriter(LogWriter):101 # TODO: fmbtandroid should add here some checks include screenshots102 # to the log where appropriate103 def __init__(self, logFunc, separator=";", formats=None):104 if formats == None:105 self.formats = CSVLogWriter.defaultFormats106 else:107 self.formats = {}108 for key in CSVLogWriter.defaultFormats:109 self.formats = formats.get(key, CSVLogWriter.defaultFormats[key])110 self.logFunc = logFunc111 self.depth = 0112 self.separator = separator113 def _log(self, msg):114 if len(msg.strip()) > 0:115 prefix = (self.separator * self.depth)116 self.logFunc(prefix + msg)117 def start(self, actionName):118 msg = _formatAction(self.formats["start"], actionName)119 self._log(msg)120 self.depth += 1121 def end(self, actionName):122 msg = _formatAction(self.formats["end"], actionName)123 self._log(msg)124 self.depth -= 1125 def call(self, func, args, kwargs):126 msg = _formatCall(self.formats["call"], func, args, kwargs)127 self._log(msg)128 self.depth += 1129 def ret(self, returnValue):130 self.depth -= 1131 msg = _formatRetunValue(self.formats["ret"], returnValue)132 self._log(msg)133 def exc(self):134 self.depth -= 1135 msg = _formatException(self.formats["exc"])136 self._log(msg)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful