How to use get_device_channels_info method in lisa

Best Python code snippet using lisa_python

ethtool.py

Source:ethtool.py Github

copy

Full Screen

...454 self._device_set.add(cmd_result.stdout)455 if not self._device_set:456 raise LisaException("Did not find any synthetic network interface.")457 return self._device_set458 def get_device_channels_info(459 self, interface: str, force_run: bool = False460 ) -> DeviceChannel:461 device = self._get_or_create_device_setting(interface)462 if not force_run and device.device_channel:463 return device.device_channel464 result = self.run(f"-l {interface}", force_run=force_run)465 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):466 raise UnsupportedOperationException(467 "ethtool -l {interface} operation not supported."468 )469 result.assert_exit_code(470 message=f"Couldn't get device {interface} channels info."471 )472 device_channel_info = DeviceChannel(interface, result.stdout)473 # Find the vCPU count to accurately get max channels for the device.474 lscpu = self.node.tools[Lscpu]475 vcpu_count = lscpu.get_core_count(force_run=True)476 if vcpu_count < device_channel_info.max_channels:477 device_channel_info.max_channels = vcpu_count478 device.device_channel = device_channel_info479 return device_channel_info480 def change_device_channels_info(481 self,482 interface: str,483 channel_count: int,484 ) -> DeviceChannel:485 change_result = self.run(486 f"-L {interface} combined {channel_count}", sudo=True, force_run=True487 )488 change_result.assert_exit_code(489 message=f" Couldn't change device {interface} channels count."490 )491 return self.get_device_channels_info(interface, force_run=True)492 def get_device_enabled_features(493 self, interface: str, force_run: bool = False494 ) -> DeviceFeatures:495 device = self._get_or_create_device_setting(interface)496 if not force_run and device.device_features:497 return device.device_features498 result = self.run(f"-k {interface}", force_run=force_run)499 result.assert_exit_code()500 device.device_features = DeviceFeatures(interface, result.stdout)501 return device.device_features502 def get_device_gro_lro_settings(503 self, interface: str, force_run: bool = False504 ) -> DeviceGroLroSettings:505 device = self._get_or_create_device_setting(interface)506 if not force_run and device.device_gro_lro_settings:507 return device.device_gro_lro_settings508 result = self.run(f"-k {interface}", force_run=force_run)509 result.assert_exit_code()510 device.device_gro_lro_settings = DeviceGroLroSettings(interface, result.stdout)511 return device.device_gro_lro_settings512 def change_device_gro_lro_settings(513 self, interface: str, gro_setting: bool, lro_setting: bool514 ) -> DeviceGroLroSettings:515 gro = "on" if gro_setting else "off"516 lro = "on" if lro_setting else "off"517 change_result = self.run(518 f"-K {interface} gro {gro} lro {lro}",519 sudo=True,520 force_run=True,521 )522 change_result.assert_exit_code(523 message=f" Couldn't change device {interface} GRO LRO settings."524 )525 return self.get_device_gro_lro_settings(interface, force_run=True)526 def get_device_link_settings(self, interface: str) -> DeviceLinkSettings:527 device = self._get_or_create_device_setting(interface)528 if device.device_link_settings:529 return device.device_link_settings530 result = self.run(interface)531 result.assert_exit_code()532 link_settings = DeviceLinkSettings(interface, result.stdout)533 device.device_link_settings = link_settings534 # Caching the message level settings if captured in DeviceLinkSettings.535 # Not returning this info from this method. Only caching.536 if link_settings.msg_level_number and link_settings.msg_level_name:537 msg_level_settings = DeviceMessageLevel(538 interface,539 msg_level_number=link_settings.msg_level_number,540 msg_level_name=link_settings.msg_level_name,541 )542 device.device_msg_level = msg_level_settings543 return link_settings544 def get_device_msg_level(545 self, interface: str, force_run: bool = False546 ) -> DeviceMessageLevel:547 device = self._get_or_create_device_setting(interface)548 if not force_run and device.device_msg_level:549 return device.device_msg_level550 result = self.run(interface, force_run=force_run)551 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):552 raise UnsupportedOperationException(553 f"ethtool {interface} operation not supported."554 )555 result.assert_exit_code(556 message=f"Couldn't get device {interface} message level information"557 )558 msg_level_settings = DeviceMessageLevel(interface, result.stdout)559 device.device_msg_level = msg_level_settings560 # caching the link settings if captured in DeviceMessageLevel.561 # will not this info from this method. Only caching.562 if msg_level_settings.link_settings:563 link_settings = DeviceLinkSettings(564 interface, link_settings=msg_level_settings.link_settings565 )566 device.device_link_settings = link_settings567 return msg_level_settings568 def set_unset_device_message_flag_by_name(569 self, interface: str, msg_flag: List[str], set: bool570 ) -> DeviceMessageLevel:571 if set:572 result = self.run(573 f"-s {interface} msglvl {' on '.join(flag for flag in msg_flag)} on",574 sudo=True,575 force_run=True,576 )577 result.assert_exit_code(578 message=f" Couldn't set device {interface} message flag/s {msg_flag}."579 )580 else:581 result = self.run(582 f"-s {interface} msglvl {' off '.join(flag for flag in msg_flag)} off",583 sudo=True,584 force_run=True,585 )586 result.assert_exit_code(587 message=f" Couldn't unset device {interface} message flag/s {msg_flag}."588 )589 return self.get_device_msg_level(interface, force_run=True)590 def set_device_message_flag_by_num(591 self, interface: str, msg_flag: str592 ) -> DeviceMessageLevel:593 result = self.run(594 f"-s {interface} msglvl {msg_flag}",595 sudo=True,596 force_run=True,597 )598 result.assert_exit_code(599 message=f" Couldn't set device {interface} message flag {msg_flag}."600 )601 return self.get_device_msg_level(interface, force_run=True)602 def get_device_ring_buffer_settings(603 self, interface: str, force_run: bool = False604 ) -> DeviceRingBufferSettings:605 device = self._get_or_create_device_setting(interface)606 if not force_run and device.device_ringbuffer_settings:607 return device.device_ringbuffer_settings608 result = self.run(f"-g {interface}", force_run=force_run)609 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):610 raise UnsupportedOperationException(611 f"ethtool -g {interface} operation not supported."612 )613 result.assert_exit_code(614 message=f"Couldn't get device {interface} ring buffer settings."615 )616 device.device_ringbuffer_settings = DeviceRingBufferSettings(617 interface, result.stdout618 )619 return device.device_ringbuffer_settings620 def change_device_ring_buffer_settings(621 self, interface: str, rx: int, tx: int622 ) -> DeviceRingBufferSettings:623 change_result = self.run(624 f"-G {interface} rx {rx} tx {tx}", sudo=True, force_run=True625 )626 change_result.assert_exit_code(627 message=f" Couldn't change device {interface} ring buffer settings."628 )629 return self.get_device_ring_buffer_settings(interface, force_run=True)630 def get_device_rss_hash_key(631 self, interface: str, force_run: bool = False632 ) -> DeviceRssHashKey:633 device = self._get_or_create_device_setting(interface)634 if not force_run and device.device_rss_hash_key:635 return device.device_rss_hash_key636 result = self.run(f"-x {interface}", force_run=force_run)637 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):638 raise UnsupportedOperationException(639 f"ethtool -x {interface} operation not supported."640 )641 result.assert_exit_code(642 message=f"Couldn't get device {interface} ring buffer settings."643 )644 device.device_rss_hash_key = DeviceRssHashKey(interface, result.stdout)645 return device.device_rss_hash_key646 def change_device_rss_hash_key(647 self, interface: str, hash_key: str648 ) -> DeviceRssHashKey:649 result = self.run(f"-X {interface} hkey {hash_key}", sudo=True, force_run=True)650 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):651 raise UnsupportedOperationException(652 f"Changing RSS hash key with 'ethtool -X {interface}' not supported."653 )654 result.assert_exit_code(655 message=f" Couldn't change device {interface} hash key."656 )657 return self.get_device_rss_hash_key(interface, force_run=True)658 def get_device_rx_hash_level(659 self, interface: str, protocol: str, force_run: bool = False660 ) -> DeviceRxHashLevel:661 device = self._get_or_create_device_setting(interface)662 if (663 not force_run664 and device.device_rx_hash_level665 and (protocol in device.device_rx_hash_level.protocol_hash_map.keys())666 ):667 return device.device_rx_hash_level668 result = self.run(669 f"-n {interface} rx-flow-hash {protocol}", force_run=force_run670 )671 if "Operation not supported" in result.stdout:672 raise UnsupportedOperationException(673 f"ethtool -n {interface} operation not supported."674 )675 result.assert_exit_code(676 message=f"Couldn't get device {interface} RX flow hash level for"677 f" protocol {protocol}."678 )679 if device.device_rx_hash_level:680 device.device_rx_hash_level._parse_rx_hash_level(681 interface, protocol, result.stdout682 )683 device_rx_hash_level = device.device_rx_hash_level684 else:685 device_rx_hash_level = DeviceRxHashLevel(interface, protocol, result.stdout)686 device.device_rx_hash_level = device_rx_hash_level687 return device_rx_hash_level688 def change_device_rx_hash_level(689 self, interface: str, protocol: str, enable: bool690 ) -> DeviceRxHashLevel:691 param = "sd"692 if enable:693 param = "sdfn"694 result = self.run(695 f"-N {interface} rx-flow-hash {protocol} {param}",696 sudo=True,697 force_run=True,698 )699 if "Operation not supported" in result.stdout:700 raise UnsupportedOperationException(701 f"ethtool -N {interface} rx-flow-hash {protocol} {param}"702 " operation not supported."703 )704 result.assert_exit_code(705 message=f" Couldn't change device {interface} hash level for {protocol}."706 )707 return self.get_device_rx_hash_level(interface, protocol, force_run=True)708 def get_device_sg_settings(709 self, interface: str, force_run: bool = False710 ) -> DeviceSgSettings:711 device = self._get_or_create_device_setting(interface)712 if not force_run and device.device_sg_settings:713 return device.device_sg_settings714 result = self.run(f"-k {interface}", force_run=force_run)715 result.assert_exit_code()716 device.device_sg_settings = DeviceSgSettings(interface, result.stdout)717 return device.device_sg_settings718 def change_device_sg_settings(719 self, interface: str, sg_setting: bool720 ) -> DeviceSgSettings:721 sg = "on" if sg_setting else "off"722 change_result = self.run(723 f"-K {interface} sg {sg}",724 sudo=True,725 force_run=True,726 )727 change_result.assert_exit_code(728 message=f" Couldn't change device {interface} scatter-gather settings."729 )730 return self.get_device_sg_settings(interface, force_run=True)731 def get_device_statistics(732 self, interface: str, force_run: bool = False733 ) -> DeviceStatistics:734 device = self._get_or_create_device_setting(interface)735 if not force_run and device.device_statistics:736 return device.device_statistics737 result = self.run(f"-S {interface}", force_run=True)738 if (result.exit_code != 0) and (739 "Operation not supported" in result.stdout740 or "no stats available" in result.stdout741 ):742 raise UnsupportedOperationException(743 f"ethtool -S {interface} operation not supported."744 )745 result.assert_exit_code(message=f"Couldn't get device {interface} statistics.")746 device.device_statistics = DeviceStatistics(interface, result.stdout)747 return device.device_statistics748 def get_device_statistics_delta(749 self, interface: str, previous_statistics: Dict[str, int]750 ) -> Dict[str, int]:751 """752 use this method to get the delta of an operation.753 """754 new_statistics = self.get_device_statistics(755 interface=interface, force_run=True756 ).counters757 for key, value in previous_statistics.items():758 new_statistics[key] = new_statistics.get(key, 0) - value759 self._log.debug(f"none-zero delta statistics on {interface}:")760 self._log.debug(761 {key: value for key, value in new_statistics.items() if value != 0}762 )763 return new_statistics764 def get_device_firmware_version(765 self, interface: str, force_run: bool = False766 ) -> str:767 device = self._get_or_create_device_setting(interface)768 if not force_run and device.device_firmware_version:769 return device.device_firmware_version770 result = self.run(f"-i {interface}", force_run=force_run)771 if (result.exit_code != 0) and ("Operation not supported" in result.stdout):772 raise UnsupportedOperationException(773 f"ethtool -i {interface} operation not supported."774 )775 result.assert_exit_code(776 message=f"Couldn't get device {interface} firmware version info."777 )778 firmware_version_pattern = self._firmware_version_pattern.search(result.stdout)779 if not firmware_version_pattern:780 raise LisaException(781 f"Cannot get {interface} device firmware version information"782 )783 firmware_version = firmware_version_pattern.group("value")784 device.device_firmware_version = firmware_version785 return firmware_version786 def get_all_device_channels_info(self) -> List[DeviceChannel]:787 devices_channel_list = []788 devices = self.get_device_list()789 for device in devices:790 devices_channel_list.append(self.get_device_channels_info(device))791 return devices_channel_list792 def get_all_device_enabled_features(793 self, force_run: bool = False794 ) -> List[DeviceFeatures]:795 devices_features_list = []796 devices = self.get_device_list(force_run)797 for device in devices:798 devices_features_list.append(799 self.get_device_enabled_features(device, force_run)800 )801 return devices_features_list802 def get_all_device_gro_lro_settings(self) -> List[DeviceGroLroSettings]:803 devices_gro_lro_settings = []804 devices = self.get_device_list()...

