Best Python code snippet using lisa_python
performance.py
Source:performance.py  
1# Copyright (c) Microsoft Corporation.2# Licensed under the MIT license.3import inspect4from functools import partial5from time import sleep6from typing import Any, List, Type7from assertpy import assert_that8from lisa import (9    Environment,10    Logger,11    Node,12    SkippedException,13    TestCaseMetadata,14    TestSuite,15    TestSuiteMetadata,16    UnsupportedKernelException,17    simple_requirement,18)19from lisa.executable import Tool20from lisa.features import Sriov, Synthetic21from lisa.nic import NicInfo22from lisa.testsuite import TestResult23from lisa.tools import Firewall, Kill, Lagscope, Lscpu, Ntttcp24from lisa.util.parallel import run_in_parallel25from microsoft.testsuites.performance.common import (26    calculate_middle_average,27    perf_ntttcp,28)29from microsoft.testsuites.xdp.common import (30    get_dropped_count,31    get_forwarded_count,32    get_xdpdump,33    remove_hugepage,34    set_hugepage,35)36from microsoft.testsuites.xdp.pktgen import Pktgen, PktgenResult37from microsoft.testsuites.xdp.xdpdump import BuildType, XdpDump38# the received packets must be at least 90%39_default_received_threshold = 0.940# the xdp latency shouldn't take more than 40% time.41_default_latency_threshold = 1.442@TestSuiteMetadata(43    area="xdp",44    category="performance",45    description="""46    This test suite is to validate XDP performance.47    """,48)49class XdpPerformance(TestSuite):50    def before_case(self, log: Logger, **kwargs: Any) -> None:51        environment: Environment = kwargs.pop("environment")52        for node in environment.nodes.list():53            node.tools[Firewall].stop()54    @TestCaseMetadata(55        description="""56        This case tests the packet forwarded rate of the XDP TX forwarding on57        the single core Synthetic networking. The pktgen samples in Linux code58        base is used to generate packets.59        The minimum cpu count is 8, it makes sure the performance is won't too60        low.61        Three roles in this test environment, 1) sender is to send packets, 2)62        the forwarder is to forward packets to receiver, 3) and the receiver is63        used to receive packets and drop.64        Finally, it checks how many packets arrives to the forwarder or65        receiver. If it's lower than 90%, the test fails. Note, it counts the66        rx_xdp_tx_xmit (mlx5), rx_xdp_tx (mlx4), or dropped count for synthetic67        nic.68        """,69        priority=3,70        requirement=simple_requirement(71            min_nic_count=2,72            min_count=3,73            min_core_count=8,74            network_interface=Synthetic(),75        ),76    )77    def perf_xdp_tx_forward_singlecore_synthetic(78        self, environment: Environment, log: Logger79    ) -> None:80        self._execute_tx_forward_test(environment, log)81    @TestCaseMetadata(82        description="""83        This case tests the packet forwarded rate of XDP TX forwarding on the84        single core SRIOV networking.85        Refer to perf_xdp_tx_forward_singlecore_synthetic for more details.86        """,87        priority=3,88        requirement=simple_requirement(89            min_nic_count=2, min_count=3, min_core_count=8, network_interface=Sriov()90        ),91    )92    def perf_xdp_tx_forward_singlecore_sriov(93        self, environment: Environment, log: Logger94    ) -> None:95        self._execute_tx_forward_test(environment, log)96    @TestCaseMetadata(97        description="""98        This case tests the packet forwarded rate of XDP TX forwarding on the99        multi core Syntethic networking.100        Refer to perf_xdp_tx_forward_singlecore_synthetic for more details.101        """,102        priority=3,103        requirement=simple_requirement(104            min_nic_count=2,105            min_count=3,106            min_core_count=8,107            network_interface=Synthetic(),108        ),109    )110    def perf_xdp_tx_forward_multicore_synthetic(111        self, environment: Environment, log: Logger112    ) -> None:113        self._execute_tx_forward_test(environment, log, is_multi_threads=True)114    @TestCaseMetadata(115        description="""116        This case tests the packet forwarded rate of XDP TX forwarding on the117        multi core SRIOV networking.118        Refer to perf_xdp_tx_forward_singlecore_synthetic for more details.119        The threshold of this test is lower than standard, it's 85%. Because the120        UDP packets count is big in this test scenario, and easy to lost.121        """,122        priority=3,123        requirement=simple_requirement(124            min_nic_count=2, min_count=3, min_core_count=8, network_interface=Sriov()125        ),126    )127    def perf_xdp_tx_forward_multicore_sriov(128        self, environment: Environment, log: Logger129    ) -> None:130        self._execute_tx_forward_test(131            environment, log, is_multi_threads=True, threshold=0.85132        )133    @TestCaseMetadata(134        description="""135        This case tests the XDP drop performance by measuring Packets Per Second136        (PPS) and received rate with multiple send threads.137        * If the received packets rate is lower than 90% the test case fails.138        * If the PPS is lower than 1M, the test case fails.139        """,140        priority=3,141        requirement=simple_requirement(142            min_nic_count=2,143            min_count=2,144            min_core_count=8,145            network_interface=Sriov(),146        ),147    )148    def perf_xdp_rx_drop_multithread_sriov(149        self, environment: Environment, log: Logger150    ) -> None:151        self._execute_rx_drop_test(152            environment,153            True,154            log,155        )156    @TestCaseMetadata(157        description="""158        This case tests the XDP drop performance by measuring Packets Per Second159        (PPS) and received rate with single send thread.160        see details from perf_xdp_rx_drop_multithread_sriov.161        """,162        priority=3,163        requirement=simple_requirement(164            min_nic_count=2, min_count=2, min_core_count=8, network_interface=Sriov()165        ),166    )167    def perf_xdp_rx_drop_singlethread_sriov(168        self, environment: Environment, log: Logger169    ) -> None:170        self._execute_rx_drop_test(171            environment,172            False,173            log,174        )175    @TestCaseMetadata(176        description="""177        This case compare and record latency impact of XDP component.178        The test use lagscope to send tcp packets. And then compare the latency179        with/without XDP component. If the gap is more than 40%, the test case180        fails.181        """,182        priority=3,183        requirement=simple_requirement(184            min_nic_count=2, min_count=2, min_core_count=8, network_interface=Sriov()185        ),186    )187    def perf_xdp_lagscope_latency(self, result: TestResult, log: Logger) -> None:188        self._execute_latency_test(189            result,190            Lagscope,191            log,192        )193    @TestCaseMetadata(194        description="""195        This case compare and record latency impact of XDP component.196        The test use ntttcp to send tcp packets. And then compare the latency197        with/without XDP component. If the gap is more than 40%, the test case198        fails.199        """,200        priority=3,201        requirement=simple_requirement(202            min_nic_count=2, min_count=2, min_core_count=8, network_interface=Sriov()203        ),204    )205    def perf_xdp_ntttcp_latency(self, result: TestResult, log: Logger) -> None:206        self._execute_latency_test(207            result,208            Ntttcp,209            log,210        )211    def _execute_latency_test(212        self,213        test_result: TestResult,214        tool_type: Type[Tool],215        log: Logger,216    ) -> None:217        environment = test_result.environment218        assert environment, "fail to get environment from testresult"219        server = environment.nodes[0]220        client = environment.nodes[1]221        server_xdpdump = get_xdpdump(server)222        server_xdpdump.make_by_build_type(BuildType.PERF)223        server_nic = server.nics.get_nic_by_index(1)224        # the latency is not stable in cloud environment, test multiple times225        # and aggregate the result.226        tested_runs = 5227        latency_without_xdp: List[float] = []228        latency_with_xdp: List[float] = []229        for _ in range(tested_runs):230            latency_without_xdp.append(231                self._send_packets_for_latency(server, client, test_result, tool_type)232            )233            try:234                server_xdpdump.start_async(nic_name=server_nic.upper, timeout=0)235                latency_with_xdp.append(236                    self._send_packets_for_latency(237                        server, client, test_result, tool_type238                    )239                )240            finally:241                server_kill = server.tools[Kill]242                server_kill.by_name("xdpdump")243        final_without_xdp = calculate_middle_average(latency_without_xdp)244        final_with_xdp = calculate_middle_average(latency_with_xdp)245        log.info(246            f"Latency with XDP: {final_with_xdp}us, "247            f"without XDP: {final_without_xdp}us. "248            f"Raw with XDP: {latency_with_xdp}, "249            f"without XDP: {latency_without_xdp}. "250        )251        assert_that(final_with_xdp / final_without_xdp).described_as(252            f"The XDP latency: {final_with_xdp}us shouldn't slower 40% than "253            f"the normal latency: {final_without_xdp}us."254        ).is_less_than_or_equal_to(_default_latency_threshold)255    def _send_packets_for_latency(256        self,257        server: Node,258        client: Node,259        test_result: TestResult,260        tool_type: Type[Tool],261    ) -> float:262        assert_that(tool_type).described_as("the tool is not supported").is_in(263            Lagscope, Ntttcp264        )265        # mypy doesn't work with generic type method "get". So use a266        # intermidiate variable tools to store it.267        tools: List[Any] = run_in_parallel(268            [269                partial(server.tools.get, tool_type),270                partial(client.tools.get, tool_type),271            ]272        )273        server_nic = server.nics.get_nic_by_index(1)274        if tool_type is Lagscope:275            server_lagscope: Lagscope = tools[0]276            client_lagscope: Lagscope = tools[1]277            try:278                run_in_parallel(279                    [server_lagscope.set_busy_poll, client_lagscope.set_busy_poll]280                )281                server_lagscope.run_as_server(ip=server_nic.ip_addr)282                result = client_lagscope.run_as_client(server_ip=server_nic.ip_addr)283                lagscope_messages = client_lagscope.create_latency_performance_messages(284                    result=result,285                    test_case_name=inspect.stack()[2].function,286                    test_result=test_result,287                )288                assert lagscope_messages289                assert_that(len(lagscope_messages)).described_as(290                    "at least one message is necessary"291                ).is_greater_than(0)292                return float(293                    sum(x.average_latency_us for x in lagscope_messages)294                    / len(lagscope_messages)295                )296            finally:297                for lagscope in [server_lagscope, client_lagscope]:298                    lagscope.kill()299                    lagscope.restore_busy_poll()300        else:301            ntttcp_messages = perf_ntttcp(302                test_result=test_result,303                udp_mode=False,304                connections=[1],305                test_case_name=inspect.stack()[2].function,306            )307            return float(308                # The type is always TCP message, because the above line set udp309                # to False. Ignore type error here, because UDP message has no310                # latency metrics.311                sum(x.latency_us for x in ntttcp_messages)  # type: ignore312                / len(ntttcp_messages)313            )314    def _execute_rx_drop_test(315        self,316        environment: Environment,317        is_multi_thread: bool,318        log: Logger,319        threshold: float = _default_received_threshold,320    ) -> None:321        sender = environment.nodes[0]322        receiver = environment.nodes[1]323        # install pktgen on sender, and xdpdump on receiver.324        try:325            tools: List[Any] = run_in_parallel(326                [partial(sender.tools.get, Pktgen), partial(get_xdpdump, receiver)]327            )328        except UnsupportedKernelException as identifier:329            raise SkippedException(identifier)330        # type annotations331        pktgen: Pktgen = tools[0]332        xdpdump: XdpDump = tools[1]333        sender_nic = sender.nics.get_nic_by_index(1)334        receiver_nic = receiver.nics.get_nic_by_index(1)335        xdpdump.make_by_build_type(build_type=BuildType.PERF_DROP)336        original_dropped_count = get_dropped_count(337            node=receiver,338            nic=receiver_nic,339            previous_count=0,340            log=log,341        )342        try:343            xdpdump.start_async(nic_name=receiver_nic.upper, timeout=0)344            pktgen_result = self._send_packets(345                is_multi_thread, sender, pktgen, sender_nic, receiver_nic346            )347            self._wait_packets_proceeded(348                log, receiver, receiver_nic, original_dropped_count349            )350        finally:351            receiver_kill = receiver.tools[Kill]352            receiver_kill.by_name("xdpdump")353        # capture stats to calculate delta354        dropped_count = get_dropped_count(355            node=receiver,356            nic=receiver_nic,357            previous_count=original_dropped_count,358            log=log,359        )360        log.debug(361            f"sender pktgen result: {pktgen_result}, "362            f"dropped on receiver: {dropped_count}"363        )364        self._check_threshold(365            pktgen_result.sent_count, dropped_count, threshold, "dropped packets"366        )367        assert_that(pktgen_result.pps).described_as(368            "pps must be greater than 1M."369        ).is_greater_than_or_equal_to(1000000)370    def _execute_tx_forward_test(371        self,372        environment: Environment,373        log: Logger,374        is_multi_threads: bool = False,375        threshold: float = _default_received_threshold,376    ) -> None:377        sender = environment.nodes[0]378        forwarder = environment.nodes[1]379        receiver = environment.nodes[2]380        # install pktgen on sender381        try:382            pktgen = sender.tools[Pktgen]383        except UnsupportedKernelException as identifier:384            raise SkippedException(identifier)385        # install xdp dump on forwarder and receiver386        forwarder_xdpdump, receiver_xdpdump = run_in_parallel(387            [388                partial(get_xdpdump, forwarder),389                partial(get_xdpdump, receiver),390            ],391            log=log,392        )393        sender_nic = sender.nics.get_nic_by_index(1)394        forwarder_nic = forwarder.nics.get_nic_by_index(1)395        receiver_nic = receiver.nics.get_nic_by_index(1)396        run_in_parallel(397            [398                partial(399                    forwarder_xdpdump.make_on_forwarder_role,400                    forwarder_nic=forwarder_nic,401                    receiver_nic=receiver_nic,402                ),403                partial(404                    receiver_xdpdump.make_by_build_type, build_type=BuildType.PERF_DROP405                ),406            ]407        )408        # capture existing stats to calculate delta409        original_forwarded_count = get_forwarded_count(410            node=forwarder,411            nic=forwarder_nic,412            previous_count=0,413            log=log,414        )415        original_dropped_count = get_dropped_count(416            node=receiver,417            nic=receiver_nic,418            previous_count=0,419            log=log,420        )421        try:422            # start xdpdump423            forwarder_xdpdump.start_async(nic_name=forwarder_nic.upper, timeout=0)424            receiver_xdpdump.start_async(nic_name=receiver_nic.upper, timeout=0)425            pktgen_result = self._send_packets(426                is_multi_threads, sender, pktgen, sender_nic, forwarder_nic427            )428            self._wait_packets_proceeded(429                log, receiver, receiver_nic, original_dropped_count430            )431        finally:432            # kill xdpdump processes.433            forwarder_kill = forwarder.tools[Kill]434            forwarder_kill.by_name("xdpdump")435            receiver_kill = receiver.tools[Kill]436            receiver_kill.by_name("xdpdump")437        # capture stats to calculate delta438        dropped_count = get_dropped_count(439            node=receiver,440            nic=receiver_nic,441            previous_count=original_dropped_count,442            log=log,443        )444        forwarded_count = get_forwarded_count(445            node=forwarder,446            nic=forwarder_nic,447            previous_count=original_forwarded_count,448            log=log,449        )450        # In some nodes like synthetic nic, there is no forward counter,451        # so count it by dropped count.452        validate_count = forwarded_count453        if not validate_count:454            validate_count = dropped_count455        log.debug(456            f"sender pktgen result: {pktgen_result}, "457            f"on forwarder: {forwarded_count}, "458            f"on receiver: {dropped_count}"459        )460        self._check_threshold(461            pktgen_result.sent_count, validate_count, threshold, "forwarded packets"462        )463    def _check_threshold(464        self, expected_count: int, actual_count: int, threshold: float, packet_name: str465    ) -> None:466        assert_that(actual_count / expected_count).described_as(467            f"{packet_name} rate should be above the threshold. "468            f"expected count: {expected_count}, actual count: {actual_count}"469        ).is_greater_than_or_equal_to(threshold)470    def _wait_packets_proceeded(471        self, log: Logger, receiver: Node, receiver_nic: NicInfo, original_count: int472    ) -> int:473        # wait until the forwarded count is not increased, it means there is474        # no more packets in queue.475        current_count = 0476        delta_count = 1477        while delta_count:478            sleep(1)479            previous_count = current_count480            current_count = get_dropped_count(481                node=receiver,482                nic=receiver_nic,483                previous_count=original_count,484                log=log,485            )486            delta_count = current_count - previous_count487            message = f"received {delta_count} new dropped packets in the 0.5 second."488            if delta_count > 0:489                message += " keep checking."490            log.debug(message)491        return previous_count492    def _send_packets(493        self,494        is_multi_threads: bool,495        sender: Node,496        pktgen: Pktgen,497        sender_nic: NicInfo,498        forwarder_nic: NicInfo,499    ) -> PktgenResult:500        # send packets use the second nic to make sure the performance is not501        # impact by LISA.502        forwarder_ip = forwarder_nic.ip_addr503        forwarder_mac = forwarder_nic.mac_addr504        if is_multi_threads:505            lscpu = sender.tools[Lscpu]506            # max 8 thread to prevent too big concurrency507            thread_count = int(min(lscpu.get_core_count(), 8))508        else:509            thread_count = 1510        try:511            set_hugepage(sender)512            result = pktgen.send_packets(513                destination_ip=forwarder_ip,514                destination_mac=forwarder_mac,515                nic_name=sender_nic.upper,516                thread_count=thread_count,517            )518        finally:519            remove_hugepage(sender)...xdpdump.py
Source:xdpdump.py  
...164        # discard local changes after built, it's used to cleanup changes from165        # the forwarder role.166        git = self.node.tools[Git]167        git.discard_local_changes(cwd=self._code_path)168    def make_on_forwarder_role(169        self,170        forwarder_nic: NicInfo,171        receiver_nic: NicInfo,172    ) -> None:173        sed = self.node.tools[Sed]174        # replace hard code mac and ip addresses in code, the changes will be175        # reset after built.176        forwarder_mac = self._convert_mac_to_c_style(forwarder_nic.mac_addr)177        forwarder_ip = self._convert_ip_to_c_style(forwarder_nic.ip_addr)178        receiver_mac = self._convert_mac_to_c_style(receiver_nic.mac_addr)179        receiver_ip = self._convert_ip_to_c_style(receiver_nic.ip_addr)180        sed.substitute(181            regexp=r"unsigned char newethsrc \[\] = "182            "{ 0x00, 0x22, 0x48, 0x4c, 0xc4, 0x4d };",...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
