Best Python code snippet using lisa_python
features.py
Source:features.py  
...571                    f"networking into status [{enable}]"572                ).is_equal_to(enable)573        # wait settings effective574        if wait:575            self._check_sriov_enabled(enable, reset_connections)576    def is_enabled_sriov(self) -> bool:577        azure_platform: AzurePlatform = self._platform  # type: ignore578        network_client = get_network_client(azure_platform)579        sriov_enabled: bool = False580        vm = get_vm(azure_platform, self._node)581        nic = self._get_primary(vm.network_profile.network_interfaces)582        nic_name = nic.id.split("/")[-1]583        primary_nic = network_client.network_interfaces.get(584            self._resource_group_name, nic_name585        )586        sriov_enabled = primary_nic.enable_accelerated_networking587        return sriov_enabled588    def attach_nics(589        self, extra_nic_count: int, enable_accelerated_networking: bool = True590    ) -> None:591        if 0 == extra_nic_count:592            return593        azure_platform: AzurePlatform = self._platform  # type: ignore594        network_client = get_network_client(azure_platform)595        compute_client = get_compute_client(azure_platform)596        vm = get_vm(azure_platform, self._node)597        current_nic_count = len(vm.network_profile.network_interfaces)598        nic_count_after_add_extra = extra_nic_count + current_nic_count599        assert (600            self._node.capability.network_interface601            and self._node.capability.network_interface.max_nic_count602        )603        assert isinstance(604            self._node.capability.network_interface.max_nic_count, int605        ), f"actual: {type(self._node.capability.network_interface.max_nic_count)}"606        node_capability_nic_count = (607            self._node.capability.network_interface.max_nic_count608        )609        if nic_count_after_add_extra > node_capability_nic_count:610            raise LisaException(611                f"nic count after add extra nics is {nic_count_after_add_extra},"612                f" it exceeds the vm size's capability {node_capability_nic_count}."613            )614        nic = self._get_primary(vm.network_profile.network_interfaces)615        nic_name = nic.id.split("/")[-1]616        primary_nic = network_client.network_interfaces.get(617            self._resource_group_name, nic_name618        )619        startstop = self._node.features[StartStop]620        startstop.stop()621        network_interfaces_section = []622        index = 0623        while index < current_nic_count + extra_nic_count - 1:624            extra_nic_name = f"{self._node.name}-extra-{index}"625            self._log.debug(f"start to create the nic {extra_nic_name}.")626            params = {627                "location": vm.location,628                "enable_accelerated_networking": enable_accelerated_networking,629                "ip_configurations": [630                    {631                        "name": extra_nic_name,632                        "subnet": {"id": primary_nic.ip_configurations[0].subnet.id},633                        "primary": False,634                    }635                ],636            }637            network_client.network_interfaces.begin_create_or_update(638                resource_group_name=self._resource_group_name,639                network_interface_name=extra_nic_name,640                parameters=params,641            )642            self._log.debug(f"create the nic {extra_nic_name} successfully.")643            extra_nic = network_client.network_interfaces.get(644                network_interface_name=extra_nic_name,645                resource_group_name=self._resource_group_name,646            )647            network_interfaces_section.append({"id": extra_nic.id, "primary": False})648            index += 1649        network_interfaces_section.append({"id": primary_nic.id, "primary": True})650        self._log.debug(f"start to attach the nics into VM {self._node.name}.")651        compute_client.virtual_machines.begin_update(652            resource_group_name=self._resource_group_name,653            vm_name=self._node.name,654            parameters={655                "network_profile": {"network_interfaces": network_interfaces_section},656            },657        )658        self._log.debug(f"attach the nics into VM {self._node.name} successfully.")659        startstop.start()660    def get_nic_count(self, is_sriov_enabled: bool = True) -> int:661        return len(662            [663                x664                for x in self._get_all_nics()665                if x.enable_accelerated_networking == is_sriov_enabled666            ]667        )668    def remove_extra_nics(self) -> None:669        azure_platform: AzurePlatform = self._platform  # type: ignore670        network_client = get_network_client(azure_platform)671        compute_client = get_compute_client(azure_platform)672        vm = get_vm(azure_platform, self._node)673        if len(vm.network_profile.network_interfaces) == 1:674            self._log.debug("No existed extra nics can be disassociated.")675            return676        nic = self._get_primary(vm.network_profile.network_interfaces)677        nic_name = nic.id.split("/")[-1]678        primary_nic = network_client.network_interfaces.get(679            self._resource_group_name, nic_name680        )681        network_interfaces_section = []682        network_interfaces_section.append({"id": primary_nic.id, "primary": True})683        startstop = self._node.features[StartStop]684        startstop.stop()685        compute_client.virtual_machines.begin_update(686            resource_group_name=self._resource_group_name,687            vm_name=self._node.name,688            parameters={689                "network_profile": {"network_interfaces": network_interfaces_section},690            },691        )692        self._log.debug(693            f"Only associated nic {primary_nic.id} into VM {self._node.name}."694        )695        startstop.start()696    def reload_module(self) -> None:697        modprobe_tool = self._node.tools[Modprobe]698        modprobe_tool.reload(["hv_netvsc"])699    @retry(tries=60, delay=10)700    def _check_sriov_enabled(701        self, enabled: bool, reset_connections: bool = True702    ) -> None:703        if reset_connections:704            self._node.close()705        self._node.nics.reload()706        default_nic = self._node.nics.get_nic_by_index(0)707        if enabled and not default_nic.lower:708            raise LisaException("SRIOV is enabled, but VF is not found.")709        elif not enabled and default_nic.lower:710            raise LisaException("SRIOV is disabled, but VF exists.")711        else:712            # the enabled flag is consistent with VF presents.713            ...714    def _get_primary(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
