Best Python code snippet using yandex-tank
pid_controller.py
Source:pid_controller.py  
...137            # This can only happen, if another process is using the same sequence_number138            raise InvalidReplyError(139                f"Invalid reply received for request id {key}. Is someone using the same socket? Data: {result}"140            )141    async def __send_single_request(self, key: int, value: Any | None = None) -> Any:142        result = await self.send_multi_request(143            data={144                key: value,145            }146        )147        self.__test_for_errors(result, key)148        if key > 0:149            return ErrorCode(result[key])150        return result[key]151    async def send_multi_request(self, data: dict[PidFunctionID | int, Any]) -> dict:152        """153        Send one or more requests to the device.154        Parameters155        ----------156        data157        Returns158        -------159        Raises160        ------161        ValueError162            If an unknown function id was sent and `response_expected` was set to True163        """164        if self.api_version < (0, 11, 0):165            # We need to rewrite some function ids166            if PidFunctionID.GET_HUMIDITY in data:167                data[-21] = data[PidFunctionID.GET_HUMIDITY]168                del data[PidFunctionID.GET_HUMIDITY]169            if PidFunctionID.GET_BOARD_TEMPERATURE in data:170                data[-20] = data[PidFunctionID.GET_BOARD_TEMPERATURE]171                del data[PidFunctionID.GET_BOARD_TEMPERATURE]172        result = await self.connection.send_request(data=data, response_expected=True)173        assert result is not None174        if self.api_version < (0, 11, 0):175            # We need to rewrite some function ids176            if -21 in result:177                result[PidFunctionID.GET_HUMIDITY.value] = result[-21]178                del result[-21]179            if -20 in result:180                result[PidFunctionID.GET_BOARD_TEMPERATURE.value] = result[-20]181                del result[-20]182        try:183            result = {PidFunctionID(key): value for key, value in result.items()}184        except ValueError:185            # Raised by PidFunctionID(key)186            self.__logger.error("Received unknown function id in data: %(data)s", {"data": data})187            return result188        return result189    async def get_software_version(self) -> tuple[int, int, int]:190        """191        Get the firmware version running on the device. The response is a tuple of ints, that represents the version192        number.193        Returns194        -------195        Tuple of int196            The version number197        """198        return await self.get_by_function_id(PidFunctionID.GET_SOFTWARE_VERSION)199    async def get_hardware_version(self) -> tuple[int, int, int]:200        """201        Get the hardware revision of the device. The response is a tuple of ints, that represents the version number.202        Returns203        -------204        Tuple of int205            The revision number206        """207        return await self.get_by_function_id(PidFunctionID.GET_HARDWARE_VERSION)208    async def get_serial(self) -> int:209        """210        Get the serial number of the device.211        Returns212        -------213        int214            The serial number of the device215        """216        return await self.get_by_function_id(PidFunctionID.GET_SERIAL_NUMBER)217    async def get_device_temperature(self) -> float:218        """219        Query the temperature of the onboard sensor.220        Returns221        -------222        float223            The temperature of the onboard sensor in Kelvin224        """225        return await self.get_by_function_id(PidFunctionID.GET_BOARD_TEMPERATURE)226    async def get_humidity(self) -> float:227        """228        Returns the humidity as read by the onboard sensor.229        Returns230        -------231        float232            The humidity in %rH233        """234        return await self.get_by_function_id(PidFunctionID.GET_HUMIDITY)235    async def get_mac_address(self) -> bytearray:236        """237        Get the MAC address used by the ethernet port.238        Returns239        -------240        bytearray241            An array of length 6 which contains the MAC242        """243        return await self.get_by_function_id(PidFunctionID.GET_MAC_ADDRESS)244    async def set_mac_address(self, mac: tuple[int, int, int, int, int, int] | list[int] | bytearray) -> None:245        """246        Set the MAC address used by the ethernet port.247        Parameters248        ----------249        mac: bytearray or tuple of int250            The MAC address as a bytearray of length 6251        """252        assert len(mac) == 6253        mac = bytearray(mac)  # convert to bytearray, this also makes sure, that all values are in range(0, 256)254        await self.__send_single_request(PidFunctionID.SET_MAC_ADDRESS, mac)255    async def get_uuid(self) -> UUID:256        """257        Get the universally unique identifier of the node.258        Returns259        -------260        UUID261            The universally unique identifier of the node262        Raises263        ------264        FunctionNotImplementedError265            If the firmware version does not support the request.266        """267        if self.api_version < (0, 12, 0):268            raise FunctionNotImplementedError(269                f"{PidFunctionID.GET_UUID.name} is only supported in api version >= 0.12.0"270            )271        result = await self.get_by_function_id(PidFunctionID.GET_UUID)272        return UUID(bytes=bytes(result))273    async def set_uuid(self, uuid: UUID) -> None:274        """275        Set the universally unique identifier of the node.276        Parameters277        ----------278        uuid: UUID279            The universally unique identifier of the node280        Raises281        ------282        FunctionNotImplementedError283            If the firmware version does not support the request.284        """285        if self.api_version < (0, 12, 0):286            raise FunctionNotImplementedError(287                f"{PidFunctionID.SET_UUID.name} is only supported in api version >= 0.12.0"288            )289        await self.__send_single_request(PidFunctionID.SET_UUID, tuple(uuid.bytes))290    async def get_auto_resume(self) -> bool:291        """292        Query the node, whether is automatically resumes after a reset (intentionally or unintentionally). If set, the293        node will return to the same state. If not, the node will always come up as disabled with default settings.294        Returns295        -------296        bool297            True if the node keeps its enabled state after a reset298        """299        return await self.get_by_function_id(PidFunctionID.GET_AUTO_RESUME)300    async def set_auto_resume(self, value: bool):301        """302        Set the controller to automatically load the previous settings and resume its action.303        Parameters304        ----------305        value: bool306            Set to True to load the node settings on boot307        """308        await self.__send_single_request(PidFunctionID.SET_AUTO_RESUME, bool(value))309    async def set_lower_output_limit(self, limit: int) -> None:310        """311        Set the minimum output of the DAC in bit values.312        Parameters313        ----------314        limit: int315            The minimum output value of the DAC316        Raises317        ------318        ValueError319            If the limit was out of bounds.320        """321        try:322            await self.__send_single_request(PidFunctionID.SET_LOWER_OUTPUT_LIMIT, limit)323        except InvalidFormatError:324            raise ValueError("Invalid limit") from None325    async def get_lower_output_limit(self) -> int:326        """327        Get the minimum output value of the DAC.328        Returns329        -------330        int331            The minimum output in bit332        """333        return await self.get_by_function_id(PidFunctionID.GET_LOWER_OUTPUT_LIMIT)334    async def set_upper_output_limit(self, limit: int) -> None:335        """336        Set the maximum output of the DAC in bit values.337        Parameters338        ----------339        limit: int340            The maximum output value of the DAC341        Raises342        ------343        ValueError344            If the limit was out of bounds.345        """346        try:347            await self.__send_single_request(PidFunctionID.SET_UPPER_OUTPUT_LIMIT, limit)348        except InvalidFormatError:349            raise ValueError("Invalid limit") from None350    async def get_upper_output_limit(self) -> int:351        """352        Get the maximum output of the DAC in bit values.353        Returns354        -------355        int356            The upper limit for the output DAC357        """358        return await self.get_by_function_id(PidFunctionID.GET_UPPER_OUTPUT_LIMIT)359    async def set_timeout(self, timeout: int) -> None:360        """361        Set the timeout, that defines when the controller switches to fallback mode. The time is in ms.362        Parameters363        ----------364        timeout365            The timeout in ms366        """367        assert timeout > 0368        await self.__send_single_request(PidFunctionID.SET_TIMEOUT, int(timeout))369    async def get_timeout(self) -> int:370        """371        Get the time, that mass pass between updates, before the controller switches to fallback mode.372        Returns373        -------374        int375            The time in ms, that the controller waits between updates376        """377        return await self.get_by_function_id(PidFunctionID.GET_TIMEOUT)378    async def set_dac_gain(self, enable: bool) -> None:379        """380        Set the gain of the DAC to x2. This will increase the output voltage range from 0..5V to 0..10V.381        Parameters382        ----------383        enable384            True to enable the gain385        """386        await self.__send_single_request(PidFunctionID.SET_GAIN, bool(enable))387    async def is_dac_gain_enabled(self) -> bool:388        """389        Return True if the DAC output goes from 0-10 V. False if the DAC gain is disabled and the output is 0-5 V.390        Returns391        -------392        bool393            True if the gain is enabled394        """395        return await self.get_by_function_id(PidFunctionID.GET_GAIN)396    async def set_pid_feedback_direction(self, feedback: FeedbackDirection) -> None:397        """398        Set the sign of the pid output. This needs to be adjusted according to the actuator used to399        control the plant. Typically, it is assumed, that the feedback is negative. For example, when400        dealing with e.g. temperature control, this means, that if the temperature is too high,401        an increase in the feedback will increase the cooling action.402        In short: If set to `FeedbackDirection.NEGATIVE`, a positive error will result in a negative plant response.403        Parameters404        ----------405        feedback: FeedbackDirection406            The direction of the controller response407        """408        feedback = FeedbackDirection(feedback)409        await self.__send_single_request(PidFunctionID.SET_DIRECTION, feedback.value)410    async def get_pid_feedback_direction(self) -> FeedbackDirection:411        """412        Get the sign of the pid output. If set to `FeedbackDirection.NEGATIVE`, a positive error will result in a413        negative plant response.414        Returns415        -------416        FeedbackDirection417            The direction of the controller response418        """419        return FeedbackDirection(await self.get_by_function_id(PidFunctionID.GET_DIRECTION))420    async def set_output(self, value: int) -> None:421        """422        Set the output value of the DAC. This function only works, when the controller mode is set to disabled (manual).423        Use `set_enabled(false)`.424        Parameters425        ----------426        value: int427            The output in bit428        """429        await self.__send_single_request(PidFunctionID.SET_OUTPUT, int(value))430    async def get_output(self) -> int:431        """432        Queries the output value of the DAC.433        Returns434        -------435        int436            The output of the DAC in bit437        """438        return await self.get_by_function_id(PidFunctionID.GET_OUTPUT)439    async def set_enabled(self, enabled: bool) -> None:440        """441        Set the PID controller to enabled/automatic or disabled/manual mode.442        Parameters443        ----------444        enabled: bool445            True to enable the PID controller446        """447        await self.__send_single_request(PidFunctionID.SET_ENABLED, bool(enabled))448    async def is_enabled(self) -> bool:449        """450        Queries the state of the PID controller.451        Returns452        -------453        bool454            True if the controller is enabled455        """456        return await self.get_by_function_id(PidFunctionID.GET_ENABLED)457    async def __set_kx(self, function_id: PidFunctionID, kx: int) -> None:  # pylint: disable=invalid-name458        """459        Set the PID K{p,i,d} parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format460        Parameters461        ----------462        function_id: PidFunctionID463            Select which parameter set should be updated464        kx: int465            The value of the Kp, Ki or Kd parameter466        Raises467        ------468        InvalidFormatError469            If the pid constant was rejected.470        """471        if self.api_version < (0, 11, 0) and function_id in (472            PidFunctionID.SET_SECONDARY_KP,473            PidFunctionID.SET_SECONDARY_KI,474            PidFunctionID.SET_SECONDARY_KD,475        ):476            raise FunctionNotImplementedError(f"{function_id.name} is only supported in api version >= 0.11.0")477        try:478            await self.__send_single_request(function_id, int(kx))479        except InvalidFormatError:480            raise ValueError("Invalid PID constant") from None481    async def set_kp(self, kp: int, config_id: int = 0) -> None:  # pylint: disable=invalid-name482        """483        Set the PID Kp parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.484        Parameters485        ----------486        kp: int487            The PID k_p parameter in Q16.16 format488        config_id: {0, 1}, default=0489            The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.490        """491        assert config_id in (0, 1)492        if config_id == 0:493            await self.__set_kx(PidFunctionID.SET_KP, kp)494        else:495            await self.__set_kx(PidFunctionID.SET_SECONDARY_KP, kp)496    async def get_kp(self, config_id: int = 0) -> int:497        """498        Get the PID Kp parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.499        Parameters500        ----------501        config_id: {0, 1}, default=0502            The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.503        Raises504        ------505        FunctionNotImplementedError506            If the firmware version does not support the request.507        """508        assert config_id in (0, 1)509        if config_id == 0:510            return await self.get_by_function_id(PidFunctionID.GET_KP)511        if self.api_version >= (0, 11, 0):512            return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_KP)513        raise FunctionNotImplementedError(514            f"{PidFunctionID.GET_SECONDARY_KP.name} is only supported in api version >= 0.11.0"515        )516    async def set_ki(self, ki: int, config_id: int = 0) -> None:  # pylint: disable=invalid-name517        """518        Set the PID Ki parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.519        Parameters520        ----------521        ki: int522            The parameter value in Q16.16 format (32 bit)523        config_id: {0, 1}, default=0524            The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.525        """526        assert config_id in (0, 1)527        if config_id == 0:528            await self.__set_kx(PidFunctionID.SET_KI, ki)529        else:530            await self.__set_kx(PidFunctionID.SET_SECONDARY_KI, ki)531    async def get_ki(self, config_id: int = 0) -> int:532        """533        Get the PID Ki parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.534        Parameters535        ----------536        config_id: {0, 1}, default=0537            The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.538        Raises539        ------540        FunctionNotImplementedError541            If the firmware version does not support the request.542        """543        assert config_id in (0, 1)544        if config_id == 0:545            return await self.get_by_function_id(PidFunctionID.GET_KI)546        if self.api_version >= (0, 11, 0):547            return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_KI)548        raise FunctionNotImplementedError(549            f"{PidFunctionID.GET_SECONDARY_KI.name} is only supported in api version >= 0.11.0"550        )551    async def set_kd(self, kd: int, config_id: int = 0) -> None:  # pylint: disable=invalid-name552        """553        Set the PID Kd parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.554        Parameters555        ----------556        kd: int557            The parameter value in Q16.16 format (32 bit)558        config_id: {0, 1}, default=0559            The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.560        """561        assert config_id in (0, 1)562        if config_id == 0:563            await self.__set_kx(PidFunctionID.SET_KD, kd)564        else:565            await self.__set_kx(PidFunctionID.SET_SECONDARY_KD, kd)566    async def get_kd(self, config_id: int = 0) -> int:567        """568        Get the PID Kd parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.569        Parameters570        ----------571        config_id: {0, 1}, default=0572            The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.573        Raises574        ------575        FunctionNotImplementedError576            If the firmware version does not support the request.577        """578        assert config_id in (0, 1)579        if config_id == 0:580            return await self.get_by_function_id(PidFunctionID.GET_KD)581        if self.api_version >= (0, 11, 0):582            return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_KD)583        raise FunctionNotImplementedError(584            f"{PidFunctionID.GET_SECONDARY_KD.name} is only supported in api version >= 0.11.0"585        )586    async def set_input(self, value: int, return_output: bool = False) -> int | None:587        """588        Set the input, which is fed to the PID controller. The value is in Q16.16 format.589        Parameters590        ----------591        value: int592            The input value in Q16.16 format593        return_output: bool594            Returns the output of the controller if True595        Returns596        -------597        int or None:598            Returns the output of the controller if return_output is set599        """600        # We need to send a multi_request, because if return_output is True, we want to get the601        # output after the input has been set602        request: dict[int, int | None] = {PidFunctionID.SET_INPUT: int(value)}603        if return_output:604            request[PidFunctionID.GET_OUTPUT] = None605        result = await self.send_multi_request(request)606        # We need to test for errors, which would normally be done by __send_single_request()607        self.__test_for_errors(result, PidFunctionID.SET_INPUT)608        if return_output:609            return result[PidFunctionID.GET_OUTPUT]610        return None611    async def set_setpoint(self, value: int, config_id: int = 0) -> None:612        """613        Set the PID setpoint. The value is in Q16.16 format.614        Parameters615        ----------616        value: int617            The setpoint of the PID controller in Q16.16 format.618        config_id: {0, 1}, default=0619            The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.620        Raises621        ------622        FunctionNotImplementedError623            If the firmware version does not support the request.624        """625        assert config_id in (0, 1)626        try:627            if config_id == 0:628                await self.__send_single_request(PidFunctionID.SET_SETPOINT, int(value))629            else:630                if self.api_version >= (0, 11, 0):631                    await self.__send_single_request(PidFunctionID.SET_SECONDARY_SETPOINT, int(value))632                else:633                    raise FunctionNotImplementedError(634                        f"{PidFunctionID.SET_SECONDARY_SETPOINT.name} is only supported in api version >= 0.11.0"635                    )636        except InvalidFormatError:637            raise ValueError("Invalid setpoint") from None638    async def get_setpoint(self, config_id: int = 0) -> int:639        """640        Get the PID setpoint. The value is in Q16.16 format.641        Parameters642        ----------643        config_id: {0, 1}, default=0644            The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.645        Returns646        -------647        int648            The setpoint value for the given config set.649        Raises650        ------651        FunctionNotImplementedError652            If the firmware version does not support the request.653        """654        assert config_id in (0, 1)655        if config_id == 0:656            return await self.get_by_function_id(PidFunctionID.GET_SETPOINT)657        # Only allow the secondary parameter set on API version >=0.11.0658        if self.api_version >= (0, 11, 0):659            return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_SETPOINT)660        raise FunctionNotImplementedError(661            f"{PidFunctionID.GET_SECONDARY_SETPOINT.name} is only supported in api version >= 0.11.0"662        )663    async def set_secondary_config(self, config_id: int) -> None:664        """665        Set the configuration used when running in fallback mode.666        Parameters667        ----------668        config_id: {0, 1}669            The configuration to be used. The controller supports two pid parameter sets. Either 0 or 1.670        Raises671        ------672        FunctionNotImplementedError673            If the firmware version does not support the request.674        InvalidFormatError675            If the config_id is not in {0, 1}.676        """677        assert config_id in (0, 1)678        if self.api_version < (0, 11, 0):679            raise FunctionNotImplementedError(680                f"{PidFunctionID.SET_SECONDARY_PID_PARAMETER_SET.name} is only supported in api version >= 0.11.0"681            )682        try:683            await self.__send_single_request(PidFunctionID.SET_SECONDARY_PID_PARAMETER_SET, int(config_id))684        except InvalidFormatError:685            raise ValueError("Invalid configuration set. Use either 0 or 1.") from None686    async def get_secondary_config(self) -> int:687        """688        Set the configuration id used when running in fallback mode.689        Returns690        -------691        int692            The configuration used, when running in fallback mode. The controller supports two pid parameter sets.693            Either 0 or 1.694        """695        if self.api_version < (0, 11, 0):696            raise FunctionNotImplementedError(697                f"{PidFunctionID.GET_SECONDARY_PID_PARAMETER_SET.name} is only supported in api version >= 0.11.0"698            )699        return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_PID_PARAMETER_SET)700    async def set_fallback_update_interval(self, value: int):701        """702        Set the update interval, when running in fallback mode. The Controller will feed a value from the internal703        sensor to its PID controller every {value} ms.704        Parameters705        ----------706        value: int707            The update interval in ms708        """709        assert value > 0710        try:711            await self.__send_single_request(PidFunctionID.SET_FALLBACK_UPDATE_INTERVAL, int(value))712        except InvalidFormatError:713            raise ValueError("Invalid calibration offset") from None714    async def get_fallback_update_interval(self) -> int:715        """716        The update interval, which is used when running in fallback mode. The return value is in ms.717        Returns718        -------719        int:720            The number of ms between updates with the backup sensor when running in fallback mode.721        """722        return await self.get_by_function_id(PidFunctionID.GET_FALLBACK_UPDATE_INTERVAL)723    async def reset(self) -> None:724        """725        Resets the device. This will trigger a hardware reset.726        """727        await self.__send_single_request(PidFunctionID.RESET)728    async def reset_settings(self) -> None:729        """730        Resets the device to default values.731        """732        await self.__send_single_request(PidFunctionID.RESET_SETTINGS)733    async def set_serial(self, serial: int) -> None:734        """735        Set the serial number of the device736        Parameters737        ----------738        serial: int739            The serial number. Maximum: 4294967295 (32 bit)740        Raises741        ------742        ValueError743            If the serial number is not valid.744        """745        try:746            await self.__send_single_request(PidFunctionID.SET_SERIAL_NUMBER, int(serial))747        except InvalidFormatError:748            raise ValueError("Invalid serial number") from None749    async def get_active_connection_count(self) -> int:750        """751        Get the number of open sockets.752        Returns753        -------754        int755            The number of sockets756        Raises757        ------758        FunctionNotImplementedError759            If the firmware version does not support the request.760        """761        if self.api_version < (0, 11, 0):762            raise FunctionNotImplementedError(763                f"{PidFunctionID.GET_ACTIVE_CONNECTION_COUNT.name} is only supported in api version >= 0.11.0"764            )765        return await self.get_by_function_id(PidFunctionID.GET_ACTIVE_CONNECTION_COUNT)766    async def get_by_function_id(self, function_id: PidFunctionID | int) -> Any:767        """768        Query a value by function id, instead of calling the named function.769        Parameters770        ----------771        function_id: PidFunctionID or int772            The function to query773        Returns774        -------775        Any776            The result of the query.777        Raises778        ------779        InvalidCommandError780            If the function id was given as an integer and is unknown781        """782        try:783            function_id = PidFunctionID(function_id)784        except ValueError:785            raise InvalidCommandError(f"Command {function_id} is invalid.") from None786        assert function_id.value < 0  # all getter have negative ids787        result = await self.__send_single_request(function_id)788        if function_id in self.__raw_to_unit:789            result = self.__raw_to_unit[function_id](result)...client.py
Source:client.py  
...80        for h in boring:81            if h in headers:82                del (headers[h])83        return headers84    def __send_single_request(self, req, trace=False):85        p = self.session.prepare_request(req)86        if trace:87            logger.debug("Making request: %s %s Headers: %s Body: %s",88                         p.method, p.url, p.headers, p.body)89        resp = self.session.send(p, timeout=self.connection_timeout)90        if trace:91            logger.debug("Got response in %ss: %s %s Headers: %s Body: %s",92                         resp.elapsed.total_seconds(), resp.reason,93                         resp.status_code, self.filter_headers(resp.headers),94                         resp.content)95        if resp.status_code in [500, 502, 503, 504]:96            raise self.NotAvailable(97                request="request: %s %s\n\tHeaders: %s\n\tBody: %s" %98                (p.method,99                 p.url,100                 p.headers,101                 p.body),102                response="Got response in %ss: %s %s\n\tHeaders: %s\n\tBody: %s" %103                (resp.elapsed.total_seconds(),104                 resp.reason,105                 resp.status_code,106                 self.filter_headers(107                    resp.headers),108                    resp.content))109        elif resp.status_code == 410:110            raise self.StoppedFromOnline111        elif resp.status_code == 423:112            raise self.UnderMaintenance113        else:114            resp.raise_for_status()115            return resp116    def __make_api_request(117            self,118            http_method,119            path,120            data=None,121            response_callback=lambda x: x,122            writer=False,123            trace=False,124            json=None,125            maintenance_timeouts=None,126            maintenance_msg=None):127        url = urljoin(self.base_url, path)128        if json:129            request = requests.Request(130                http_method, url, json=json, headers={'User-Agent': self.user_agent}, params=self.params)131        else:132            request = requests.Request(133                http_method, url, data=data, headers={'User-Agent': self.user_agent}, params=self.params)134        network_timeouts = self.network_timeouts()135        maintenance_timeouts = maintenance_timeouts or self.maintenance_timeouts()136        maintenance_msg = maintenance_msg or "%s is under maintenance" % (self._base_url)137        while True:138            try:139                response = self.__send_single_request(request, trace=trace)140                return response_callback(response)141            except (Timeout, ConnectionError):142                logger.warn(traceback.format_exc())143                try:144                    timeout = next(network_timeouts)145                    logger.warn(146                        "Network error, will retry in %ss..." %147                        timeout)148                    time.sleep(timeout)149                    continue150                except StopIteration:151                    raise self.NetworkError()152            except self.UnderMaintenance as e:153                try:154                    timeout = next(maintenance_timeouts)155                    logger.warn(maintenance_msg)156                    logger.warn("Retrying in %ss..." % timeout)157                    time.sleep(timeout)158                    continue159                except StopIteration:160                    raise e161    def __make_writer_request(162            self,163            params=None,164            json=None,165            http_method="POST",166            trace=False):167        '''168        Send request to writer service.169        '''170        request = requests.Request(171            http_method,172            self.writer_url,173            params=params,174            json=json,175            headers={176                'User-Agent': self.user_agent})177        network_timeouts = self.network_timeouts()178        maintenance_timeouts = self.maintenance_timeouts()179        while True:180            try:181                response = self.__send_single_request(request, trace=trace)182                return response183            except (Timeout, ConnectionError):184                logger.warn(traceback.format_exc())185                try:186                    timeout = next(network_timeouts)187                    logger.warn(188                        "Network error, will retry in %ss..." %189                        timeout)190                    time.sleep(timeout)191                    continue192                except StopIteration:193                    raise self.NetworkError()194            except self.UnderMaintenance as e:195                try:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
