How to use __send_single_request method in yandex-tank

Best Python code snippet using yandex-tank

pid_controller.py

Source:pid_controller.py Github

copy

Full Screen

...137 # This can only happen, if another process is using the same sequence_number138 raise InvalidReplyError(139 f"Invalid reply received for request id {key}. Is someone using the same socket? Data: {result}"140 )141 async def __send_single_request(self, key: int, value: Any | None = None) -> Any:142 result = await self.send_multi_request(143 data={144 key: value,145 }146 )147 self.__test_for_errors(result, key)148 if key > 0:149 return ErrorCode(result[key])150 return result[key]151 async def send_multi_request(self, data: dict[PidFunctionID | int, Any]) -> dict:152 """153 Send one or more requests to the device.154 Parameters155 ----------156 data157 Returns158 -------159 Raises160 ------161 ValueError162 If an unknown function id was sent and `response_expected` was set to True163 """164 if self.api_version < (0, 11, 0):165 # We need to rewrite some function ids166 if PidFunctionID.GET_HUMIDITY in data:167 data[-21] = data[PidFunctionID.GET_HUMIDITY]168 del data[PidFunctionID.GET_HUMIDITY]169 if PidFunctionID.GET_BOARD_TEMPERATURE in data:170 data[-20] = data[PidFunctionID.GET_BOARD_TEMPERATURE]171 del data[PidFunctionID.GET_BOARD_TEMPERATURE]172 result = await self.connection.send_request(data=data, response_expected=True)173 assert result is not None174 if self.api_version < (0, 11, 0):175 # We need to rewrite some function ids176 if -21 in result:177 result[PidFunctionID.GET_HUMIDITY.value] = result[-21]178 del result[-21]179 if -20 in result:180 result[PidFunctionID.GET_BOARD_TEMPERATURE.value] = result[-20]181 del result[-20]182 try:183 result = {PidFunctionID(key): value for key, value in result.items()}184 except ValueError:185 # Raised by PidFunctionID(key)186 self.__logger.error("Received unknown function id in data: %(data)s", {"data": data})187 return result188 return result189 async def get_software_version(self) -> tuple[int, int, int]:190 """191 Get the firmware version running on the device. The response is a tuple of ints, that represents the version192 number.193 Returns194 -------195 Tuple of int196 The version number197 """198 return await self.get_by_function_id(PidFunctionID.GET_SOFTWARE_VERSION)199 async def get_hardware_version(self) -> tuple[int, int, int]:200 """201 Get the hardware revision of the device. The response is a tuple of ints, that represents the version number.202 Returns203 -------204 Tuple of int205 The revision number206 """207 return await self.get_by_function_id(PidFunctionID.GET_HARDWARE_VERSION)208 async def get_serial(self) -> int:209 """210 Get the serial number of the device.211 Returns212 -------213 int214 The serial number of the device215 """216 return await self.get_by_function_id(PidFunctionID.GET_SERIAL_NUMBER)217 async def get_device_temperature(self) -> float:218 """219 Query the temperature of the onboard sensor.220 Returns221 -------222 float223 The temperature of the onboard sensor in Kelvin224 """225 return await self.get_by_function_id(PidFunctionID.GET_BOARD_TEMPERATURE)226 async def get_humidity(self) -> float:227 """228 Returns the humidity as read by the onboard sensor.229 Returns230 -------231 float232 The humidity in %rH233 """234 return await self.get_by_function_id(PidFunctionID.GET_HUMIDITY)235 async def get_mac_address(self) -> bytearray:236 """237 Get the MAC address used by the ethernet port.238 Returns239 -------240 bytearray241 An array of length 6 which contains the MAC242 """243 return await self.get_by_function_id(PidFunctionID.GET_MAC_ADDRESS)244 async def set_mac_address(self, mac: tuple[int, int, int, int, int, int] | list[int] | bytearray) -> None:245 """246 Set the MAC address used by the ethernet port.247 Parameters248 ----------249 mac: bytearray or tuple of int250 The MAC address as a bytearray of length 6251 """252 assert len(mac) == 6253 mac = bytearray(mac) # convert to bytearray, this also makes sure, that all values are in range(0, 256)254 await self.__send_single_request(PidFunctionID.SET_MAC_ADDRESS, mac)255 async def get_uuid(self) -> UUID:256 """257 Get the universally unique identifier of the node.258 Returns259 -------260 UUID261 The universally unique identifier of the node262 Raises263 ------264 FunctionNotImplementedError265 If the firmware version does not support the request.266 """267 if self.api_version < (0, 12, 0):268 raise FunctionNotImplementedError(269 f"{PidFunctionID.GET_UUID.name} is only supported in api version >= 0.12.0"270 )271 result = await self.get_by_function_id(PidFunctionID.GET_UUID)272 return UUID(bytes=bytes(result))273 async def set_uuid(self, uuid: UUID) -> None:274 """275 Set the universally unique identifier of the node.276 Parameters277 ----------278 uuid: UUID279 The universally unique identifier of the node280 Raises281 ------282 FunctionNotImplementedError283 If the firmware version does not support the request.284 """285 if self.api_version < (0, 12, 0):286 raise FunctionNotImplementedError(287 f"{PidFunctionID.SET_UUID.name} is only supported in api version >= 0.12.0"288 )289 await self.__send_single_request(PidFunctionID.SET_UUID, tuple(uuid.bytes))290 async def get_auto_resume(self) -> bool:291 """292 Query the node, whether is automatically resumes after a reset (intentionally or unintentionally). If set, the293 node will return to the same state. If not, the node will always come up as disabled with default settings.294 Returns295 -------296 bool297 True if the node keeps its enabled state after a reset298 """299 return await self.get_by_function_id(PidFunctionID.GET_AUTO_RESUME)300 async def set_auto_resume(self, value: bool):301 """302 Set the controller to automatically load the previous settings and resume its action.303 Parameters304 ----------305 value: bool306 Set to True to load the node settings on boot307 """308 await self.__send_single_request(PidFunctionID.SET_AUTO_RESUME, bool(value))309 async def set_lower_output_limit(self, limit: int) -> None:310 """311 Set the minimum output of the DAC in bit values.312 Parameters313 ----------314 limit: int315 The minimum output value of the DAC316 Raises317 ------318 ValueError319 If the limit was out of bounds.320 """321 try:322 await self.__send_single_request(PidFunctionID.SET_LOWER_OUTPUT_LIMIT, limit)323 except InvalidFormatError:324 raise ValueError("Invalid limit") from None325 async def get_lower_output_limit(self) -> int:326 """327 Get the minimum output value of the DAC.328 Returns329 -------330 int331 The minimum output in bit332 """333 return await self.get_by_function_id(PidFunctionID.GET_LOWER_OUTPUT_LIMIT)334 async def set_upper_output_limit(self, limit: int) -> None:335 """336 Set the maximum output of the DAC in bit values.337 Parameters338 ----------339 limit: int340 The maximum output value of the DAC341 Raises342 ------343 ValueError344 If the limit was out of bounds.345 """346 try:347 await self.__send_single_request(PidFunctionID.SET_UPPER_OUTPUT_LIMIT, limit)348 except InvalidFormatError:349 raise ValueError("Invalid limit") from None350 async def get_upper_output_limit(self) -> int:351 """352 Get the maximum output of the DAC in bit values.353 Returns354 -------355 int356 The upper limit for the output DAC357 """358 return await self.get_by_function_id(PidFunctionID.GET_UPPER_OUTPUT_LIMIT)359 async def set_timeout(self, timeout: int) -> None:360 """361 Set the timeout, that defines when the controller switches to fallback mode. The time is in ms.362 Parameters363 ----------364 timeout365 The timeout in ms366 """367 assert timeout > 0368 await self.__send_single_request(PidFunctionID.SET_TIMEOUT, int(timeout))369 async def get_timeout(self) -> int:370 """371 Get the time, that mass pass between updates, before the controller switches to fallback mode.372 Returns373 -------374 int375 The time in ms, that the controller waits between updates376 """377 return await self.get_by_function_id(PidFunctionID.GET_TIMEOUT)378 async def set_dac_gain(self, enable: bool) -> None:379 """380 Set the gain of the DAC to x2. This will increase the output voltage range from 0..5V to 0..10V.381 Parameters382 ----------383 enable384 True to enable the gain385 """386 await self.__send_single_request(PidFunctionID.SET_GAIN, bool(enable))387 async def is_dac_gain_enabled(self) -> bool:388 """389 Return True if the DAC output goes from 0-10 V. False if the DAC gain is disabled and the output is 0-5 V.390 Returns391 -------392 bool393 True if the gain is enabled394 """395 return await self.get_by_function_id(PidFunctionID.GET_GAIN)396 async def set_pid_feedback_direction(self, feedback: FeedbackDirection) -> None:397 """398 Set the sign of the pid output. This needs to be adjusted according to the actuator used to399 control the plant. Typically, it is assumed, that the feedback is negative. For example, when400 dealing with e.g. temperature control, this means, that if the temperature is too high,401 an increase in the feedback will increase the cooling action.402 In short: If set to `FeedbackDirection.NEGATIVE`, a positive error will result in a negative plant response.403 Parameters404 ----------405 feedback: FeedbackDirection406 The direction of the controller response407 """408 feedback = FeedbackDirection(feedback)409 await self.__send_single_request(PidFunctionID.SET_DIRECTION, feedback.value)410 async def get_pid_feedback_direction(self) -> FeedbackDirection:411 """412 Get the sign of the pid output. If set to `FeedbackDirection.NEGATIVE`, a positive error will result in a413 negative plant response.414 Returns415 -------416 FeedbackDirection417 The direction of the controller response418 """419 return FeedbackDirection(await self.get_by_function_id(PidFunctionID.GET_DIRECTION))420 async def set_output(self, value: int) -> None:421 """422 Set the output value of the DAC. This function only works, when the controller mode is set to disabled (manual).423 Use `set_enabled(false)`.424 Parameters425 ----------426 value: int427 The output in bit428 """429 await self.__send_single_request(PidFunctionID.SET_OUTPUT, int(value))430 async def get_output(self) -> int:431 """432 Queries the output value of the DAC.433 Returns434 -------435 int436 The output of the DAC in bit437 """438 return await self.get_by_function_id(PidFunctionID.GET_OUTPUT)439 async def set_enabled(self, enabled: bool) -> None:440 """441 Set the PID controller to enabled/automatic or disabled/manual mode.442 Parameters443 ----------444 enabled: bool445 True to enable the PID controller446 """447 await self.__send_single_request(PidFunctionID.SET_ENABLED, bool(enabled))448 async def is_enabled(self) -> bool:449 """450 Queries the state of the PID controller.451 Returns452 -------453 bool454 True if the controller is enabled455 """456 return await self.get_by_function_id(PidFunctionID.GET_ENABLED)457 async def __set_kx(self, function_id: PidFunctionID, kx: int) -> None: # pylint: disable=invalid-name458 """459 Set the PID K{p,i,d} parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format460 Parameters461 ----------462 function_id: PidFunctionID463 Select which parameter set should be updated464 kx: int465 The value of the Kp, Ki or Kd parameter466 Raises467 ------468 InvalidFormatError469 If the pid constant was rejected.470 """471 if self.api_version < (0, 11, 0) and function_id in (472 PidFunctionID.SET_SECONDARY_KP,473 PidFunctionID.SET_SECONDARY_KI,474 PidFunctionID.SET_SECONDARY_KD,475 ):476 raise FunctionNotImplementedError(f"{function_id.name} is only supported in api version >= 0.11.0")477 try:478 await self.__send_single_request(function_id, int(kx))479 except InvalidFormatError:480 raise ValueError("Invalid PID constant") from None481 async def set_kp(self, kp: int, config_id: int = 0) -> None: # pylint: disable=invalid-name482 """483 Set the PID Kp parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.484 Parameters485 ----------486 kp: int487 The PID k_p parameter in Q16.16 format488 config_id: {0, 1}, default=0489 The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.490 """491 assert config_id in (0, 1)492 if config_id == 0:493 await self.__set_kx(PidFunctionID.SET_KP, kp)494 else:495 await self.__set_kx(PidFunctionID.SET_SECONDARY_KP, kp)496 async def get_kp(self, config_id: int = 0) -> int:497 """498 Get the PID Kp parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.499 Parameters500 ----------501 config_id: {0, 1}, default=0502 The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.503 Raises504 ------505 FunctionNotImplementedError506 If the firmware version does not support the request.507 """508 assert config_id in (0, 1)509 if config_id == 0:510 return await self.get_by_function_id(PidFunctionID.GET_KP)511 if self.api_version >= (0, 11, 0):512 return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_KP)513 raise FunctionNotImplementedError(514 f"{PidFunctionID.GET_SECONDARY_KP.name} is only supported in api version >= 0.11.0"515 )516 async def set_ki(self, ki: int, config_id: int = 0) -> None: # pylint: disable=invalid-name517 """518 Set the PID Ki parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.519 Parameters520 ----------521 ki: int522 The parameter value in Q16.16 format (32 bit)523 config_id: {0, 1}, default=0524 The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.525 """526 assert config_id in (0, 1)527 if config_id == 0:528 await self.__set_kx(PidFunctionID.SET_KI, ki)529 else:530 await self.__set_kx(PidFunctionID.SET_SECONDARY_KI, ki)531 async def get_ki(self, config_id: int = 0) -> int:532 """533 Get the PID Ki parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.534 Parameters535 ----------536 config_id: {0, 1}, default=0537 The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.538 Raises539 ------540 FunctionNotImplementedError541 If the firmware version does not support the request.542 """543 assert config_id in (0, 1)544 if config_id == 0:545 return await self.get_by_function_id(PidFunctionID.GET_KI)546 if self.api_version >= (0, 11, 0):547 return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_KI)548 raise FunctionNotImplementedError(549 f"{PidFunctionID.GET_SECONDARY_KI.name} is only supported in api version >= 0.11.0"550 )551 async def set_kd(self, kd: int, config_id: int = 0) -> None: # pylint: disable=invalid-name552 """553 Set the PID Kd parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.554 Parameters555 ----------556 kd: int557 The parameter value in Q16.16 format (32 bit)558 config_id: {0, 1}, default=0559 The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.560 """561 assert config_id in (0, 1)562 if config_id == 0:563 await self.__set_kx(PidFunctionID.SET_KD, kd)564 else:565 await self.__set_kx(PidFunctionID.SET_SECONDARY_KD, kd)566 async def get_kd(self, config_id: int = 0) -> int:567 """568 Get the PID Kd parameter. The Kp, Ki, Kd parameters are stored in Q16.16 format.569 Parameters570 ----------571 config_id: {0, 1}, default=0572 The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.573 Raises574 ------575 FunctionNotImplementedError576 If the firmware version does not support the request.577 """578 assert config_id in (0, 1)579 if config_id == 0:580 return await self.get_by_function_id(PidFunctionID.GET_KD)581 if self.api_version >= (0, 11, 0):582 return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_KD)583 raise FunctionNotImplementedError(584 f"{PidFunctionID.GET_SECONDARY_KD.name} is only supported in api version >= 0.11.0"585 )586 async def set_input(self, value: int, return_output: bool = False) -> int | None:587 """588 Set the input, which is fed to the PID controller. The value is in Q16.16 format.589 Parameters590 ----------591 value: int592 The input value in Q16.16 format593 return_output: bool594 Returns the output of the controller if True595 Returns596 -------597 int or None:598 Returns the output of the controller if return_output is set599 """600 # We need to send a multi_request, because if return_output is True, we want to get the601 # output after the input has been set602 request: dict[int, int | None] = {PidFunctionID.SET_INPUT: int(value)}603 if return_output:604 request[PidFunctionID.GET_OUTPUT] = None605 result = await self.send_multi_request(request)606 # We need to test for errors, which would normally be done by __send_single_request()607 self.__test_for_errors(result, PidFunctionID.SET_INPUT)608 if return_output:609 return result[PidFunctionID.GET_OUTPUT]610 return None611 async def set_setpoint(self, value: int, config_id: int = 0) -> None:612 """613 Set the PID setpoint. The value is in Q16.16 format.614 Parameters615 ----------616 value: int617 The setpoint of the PID controller in Q16.16 format.618 config_id: {0, 1}, default=0619 The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.620 Raises621 ------622 FunctionNotImplementedError623 If the firmware version does not support the request.624 """625 assert config_id in (0, 1)626 try:627 if config_id == 0:628 await self.__send_single_request(PidFunctionID.SET_SETPOINT, int(value))629 else:630 if self.api_version >= (0, 11, 0):631 await self.__send_single_request(PidFunctionID.SET_SECONDARY_SETPOINT, int(value))632 else:633 raise FunctionNotImplementedError(634 f"{PidFunctionID.SET_SECONDARY_SETPOINT.name} is only supported in api version >= 0.11.0"635 )636 except InvalidFormatError:637 raise ValueError("Invalid setpoint") from None638 async def get_setpoint(self, config_id: int = 0) -> int:639 """640 Get the PID setpoint. The value is in Q16.16 format.641 Parameters642 ----------643 config_id: {0, 1}, default=0644 The id of the parameter set. The controller supports two pid parameter sets. Either 0 or 1.645 Returns646 -------647 int648 The setpoint value for the given config set.649 Raises650 ------651 FunctionNotImplementedError652 If the firmware version does not support the request.653 """654 assert config_id in (0, 1)655 if config_id == 0:656 return await self.get_by_function_id(PidFunctionID.GET_SETPOINT)657 # Only allow the secondary parameter set on API version >=0.11.0658 if self.api_version >= (0, 11, 0):659 return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_SETPOINT)660 raise FunctionNotImplementedError(661 f"{PidFunctionID.GET_SECONDARY_SETPOINT.name} is only supported in api version >= 0.11.0"662 )663 async def set_secondary_config(self, config_id: int) -> None:664 """665 Set the configuration used when running in fallback mode.666 Parameters667 ----------668 config_id: {0, 1}669 The configuration to be used. The controller supports two pid parameter sets. Either 0 or 1.670 Raises671 ------672 FunctionNotImplementedError673 If the firmware version does not support the request.674 InvalidFormatError675 If the config_id is not in {0, 1}.676 """677 assert config_id in (0, 1)678 if self.api_version < (0, 11, 0):679 raise FunctionNotImplementedError(680 f"{PidFunctionID.SET_SECONDARY_PID_PARAMETER_SET.name} is only supported in api version >= 0.11.0"681 )682 try:683 await self.__send_single_request(PidFunctionID.SET_SECONDARY_PID_PARAMETER_SET, int(config_id))684 except InvalidFormatError:685 raise ValueError("Invalid configuration set. Use either 0 or 1.") from None686 async def get_secondary_config(self) -> int:687 """688 Set the configuration id used when running in fallback mode.689 Returns690 -------691 int692 The configuration used, when running in fallback mode. The controller supports two pid parameter sets.693 Either 0 or 1.694 """695 if self.api_version < (0, 11, 0):696 raise FunctionNotImplementedError(697 f"{PidFunctionID.GET_SECONDARY_PID_PARAMETER_SET.name} is only supported in api version >= 0.11.0"698 )699 return await self.get_by_function_id(PidFunctionID.GET_SECONDARY_PID_PARAMETER_SET)700 async def set_fallback_update_interval(self, value: int):701 """702 Set the update interval, when running in fallback mode. The Controller will feed a value from the internal703 sensor to its PID controller every {value} ms.704 Parameters705 ----------706 value: int707 The update interval in ms708 """709 assert value > 0710 try:711 await self.__send_single_request(PidFunctionID.SET_FALLBACK_UPDATE_INTERVAL, int(value))712 except InvalidFormatError:713 raise ValueError("Invalid calibration offset") from None714 async def get_fallback_update_interval(self) -> int:715 """716 The update interval, which is used when running in fallback mode. The return value is in ms.717 Returns718 -------719 int:720 The number of ms between updates with the backup sensor when running in fallback mode.721 """722 return await self.get_by_function_id(PidFunctionID.GET_FALLBACK_UPDATE_INTERVAL)723 async def reset(self) -> None:724 """725 Resets the device. This will trigger a hardware reset.726 """727 await self.__send_single_request(PidFunctionID.RESET)728 async def reset_settings(self) -> None:729 """730 Resets the device to default values.731 """732 await self.__send_single_request(PidFunctionID.RESET_SETTINGS)733 async def set_serial(self, serial: int) -> None:734 """735 Set the serial number of the device736 Parameters737 ----------738 serial: int739 The serial number. Maximum: 4294967295 (32 bit)740 Raises741 ------742 ValueError743 If the serial number is not valid.744 """745 try:746 await self.__send_single_request(PidFunctionID.SET_SERIAL_NUMBER, int(serial))747 except InvalidFormatError:748 raise ValueError("Invalid serial number") from None749 async def get_active_connection_count(self) -> int:750 """751 Get the number of open sockets.752 Returns753 -------754 int755 The number of sockets756 Raises757 ------758 FunctionNotImplementedError759 If the firmware version does not support the request.760 """761 if self.api_version < (0, 11, 0):762 raise FunctionNotImplementedError(763 f"{PidFunctionID.GET_ACTIVE_CONNECTION_COUNT.name} is only supported in api version >= 0.11.0"764 )765 return await self.get_by_function_id(PidFunctionID.GET_ACTIVE_CONNECTION_COUNT)766 async def get_by_function_id(self, function_id: PidFunctionID | int) -> Any:767 """768 Query a value by function id, instead of calling the named function.769 Parameters770 ----------771 function_id: PidFunctionID or int772 The function to query773 Returns774 -------775 Any776 The result of the query.777 Raises778 ------779 InvalidCommandError780 If the function id was given as an integer and is unknown781 """782 try:783 function_id = PidFunctionID(function_id)784 except ValueError:785 raise InvalidCommandError(f"Command {function_id} is invalid.") from None786 assert function_id.value < 0 # all getter have negative ids787 result = await self.__send_single_request(function_id)788 if function_id in self.__raw_to_unit:789 result = self.__raw_to_unit[function_id](result)...

