Best Python code snippet using lisa_python
platform.py
Source:platform.py  
...131        self._deploy_nodes(environment, log)132    def _delete_environment(self, environment: Environment, log: Logger) -> None:133        self._delete_nodes(environment, log)134        if self.host_node.is_remote:135            self._stop_port_forwarding(environment, log)136        libvirt_log = self.host_node.tools[Journalctl].logs_for_unit(137            "libvirtd", sudo=self.host_node.is_remote138        )139        libvirt_log_path = self.host_node.local_log_path / "libvirtd.log"140        with open(str(libvirt_log_path), "w") as f:141            f.write(libvirt_log)142    def _configure_environment(self, environment: Environment, log: Logger) -> None:143        environment_context = get_environment_context(environment)144        if self.platform_runbook.network_boot_timeout:145            environment_context.network_boot_timeout = (146                self.platform_runbook.network_boot_timeout147            )148        environment_context.ssh_public_key = get_public_key_data(149            self.runbook.admin_private_key_file150        )151    def _configure_node_capabilities(152        self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect153    ) -> bool:154        if not environment.runbook.nodes_requirement:155            return True156        host_capabilities = self._get_host_capabilities(lv_conn, log)157        nodes_capabilities = self._create_node_capabilities(host_capabilities)158        nodes_requirement = []159        for node_space in environment.runbook.nodes_requirement:160            # Check that the general node capabilities are compatible with this node's161            # specific requirements.162            if not node_space.check(nodes_capabilities):163                return False164            # Rectify the general node capabilities with this node's specific165            # requirements.166            node_requirement = node_space.generate_min_capability(nodes_capabilities)167            nodes_requirement.append(node_requirement)168        if not self._check_host_capabilities(nodes_requirement, host_capabilities, log):169            return False170        environment.runbook.nodes_requirement = nodes_requirement171        return True172    def _get_host_capabilities(173        self, lv_conn: libvirt.virConnect, log: Logger174    ) -> _HostCapabilities:175        host_capabilities = _HostCapabilities()176        capabilities_xml_str = lv_conn.getCapabilities()177        capabilities_xml = ET.fromstring(capabilities_xml_str)178        host_xml = capabilities_xml.find("host")179        assert host_xml180        topology_xml = host_xml.find("topology")181        assert topology_xml182        cells_xml = topology_xml.find("cells")183        assert cells_xml184        for cell in cells_xml.findall("cell"):185            cpus_xml = cell.find("cpus")186            assert cpus_xml187            host_capabilities.core_count += int(cpus_xml.attrib["num"])188        # Get free memory.189        # Include the disk cache size, as it will be freed if memory becomes limited.190        memory_stats = lv_conn.getMemoryStats(libvirt.VIR_NODE_MEMORY_STATS_ALL_CELLS)191        host_capabilities.free_memory_kib = (192            memory_stats[libvirt.VIR_NODE_MEMORY_STATS_FREE]193            + memory_stats[libvirt.VIR_NODE_MEMORY_STATS_CACHED]194        )195        log.debug(196            f"QEMU host: "197            f"CPU Cores = {host_capabilities.core_count}, "198            f"Free Memory = {host_capabilities.free_memory_kib} KiB"199        )200        return host_capabilities201    # Create the set of capabilities that are generally supported on QEMU nodes.202    def _create_node_capabilities(203        self, host_capabilities: _HostCapabilities204    ) -> schema.NodeSpace:205        node_capabilities = schema.NodeSpace()206        node_capabilities.name = "QEMU"207        node_capabilities.node_count = 1208        node_capabilities.core_count = search_space.IntRange(209            min=1, max=host_capabilities.core_count210        )211        node_capabilities.disk = schema.DiskOptionSettings(212            data_disk_count=search_space.IntRange(min=0),213            data_disk_size=search_space.IntRange(min=1),214        )215        node_capabilities.network_interface = schema.NetworkInterfaceOptionSettings()216        node_capabilities.network_interface.max_nic_count = 1217        node_capabilities.network_interface.nic_count = 1218        node_capabilities.gpu_count = 0219        node_capabilities.features = search_space.SetSpace[schema.FeatureSettings](220            is_allow_set=True,221            items=[222                schema.FeatureSettings.create(SerialConsole.name()),223            ],224        )225        return node_capabilities226    # Check that the VM requirements can be fulfilled by the host.227    def _check_host_capabilities(228        self,229        nodes_requirements: List[schema.NodeSpace],230        host_capabilities: _HostCapabilities,231        log: Logger,232    ) -> bool:233        total_required_memory_mib = 0234        for node_requirements in nodes_requirements:235            # Calculate the total amount of memory required for all the VMs.236            assert isinstance(node_requirements.memory_mb, int)237            total_required_memory_mib += node_requirements.memory_mb238        # Ensure host has enough memory for all the VMs.239        total_required_memory_kib = total_required_memory_mib * 1024240        if total_required_memory_kib > host_capabilities.free_memory_kib:241            log.error(242                f"Nodes require a total of {total_required_memory_kib} KiB memory. "243                f"Host only has {host_capabilities.free_memory_kib} KiB free."244            )245            return False246        return True247    # Get the minimum value for a node requirement with an interger type.248    # Note: Unlike other orchestrators, we don't want to fill up the capacity of249    # the host in case the test is running on a dev box.250    def _get_count_space_min(self, count_space: search_space.CountSpace) -> int:251        return search_space.generate_min_capability_countspace(count_space, count_space)252    def _deploy_nodes(self, environment: Environment, log: Logger) -> None:253        self._configure_nodes(environment, log)254        with libvirt.open(self.libvirt_conn_str) as lv_conn:255            try:256                self._create_nodes(environment, log, lv_conn)257                self._fill_nodes_metadata(environment, log, lv_conn)258            except Exception as ex:259                assert environment.platform260                if (261                    environment.platform.runbook.keep_environment262                    == constants.ENVIRONMENT_KEEP_NO263                ):264                    self._delete_nodes(environment, log)265                raise ex266    # Pre-determine all the nodes' properties, including the name of all the resouces267    # to be created. This makes it easier to cleanup everything after the test is268    # finished (or fails).269    def _configure_nodes(self, environment: Environment, log: Logger) -> None:270        # Generate a random name for the VMs.271        test_suffix = "".join(random.choice(string.ascii_uppercase) for _ in range(5))272        vm_name_prefix = f"lisa-{test_suffix}"273        self.vm_disks_dir = os.path.join(274            self.platform_runbook.hosts[0].lisa_working_dir, vm_name_prefix275        )276        assert environment.runbook.nodes_requirement277        for i, node_space in enumerate(environment.runbook.nodes_requirement):278            assert isinstance(279                node_space, schema.NodeSpace280            ), f"actual: {type(node_space)}"281            node_runbook: BaseLibvirtNodeSchema = node_space.get_extended_runbook(282                self.__node_runbook_type(), type_name=type(self).type_name()283            )284            if not os.path.exists(node_runbook.disk_img):285                raise LisaException(f"file does not exist: {node_runbook.disk_img}")286            node = environment.create_node_from_requirement(node_space)287            self._configure_node(288                node,289                i,290                node_space,291                node_runbook,292                vm_name_prefix,293            )294    def _configure_node(295        self,296        node: Node,297        node_idx: int,298        node_space: schema.NodeSpace,299        node_runbook: BaseLibvirtNodeSchema,300        vm_name_prefix: str,301    ) -> None:302        node_context = get_node_context(node)303        if (304            not node_runbook.firmware_type305            or node_runbook.firmware_type == FIRMWARE_TYPE_UEFI306        ):307            node_context.use_bios_firmware = False308        elif node_runbook.firmware_type == FIRMWARE_TYPE_BIOS:309            node_context.use_bios_firmware = True310            if node_runbook.enable_secure_boot:311                raise LisaException("Secure-boot requires UEFI firmware.")312        else:313            raise LisaException(314                f"Unknown node firmware type: {node_runbook.firmware_type}."315                f"Expecting either {FIRMWARE_TYPE_UEFI} or {FIRMWARE_TYPE_BIOS}."316            )317        node_context.machine_type = node_runbook.machine_type or None318        node_context.enable_secure_boot = node_runbook.enable_secure_boot319        node_context.vm_name = f"{vm_name_prefix}-{node_idx}"320        if not node.name:321            node.name = node_context.vm_name322        node_context.cloud_init_file_path = os.path.join(323            self.vm_disks_dir, f"{node_context.vm_name}-cloud-init.iso"324        )325        if self.host_node.is_remote:326            node_context.os_disk_source_file_path = node_runbook.disk_img327            node_context.os_disk_base_file_path = os.path.join(328                self.vm_disks_dir, os.path.basename(node_runbook.disk_img)329            )330        else:331            node_context.os_disk_base_file_path = node_runbook.disk_img332        node_context.os_disk_base_file_fmt = DiskImageFormat(333            node_runbook.disk_img_format334        )335        node_context.os_disk_file_path = os.path.join(336            self.vm_disks_dir, f"{node_context.vm_name}-os.qcow2"337        )338        node_context.console_log_file_path = str(339            node.local_log_path / "qemu-console.log"340        )341        # Read extra cloud-init data.342        extra_user_data = (343            node_runbook.cloud_init and node_runbook.cloud_init.extra_user_data344        )345        if extra_user_data:346            node_context.extra_cloud_init_user_data = []347            if isinstance(extra_user_data, str):348                extra_user_data = [extra_user_data]349            for relative_file_path in extra_user_data:350                if not relative_file_path:351                    continue352                file_path = constants.RUNBOOK_PATH.joinpath(relative_file_path)353                with open(file_path, "r") as file:354                    node_context.extra_cloud_init_user_data.append(yaml.safe_load(file))355        # Configure data disks.356        if node_space.disk:357            assert isinstance(358                node_space.disk.data_disk_count, int359            ), f"actual: {type(node_space.disk.data_disk_count)}"360            assert isinstance(361                node_space.disk.data_disk_size, int362            ), f"actual: {type(node_space.disk.data_disk_size)}"363            for i in range(node_space.disk.data_disk_count):364                data_disk = DataDiskContext()365                data_disk.file_path = os.path.join(366                    self.vm_disks_dir, f"{node_context.vm_name}-data-{i}.qcow2"367                )368                data_disk.size_gib = node_space.disk.data_disk_size369                node_context.data_disks.append(data_disk)370    def _create_domain_and_attach_logger(371        self,372        libvirt_conn: libvirt.virConnect,373        node_context: NodeContext,374    ) -> None:375        # Start the VM in the paused state.376        # This gives the console logger a chance to connect before the VM starts377        # for real.378        assert node_context.domain379        node_context.domain.createWithFlags(libvirt.VIR_DOMAIN_START_PAUSED)380        # Attach the console logger381        node_context.console_logger = QemuConsoleLogger()382        node_context.console_logger.attach(383            libvirt_conn, node_context.domain, node_context.console_log_file_path384        )385        # Start the VM.386        node_context.domain.resume()387    # Create all the VMs.388    def _create_nodes(389        self,390        environment: Environment,391        log: Logger,392        lv_conn: libvirt.virConnect,393    ) -> None:394        self.host_node.shell.mkdir(Path(self.vm_disks_dir), exist_ok=True)395        for node in environment.nodes.list():396            node_context = get_node_context(node)397            self._create_node(398                node,399                node_context,400                environment,401                log,402                lv_conn,403            )404    def _create_node(405        self,406        node: Node,407        node_context: NodeContext,408        environment: Environment,409        log: Logger,410        lv_conn: libvirt.virConnect,411    ) -> None:412        # Create required directories and copy the required files to the host413        # node.414        if node_context.os_disk_source_file_path:415            source_exists = self.host_node.tools[Ls].path_exists(416                path=node_context.os_disk_base_file_path, sudo=True417            )418            if not source_exists:419                self.host_node.shell.copy(420                    Path(node_context.os_disk_source_file_path),421                    Path(node_context.os_disk_base_file_path),422                )423        # Create cloud-init ISO file.424        self._create_node_cloud_init_iso(environment, log, node)425        # Create OS disk from the provided image.426        self._create_node_os_disk(environment, log, node)427        # Create data disks428        self._create_node_data_disks(node)429        # Create libvirt domain (i.e. VM).430        xml = self._create_node_domain_xml(environment, log, node, lv_conn)431        node_context.domain = lv_conn.defineXML(xml)432        self._create_domain_and_attach_logger(433            lv_conn,434            node_context,435        )436    # Delete all the VMs.437    def _delete_nodes(self, environment: Environment, log: Logger) -> None:438        # Delete nodes.439        for node in environment.nodes.list():440            self._delete_node(node, log)441        # Delete VM disks directory.442        try:443            self.host_node.shell.remove(Path(self.vm_disks_dir), True)444        except Exception as ex:445            log.warning(f"Failed to delete VM files directory: {ex}")446    def _delete_node_watchdog_callback(self) -> None:447        print("VM delete watchdog timer fired.\n", file=sys.__stderr__)448        faulthandler.dump_traceback(file=sys.__stderr__, all_threads=True)449        os._exit(1)450    def _delete_node(self, node: Node, log: Logger) -> None:451        node_context = get_node_context(node)452        watchdog = Timer(60.0, self._delete_node_watchdog_callback)453        watchdog.start()454        # Stop the VM.455        if node_context.domain:456            log.debug(f"Stop VM: {node_context.vm_name}")457            try:458                # In the libvirt API, "destroy" means "stop".459                node_context.domain.destroy()460            except libvirt.libvirtError as ex:461                log.warning(f"VM stop failed. {ex}")462        # Wait for console log to close.463        # Note: libvirt can deadlock if you try to undefine the VM while the stream464        # is trying to close.465        if node_context.console_logger:466            log.debug(f"Close VM console log: {node_context.vm_name}")467            node_context.console_logger.close()468            node_context.console_logger = None469        # Undefine the VM.470        if node_context.domain:471            log.debug(f"Delete VM: {node_context.vm_name}")472            try:473                node_context.domain.undefineFlags(self._get_domain_undefine_flags())474            except libvirt.libvirtError as ex:475                log.warning(f"VM delete failed. {ex}")476            node_context.domain = None477        watchdog.cancel()478    def _get_domain_undefine_flags(self) -> int:479        return int(480            libvirt.VIR_DOMAIN_UNDEFINE_MANAGED_SAVE481            | libvirt.VIR_DOMAIN_UNDEFINE_SNAPSHOTS_METADATA482            | libvirt.VIR_DOMAIN_UNDEFINE_NVRAM483            | libvirt.VIR_DOMAIN_UNDEFINE_CHECKPOINTS_METADATA484        )485    def _stop_port_forwarding(self, environment: Environment, log: Logger) -> None:486        log.debug(f"Clearing port forwarding rules for environment {environment.name}")487        environment_context = get_environment_context(environment)488        for (port, address) in environment_context.port_forwarding_list:489            self.host_node.tools[Iptables].stop_forwarding(port, address, 22)490    # Retrieve the VMs' dynamic properties (e.g. IP address).491    def _fill_nodes_metadata(492        self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect493    ) -> None:494        environment_context = get_environment_context(environment)495        # Give all the VMs some time to boot and then acquire an IP address.496        timeout = time.time() + environment_context.network_boot_timeout497        if self.host_node.is_remote:498            remote_node = cast(RemoteNode, self.host_node)499            conn_info = remote_node.connection_info...snippet_client_v2.py
Source:snippet_client_v2.py  
...486        self._conn.close()487        self._conn = None488    finally:489      # Always clear the host port as part of the close step490      self._stop_port_forwarding()491  def _stop_port_forwarding(self):492    """Stops the adb port forwarding used by this client."""493    if self.host_port:494      self._device.adb.forward(['--remove', f'tcp:{self.host_port}'])495      self.host_port = None496  def _stop_server(self):497    """Releases all the resources acquired in `start_server`.498    Raises:499      android_device_lib_errors.DeviceError: if the server exited with errors on500        the device side.501    """502    # Although killing the snippet server would abort this subprocess anyway, we503    # want to call stop_standing_subprocess() to perform a health check,504    # print the failure stack trace if there was any, and reap it from the505    # process table. Note that it's much more important to ensure releasing all...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
