Best Python code snippet using fMBT_python
AbstractEnv.py
Source:AbstractEnv.py  
1# -*- coding: utf-8 -*-2"""AbstractEnv to make the link between Gym and Sofa.3"""4__authors__ = ("PSC", "dmarchal", "emenager")5__contact__ = ("pierre.schegg@robocath.com", "damien.marchal@univ-lille.fr", "etienne.menager@ens-rennes.fr")6__version__ = "1.0.0"7__copyright__ = "(c) 2020, Robocath, CNRS, Inria"8__date__ = "Oct 7 2020"9import gym10from gym.utils import seeding11import numpy as np12import copy13import os14import splib315from sofagym.env.common.viewer import Viewer16from sofagym.env.common.rpc_server import start_server, add_new_step, get_result, clean_registry, close_scene17class AbstractEnv(gym.Env):18    """Use Sofa scene with a Gym interface.19    Methods:20    -------21        __init__: classical __init__ method.22        initialization: Initialization of all arguments.23        seed: Initialization of the seed.24        step: Realise a step in the environment.25        async_step: Realise a step without blocking queue.26        reset: Reset the environment and useful arguments.27        render: Use viewer to see the environment.28        _automatic_rendering: Automatically render the intermediate frames29            while an action is still ongoing.30        close: Terminate the simulation.31        configure: Add element in the configuration.32        clean: clean the registery.33        _formataction.. : transforme the type of action to use server.34    Arguments:35    ---------36        config: Dictionary.37            Contains the configuration of the environment.38            Minimum:39                - scene : the name of the simulation.40                    Note: define the name of the toolbox <scene>Toolbox and the41                    scene <scene>Scene in the directory ../<scene>.42                - deterministic: whether or not the environment is deterministic.43                - source,target: definition of the Sofa camera point of view.44                - goalList : list of the goals to reach (position or index).45                - start_node: the start node (position or index).46                - scale_factor: int that define the number of step in simulation.47                - timer_limit: int that define the maximum number of steps.48                - timeout: int that define the timeout for the server/client requests.49                - display_size: tuple of int that define the size of the Viewer50                    window.51                - save_path: path to save the image of the simulation.52                - render: wheter or not the viewer displays images.53                    0: no rendering.54                    1: render after simulation.55                    2: render all steps.56                    Warning: we can't change this value after initialization.57                - save_data: wheter or not the data are saved.58                - save_image: wheter or not the images are saved.59                - planning: if realise planning or not.60                - discrete: if the environment is discrete or not.61                - timer_limit: the limit of the time.62                - seed : the seed.63                - start_from_history: list of actions that have to be carried64                    out before starting the training.65                - python_version: the version of python.66                - time_before_start: initialize the simulation with time_before_start steps.67        observation_space: spaces.Box68            Define the size of the environment.69        past_actions: list of int.70            Keeps track of past actions. Allows you to retrieve past71            configurations of the environment.72        goalList: list73            List of possible objectives to be achieved.74        goal: list75            Current objective.76        num_envs: int77            The number of environment.78        np_random:  np.random.RandomState()79             Exposes a number of methods for generating random numbers80        viewer: <class viewer>81            Allows to manage the visual feedback of the simulation.82        automatic_rendering_callback:83            Callback function used in _automatic_rendering.84        timer:85            Number of steps already completed.86        deterministic:87            Whether the environment is deterministic or not.88        timeout:89            Number of times the queue is blocking. Allows to avoid blocking90            situations.91    Notes:92    -----93        It is necessary to define the specificity of the environment in a94        subclass.95    Usage:96    -----97        Use the reset method before launch the environment.98    """99    def __init__(self, config=None):100        """101        Classic initialization of a class in python.102        Parameters:103        ----------104        config: Dictionary or None, default = None105            Customisable configuration element.106        Returns:107        ---------108            None.109        """110        # Define a DEFAULT_CONFIG in sub-class.111        self.config = copy.deepcopy(self.DEFAULT_CONFIG)112        if config is not None:113            self.config.update(config)114        self.initialization()115    def initialization(self):116        """Initialization of all parameters.117        Parameters:118        ----------119            None.120        Returns:121        -------122            None.123        """124        self.goalList = None125        self.goal = None126        self.past_actions = []127        self.num_envs = 40128        self.np_random = None129        self.seed(self.config['seed'])130        self.viewer = None131        self.automatic_rendering_callback = None132        self.timer = 0133        self.timeout = self.config["timeout"]134        # Start the server which distributes the calculations to its clients135        start_server(self.config)136        if 'save_data' in self.config and self.config['save_data']:137            save_path_results = self.config['save_path']+"/data"138            os.makedirs(save_path_results, exist_ok=True)139        else:140            save_path_results = None141        if 'save_image' in self.config and self.config['save_image']:142            save_path_image = self.config['save_path']+"/img"143            os.makedirs(save_path_image, exist_ok=True)144        else:145            save_path_image = None146        self.configure({"save_path_image": save_path_image, "save_path_results": save_path_results})147    def seed(self, seed=None):148        """149        Computes the random generators of the environment.150        Parameters:151        ----------152        seed: int, 1D array or None, default = None153            seed for the RandomState.154        Returns:155        ---------156            [seed]157        """158        self.np_random, seed = seeding.np_random(seed)159        return [seed]160    def _formataction(self, action):161        """Change the type of action to be in [list, float, int].162        Parameters:163        ----------164            action:165                The action with no control on the type.166        Returns:167        -------168            action: in [list, float, int]169                The action with  control on the type.170        """171        if isinstance(action, np.ndarray):172            action = action.tolist()173        elif isinstance(action, np.int64):174            action = int(action)175        elif isinstance(action, np.float64):176            action = float(action)177        elif isinstance(action, tuple):178             action = self._formatactionTuple(action)179        elif isinstance(action, dict):180            action = self._formatactionDict(action)181        return action182    def _formatactionTuple(self, action):183        """Change the type of tuple action to be in [list, float, int].184        Parameters:185        ----------186            action:187                The action with no control on the type.188        Returns:189        -------190            action:191                The action with  control on the type.192        """193        return self._formataction(action[0]), self._formataction(action[1])194    def _formatactionDict(self, action):195        """Change the type of tuple action to be in [list, float, int].196        Parameters:197        ----------198            action:199                The action with no control on the type.200        Returns:201        -------202            action:203                The action with  control on the type.204        """205        for key in action.keys():206            action[key] = self._formataction(action[key])207        return action208    def clean(self):209        """Function to clean the registery .210        Close clients who are processing unused sequences of actions (for211        planning)212        Parameters:213        ----------214            None.215        Returns:216        -------217            None.218        """219        clean_registry(self.past_actions)220    def step(self, action):221        """Executes one action in the environment.222        Apply action and execute scale_factor simulation steps of 0.01 s.223        Parameters:224        ----------225            action: int226                Action applied in the environment.227        Returns:228        -------229            obs:230                The new state of the agent.231            reward:232                The reward obtain after applying the action in the current state.233            done:234                Whether the goal is reached or not.235            {}: additional information (not used here)236        """237        # assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))238        action = self._formataction(action)239        # Pass the actions to the server to launch the simulation.240        result_id = add_new_step(self.past_actions, action)241        self.past_actions.append(action)242        # Request results from the server.243        # print("[INFO]   >>> Result id:", result_id)244        results = get_result(result_id, timeout=self.timeout)245        obs = np.array(results["observation"])  # to work with baseline246        reward = results["reward"]247        done = results["done"]248        # Avoid long explorations by using a timer.249        self.timer += 1250        if self.timer >= self.config["timer_limit"]:251            # reward = -150252            done = True253        if self.config["planning"]:254            self.clean()255        return obs, reward, done, {}256    def async_step(self, action):257        """Executes one action in the environment.258        Apply action and execute scale_factor simulation steps of 0.01 s.259        Like step but useful if you want to parallelise (blocking "get").260        Otherwise use step.261        Parameters:262        ----------263            action: int264                Action applied in the environment.265        Returns:266        -------267            LateResult:268                Class which allows to store the id of the client who performs269                the calculation and to return later the usual information270                (observation, reward, done) thanks to a get method.271        """272        assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))273        result_id = add_new_step(self.past_actions, action)274        self.past_actions.append(action)275        class LateResult:276            def __init__(self, result_id):277                self.result_id = result_id278            def get(self, timeout=None):279                results = get_result(self.result_id, timeout=timeout)280                obs = results["observation"]281                reward = results["reward"]282                done = results["done"]283                return obs, reward, done, {}284        return LateResult(copy.copy(result_id))285    def reset(self):286        """Reset simulation.287        Parameters:288        ----------289            None.290        Returns:291        -------292            None.293        """294        self.close()295        self.initialization()296        splib3.animation.animate.manager = None297        if not self.goalList:298            self.goalList = self.config["goalList"]299        # Set a new random goal from the list300        id_goal = self.np_random.choice(range(len(self.goalList)))301        self.config.update({'goal_node': id_goal})302        self.goal = self.goalList[id_goal]303        self.timer = 0304        self.past_actions = []305        return306    def render(self, mode='rgb_array'):307        """See the current state of the environment.308        Get the OpenGL Context to render an image (snapshot) of the simulation309        state.310        Parameters:311        ----------312            mode: string, default = 'rgb_array'313                Type of representation.314        Returns:315        -------316            None.317        """318        if self.config['render'] != 0:319            # Define the viewer at the first run of render.320            if not self.viewer:321                display_size = self.config["display_size"]  # Sim display322                if 'zFar' in self.config:323                    zFar = self.config['zFar']324                else:325                    zFar = 0326                self.viewer = Viewer(self, display_size, zFar=zFar, save_path=self.config["save_path_image"])327            # Use the viewer to display the environment.328            self.viewer.render()329        else:330            print(">> No rendering")331    def _automatic_rendering(self):332        """Automatically render the intermediate frames while an action is still ongoing.333        This allows to render the whole video and not only single steps334        corresponding to agent decision-making.335        If a callback has been set, use it to perform the rendering. This is336        useful for the environment wrappers such as video-recording monitor that337        need to access these intermediate renderings.338        Parameters:339        ----------340            None.341        Returns:342        -------343            None.344        """345        if self.viewer is not None:346            if self.automatic_rendering_callback:347                self.automatic_rendering_callback()348            else:349                self.render()350    def close(self):351        """Terminate simulation.352        Close the viewer and the scene.353        Parametres:354        ----------355            None.356        Returns:357        -------358            None.359        """360        if self.viewer is not None:361            self.viewer.close()362        close_scene()363        print("All clients are closed. Bye Bye.")364    def configure(self, config):365        """Update the configuration.366        Parameters:367        ----------368            config: Dictionary.369                Elements to be added in the configuration.370        Returns:371        -------372            None.373        """...fmbtlogger.py
Source:fmbtlogger.py  
...28import sys29import traceback30import types31import fmbt32def _formatAction(fmt, actionName):33    values = {34        "action": actionName35        }36    return fmt % values37def _formatKwArgs(kwargs):38    l = []39    for key in sorted(kwargs):40        v = kwargs[key]41        if type(v) == str: l.append("%s=%s" % (key, repr(v)))42        else: l.append("%s=%s" % (key, str(v)))43    return l44def _formatCall(fmt, func, args, kwargs):45    arglist = []46    for a in args:47        if type(a) == str: arglist.append(repr(a))48        else: arglist.append(str(a))49    values = {50        "func": func.__name__,51        "args": ", ".join(arglist),52        "kwargs": ", ".join([""] + _formatKwArgs(kwargs))53        }54    return fmt % values55def _formatRetunValue(fmt, retval):56    if type(retval) == str: values={'value': repr(retval)}57    else: values={'value': str(retval)}58    return fmt % values59def _formatException(fmt):60    s = traceback.format_exc()61    exc, msg, _ = sys.exc_info()62    values = {63        "tb": s,64        "exc": exc.__name__,65        "msg": msg66        }67    return fmt % values68class FileToLogFunc(object):69    def __init__(self, fileObj, timeFormat="%s.%f "):70        self._fileObj = fileObj71        self._timeFormat = timeFormat72    def __call__(self, msg):73        self._fileObj.write("%s%s\n" % (74                datetime.datetime.now().strftime(self._timeFormat), msg))75class LogWriter(object):76    """77    LogWriter interface has the following methods:78      start(actionName)79              called after the execution of next action in fMBT model80              has started.81      end(actionName)82              called after the execution of previously executed action83              has ended.84      call(func, args, kwargs)85              called before func(args, kwargs) is called in logged86              interface.87      ret(returnValue)88              called after function returned returnValue.89      exc()90              called after function raised exception. The exception91              can be inspected using sys.exc_info().92    """93    defaultFormats = {94        "start": "%(action)s",95        "end": "",96        "call": "%(func)s(%(args)s%(kwargs)s)",97        "ret": "= %(value)s",98        "exc": "! %(exc)s (%(msg)s)"99        }100class CSVLogWriter(LogWriter):101    # TODO: fmbtandroid should add here some checks include screenshots102    # to the log where appropriate103    def __init__(self, logFunc, separator=";", formats=None):104        if formats == None:105            self.formats = CSVLogWriter.defaultFormats106        else:107            self.formats = {}108            for key in CSVLogWriter.defaultFormats:109                self.formats = formats.get(key, CSVLogWriter.defaultFormats[key])110        self.logFunc = logFunc111        self.depth = 0112        self.separator = separator113    def _log(self, msg):114        if len(msg.strip()) > 0:115            prefix = (self.separator * self.depth)116            self.logFunc(prefix + msg)117    def start(self, actionName):118        msg = _formatAction(self.formats["start"], actionName)119        self._log(msg)120        self.depth += 1121    def end(self, actionName):122        msg = _formatAction(self.formats["end"], actionName)123        self._log(msg)124        self.depth -= 1125    def call(self, func, args, kwargs):126        msg = _formatCall(self.formats["call"], func, args, kwargs)127        self._log(msg)128        self.depth += 1129    def ret(self, returnValue):130        self.depth -= 1131        msg = _formatRetunValue(self.formats["ret"], returnValue)132        self._log(msg)133    def exc(self):134        self.depth -= 1135        msg = _formatException(self.formats["exc"])136        self._log(msg)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