Full Screen

Full Screen

client.py

Source:client.py Github

copy

Full Screen

...80 for h in boring:81 if h in headers:82 del (headers[h])83 return headers84 def __send_single_request(self, req, trace=False):85 p = self.session.prepare_request(req)86 if trace:87 logger.debug("Making request: %s %s Headers: %s Body: %s",88 p.method, p.url, p.headers, p.body)89 resp = self.session.send(p, timeout=self.connection_timeout)90 if trace:91 logger.debug("Got response in %ss: %s %s Headers: %s Body: %s",92 resp.elapsed.total_seconds(), resp.reason,93 resp.status_code, self.filter_headers(resp.headers),94 resp.content)95 if resp.status_code in [500, 502, 503, 504]:96 raise self.NotAvailable(97 request="request: %s %s\n\tHeaders: %s\n\tBody: %s" %98 (p.method,99 p.url,100 p.headers,101 p.body),102 response="Got response in %ss: %s %s\n\tHeaders: %s\n\tBody: %s" %103 (resp.elapsed.total_seconds(),104 resp.reason,105 resp.status_code,106 self.filter_headers(107 resp.headers),108 resp.content))109 elif resp.status_code == 410:110 raise self.StoppedFromOnline111 elif resp.status_code == 423:112 raise self.UnderMaintenance113 else:114 resp.raise_for_status()115 return resp116 def __make_api_request(117 self,118 http_method,119 path,120 data=None,121 response_callback=lambda x: x,122 writer=False,123 trace=False,124 json=None,125 maintenance_timeouts=None,126 maintenance_msg=None):127 url = urljoin(self.base_url, path)128 if json:129 request = requests.Request(130 http_method, url, json=json, headers={'User-Agent': self.user_agent}, params=self.params)131 else:132 request = requests.Request(133 http_method, url, data=data, headers={'User-Agent': self.user_agent}, params=self.params)134 network_timeouts = self.network_timeouts()135 maintenance_timeouts = maintenance_timeouts or self.maintenance_timeouts()136 maintenance_msg = maintenance_msg or "%s is under maintenance" % (self._base_url)137 while True:138 try:139 response = self.__send_single_request(request, trace=trace)140 return response_callback(response)141 except (Timeout, ConnectionError):142 logger.warn(traceback.format_exc())143 try:144 timeout = next(network_timeouts)145 logger.warn(146 "Network error, will retry in %ss..." %147 timeout)148 time.sleep(timeout)149 continue150 except StopIteration:151 raise self.NetworkError()152 except self.UnderMaintenance as e:153 try:154 timeout = next(maintenance_timeouts)155 logger.warn(maintenance_msg)156 logger.warn("Retrying in %ss..." % timeout)157 time.sleep(timeout)158 continue159 except StopIteration:160 raise e161 def __make_writer_request(162 self,163 params=None,164 json=None,165 http_method="POST",166 trace=False):167 '''168 Send request to writer service.169 '''170 request = requests.Request(171 http_method,172 self.writer_url,173 params=params,174 json=json,175 headers={176 'User-Agent': self.user_agent})177 network_timeouts = self.network_timeouts()178 maintenance_timeouts = self.maintenance_timeouts()179 while True:180 try:181 response = self.__send_single_request(request, trace=trace)182 return response183 except (Timeout, ConnectionError):184 logger.warn(traceback.format_exc())185 try:186 timeout = next(network_timeouts)187 logger.warn(188 "Network error, will retry in %ss..." %189 timeout)190 time.sleep(timeout)191 continue192 except StopIteration:193 raise self.NetworkError()194 except self.UnderMaintenance as e:195 try:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful