Best Python code snippet using lisa_python
nestedperf.py
Source:nestedperf.py  
...336                client_nic_name=self._NIC_NAME,337                test_case_name=inspect.stack()[1][3],338            )339        finally:340            self._linux_cleanup_nat(server_l1, self._BR_NAME, log)341            self._linux_cleanup_nat(client_l1, self._BR_NAME, log)342    @TestCaseMetadata(343        description="""344        This script runs ntttcp test on two nested VMs on different L1 guests345        connected with NAT346        """,347        priority=3,348        timeout=_TIME_OUT,349        requirement=simple_requirement(350            min_count=2,351            supported_os=[Windows],352            supported_platform_type=[AZURE, READY],353        ),354    )355    def perf_nested_hyperv_ntttcp_different_l1_nat(356        self,357        result: TestResult,358        variables: Dict[str, Any],359        log: Logger,360    ) -> None:361        environment = result.environment362        assert environment, "fail to get environment from testresult"363        server_l1 = cast(RemoteNode, environment.nodes[0])364        client_l1 = cast(RemoteNode, environment.nodes[1])365        # parse nested image variables366        (367            nested_image_username,368            nested_image_password,369            _,370            nested_image_url,371        ) = parse_nested_image_variables(variables)372        try:373            # setup nested vm on server in NAT configuration374            server_l2 = self._windows_setup_nat(375                node=server_l1,376                nested_vm_name="server_l2",377                guest_username=nested_image_username,378                guest_password=nested_image_password,379                guest_port=self._SERVER_HOST_FWD_PORT,380                guest_image_url=nested_image_url,381            )382            # setup nested vm on client in NAT configuration383            client_l2 = self._windows_setup_nat(384                node=client_l1,385                nested_vm_name="client_l2",386                guest_username=nested_image_username,387                guest_password=nested_image_password,388                guest_port=self._CLIENT_HOST_FWD_PORT,389                guest_image_url=nested_image_url,390            )391            # run ntttcp test392            perf_ntttcp(393                result,394                server_l2,395                client_l2,396                test_case_name=inspect.stack()[1][3],397            )398        finally:399            # cleanup server400            try:401                hyperv_remove_nested_vm(server_l1, "server_l2")402            except Exception as e:403                log.debug(f"Failed to clean up server vm: {e}")404                server_l1.mark_dirty()405            # cleanup client406            try:407                hyperv_remove_nested_vm(client_l1, "client_l2")408            except Exception as e:409                log.debug(f"Failed to clean up client vm: {e}")410                client_l1.mark_dirty()411    @TestCaseMetadata(412        description="""413        This script runs netperf test on two nested VMs on different L1 guests414        connected with NAT415        """,416        priority=3,417        timeout=_TIME_OUT,418        requirement=simple_requirement(419            min_count=2,420            network_interface=schema.NetworkInterfaceOptionSettings(421                nic_count=search_space.IntRange(min=2),422            ),423            disk=schema.DiskOptionSettings(424                data_disk_count=search_space.IntRange(min=1),425                data_disk_size=search_space.IntRange(min=12),426            ),427        ),428    )429    def perf_nested_kvm_netperf_pps_nat(430        self,431        result: TestResult,432        variables: Dict[str, Any],433        log: Logger,434    ) -> None:435        environment = result.environment436        assert environment, "fail to get environment from testresult"437        server_l1 = cast(RemoteNode, environment.nodes[0])438        client_l1 = cast(RemoteNode, environment.nodes[1])439        # parse nested image variables440        (441            nested_image_username,442            nested_image_password,443            _,444            nested_image_url,445        ) = parse_nested_image_variables(variables)446        try:447            # setup nested vm on server in NAT configuration448            server_l2 = self._linux_setup_nat(449                node=server_l1,450                nested_vm_name="server_l2",451                guest_username=nested_image_username,452                guest_password=nested_image_password,453                guest_port=self._SERVER_HOST_FWD_PORT,454                guest_image_url=nested_image_url,455                guest_internal_ip=self._SERVER_IP_ADDR,456                guest_default_nic=self._NIC_NAME,457                bridge_name=self._BR_NAME,458                bridge_network=self._BR_NETWORK,459                bridge_cidr=self._BR_CIDR,460                bridge_gateway=self._BR_GATEWAY,461            )462            # setup nested vm on client in NAT configuration463            client_l2 = self._linux_setup_nat(464                node=client_l1,465                nested_vm_name="client_l2",466                guest_username=nested_image_username,467                guest_password=nested_image_password,468                guest_port=self._CLIENT_HOST_FWD_PORT,469                guest_image_url=nested_image_url,470                guest_internal_ip=self._CLIENT_IP_ADDR,471                guest_default_nic=self._NIC_NAME,472                bridge_name=self._BR_NAME,473                bridge_network=self._BR_NETWORK,474                bridge_cidr=self._BR_CIDR,475                bridge_gateway=self._BR_GATEWAY,476            )477            # run netperf test478            perf_tcp_pps(result, "singlepps", server_l2, client_l2)479        finally:480            self._linux_cleanup_nat(server_l1, self._BR_NAME, log)481            self._linux_cleanup_nat(client_l1, self._BR_NAME, log)482    def _linux_setup_nat(483        self,484        node: RemoteNode,485        nested_vm_name: str,486        guest_username: str,487        guest_password: str,488        guest_port: int,489        guest_image_url: str,490        guest_internal_ip: str,491        guest_default_nic: str,492        bridge_name: str,493        bridge_network: str,494        bridge_cidr: str,495        bridge_gateway: str,496    ) -> RemoteNode:497        """498        Setup NAT on the node with following configurations:499        1. Forward traffic on node's eth0 interface and port `guest_port`500        to the nested VM's port 22.501        2. Forward all traffic on node's eth1 interface to the nested VM.502        """503        # get core count504        core_count = node.tools[Lscpu].get_core_count()505        node_eth1_ip = node.nics.get_nic("eth1").ip_addr506        bridge_dhcp_range = f"{guest_internal_ip},{guest_internal_ip}"507        # enable ip forwarding508        node.tools[Sysctl].write("net.ipv4.ip_forward", "1")509        # setup bridge510        node.tools[Ip].setup_bridge(bridge_name, f"{bridge_gateway}/{bridge_cidr}")511        node.tools[Ip].set_bridge_configuration(bridge_name, "stp_state", "0")512        node.tools[Ip].set_bridge_configuration(bridge_name, "forward_delay", "0")513        # reset bridge lease file to remove old dns leases514        node.execute(515            f"cp /dev/null /var/run/qemu-dnsmasq-{bridge_name}.leases", sudo=True516        )517        # start dnsmasq518        node.tools[Dnsmasq].start(bridge_name, bridge_gateway, bridge_dhcp_range)519        # reset filter table to accept all traffic520        node.tools[Iptables].reset_table()521        # reset nat table and setup nat forwarding522        node.tools[Iptables].reset_table("nat")523        node.tools[Iptables].run(524            f"-t nat -A POSTROUTING -s {bridge_network}/{bridge_cidr} -j MASQUERADE",525            sudo=True,526            force_run=True,527        )528        # start nested vm529        nested_vm = qemu_connect_nested_vm(530            node,531            guest_username,532            guest_password,533            guest_port,534            guest_image_url,535            taps=1,536            cores=core_count,537            bridge=bridge_name,538            stop_existing_vm=True,539            name=nested_vm_name,540        )541        # configure rc.local to run dhclient on reboot542        nested_vm.tools[StartConfiguration].add_command("ip link set dev ens4 up")543        nested_vm.tools[StartConfiguration].add_command("dhclient ens4")544        # reboot nested vm and close ssh connection545        nested_vm.execute("reboot", sudo=True)546        nested_vm.close()547        # route traffic on `eth0` and port `guest_port` on l1 vm to548        # port 22 on l2 vm549        node.tools[Iptables].run(550            f"-t nat -A PREROUTING -i eth0 -p tcp --dport {guest_port} "551            f"-j DNAT --to {guest_internal_ip}:22",552            sudo=True,553            force_run=True,554        )555        # route all tcp traffic on `eth1` port on l1 vm to l2 vm556        node.tools[Iptables].run(557            f"-t nat -A PREROUTING -i eth1 -d {node_eth1_ip} "558            f"-p tcp -j DNAT --to {guest_internal_ip}",559            sudo=True,560            force_run=True,561        )562        # wait till nested vm is up563        try_connect(564            schema.ConnectionInfo(565                address=node.public_address,566                port=guest_port,567                username=guest_username,568                password=guest_password,569            )570        )571        # set default nic interfaces on l2 vm572        nested_vm.internal_address = node_eth1_ip573        nested_vm.capability.network_interface = Synthetic()574        return nested_vm575    def _linux_cleanup_nat(576        self,577        node: RemoteNode,578        bridge_name: str,579        log: Logger,580    ) -> None:581        try:582            # stop running QEMU instances583            node.tools[Qemu].delete_vm()584            # clear bridge and taps585            node.tools[Ip].delete_interface(bridge_name)586            # flush ip tables587            node.tools[Iptables].reset_table()588            node.tools[Iptables].reset_table("nat")589        except Exception as e:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