Full Screen

Full Screen

cpusuite.py

Source:cpusuite.py Github

copy

Full Screen

...172 idle_cpus = get_idle_cpus(node)173 log.debug(f"idle cpus: {idle_cpus}")174 # save origin current channel175 origin_device_channel = (176 node.tools[Ethtool].get_device_channels_info("eth0", True)177 ).current_channels178 # set channel count into 1 to get idle cpus179 if len(idle_cpus) == 0:180 node.tools[Ethtool].change_device_channels_info("eth0", 1)181 idle_cpus = get_idle_cpus(node)182 log.debug(f"idle cpus: {idle_cpus}")183 if len(idle_cpus) == 0:184 raise SkippedException(185 "all of the cpu are associated vmbus channels, "186 "no idle cpu can be used to test hotplug."187 )188 # set idle cpu state offline and change channels189 # current max channel will be cpu_count - len(idle_cpus)190 # check channels of synthetic network adapter align with current setting channel191 try:192 # take idle cpu to offline193 set_cpu_state_serial(log, node, idle_cpus, CPUState.OFFLINE)194 # get vmbus channels of synthetic network adapter. the synthetic network195 # drivers have class id "f8615163-df3e-46c5-913f-f2d2f965ed0e"196 node.tools[Lsvmbus].get_device_channels(force_run=True)197 cpu_count = node.tools[Lscpu].get_core_count()198 # current max channel count need minus count of idle cpus199 max_channel_count = cpu_count - len(idle_cpus)200 # change current channel201 first_channel_count = random.randint(1, min(max_channel_count, 64))202 node.tools[Ethtool].change_device_channels_info("eth0", first_channel_count)203 # verify that the added channels do not handle interrupts on offline cpu204 lsvmbus_channels = node.tools[Lsvmbus].get_device_channels(force_run=True)205 for channel in lsvmbus_channels:206 # verify synthetic network adapter channels align with expected value207 if channel.class_id == "f8615163-df3e-46c5-913f-f2d2f965ed0e":208 log.debug(f"Network synthetic channel: {channel}")209 assert_that(channel.channel_vp_map).is_length(first_channel_count)210 # verify that devices do not handle interrupts on offline cpu211 for channel_vp in channel.channel_vp_map:212 assert_that(channel_vp.target_cpu).is_not_in(idle_cpus)213 # reset idle cpu to online214 set_cpu_state_serial(log, node, idle_cpus, CPUState.ONLINE)215 # reset max and current channel count into original ones216 # by reloading hv_netvsc driver217 node.tools[Modprobe].reload(["hv_netvsc"])218 # change the combined channels count after all cpus online219 second_channel_count = random.randint(1, min(cpu_count, 64))220 while True:221 if first_channel_count != second_channel_count:222 break223 second_channel_count = random.randint(1, min(cpu_count, 64))224 node.tools[Ethtool].change_device_channels_info(225 "eth0", second_channel_count226 )227 # verify that the network adapter channels count changed228 # into new channel count229 lsvmbus_channels = node.tools[Lsvmbus].get_device_channels(force_run=True)230 for channel in lsvmbus_channels:231 # verify that channels were added to synthetic network adapter232 if channel.class_id == "f8615163-df3e-46c5-913f-f2d2f965ed0e":233 log.debug(f"Network synthetic channel: {channel}")234 assert_that(channel.channel_vp_map).is_length(second_channel_count)235 finally:236 # reset idle cpu to online237 set_cpu_state_serial(log, node, idle_cpus, CPUState.ONLINE)238 # restore channel count into origin value239 current_device_channel = (240 node.tools[Ethtool].get_device_channels_info("eth0", True)241 ).current_channels242 if current_device_channel != origin_device_channel:243 node.tools[Ethtool].change_device_channels_info(244 "eth0", origin_device_channel245 )246 # when kernel doesn't support set vmbus channels target cpu feature, the247 # dict which stores original status is empty, nothing need to be restored....

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful