Best Python code snippet using lisa_python
sriov.py
Source:sriov.py  
...74        ),75    )76    def sriov_basic_validation(self, environment: Environment) -> None:77        vm_nics = initialize_nic_info(environment)78        sriov_basic_test(environment, vm_nics)79    @TestCaseMetadata(80        description="""81        This case verifies module of sriov network interface is loaded and82         each synthetic nic is paired with one VF, and check rx statistics of source83         and tx statistics of dest increase after send 200 Mb file from source to dest.84        Steps,85        1. Check VF of synthetic nic is paired.86        2. Check module of sriov network device is loaded.87        3. Check VF counts listed from lspci is expected.88        4. Setup SSH connection between source and dest with key authentication.89        5. Ping the dest IP from the source machine to check connectivity.90        6. Generate 200Mb file, copy from source to dest.91        7. Check rx statistics of source VF and tx statistics of dest VF is increased.92        """,93        priority=1,94        requirement=simple_requirement(95            min_count=2,96            network_interface=features.Sriov(),97        ),98    )99    def verify_sriov_single_vf_connection(self, environment: Environment) -> None:100        vm_nics = initialize_nic_info(environment)101        sriov_basic_test(environment, vm_nics)102        sriov_vf_connection_test(environment, vm_nics)103    @TestCaseMetadata(104        description="""105        This case needs 2 nodes and 64 Vcpus. And it verifies module of sriov network106         interface is loaded and each synthetic nic is paired with one VF, and check107         rx statistics of source and tx statistics of dest increase after send 200 Mb108         file from source to dest.109        Steps,110        1. Check VF of synthetic nic is paired.111        2. Check module of sriov network device is loaded.112        3. Check VF counts listed from lspci is expected.113        4. Setup SSH connection between source and dest with key authentication.114        5. Ping the dest IP from the source machine to check connectivity.115        6. Generate 200Mb file, copy from source to dest.116        7. Check rx statistics of source VF and tx statistics of dest VF is increased.117        """,118        priority=2,119        requirement=simple_requirement(120            min_count=2,121            min_core_count=64,122            network_interface=features.Sriov(),123        ),124    )125    def verify_sriov_single_vf_connection_max_cpu(126        self, environment: Environment127    ) -> None:128        vm_nics = initialize_nic_info(environment)129        sriov_basic_test(environment, vm_nics)130        sriov_vf_connection_test(environment, vm_nics)131    @TestCaseMetadata(132        description="""133        This case needs 2 nodes and 8 nics. And it verifies module of sriov network134         interface is loaded and each synthetic nic is paired with one VF, and check135         rx statistics of source and tx statistics of dest increase after send 200 Mb136         file from source to dest.137        Steps,138        1. Check VF of synthetic nic is paired.139        2. Check module of sriov network device is loaded.140        3. Check VF counts listed from lspci is expected.141        4. Setup SSH connection between source and dest with key authentication.142        5. Ping the dest IP from the source machine to check connectivity.143        6. Generate 200Mb file, copy from source to dest.144        7. Check rx statistics of source VF and tx statistics of dest VF is increased.145        """,146        priority=2,147        requirement=simple_requirement(148            min_count=2,149            network_interface=schema.NetworkInterfaceOptionSettings(150                nic_count=8,151                data_path=schema.NetworkDataPath.Sriov,152            ),153        ),154    )155    def verify_sriov_max_vf_connection(self, environment: Environment) -> None:156        vm_nics = initialize_nic_info(environment)157        sriov_basic_test(environment, vm_nics)158        sriov_vf_connection_test(environment, vm_nics)159    @TestCaseMetadata(160        description="""161        This case needs 2 nodes, 8 nics and 64 Vcpus. And it verifies module of sriov162         network interface is loaded and each synthetic nic is paired with one VF, and163         check rx statistics of source and tx statistics of dest increase after send 200164         Mb file from source to dest.165        Steps,166        1. Check VF of synthetic nic is paired.167        2. Check module of sriov network device is loaded.168        3. Check VF counts listed from lspci is expected.169        4. Setup SSH connection between source and dest with key authentication.170        5. Ping the dest IP from the source machine to check connectivity.171        6. Generate 200Mb file, copy from source to dest.172        7. Check rx statistics of source VF and tx statistics of dest VF is increased.173        """,174        priority=2,175        requirement=simple_requirement(176            min_count=2,177            min_core_count=64,178            network_interface=schema.NetworkInterfaceOptionSettings(179                nic_count=8,180                data_path=schema.NetworkDataPath.Sriov,181            ),182        ),183    )184    def verify_sriov_max_vf_connection_max_cpu(self, environment: Environment) -> None:185        vm_nics = initialize_nic_info(environment)186        sriov_basic_test(environment, vm_nics)187        sriov_vf_connection_test(environment, vm_nics)188    @TestCaseMetadata(189        description="""190        This case verify VM works well after disable and enable accelerated network in191         network interface through sdk.192        Steps,193        1. Do the basic sriov check.194        2. Set enable_accelerated_networking as False to disable sriov.195        3. Set enable_accelerated_networking as True to enable sriov.196        4. Do the basic sriov check.197        5. Do step 2 ~ step 4 for 2 times.198        """,199        priority=2,200        requirement=simple_requirement(201            network_interface=features.Sriov(),202            supported_platform_type=[AZURE],203        ),204    )205    def verify_sriov_disable_enable(self, environment: Environment) -> None:206        sriov_disable_enable(environment)207    @TestCaseMetadata(208        description="""209        This case verify VM works well after disable and enable PCI device inside VM.210        Steps,211        1. Disable sriov PCI device inside the VM.212        2. Enable sriov PCI device inside the VM.213        3. Do the basic sriov check.214        4. Do VF connection test.215        """,216        priority=2,217        requirement=simple_requirement(218            min_count=2,219            network_interface=features.Sriov(),220        ),221    )222    def verify_sriov_disable_enable_pci(self, environment: Environment) -> None:223        for node in environment.nodes.list():224            lspci = node.tools[Lspci]225            lspci.disable_devices_by_type(constants.DEVICE_TYPE_SRIOV)226            lspci.enable_devices()227        vm_nics = initialize_nic_info(environment)228        sriov_basic_test(environment, vm_nics)229        sriov_vf_connection_test(environment, vm_nics)230    @TestCaseMetadata(231        description="""232        This case verify VM works well after down the VF nic and up VF nic inside VM.233        Steps,234        1. Do the basic sriov check.235        2. Do network connection test with bring down the VF nic.236        3. After copy 200Mb file from source to desc.237        4. Check rx statistics of source synthetic nic and tx statistics of dest238         synthetic nic is increased.239        5. Bring up VF nic.240        """,241        priority=2,242        requirement=simple_requirement(243            min_count=2,244            network_interface=features.Sriov(),245        ),246    )247    def verify_sriov_disable_enable_on_guest(self, environment: Environment) -> None:248        vm_nics = initialize_nic_info(environment)249        sriov_basic_test(environment, vm_nics)250        sriov_vf_connection_test(environment, vm_nics, turn_off_vf=True)251    @TestCaseMetadata(252        description="""253        This case verify VM works well after attached the max sriov nics after254         provision.255        Steps,256        1. Attach 7 extra sriov nic into the VM.257        2. Do the basic sriov testing.258        """,259        priority=2,260        use_new_environment=True,261        requirement=simple_requirement(262            network_interface=schema.NetworkInterfaceOptionSettings(263                data_path=schema.NetworkDataPath.Sriov,264                max_nic_count=8,265            ),266        ),267    )268    def verify_sriov_add_max_nics(269        self, log_path: Path, log: Logger, environment: Environment270    ) -> None:271        remove_extra_nics(environment)272        try:273            node = cast(RemoteNode, environment.nodes[0])274            network_interface_feature = node.features[NetworkInterface]275            network_interface_feature.attach_nics(extra_nic_count=7)276            is_ready, tcp_error_code = wait_tcp_port_ready(277                node.public_address, node.public_port, log=log, timeout=self.TIME_OUT278            )279            if is_ready:280                vm_nics = initialize_nic_info(environment)281                sriov_basic_test(environment, vm_nics)282            else:283                serial_console = node.features[SerialConsole]284                serial_console.check_panic(285                    saved_path=log_path, stage="after_attach_nics"286                )287                raise TcpConnectionException(288                    node.public_address,289                    node.public_port,290                    tcp_error_code,291                    "no panic found in serial log after attach nics",292                )293        finally:294            restore_extra_nics(environment)295    @TestCaseMetadata(296        description="""297        This case verify VM works well when provisioning with max (8) sriov nics.298        Steps,299        1. Provision VM with max network interfaces with enabling accelerated network.300        2. Do the basic sriov testing.301        """,302        priority=2,303        requirement=simple_requirement(304            min_nic_count=8,305            network_interface=features.Sriov(),306        ),307    )308    def verify_sriov_provision_with_max_nics(self, environment: Environment) -> None:309        vm_nics = initialize_nic_info(environment)310        sriov_basic_test(environment, vm_nics)311    @TestCaseMetadata(312        description="""313        This case verify VM works well when provisioning with max (8) sriov nics.314        Steps,315        1. Provision VM with max network interfaces with enabling accelerated network.316        2. Do the basic sriov testing.317        3. Reboot VM from guest.318        4. Do the basic sriov testing.319        """,320        priority=2,321        requirement=simple_requirement(322            min_nic_count=8,323            network_interface=features.Sriov(),324        ),325    )326    def verify_sriov_provision_with_max_nics_reboot(327        self, environment: Environment328    ) -> None:329        vm_nics = initialize_nic_info(environment)330        sriov_basic_test(environment, vm_nics)331        for node in environment.nodes.list():332            node.reboot()333        sriov_basic_test(environment, vm_nics)334    @TestCaseMetadata(335        description="""336        This case verify VM works well when provisioning with max (8) sriov nics.337        Steps,338        1. Provision VM with max network interfaces with enabling accelerated network.339        2. Do the basic sriov testing.340        3. Reboot VM from API.341        4. Do the basic sriov testing.342        """,343        priority=2,344        requirement=simple_requirement(345            min_nic_count=8,346            network_interface=features.Sriov(),347        ),348    )349    def verify_sriov_provision_with_max_nics_reboot_from_platform(350        self, environment: Environment351    ) -> None:352        vm_nics = initialize_nic_info(environment)353        sriov_basic_test(environment, vm_nics)354        for node in environment.nodes.list():355            start_stop = node.features[StartStop]356            start_stop.restart()357        sriov_basic_test(environment, vm_nics)358    @TestCaseMetadata(359        description="""360        This case verify VM works well when provisioning with max (8) sriov nics.361        Steps,362        1. Provision VM with max network interfaces with enabling accelerated network.363        2. Do the basic sriov testing.364        3. Stop and Start VM from API.365        4. Do the basic sriov testing.366        """,367        priority=2,368        requirement=simple_requirement(369            min_nic_count=8,370            network_interface=features.Sriov(),371        ),372    )373    def verify_sriov_provision_with_max_nics_stop_start_from_platform(374        self, environment: Environment375    ) -> None:376        vm_nics = initialize_nic_info(environment)377        sriov_basic_test(environment, vm_nics)378        for node in environment.nodes.list():379            start_stop = node.features[StartStop]380            start_stop.stop()381            start_stop.start()382        sriov_basic_test(environment, vm_nics)383    @TestCaseMetadata(384        description="""385        This case verify VM works well during remove and load sriov modules.386        Steps,387        1. Provision VM with max network interfaces with enabling accelerated network.388        2. Do the basic sriov testing.389        3. Remove sriov module, check network traffic through synthetic nic.390        4. Load sriov module, check network traffic through VF.391        """,392        priority=2,393        requirement=simple_requirement(394            min_count=2,395            min_nic_count=8,396            network_interface=features.Sriov(),397        ),398    )399    def verify_sriov_reload_modules(self, environment: Environment) -> None:400        for node in environment.nodes.list():401            if node.tools[KernelConfig].is_built_in(get_used_config(node)):402                raise SkippedException(403                    "current VM's mlx driver is built-in, can not reload."404                )405        vm_nics = initialize_nic_info(environment)406        sriov_basic_test(environment, vm_nics)407        module_in_used: Dict[str, str] = {}408        for node in environment.nodes.list():409            module_in_used[node.name] = remove_module(node)410        sriov_vf_connection_test(environment, vm_nics, remove_module=True)411        for node in environment.nodes.list():412            load_module(node, module_in_used[node.name])413        vm_nics = initialize_nic_info(environment)414        sriov_vf_connection_test(environment, vm_nics)415    @TestCaseMetadata(416        description="""417        This case verify below two kernel patches.418        1. hv_netvsc: Sync offloading features to VF NIC419           https://github.com/torvalds/linux/commit/68622d071e555e1528f3e7807f30f73311c1acae#diff-007213ba7199932efdb096be47d209a2f83e4d425c486b3adaba861d0a0c80c5 # noqa: E501420        2. hv_netvsc: Allow scatter-gather feature to be tunable...stress.py
Source:stress.py  
...223    def verify_stress_sriov_with_max_nics_reboot(224        self, environment: Environment225    ) -> None:226        vm_nics = initialize_nic_info(environment)227        sriov_basic_test(environment, vm_nics)228        for _ in range(10):229            for node in environment.nodes.list():230                node.reboot()231            sriov_basic_test(environment, vm_nics)232    @TestCaseMetadata(233        description="""234        This case verify VM works well when provisioning with max (8) sriov nics.235        Steps,236        1. Provision VM with max network interfaces with enabling accelerated network.237        2. Do the basic sriov testing.238        3. Reboot VM from API.239        4. Do the basic sriov testing.240        5. Repeat step 3 and 4 for 10 times.241        """,242        priority=2,243        requirement=simple_requirement(244            min_nic_count=8,245            network_interface=features.Sriov(),246        ),247    )248    def verify_sriov_with_max_nics_reboot_from_platform(249        self, environment: Environment250    ) -> None:251        vm_nics = initialize_nic_info(environment)252        sriov_basic_test(environment, vm_nics)253        for _ in range(10):254            for node in environment.nodes.list():255                start_stop = node.features[StartStop]256                start_stop.restart()257            sriov_basic_test(environment, vm_nics)258    @TestCaseMetadata(259        description="""260        This case verify VM works well when provisioning with max (8) sriov nics.261        Steps,262        1. Provision VM with max network interfaces with enabling accelerated network.263        2. Do the basic sriov testing.264        3. Stop and Start VM from API.265        4. Do the basic sriov testing.266        5. Repeat step 3 and 4 for 10 times.267        """,268        priority=2,269        requirement=simple_requirement(270            min_nic_count=8,271            network_interface=features.Sriov(),272        ),273    )274    def verify_sriov_with_max_nics_stop_start_from_platform(275        self, environment: Environment276    ) -> None:277        vm_nics = initialize_nic_info(environment)278        sriov_basic_test(environment, vm_nics)279        for _ in range(10):280            for node in environment.nodes.list():281                start_stop = node.features[StartStop]282                start_stop.stop()283                start_stop.start()284            sriov_basic_test(environment, vm_nics)285    def after_case(self, log: Logger, **kwargs: Any) -> None:286        environment: Environment = kwargs.pop("environment")...common.py
Source:common.py  
...73def get_packets(node: Node, nic_name: str, name: str = "tx_packets") -> int:74    cat = node.tools[Cat]75    return int(cat.read(f"/sys/class/net/{nic_name}/statistics/{name}", force_run=True))76@retry(exceptions=AssertionError, tries=150, delay=2)77def sriov_basic_test(78    environment: Environment, vm_nics: Dict[str, Dict[str, NicInfo]]79) -> None:80    for node in environment.nodes.list():81        # 1. Check module of sriov network device is loaded.82        used_module = get_used_module(node)83        if not node.tools[KernelConfig].is_built_in(modules_config_dict[used_module]):84            lsmod = node.tools[Lsmod]85            assert_that(lsmod.module_exists(used_module, force_run=True)).described_as(86                "The module of sriov network device isn't loaded."87            ).is_true()88        # 2. Check VF counts listed from lspci is expected.89        lspci = node.tools[Lspci]90        devices_slots = lspci.get_device_names_by_type(91            constants.DEVICE_TYPE_SRIOV, force_run=True92        )93        assert_that(devices_slots).described_as(94            "count of sriov devices listed from lspci is not expected,"95            " please check the driver works properly"96        ).is_length(len([x for x in vm_nics[node.name].values() if x.lower != ""]))97def sriov_vf_connection_test(98    environment: Environment,99    vm_nics: Dict[str, Dict[str, NicInfo]],100    turn_off_vf: bool = False,101    remove_module: bool = False,102) -> None:103    source_node = cast(RemoteNode, environment.nodes[0])104    dest_node = cast(RemoteNode, environment.nodes[1])105    source_ssh = source_node.tools[Ssh]106    dest_ssh = dest_node.tools[Ssh]107    dest_ssh.enable_public_key(source_ssh.generate_key_pairs())108    # generate 200Mb file109    source_node.execute("dd if=/dev/urandom of=large_file bs=100 count=0 seek=2M")110    max_retry_times = 10111    for _, source_nic_info in vm_nics[source_node.name].items():112        matched_dest_nic_name = ""113        for dest_nic_name, dest_nic_info in vm_nics[dest_node.name].items():114            # only when IPs are in the same subnet, IP1 of machine A can connect to115            # IP2 of machine B116            # e.g. eth2 IP is 10.0.2.3 on machine A, eth2 IP is 10.0.3.4 on machine117            # B, use nic name doesn't work in this situation118            if (119                dest_nic_info.ip_addr.rsplit(".", maxsplit=1)[0]120                == source_nic_info.ip_addr.rsplit(".", maxsplit=1)[0]121            ):122                matched_dest_nic_name = dest_nic_name123                break124        assert_that(matched_dest_nic_name).described_as(125            f"can't find the same subnet nic with {source_nic_info.ip_addr} on"126            f" machine {source_node.name}, please check network setting of "127            f"machine {dest_node.name}."128        ).is_not_empty()129        desc_nic_info = vm_nics[dest_node.name][matched_dest_nic_name]130        dest_ip = vm_nics[dest_node.name][matched_dest_nic_name].ip_addr131        source_ip = source_nic_info.ip_addr132        source_synthetic_nic = source_nic_info.upper133        dest_synthetic_nic = desc_nic_info.upper134        source_nic = source_vf_nic = source_nic_info.lower135        dest_nic = dest_vf_nic = desc_nic_info.lower136        if remove_module or turn_off_vf:137            source_nic = source_synthetic_nic138            dest_nic = dest_synthetic_nic139        if turn_off_vf:140            source_node.execute(f"ip link set dev {source_vf_nic} down", sudo=True)141            dest_node.execute(f"ip link set dev {dest_vf_nic} down", sudo=True)142        # get origin tx_packets and rx_packets before copy file143        source_tx_packets_origin = get_packets(source_node, source_nic)144        dest_tx_packets_origin = get_packets(dest_node, dest_nic, "rx_packets")145        # check the connectivity between source and dest machine using ping146        for _ in range(max_retry_times):147            cmd_result = source_node.execute(148                f"ping -c 1 {dest_ip} -I {source_synthetic_nic}", sudo=True149            )150            if cmd_result.exit_code == 0:151                break152        cmd_result.assert_exit_code(153            message=f"fail to ping {dest_ip} from {source_node.name} to "154            f"{dest_node.name} after retry {max_retry_times}"155        )156        # copy 200 Mb file from source ip to dest ip157        cmd_result = source_node.execute(158            f"scp -o BindAddress={source_ip} -i ~/.ssh/id_rsa -o"159            f" StrictHostKeyChecking=no large_file "160            f"$USER@{dest_ip}:/tmp/large_file",161            shell=True,162            expected_exit_code=0,163            expected_exit_code_failure_message="Fail to copy file large_file from"164            f" {source_ip} to {dest_ip}",165        )166        source_tx_packets = get_packets(source_node, source_nic)167        dest_tx_packets = get_packets(dest_node, dest_nic, "rx_packets")168        # verify tx_packets value of source nic is increased after coping 200Mb file169        #  from source to dest170        assert_that(171            int(source_tx_packets), "insufficient TX packets sent"172        ).is_greater_than(int(source_tx_packets_origin))173        # verify rx_packets value of dest nic is increased after receiving 200Mb174        #  file from source to dest175        assert_that(176            int(dest_tx_packets), "insufficient RX packets received"177        ).is_greater_than(int(dest_tx_packets_origin))178        if turn_off_vf:179            source_node.execute(f"ip link set dev {source_vf_nic} up", sudo=True)180            dest_node.execute(f"ip link set dev {dest_vf_nic} up", sudo=True)181def cleanup_iperf3(environment: Environment) -> None:182    for node in environment.nodes.list():183        kill = node.tools[Kill]184        kill.by_name("iperf3")185def sriov_disable_enable(environment: Environment, times: int = 4) -> None:186    vm_nics = initialize_nic_info(environment)187    sriov_basic_test(environment, vm_nics)188    node = cast(RemoteNode, environment.nodes[0])189    network_interface_feature = node.features[NetworkInterface]190    for _ in range(times):191        sriov_is_enabled = network_interface_feature.is_enabled_sriov()192        if sriov_is_enabled:193            sriov_basic_test(environment, vm_nics)194        network_interface_feature.switch_sriov(enable=(not sriov_is_enabled))195def remove_extra_nics_per_node(node: Node) -> None:196    node = cast(RemoteNode, node)197    network_interface_feature = node.features[NetworkInterface]198    network_interface_feature.remove_extra_nics()199def remove_extra_nics(environment: Environment) -> None:200    for node in environment.nodes.list():201        remove_extra_nics_per_node(node)202def restore_extra_nics_per_node(node: Node) -> None:203    remove_extra_nics_per_node(node)204    network_interface_feature = node.features[NetworkInterface]205    network_interface_feature.attach_nics(206        network_interface_feature.origin_extra_sriov_nics_count,207        enable_accelerated_networking=True,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
