Best Python code snippet using hypothesis
reached_setpoint.py
Source:reached_setpoint.py  
1from ...math.utils import compare_value2from ophyd import PVPositionerPC, Component as Cpt, Signal, Kind3from ophyd.utils import errors4from ophyd.status import SubscriptionStatus, Status, AndStatus5import logging6logger = logging.getLogger()7class OphydInvalidParameter(ValueError, errors.OpException):8    """given parameter was invalid9    """10    pass11class OphydMethodNotOverloaded(AssertionError, errors.OpException):12    """13    """14    pass15# t_super = Device16#: It has to be PVPositionerPC so that it will work17t_super = PVPositionerPC18class DoneBasedOnReadback(t_super):19    """Wait until readback is matching setpoint within requested precision20    The idea of this class is to mimic a proper done variable using21    setpoint and readback value. Then the done variable should be22    set when the readback value matches the setpoint value within23    the specified precision.24    See :class:`ReachedSetpoint` for an implemementation of this class25    The work is done in26    This checking behaviour is implemented in27    :meth:`_positionReached`.28    The device has to provide two signal like variables:29        * setpoint30        * readback31    Warning:32        Code not yet checked33    Todo:34        Check __init__ handling timeout35    """36    #:37    always_set = Cpt(Signal, name='always_set', value=False, kind=Kind.config)38    #: Tune correction is made by calling method set, but without39    #: writing a value to it40    do_not_set = Cpt(Signal, name='always_set', value=False, kind=Kind.config)41    def __init__(self, *args, **kws):42        """43        Args:44            setting_parameters :45        """46        self._setting_parameters = None47        self._timeout = None48        setting_parameters = kws.pop("setting_parameters", None)49        timeout = kws.pop("timeout", 0.0)50        timeout = float(timeout)51        super().__init__(*args, **kws)52        setpar = self._checkSettingParameters(setting_parameters)53        if setpar is None:54            cls_name = self.__class__.__name__55            txt = (56                f'{cls_name}._checkSettingParameters must return'57                ' valid parameters (returned None)'58            )59            raise AssertionError(txt)60        self._setting_parameters = setpar61        self._timeout = timeout62        self._checkSetup()63        # Required to trace the status of the device64        self._moving = None65    def _checkSetup(self):66        """check that instance contains required variables67        """68        assert(self.readback is not None)69        assert(callable(self.readback.get))70        assert(self.setpoint is not None)71        assert(callable(self.readback.get))72        assert(callable(self.readback.set))73        assert(self._timeout > 0)74    def _checkSettingParameters(self, setting_parameters):75        """Check and store setting Parameters76        Overload this function to check setting parameters77        Returns:78                valid setting parameters79        """80        raise OphydMethodNotOverloaded("Overload this method")81        return setting_parameters82    def _positionReached(self, *args, **kws):83        """check that the position has been reached84        Returns: flag(bool)85        Returns true if position was reached, false otherwise86        """87        raise OphydMethodNotOverloaded("Overload this method")88    def set(self, value):89        """90        Returns:91            :class:`ophyd.status.SubscriptionStatus`92        """93        def callback(*args, **kws):94            pos_valid = self._positionReached(*args, **kws)95            cls_name = self.__class__.__name__96            txt = (97                f'{cls_name}:set cb: args {args}  kws {kws}:'98                f' self._moving {self._moving} pos_valid {pos_valid}'99                )100            self.log.info(txt)101            if self._moving and pos_valid:102                self._moving = False103                self.log.info(txt)104                return True105            else:106                self._moving = True107            return False108        pos_valid = self._positionReached(check_set_value=value)109        always_set = self.always_set.get()110        cls_name = self.__class__.__name__111        name = self.name112        if pos_valid:113            txt = f"{cls_name}: no motion required for value {value} always set {always_set}"114            self.log.info(txt)115        if pos_valid and not always_set:116            status = Status()117            # status.success = 1 -> status.set_finished()118            status.set_finished()119            txt = f"{cls_name}.{name}: no motion required : always set {always_set} False. No motion"120            self.log.debug(txt)121            return status122        self.log.debug(f'{cls_name}.{name}settle time {self.settle_time}')123        stat_rbk = None124        if not pos_valid:125            # No response expected126            stat_rbk = SubscriptionStatus(self.readback, callback,127                                          timeout=self._timeout,128                                          settle_time=self.settle_time)129        txt = f"{cls_name}: setting to value {value}"130        self.log.debug(txt)131        if self.do_not_set.get():132            txt = (133                f'Not setting the value {value} position valid {pos_valid}'134                f' stat_rbk {stat_rbk}'135            )136            self.log.info(txt)137            stat_setp = Status()138            stat_setp.set_finished()139        else:140            stat_setp = self.setpoint.set(value, timeout=self._timeout)141        if stat_rbk is None:142            status = stat_setp143        else:144            status = AndStatus(stat_rbk, stat_setp)145        cls_name = self.__class__.__name__146        txt = f"{cls_name}:set cb: value {value} status = {status}: setp {stat_setp} {stat_rbk}"147        # print(txt)148        self.log.info(txt)149        return status150class ReachedSetpoint(DoneBasedOnReadback):151    """Setpoint within some absolute precision152    Obsolete class use :class:`ReachedSetpointEPS` instead153    """154    def __init__(self, *args, **kwargs):155        msg = 'Obsolete class use ReachedSetpointEPS instead'156        raise NotImplementedError(msg)157class ReachedSetpointEPS(DoneBasedOnReadback):158    """Setpoint within some absolute and relative precision159    """160    eps_rel = Cpt(Signal, name='eps_rel', value=1e-9)161    eps_abs = Cpt(Signal, name='eps_abs', value=1e-9)162    def _correctReadback(self, val):163        return val164    def _positionReached(self, *args, **kws):165        """position within given range?166        """167        rbk = self.readback.get()168        rbk = self._correctReadback(rbk)169        check_set_value = kws.pop("check_set_value", None)170        if check_set_value is None:171            setp = self.setpoint.get()172        else:173            setp = check_set_value174        eps_abs = self.eps_abs.get()175        eps_rel = self.eps_rel.get()176        t_cmp = compare_value(rbk, setp, eps_abs=eps_abs, eps_rel=eps_rel)177        flag = t_cmp == 0178        c_name = str(self.__class__)179        name = self.name180        txt = (181            f'{c_name}:_positionReached: name {name}, set {setp} rbk {rbk} '182            f'eps: abs {eps_abs} rel {eps_rel} '183            f'comparison {t_cmp} position valid {flag}'184        )185        # print(txt)186        if flag:187            self.log.info(txt)188        else:189            self.log.debug(txt)190        return flag191    def _checkSettingParameters(self, unused):192        """Absolute value for setting parameter193        And thus just a float194        """195        eps_abs = self.eps_abs.get()196        try:197            t_range = float(eps_abs)198            assert(t_range > 0)199        except ValueError as des:200            msg = f"Expected eps_abs {eps_abs}  >0 got: error {des}"201            raise OphydInvalidParameter(msg)202        eps_rel = self.eps_rel.get()203        try:204            t_range = float(eps_rel)205            assert(t_range > 0)206        except ValueError as des:207            msg = f"Expected eps_rel {eps_rel}  >0 got: error {des}"208            raise OphydInvalidParameter(msg)...ReachedSetPoint.py
Source:ReachedSetPoint.py  
1from ....math.utils import compare_value2from ophyd import PVPositionerPC, Component as Cpt, Signal3from ophyd.utils import errors4from ophyd.status import SubscriptionStatus, Status, AndStatus5import logging6logger = logging.getLogger()7class OphydInvalidParameter(ValueError, errors.OpException):8    """given parameter was invalid9    """10    pass11class OphydMethodNotOverloaded(AssertionError, errors.OpException):12    """13    """14    pass15# t_super = Device16#: It has to be PVPositionerPC so that it will work17t_super = PVPositionerPC18class DoneBasedOnReadback(t_super):19    """Wait until readback is matching setpoint within requested precision20    The idea of this class is to mimic a proper done variable using21    setpoint and readback value. Then the done variable should be22    set when the readback value matches the setpoint value within23    the specified precision.24    See :class:`ReachedSetpoint` for an implemementation of this class25    The work is done in26    This checking behaviour is implemented in27    :meth:`_positionReached`.28    The device has to provide two signal like variables:29        * setpoint30        * readback31    Warning:32        Code not yet checked33    Todo:34        Check __init__ handling timeout35    """36    def __init__(self, *args, **kws):37        """38        Args:39            setting_parameters :40        """41        self._setting_parameters = None42        self._timeout = None43        setting_parameters = kws.pop("setting_parameters", None)44        timeout = kws.pop("timeout", 0.0)45        timeout = float(timeout)46        super().__init__(*args, **kws)47        setpar = self._checkSettingParameters(setting_parameters)48        if setpar is None:49            cls_name = self.__class__.__name__50            txt = (51                f'{cls_name}._checkSettingParameters must return'52                ' valid parameters (returned None)'53            )54            raise AssertionError(txt)55        self._setting_parameters = setpar56        self._timeout = timeout57        self._checkSetup()58        # Required to trace the status of the device59        self._moving = None60    def _checkSetup(self):61        """check that instance contains required variables62        """63        assert(self.readback is not None)64        self.readback.value65        assert(self.setpoint is not None)66        self.setpoint.value67        self.setpoint.set68        assert(self._timeout > 0)69    def _checkSettingParameters(self, setting_parameters):70        """Check and store setting Parameters71        Overload this function to check setting parameters72        Returns:73                valid setting parameters74        """75        raise OphydMethodNotOverloaded("Overload this method")76        return setting_parameters77    def _positionReached(self, *args, **kws):78        """check that the position has been reached79        Returns: flag(bool)80        Returns true if position was reached, false otherwise81        """82        raise OphydMethodNotOverloaded("Overload this method")83    def set(self, value):84        """85        Returns:86            :class:`ophyd.status.SubscriptionStatus`87        """88        def callback(*args, **kws):89            pos_valid = self._positionReached(*args, **kws)90            cls_name = self.__class__.__name__91            txt = (92                f'{cls_name}:set cb: args {args}  kws {kws}:'93                f' self._moving {self._moving} pos_valid {pos_valid}'94                )95            self.log.debug(txt)96            if self._moving and pos_valid:97                self._moving = False98                return True99            else:100                self._moving = True101            return False102        pos_valid = self._positionReached(check_set_value=value)103        if pos_valid:104            status = Status()105            status.done = 1106            status.success = 1107            cls_name = self.__class__.__name__108            txt = f"{cls_name}: no motion required for value {value}"109            self.log.info(txt)110            return status111        self.log.info(f'settle time {self.settle_time}')112        stat_rbk = SubscriptionStatus(self.readback, callback,113                                      timeout=self._timeout,114                                      settle_time=self.settle_time)115        stat_setp = self.setpoint.set(value, timeout=self._timeout)116        status = AndStatus(stat_rbk, stat_setp)117        cls_name = self.__class__.__name__118        txt = f"{cls_name}:set cb: value {value} status = {status}"119        # print(txt)120        if logger:121            logger.debug(txt)122        return status123class ReachedSetpoint(DoneBasedOnReadback):124    """Setpoint within some absolute precision125    Obsolete class use :class:`ReachedSetpointEPS` instead126    """127    def __init__(self, *args, **kwargs):128        msg = 'Obsolete class use ReachedSetpointEPS instead'129        raise NotImplementedError(msg)130class ReachedSetpointEPS(DoneBasedOnReadback):131    """Setpoint within some absolute and relative precision132    """133    eps_rel = Cpt(Signal, name='eps_rel', value=1e-9)134    eps_abs = Cpt(Signal, name='eps_abs', value=1e-9)135    def _correctReadback(self, val):136        return val137    def _positionReached(self, *args, **kws):138        """position within given range?139        """140        rbk = self.readback.value141        rbk = self._correctReadback(rbk)142        check_set_value = kws.pop("check_set_value", None)143        if check_set_value is None:144            setp = self.setpoint.value145        else:146            setp = check_set_value147        eps_abs = self.eps_abs.value148        eps_rel = self.eps_rel.value149        t_cmp = compare_value(rbk, setp, eps_abs=eps_abs, eps_rel=eps_rel)150        flag = t_cmp == 0151        c_name = str(self.__class__)152        txt = (153            f'{c_name}:_positionReached: set {setp} rbk {rbk} '154            f'eps: abs {eps_abs} rel {eps_rel} '155            f'comparison {t_cmp} position valid {flag}'156        )157        # print(txt)158        self.log.info(txt)159        return flag160    def _checkSettingParameters(self, unused):161        """Absolute value for setting parameter162        And thus just a float163        """164        eps_abs = self.eps_abs.value165        try:166            t_range = float(eps_abs)167            assert(t_range > 0)168        except ValueError as des:169            msg = f"Expected eps_abs {eps_abs}  >0 got: error {des}"170            raise OphydInvalidParameter(msg)171        eps_rel = self.eps_rel.value172        try:173            t_range = float(eps_rel)174            assert(t_range > 0)175        except ValueError as des:176            msg = f"Expected eps_rel {eps_rel}  >0 got: error {des}"177            raise OphydInvalidParameter(msg)...check_set_value.py
Source:check_set_value.py  
1#!/usr/bin/python32# check_set_value.py: Wrapper to demonstrate check_set_value()3# utility function that determines whether a value is a legal4# value of a given SET column.5# Usage: check_set_value.py db_name tbl_name col_name test_value6import sys7import mysql.connector8import cookbook9from cookbook_utils import *10if len(sys.argv) != 5:11  print("Usage: check_enum_value.py db_name tbl_name col_name test_val")12  sys.exit(1)13db_name = sys.argv[1]14tbl_name = sys.argv[2]15col_name = sys.argv[3]16val = sys.argv[4]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
