How to use _get_or_create_device_setting method in lisa

Best Python code snippet using lisa_python

ethtool.py

Source:ethtool.py Github

copy

Full Screen

...457 return self._device_set458 def get_device_channels_info(459 self, interface: str, force_run: bool = False460 ) -> DeviceChannel:461 device = self._get_or_create_device_setting(interface)462 if not force_run and device.device_channel:463 return device.device_channel464 result = self.run(f"-l {interface}", force_run=force_run)465 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):466 raise UnsupportedOperationException(467 "ethtool -l {interface} operation not supported."468 )469 result.assert_exit_code(470 message=f"Couldn't get device {interface} channels info."471 )472 device_channel_info = DeviceChannel(interface, result.stdout)473 # Find the vCPU count to accurately get max channels for the device.474 lscpu = self.node.tools[Lscpu]475 vcpu_count = lscpu.get_core_count(force_run=True)476 if vcpu_count < device_channel_info.max_channels:477 device_channel_info.max_channels = vcpu_count478 device.device_channel = device_channel_info479 return device_channel_info480 def change_device_channels_info(481 self,482 interface: str,483 channel_count: int,484 ) -> DeviceChannel:485 change_result = self.run(486 f"-L {interface} combined {channel_count}", sudo=True, force_run=True487 )488 change_result.assert_exit_code(489 message=f" Couldn't change device {interface} channels count."490 )491 return self.get_device_channels_info(interface, force_run=True)492 def get_device_enabled_features(493 self, interface: str, force_run: bool = False494 ) -> DeviceFeatures:495 device = self._get_or_create_device_setting(interface)496 if not force_run and device.device_features:497 return device.device_features498 result = self.run(f"-k {interface}", force_run=force_run)499 result.assert_exit_code()500 device.device_features = DeviceFeatures(interface, result.stdout)501 return device.device_features502 def get_device_gro_lro_settings(503 self, interface: str, force_run: bool = False504 ) -> DeviceGroLroSettings:505 device = self._get_or_create_device_setting(interface)506 if not force_run and device.device_gro_lro_settings:507 return device.device_gro_lro_settings508 result = self.run(f"-k {interface}", force_run=force_run)509 result.assert_exit_code()510 device.device_gro_lro_settings = DeviceGroLroSettings(interface, result.stdout)511 return device.device_gro_lro_settings512 def change_device_gro_lro_settings(513 self, interface: str, gro_setting: bool, lro_setting: bool514 ) -> DeviceGroLroSettings:515 gro = "on" if gro_setting else "off"516 lro = "on" if lro_setting else "off"517 change_result = self.run(518 f"-K {interface} gro {gro} lro {lro}",519 sudo=True,520 force_run=True,521 )522 change_result.assert_exit_code(523 message=f" Couldn't change device {interface} GRO LRO settings."524 )525 return self.get_device_gro_lro_settings(interface, force_run=True)526 def get_device_link_settings(self, interface: str) -> DeviceLinkSettings:527 device = self._get_or_create_device_setting(interface)528 if device.device_link_settings:529 return device.device_link_settings530 result = self.run(interface)531 result.assert_exit_code()532 link_settings = DeviceLinkSettings(interface, result.stdout)533 device.device_link_settings = link_settings534 # Caching the message level settings if captured in DeviceLinkSettings.535 # Not returning this info from this method. Only caching.536 if link_settings.msg_level_number and link_settings.msg_level_name:537 msg_level_settings = DeviceMessageLevel(538 interface,539 msg_level_number=link_settings.msg_level_number,540 msg_level_name=link_settings.msg_level_name,541 )542 device.device_msg_level = msg_level_settings543 return link_settings544 def get_device_msg_level(545 self, interface: str, force_run: bool = False546 ) -> DeviceMessageLevel:547 device = self._get_or_create_device_setting(interface)548 if not force_run and device.device_msg_level:549 return device.device_msg_level550 result = self.run(interface, force_run=force_run)551 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):552 raise UnsupportedOperationException(553 f"ethtool {interface} operation not supported."554 )555 result.assert_exit_code(556 message=f"Couldn't get device {interface} message level information"557 )558 msg_level_settings = DeviceMessageLevel(interface, result.stdout)559 device.device_msg_level = msg_level_settings560 # caching the link settings if captured in DeviceMessageLevel.561 # will not this info from this method. Only caching.562 if msg_level_settings.link_settings:563 link_settings = DeviceLinkSettings(564 interface, link_settings=msg_level_settings.link_settings565 )566 device.device_link_settings = link_settings567 return msg_level_settings568 def set_unset_device_message_flag_by_name(569 self, interface: str, msg_flag: List[str], set: bool570 ) -> DeviceMessageLevel:571 if set:572 result = self.run(573 f"-s {interface} msglvl {' on '.join(flag for flag in msg_flag)} on",574 sudo=True,575 force_run=True,576 )577 result.assert_exit_code(578 message=f" Couldn't set device {interface} message flag/s {msg_flag}."579 )580 else:581 result = self.run(582 f"-s {interface} msglvl {' off '.join(flag for flag in msg_flag)} off",583 sudo=True,584 force_run=True,585 )586 result.assert_exit_code(587 message=f" Couldn't unset device {interface} message flag/s {msg_flag}."588 )589 return self.get_device_msg_level(interface, force_run=True)590 def set_device_message_flag_by_num(591 self, interface: str, msg_flag: str592 ) -> DeviceMessageLevel:593 result = self.run(594 f"-s {interface} msglvl {msg_flag}",595 sudo=True,596 force_run=True,597 )598 result.assert_exit_code(599 message=f" Couldn't set device {interface} message flag {msg_flag}."600 )601 return self.get_device_msg_level(interface, force_run=True)602 def get_device_ring_buffer_settings(603 self, interface: str, force_run: bool = False604 ) -> DeviceRingBufferSettings:605 device = self._get_or_create_device_setting(interface)606 if not force_run and device.device_ringbuffer_settings:607 return device.device_ringbuffer_settings608 result = self.run(f"-g {interface}", force_run=force_run)609 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):610 raise UnsupportedOperationException(611 f"ethtool -g {interface} operation not supported."612 )613 result.assert_exit_code(614 message=f"Couldn't get device {interface} ring buffer settings."615 )616 device.device_ringbuffer_settings = DeviceRingBufferSettings(617 interface, result.stdout618 )619 return device.device_ringbuffer_settings620 def change_device_ring_buffer_settings(621 self, interface: str, rx: int, tx: int622 ) -> DeviceRingBufferSettings:623 change_result = self.run(624 f"-G {interface} rx {rx} tx {tx}", sudo=True, force_run=True625 )626 change_result.assert_exit_code(627 message=f" Couldn't change device {interface} ring buffer settings."628 )629 return self.get_device_ring_buffer_settings(interface, force_run=True)630 def get_device_rss_hash_key(631 self, interface: str, force_run: bool = False632 ) -> DeviceRssHashKey:633 device = self._get_or_create_device_setting(interface)634 if not force_run and device.device_rss_hash_key:635 return device.device_rss_hash_key636 result = self.run(f"-x {interface}", force_run=force_run)637 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):638 raise UnsupportedOperationException(639 f"ethtool -x {interface} operation not supported."640 )641 result.assert_exit_code(642 message=f"Couldn't get device {interface} ring buffer settings."643 )644 device.device_rss_hash_key = DeviceRssHashKey(interface, result.stdout)645 return device.device_rss_hash_key646 def change_device_rss_hash_key(647 self, interface: str, hash_key: str648 ) -> DeviceRssHashKey:649 result = self.run(f"-X {interface} hkey {hash_key}", sudo=True, force_run=True)650 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):651 raise UnsupportedOperationException(652 f"Changing RSS hash key with 'ethtool -X {interface}' not supported."653 )654 result.assert_exit_code(655 message=f" Couldn't change device {interface} hash key."656 )657 return self.get_device_rss_hash_key(interface, force_run=True)658 def get_device_rx_hash_level(659 self, interface: str, protocol: str, force_run: bool = False660 ) -> DeviceRxHashLevel:661 device = self._get_or_create_device_setting(interface)662 if (663 not force_run664 and device.device_rx_hash_level665 and (protocol in device.device_rx_hash_level.protocol_hash_map.keys())666 ):667 return device.device_rx_hash_level668 result = self.run(669 f"-n {interface} rx-flow-hash {protocol}", force_run=force_run670 )671 if "Operation not supported" in result.stdout:672 raise UnsupportedOperationException(673 f"ethtool -n {interface} operation not supported."674 )675 result.assert_exit_code(676 message=f"Couldn't get device {interface} RX flow hash level for"677 f" protocol {protocol}."678 )679 if device.device_rx_hash_level:680 device.device_rx_hash_level._parse_rx_hash_level(681 interface, protocol, result.stdout682 )683 device_rx_hash_level = device.device_rx_hash_level684 else:685 device_rx_hash_level = DeviceRxHashLevel(interface, protocol, result.stdout)686 device.device_rx_hash_level = device_rx_hash_level687 return device_rx_hash_level688 def change_device_rx_hash_level(689 self, interface: str, protocol: str, enable: bool690 ) -> DeviceRxHashLevel:691 param = "sd"692 if enable:693 param = "sdfn"694 result = self.run(695 f"-N {interface} rx-flow-hash {protocol} {param}",696 sudo=True,697 force_run=True,698 )699 if "Operation not supported" in result.stdout:700 raise UnsupportedOperationException(701 f"ethtool -N {interface} rx-flow-hash {protocol} {param}"702 " operation not supported."703 )704 result.assert_exit_code(705 message=f" Couldn't change device {interface} hash level for {protocol}."706 )707 return self.get_device_rx_hash_level(interface, protocol, force_run=True)708 def get_device_sg_settings(709 self, interface: str, force_run: bool = False710 ) -> DeviceSgSettings:711 device = self._get_or_create_device_setting(interface)712 if not force_run and device.device_sg_settings:713 return device.device_sg_settings714 result = self.run(f"-k {interface}", force_run=force_run)715 result.assert_exit_code()716 device.device_sg_settings = DeviceSgSettings(interface, result.stdout)717 return device.device_sg_settings718 def change_device_sg_settings(719 self, interface: str, sg_setting: bool720 ) -> DeviceSgSettings:721 sg = "on" if sg_setting else "off"722 change_result = self.run(723 f"-K {interface} sg {sg}",724 sudo=True,725 force_run=True,726 )727 change_result.assert_exit_code(728 message=f" Couldn't change device {interface} scatter-gather settings."729 )730 return self.get_device_sg_settings(interface, force_run=True)731 def get_device_statistics(732 self, interface: str, force_run: bool = False733 ) -> DeviceStatistics:734 device = self._get_or_create_device_setting(interface)735 if not force_run and device.device_statistics:736 return device.device_statistics737 result = self.run(f"-S {interface}", force_run=True)738 if (result.exit_code != 0) and (739 "Operation not supported" in result.stdout740 or "no stats available" in result.stdout741 ):742 raise UnsupportedOperationException(743 f"ethtool -S {interface} operation not supported."744 )745 result.assert_exit_code(message=f"Couldn't get device {interface} statistics.")746 device.device_statistics = DeviceStatistics(interface, result.stdout)747 return device.device_statistics748 def get_device_statistics_delta(749 self, interface: str, previous_statistics: Dict[str, int]750 ) -> Dict[str, int]:751 """752 use this method to get the delta of an operation.753 """754 new_statistics = self.get_device_statistics(755 interface=interface, force_run=True756 ).counters757 for key, value in previous_statistics.items():758 new_statistics[key] = new_statistics.get(key, 0) - value759 self._log.debug(f"none-zero delta statistics on {interface}:")760 self._log.debug(761 {key: value for key, value in new_statistics.items() if value != 0}762 )763 return new_statistics764 def get_device_firmware_version(765 self, interface: str, force_run: bool = False766 ) -> str:767 device = self._get_or_create_device_setting(interface)768 if not force_run and device.device_firmware_version:769 return device.device_firmware_version770 result = self.run(f"-i {interface}", force_run=force_run)771 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):772 raise UnsupportedOperationException(773 f"ethtool -i {interface} operation not supported."774 )775 result.assert_exit_code(776 message=f"Couldn't get device {interface} firmware version info."777 )778 firmware_version_pattern = self._firmware_version_pattern.search(result.stdout)779 if not firmware_version_pattern:780 raise LisaException(781 f"Cannot get {interface} device firmware version information"782 )783 firmware_version = firmware_version_pattern.group("value")784 device.device_firmware_version = firmware_version785 return firmware_version786 def get_all_device_channels_info(self) -> List[DeviceChannel]:787 devices_channel_list = []788 devices = self.get_device_list()789 for device in devices:790 devices_channel_list.append(self.get_device_channels_info(device))791 return devices_channel_list792 def get_all_device_enabled_features(793 self, force_run: bool = False794 ) -> List[DeviceFeatures]:795 devices_features_list = []796 devices = self.get_device_list(force_run)797 for device in devices:798 devices_features_list.append(799 self.get_device_enabled_features(device, force_run)800 )801 return devices_features_list802 def get_all_device_gro_lro_settings(self) -> List[DeviceGroLroSettings]:803 devices_gro_lro_settings = []804 devices = self.get_device_list()805 for device in devices:806 devices_gro_lro_settings.append(self.get_device_gro_lro_settings(device))807 return devices_gro_lro_settings808 def get_all_device_link_settings(self) -> List[DeviceLinkSettings]:809 devices_link_settings_list = []810 devices = self.get_device_list()811 for device in devices:812 devices_link_settings_list.append(self.get_device_link_settings(device))813 return devices_link_settings_list814 def get_all_device_msg_level(self) -> List[DeviceMessageLevel]:815 devices_msg_level_list = []816 devices = self.get_device_list()817 for device in devices:818 devices_msg_level_list.append(self.get_device_msg_level(device))819 return devices_msg_level_list820 def get_all_device_ring_buffer_settings(self) -> List[DeviceRingBufferSettings]:821 devices_ring_buffer_settings_list = []822 devices = self.get_device_list()823 for device in devices:824 devices_ring_buffer_settings_list.append(825 self.get_device_ring_buffer_settings(device)826 )827 return devices_ring_buffer_settings_list828 def get_all_device_rss_hash_key(self) -> List[DeviceRssHashKey]:829 devices_rss_hash_keys = []830 devices = self.get_device_list()831 for device in devices:832 devices_rss_hash_keys.append(self.get_device_rss_hash_key(device))833 return devices_rss_hash_keys834 def get_all_device_rx_hash_level(self, protocol: str) -> List[DeviceRxHashLevel]:835 devices_rx_hash_level = []836 devices = self.get_device_list()837 for device in devices:838 devices_rx_hash_level.append(839 self.get_device_rx_hash_level(device, protocol)840 )841 return devices_rx_hash_level842 def get_all_device_statistics(self) -> List[DeviceStatistics]:843 devices_statistics = []844 devices = self.get_device_list()845 for device in devices:846 devices_statistics.append(847 self.get_device_statistics(device, force_run=True)848 )849 return devices_statistics850 def get_all_device_firmware_version(self) -> Dict[str, str]:851 devices_firmware_versions: Dict[str, str] = {}852 devices = self.get_device_list()853 for device in devices:854 devices_firmware_versions[device] = self.get_device_firmware_version(855 device, force_run=True856 )857 return devices_firmware_versions858 def _get_or_create_device_setting(self, interface: str) -> DeviceSettings:859 settings = self._device_settings_map.get(interface, None)860 if settings is None:861 settings = DeviceSettings(interface)862 self._device_settings_map[interface] = settings...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful