Best Python code snippet using lisa_python
platform.py
Source:platform.py  
...79    @classmethod80    def platform_runbook_type(cls) -> type:81        return BaseLibvirtPlatformSchema82    @classmethod83    def node_runbook_type(cls) -> type:84        return BaseLibvirtNodeSchema85    def _initialize(self, *args: Any, **kwargs: Any) -> None:86        libvirt_events_thread.init()87        # 49512 is the first available private port88        self._next_available_port = 4915289        self._port_forwarding_lock = Lock()90        self.platform_runbook = self.runbook.get_extended_runbook(91            self.__platform_runbook_type(), type_name=type(self).type_name()92        )93    def _prepare_environment(self, environment: Environment, log: Logger) -> bool:94        # Ensure environment log directory is created before connecting to any nodes.95        _ = environment.log_path96        if len(self.platform_runbook.hosts) > 1:97            log.warning(98                "Multiple hosts are currently not supported. "99                "Only the first host will be used."100            )101        host = self.platform_runbook.hosts[0]102        if host.is_remote():103            assert host.address104            if not host.username:105                raise LisaException("Username must be provided for remote host")106            if not host.private_key_file:107                raise LisaException("Private key file must be provided for remote host")108            self.host_node = RemoteNode(109                runbook=schema.Node(name="libvirt-host"),110                index=-1,111                logger_name="libvirt-host",112                base_part_path=environment.environment_part_path,113                parent_logger=log,114            )115            self.host_node.set_connection_info(116                address=host.address,117                username=host.username,118                private_key_file=host.private_key_file,119            )120        else:121            self.host_node = local_node_connect(122                name="libvirt-host",123                base_part_path=environment.environment_part_path,124                parent_logger=log,125            )126        self.__init_libvirt_conn_string()127        self._configure_environment(environment, log)128        with libvirt.open(self.libvirt_conn_str) as lv_conn:129            return self._configure_node_capabilities(environment, log, lv_conn)130    def _deploy_environment(self, environment: Environment, log: Logger) -> None:131        self._deploy_nodes(environment, log)132    def _delete_environment(self, environment: Environment, log: Logger) -> None:133        self._delete_nodes(environment, log)134        if self.host_node.is_remote:135            self._stop_port_forwarding(environment, log)136        libvirt_log = self.host_node.tools[Journalctl].logs_for_unit(137            "libvirtd", sudo=self.host_node.is_remote138        )139        libvirt_log_path = self.host_node.local_log_path / "libvirtd.log"140        with open(str(libvirt_log_path), "w") as f:141            f.write(libvirt_log)142    def _configure_environment(self, environment: Environment, log: Logger) -> None:143        environment_context = get_environment_context(environment)144        if self.platform_runbook.network_boot_timeout:145            environment_context.network_boot_timeout = (146                self.platform_runbook.network_boot_timeout147            )148        environment_context.ssh_public_key = get_public_key_data(149            self.runbook.admin_private_key_file150        )151    def _configure_node_capabilities(152        self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect153    ) -> bool:154        if not environment.runbook.nodes_requirement:155            return True156        host_capabilities = self._get_host_capabilities(lv_conn, log)157        nodes_capabilities = self._create_node_capabilities(host_capabilities)158        nodes_requirement = []159        for node_space in environment.runbook.nodes_requirement:160            # Check that the general node capabilities are compatible with this node's161            # specific requirements.162            if not node_space.check(nodes_capabilities):163                return False164            # Rectify the general node capabilities with this node's specific165            # requirements.166            node_requirement = node_space.generate_min_capability(nodes_capabilities)167            nodes_requirement.append(node_requirement)168        if not self._check_host_capabilities(nodes_requirement, host_capabilities, log):169            return False170        environment.runbook.nodes_requirement = nodes_requirement171        return True172    def _get_host_capabilities(173        self, lv_conn: libvirt.virConnect, log: Logger174    ) -> _HostCapabilities:175        host_capabilities = _HostCapabilities()176        capabilities_xml_str = lv_conn.getCapabilities()177        capabilities_xml = ET.fromstring(capabilities_xml_str)178        host_xml = capabilities_xml.find("host")179        assert host_xml180        topology_xml = host_xml.find("topology")181        assert topology_xml182        cells_xml = topology_xml.find("cells")183        assert cells_xml184        for cell in cells_xml.findall("cell"):185            cpus_xml = cell.find("cpus")186            assert cpus_xml187            host_capabilities.core_count += int(cpus_xml.attrib["num"])188        # Get free memory.189        # Include the disk cache size, as it will be freed if memory becomes limited.190        memory_stats = lv_conn.getMemoryStats(libvirt.VIR_NODE_MEMORY_STATS_ALL_CELLS)191        host_capabilities.free_memory_kib = (192            memory_stats[libvirt.VIR_NODE_MEMORY_STATS_FREE]193            + memory_stats[libvirt.VIR_NODE_MEMORY_STATS_CACHED]194        )195        log.debug(196            f"QEMU host: "197            f"CPU Cores = {host_capabilities.core_count}, "198            f"Free Memory = {host_capabilities.free_memory_kib} KiB"199        )200        return host_capabilities201    # Create the set of capabilities that are generally supported on QEMU nodes.202    def _create_node_capabilities(203        self, host_capabilities: _HostCapabilities204    ) -> schema.NodeSpace:205        node_capabilities = schema.NodeSpace()206        node_capabilities.name = "QEMU"207        node_capabilities.node_count = 1208        node_capabilities.core_count = search_space.IntRange(209            min=1, max=host_capabilities.core_count210        )211        node_capabilities.disk = schema.DiskOptionSettings(212            data_disk_count=search_space.IntRange(min=0),213            data_disk_size=search_space.IntRange(min=1),214        )215        node_capabilities.network_interface = schema.NetworkInterfaceOptionSettings()216        node_capabilities.network_interface.max_nic_count = 1217        node_capabilities.network_interface.nic_count = 1218        node_capabilities.gpu_count = 0219        node_capabilities.features = search_space.SetSpace[schema.FeatureSettings](220            is_allow_set=True,221            items=[222                schema.FeatureSettings.create(SerialConsole.name()),223            ],224        )225        return node_capabilities226    # Check that the VM requirements can be fulfilled by the host.227    def _check_host_capabilities(228        self,229        nodes_requirements: List[schema.NodeSpace],230        host_capabilities: _HostCapabilities,231        log: Logger,232    ) -> bool:233        total_required_memory_mib = 0234        for node_requirements in nodes_requirements:235            # Calculate the total amount of memory required for all the VMs.236            assert isinstance(node_requirements.memory_mb, int)237            total_required_memory_mib += node_requirements.memory_mb238        # Ensure host has enough memory for all the VMs.239        total_required_memory_kib = total_required_memory_mib * 1024240        if total_required_memory_kib > host_capabilities.free_memory_kib:241            log.error(242                f"Nodes require a total of {total_required_memory_kib} KiB memory. "243                f"Host only has {host_capabilities.free_memory_kib} KiB free."244            )245            return False246        return True247    # Get the minimum value for a node requirement with an interger type.248    # Note: Unlike other orchestrators, we don't want to fill up the capacity of249    # the host in case the test is running on a dev box.250    def _get_count_space_min(self, count_space: search_space.CountSpace) -> int:251        return search_space.generate_min_capability_countspace(count_space, count_space)252    def _deploy_nodes(self, environment: Environment, log: Logger) -> None:253        self._configure_nodes(environment, log)254        with libvirt.open(self.libvirt_conn_str) as lv_conn:255            try:256                self._create_nodes(environment, log, lv_conn)257                self._fill_nodes_metadata(environment, log, lv_conn)258            except Exception as ex:259                assert environment.platform260                if (261                    environment.platform.runbook.keep_environment262                    == constants.ENVIRONMENT_KEEP_NO263                ):264                    self._delete_nodes(environment, log)265                raise ex266    # Pre-determine all the nodes' properties, including the name of all the resouces267    # to be created. This makes it easier to cleanup everything after the test is268    # finished (or fails).269    def _configure_nodes(self, environment: Environment, log: Logger) -> None:270        # Generate a random name for the VMs.271        test_suffix = "".join(random.choice(string.ascii_uppercase) for _ in range(5))272        vm_name_prefix = f"lisa-{test_suffix}"273        self.vm_disks_dir = os.path.join(274            self.platform_runbook.hosts[0].lisa_working_dir, vm_name_prefix275        )276        assert environment.runbook.nodes_requirement277        for i, node_space in enumerate(environment.runbook.nodes_requirement):278            assert isinstance(279                node_space, schema.NodeSpace280            ), f"actual: {type(node_space)}"281            node_runbook: BaseLibvirtNodeSchema = node_space.get_extended_runbook(282                self.__node_runbook_type(), type_name=type(self).type_name()283            )284            if not os.path.exists(node_runbook.disk_img):285                raise LisaException(f"file does not exist: {node_runbook.disk_img}")286            node = environment.create_node_from_requirement(node_space)287            self._configure_node(288                node,289                i,290                node_space,291                node_runbook,292                vm_name_prefix,293            )294    def _configure_node(295        self,296        node: Node,297        node_idx: int,298        node_space: schema.NodeSpace,299        node_runbook: BaseLibvirtNodeSchema,300        vm_name_prefix: str,301    ) -> None:302        node_context = get_node_context(node)303        if (304            not node_runbook.firmware_type305            or node_runbook.firmware_type == FIRMWARE_TYPE_UEFI306        ):307            node_context.use_bios_firmware = False308        elif node_runbook.firmware_type == FIRMWARE_TYPE_BIOS:309            node_context.use_bios_firmware = True310            if node_runbook.enable_secure_boot:311                raise LisaException("Secure-boot requires UEFI firmware.")312        else:313            raise LisaException(314                f"Unknown node firmware type: {node_runbook.firmware_type}."315                f"Expecting either {FIRMWARE_TYPE_UEFI} or {FIRMWARE_TYPE_BIOS}."316            )317        node_context.machine_type = node_runbook.machine_type or None318        node_context.enable_secure_boot = node_runbook.enable_secure_boot319        node_context.vm_name = f"{vm_name_prefix}-{node_idx}"320        if not node.name:321            node.name = node_context.vm_name322        node_context.cloud_init_file_path = os.path.join(323            self.vm_disks_dir, f"{node_context.vm_name}-cloud-init.iso"324        )325        if self.host_node.is_remote:326            node_context.os_disk_source_file_path = node_runbook.disk_img327            node_context.os_disk_base_file_path = os.path.join(328                self.vm_disks_dir, os.path.basename(node_runbook.disk_img)329            )330        else:331            node_context.os_disk_base_file_path = node_runbook.disk_img332        node_context.os_disk_base_file_fmt = DiskImageFormat(333            node_runbook.disk_img_format334        )335        node_context.os_disk_file_path = os.path.join(336            self.vm_disks_dir, f"{node_context.vm_name}-os.qcow2"337        )338        node_context.console_log_file_path = str(339            node.local_log_path / "qemu-console.log"340        )341        # Read extra cloud-init data.342        extra_user_data = (343            node_runbook.cloud_init and node_runbook.cloud_init.extra_user_data344        )345        if extra_user_data:346            node_context.extra_cloud_init_user_data = []347            if isinstance(extra_user_data, str):348                extra_user_data = [extra_user_data]349            for relative_file_path in extra_user_data:350                if not relative_file_path:351                    continue352                file_path = constants.RUNBOOK_PATH.joinpath(relative_file_path)353                with open(file_path, "r") as file:354                    node_context.extra_cloud_init_user_data.append(yaml.safe_load(file))355        # Configure data disks.356        if node_space.disk:357            assert isinstance(358                node_space.disk.data_disk_count, int359            ), f"actual: {type(node_space.disk.data_disk_count)}"360            assert isinstance(361                node_space.disk.data_disk_size, int362            ), f"actual: {type(node_space.disk.data_disk_size)}"363            for i in range(node_space.disk.data_disk_count):364                data_disk = DataDiskContext()365                data_disk.file_path = os.path.join(366                    self.vm_disks_dir, f"{node_context.vm_name}-data-{i}.qcow2"367                )368                data_disk.size_gib = node_space.disk.data_disk_size369                node_context.data_disks.append(data_disk)370    def _create_domain_and_attach_logger(371        self,372        libvirt_conn: libvirt.virConnect,373        node_context: NodeContext,374    ) -> None:375        # Start the VM in the paused state.376        # This gives the console logger a chance to connect before the VM starts377        # for real.378        assert node_context.domain379        node_context.domain.createWithFlags(libvirt.VIR_DOMAIN_START_PAUSED)380        # Attach the console logger381        node_context.console_logger = QemuConsoleLogger()382        node_context.console_logger.attach(383            libvirt_conn, node_context.domain, node_context.console_log_file_path384        )385        # Start the VM.386        node_context.domain.resume()387    # Create all the VMs.388    def _create_nodes(389        self,390        environment: Environment,391        log: Logger,392        lv_conn: libvirt.virConnect,393    ) -> None:394        self.host_node.shell.mkdir(Path(self.vm_disks_dir), exist_ok=True)395        for node in environment.nodes.list():396            node_context = get_node_context(node)397            self._create_node(398                node,399                node_context,400                environment,401                log,402                lv_conn,403            )404    def _create_node(405        self,406        node: Node,407        node_context: NodeContext,408        environment: Environment,409        log: Logger,410        lv_conn: libvirt.virConnect,411    ) -> None:412        # Create required directories and copy the required files to the host413        # node.414        if node_context.os_disk_source_file_path:415            source_exists = self.host_node.tools[Ls].path_exists(416                path=node_context.os_disk_base_file_path, sudo=True417            )418            if not source_exists:419                self.host_node.shell.copy(420                    Path(node_context.os_disk_source_file_path),421                    Path(node_context.os_disk_base_file_path),422                )423        # Create cloud-init ISO file.424        self._create_node_cloud_init_iso(environment, log, node)425        # Create OS disk from the provided image.426        self._create_node_os_disk(environment, log, node)427        # Create data disks428        self._create_node_data_disks(node)429        # Create libvirt domain (i.e. VM).430        xml = self._create_node_domain_xml(environment, log, node, lv_conn)431        node_context.domain = lv_conn.defineXML(xml)432        self._create_domain_and_attach_logger(433            lv_conn,434            node_context,435        )436    # Delete all the VMs.437    def _delete_nodes(self, environment: Environment, log: Logger) -> None:438        # Delete nodes.439        for node in environment.nodes.list():440            self._delete_node(node, log)441        # Delete VM disks directory.442        try:443            self.host_node.shell.remove(Path(self.vm_disks_dir), True)444        except Exception as ex:445            log.warning(f"Failed to delete VM files directory: {ex}")446    def _delete_node_watchdog_callback(self) -> None:447        print("VM delete watchdog timer fired.\n", file=sys.__stderr__)448        faulthandler.dump_traceback(file=sys.__stderr__, all_threads=True)449        os._exit(1)450    def _delete_node(self, node: Node, log: Logger) -> None:451        node_context = get_node_context(node)452        watchdog = Timer(60.0, self._delete_node_watchdog_callback)453        watchdog.start()454        # Stop the VM.455        if node_context.domain:456            log.debug(f"Stop VM: {node_context.vm_name}")457            try:458                # In the libvirt API, "destroy" means "stop".459                node_context.domain.destroy()460            except libvirt.libvirtError as ex:461                log.warning(f"VM stop failed. {ex}")462        # Wait for console log to close.463        # Note: libvirt can deadlock if you try to undefine the VM while the stream464        # is trying to close.465        if node_context.console_logger:466            log.debug(f"Close VM console log: {node_context.vm_name}")467            node_context.console_logger.close()468            node_context.console_logger = None469        # Undefine the VM.470        if node_context.domain:471            log.debug(f"Delete VM: {node_context.vm_name}")472            try:473                node_context.domain.undefineFlags(self._get_domain_undefine_flags())474            except libvirt.libvirtError as ex:475                log.warning(f"VM delete failed. {ex}")476            node_context.domain = None477        watchdog.cancel()478    def _get_domain_undefine_flags(self) -> int:479        return int(480            libvirt.VIR_DOMAIN_UNDEFINE_MANAGED_SAVE481            | libvirt.VIR_DOMAIN_UNDEFINE_SNAPSHOTS_METADATA482            | libvirt.VIR_DOMAIN_UNDEFINE_NVRAM483            | libvirt.VIR_DOMAIN_UNDEFINE_CHECKPOINTS_METADATA484        )485    def _stop_port_forwarding(self, environment: Environment, log: Logger) -> None:486        log.debug(f"Clearing port forwarding rules for environment {environment.name}")487        environment_context = get_environment_context(environment)488        for (port, address) in environment_context.port_forwarding_list:489            self.host_node.tools[Iptables].stop_forwarding(port, address, 22)490    # Retrieve the VMs' dynamic properties (e.g. IP address).491    def _fill_nodes_metadata(492        self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect493    ) -> None:494        environment_context = get_environment_context(environment)495        # Give all the VMs some time to boot and then acquire an IP address.496        timeout = time.time() + environment_context.network_boot_timeout497        if self.host_node.is_remote:498            remote_node = cast(RemoteNode, self.host_node)499            conn_info = remote_node.connection_info500            address = conn_info[constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS]501        for node in environment.nodes.list():502            assert isinstance(node, RemoteNode)503            # Get the VM's IP address.504            local_address = self._get_node_ip_address(505                environment, log, lv_conn, node, timeout506            )507            node_port = 22508            if self.host_node.is_remote:509                with self._port_forwarding_lock:510                    port_not_found = True511                    while port_not_found:512                        if self._next_available_port > 65535:513                            raise LisaException(514                                "No available ports on the host to forward"515                            )516                        # check if the port is already in use517                        output = self.host_node.execute(518                            f"nc -vz 127.0.0.1 {self._next_available_port}"519                        )520                        if output.exit_code == 1:  # port not in use521                            node_port = self._next_available_port522                            port_not_found = False523                        self._next_available_port += 1524                self.host_node.tools[Iptables].start_forwarding(525                    node_port, local_address, 22526                )527                environment_context.port_forwarding_list.append(528                    (node_port, local_address)529                )530            else:531                address = local_address532            # Set SSH connection info for the node.533            node.set_connection_info(534                address=local_address,535                public_address=address,536                public_port=node_port,537                username=self.runbook.admin_username,538                private_key_file=self.runbook.admin_private_key_file,539            )540            # Ensure cloud-init completes its setup.541            node.execute(542                "cloud-init status --wait",543                sudo=True,544                expected_exit_code=0,545                expected_exit_code_failure_message="waiting on cloud-init",546            )547    # Create a cloud-init ISO for a VM.548    def _create_node_cloud_init_iso(549        self, environment: Environment, log: Logger, node: Node550    ) -> None:551        environment_context = get_environment_context(environment)552        node_context = get_node_context(node)553        user_data = {554            "users": [555                "default",556                {557                    "name": self.runbook.admin_username,558                    "shell": "/bin/bash",559                    "sudo": ["ALL=(ALL) NOPASSWD:ALL"],560                    "groups": ["sudo", "docker"],561                    "ssh_authorized_keys": [environment_context.ssh_public_key],562                },563            ],564        }565        # Iterate through all the top-level properties.566        for extra_user_data in node_context.extra_cloud_init_user_data:567            for key, value in extra_user_data.items():568                existing_value = user_data.get(key)569                if not existing_value:570                    # Property doesn't exist yet. So, add it.571                    user_data[key] = value572                elif isinstance(existing_value, dict) and isinstance(value, dict):573                    # Merge two dictionaries by adding properties from new value and574                    # replacing any existing properties.575                    # Examples: disk_setup, etc.576                    existing_value.update(value)577                elif isinstance(existing_value, list) and isinstance(value, list):578                    # Merge two lists by appending to the end of the existing list.579                    # Examples: write_files, runcmd, etc.580                    existing_value.extend(value)581                else:582                    # String, unknown type or mismatched type.583                    # Just replace the existing property.584                    user_data[key] = value585        meta_data = {586            "local-hostname": node_context.vm_name,587        }588        # Note: cloud-init requires the user-data file to be prefixed with589        # `#cloud-config`.590        user_data_string = "#cloud-config\n" + yaml.safe_dump(user_data)591        meta_data_string = yaml.safe_dump(meta_data)592        iso_path = node_context.cloud_init_file_path593        tmp_dir = tempfile.TemporaryDirectory()594        try:595            iso_path = os.path.join(tmp_dir.name, "cloud-init.iso")596            self._create_iso(597                iso_path,598                [("/user-data", user_data_string), ("/meta-data", meta_data_string)],599            )600            self.host_node.shell.copy(601                Path(iso_path), Path(node_context.cloud_init_file_path)602            )603        finally:604            tmp_dir.cleanup()605    # Create an ISO file.606    def _create_iso(self, file_path: str, files: List[Tuple[str, str]]) -> None:607        iso = pycdlib.PyCdlib()608        iso.new(joliet=3, vol_ident="cidata")609        for i, file in enumerate(files):610            path, contents = file611            contents_data = contents.encode()612            iso.add_fp(613                io.BytesIO(contents_data),614                len(contents_data),615                f"/{i}.;1",616                joliet_path=path,617            )618        iso.write(file_path)619    # Create the OS disk.620    def _create_node_os_disk(621        self, environment: Environment, log: Logger, node: Node622    ) -> None:623        raise NotImplementedError()624    def _create_node_data_disks(self, node: Node) -> None:625        node_context = get_node_context(node)626        qemu_img = self.host_node.tools[QemuImg]627        for disk in node_context.data_disks:628            qemu_img.create_new_qcow2(disk.file_path, disk.size_gib * 1024)629    # Create the XML definition for the VM.630    def _create_node_domain_xml(631        self,632        environment: Environment,633        log: Logger,634        node: Node,635        lv_conn: libvirt.virConnect,636    ) -> str:637        node_context = get_node_context(node)638        domain = ET.Element("domain")639        domain.attrib["type"] = "kvm"640        name = ET.SubElement(domain, "name")641        name.text = node_context.vm_name642        memory = ET.SubElement(domain, "memory")643        memory.attrib["unit"] = "MiB"644        assert isinstance(node.capability.memory_mb, int)645        memory.text = str(node.capability.memory_mb)646        vcpu = ET.SubElement(domain, "vcpu")647        assert isinstance(node.capability.core_count, int)648        vcpu.text = str(node.capability.core_count)649        os_tag = ET.SubElement(domain, "os")650        os_type = ET.SubElement(os_tag, "type")651        os_type.text = "hvm"652        if node_context.machine_type:653            os_type.attrib["machine"] = node_context.machine_type654        if not node_context.use_bios_firmware:655            # In an ideal world, we would use libvirt's firmware auto-selection feature.656            # Unfortunatley, it isn't possible to specify the secure-boot state until657            # libvirt v7.2.0 and Ubuntu 20.04 only has libvirt v6.0.0. Therefore, we658            # have to select the firmware manually.659            firmware_config = self._get_firmware_config(660                lv_conn, node_context.machine_type, node_context.enable_secure_boot661            )662            print(firmware_config)663            loader = ET.SubElement(os_tag, "loader")664            loader.attrib["readonly"] = "yes"665            loader.attrib["type"] = "pflash"666            loader.attrib["secure"] = "yes" if node_context.enable_secure_boot else "no"667            loader.text = firmware_config["mapping"]["executable"]["filename"]668            nvram = ET.SubElement(os_tag, "nvram")669            nvram.attrib["template"] = firmware_config["mapping"]["nvram-template"][670                "filename"671            ]672        features = ET.SubElement(domain, "features")673        ET.SubElement(features, "acpi")674        ET.SubElement(features, "apic")675        cpu = ET.SubElement(domain, "cpu")676        cpu.attrib["mode"] = "host-passthrough"677        clock = ET.SubElement(domain, "clock")678        clock.attrib["offset"] = "utc"679        on_poweroff = ET.SubElement(domain, "on_poweroff")680        on_poweroff.text = "destroy"681        on_reboot = ET.SubElement(domain, "on_reboot")682        on_reboot.text = "restart"683        on_crash = ET.SubElement(domain, "on_crash")684        on_crash.text = "destroy"685        devices = ET.SubElement(domain, "devices")686        serial = ET.SubElement(devices, "serial")687        serial.attrib["type"] = "pty"688        serial_target = ET.SubElement(serial, "target")689        serial_target.attrib["type"] = "isa-serial"690        serial_target.attrib["port"] = "0"691        serial_target_model = ET.SubElement(serial_target, "model")692        serial_target_model.attrib["name"] = "isa-serial"693        console = ET.SubElement(devices, "console")694        console.attrib["type"] = "pty"695        console_target = ET.SubElement(console, "target")696        console_target.attrib["type"] = "serial"697        console_target.attrib["port"] = "0"698        video = ET.SubElement(devices, "video")699        video_model = ET.SubElement(video, "model")700        if isinstance(self.host_node.os, CBLMariner):701            video_model.attrib["type"] = "vga"702        else:703            video_model.attrib["type"] = "qxl"704            graphics = ET.SubElement(devices, "graphics")705            graphics.attrib["type"] = "spice"706        network_interface = ET.SubElement(devices, "interface")707        network_interface.attrib["type"] = "network"708        network_interface_source = ET.SubElement(network_interface, "source")709        network_interface_source.attrib["network"] = "default"710        network_interface_model = ET.SubElement(network_interface, "model")711        network_interface_model.attrib["type"] = "virtio"712        self._add_disk_xml(713            node_context,714            devices,715            node_context.cloud_init_file_path,716            "cdrom",717            "raw",718            "sata",719        )720        self._add_disk_xml(721            node_context,722            devices,723            node_context.os_disk_file_path,724            "disk",725            "qcow2",726            "virtio",727        )728        for data_disk in node_context.data_disks:729            self._add_disk_xml(730                node_context,731                devices,732                data_disk.file_path,733                "disk",734                "qcow2",735                "virtio",736            )737        xml = ET.tostring(domain, "unicode")738        return xml739    def _add_disk_xml(740        self,741        node_context: NodeContext,742        devices: ET.Element,743        file_path: str,744        device_type: str,745        image_type: str,746        bus_type: str,747    ) -> None:748        device_name = self._new_disk_device_name(node_context)749        disk = ET.SubElement(devices, "disk")750        disk.attrib["type"] = "file"751        disk.attrib["device"] = device_type752        disk_driver = ET.SubElement(disk, "driver")753        disk_driver.attrib["name"] = "qemu"754        disk_driver.attrib["type"] = image_type755        disk_target = ET.SubElement(disk, "target")756        disk_target.attrib["dev"] = device_name757        disk_target.attrib["bus"] = bus_type758        disk_source = ET.SubElement(disk, "source")759        disk_source.attrib["file"] = file_path760    def _add_virtio_disk_xml(761        self,762        node_context: NodeContext,763        devices: ET.Element,764        file_path: str,765        queues: int,766    ) -> None:767        device_name = self._new_disk_device_name(node_context, True)768        disk = ET.SubElement(devices, "disk")769        disk.attrib["type"] = "file"770        disk_driver = ET.SubElement(disk, "driver")771        disk_driver.attrib["if"] = "virtio"772        disk_driver.attrib["type"] = "raw"773        disk_driver.attrib["queues"] = str(queues)774        disk_target = ET.SubElement(disk, "target")775        disk_target.attrib["dev"] = device_name776        disk_source = ET.SubElement(disk, "source")777        disk_source.attrib["file"] = file_path778    def _new_disk_device_name(779        self,780        node_context: NodeContext,781        is_paravirtualized: bool = False,782    ) -> str:783        disk_index = node_context.next_disk_index784        node_context.next_disk_index += 1785        device_name = self._get_disk_device_name(disk_index, is_paravirtualized)786        return device_name787    def _get_disk_device_name(788        self, disk_index: int, is_paravirtualized: bool = False789    ) -> str:790        # The disk device name is required to follow the standard Linux device naming791        # scheme. That is: [ sda, sdb, ..., sdz, sdaa, sdab, ... ]. However, it is792        # unlikely that someone will ever need more than 26 disks. So, keep is simple793        # for now.794        if disk_index < 0 or disk_index > 25:795            raise LisaException(f"Unsupported disk index: {disk_index}.")796        prefix = "v" if is_paravirtualized else "s"797        suffix = chr(ord("a") + disk_index)798        return f"{prefix}d{suffix}"799    # Wait for the VM to boot and then get the IP address.800    def _get_node_ip_address(801        self,802        environment: Environment,803        log: Logger,804        lv_conn: libvirt.virConnect,805        node: Node,806        timeout: float,807    ) -> str:808        node_context = get_node_context(node)809        while True:810            addr = self._try_get_node_ip_address(environment, log, lv_conn, node)811            if addr:812                return addr813            if time.time() > timeout:814                raise LisaException(f"no IP addresses found for {node_context.vm_name}")815    # Try to get the IP address of the VM.816    def _try_get_node_ip_address(817        self,818        environment: Environment,819        log: Logger,820        lv_conn: libvirt.virConnect,821        node: Node,822    ) -> Optional[str]:823        node_context = get_node_context(node)824        domain = lv_conn.lookupByName(node_context.vm_name)825        # Acquire IP address from libvirt's DHCP server.826        interfaces = domain.interfaceAddresses(827            libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE828        )829        if len(interfaces) < 1:830            return None831        interface_name = next(iter(interfaces))832        addrs = interfaces[interface_name]["addrs"]833        if len(addrs) < 1:834            return None835        addr = addrs[0]["addr"]836        assert isinstance(addr, str)837        return addr838    def _get_firmware_config(839        self,840        lv_conn: libvirt.virConnect,841        machine_type: Optional[str],842        enable_secure_boot: bool,843    ) -> Dict[str, Any]:844        # Resolve the machine type to its full name.845        domain_caps_str = lv_conn.getDomainCapabilities(846            machine=machine_type, virttype="kvm"847        )848        domain_caps = ET.fromstring(domain_caps_str)849        full_machine_type = domain_caps.findall("./machine")[0].text850        arch = domain_caps.findall("./arch")[0].text851        # Read the QEMU firmware config files.852        # Note: "/usr/share/qemu/firmware" is a well known location for these files.853        firmware_configs_str = self.host_node.execute(854            "cat /usr/share/qemu/firmware/*.json",855            shell=True,856            expected_exit_code=0,857            no_debug_log=True,858        ).stdout859        firmware_configs = self._read_concat_json_str(firmware_configs_str)860        # Filter on architecture.861        filtered_firmware_configs = filter(862            lambda f: f["targets"][0]["architecture"] == arch, firmware_configs863        )864        # Filter on machine type.865        filtered_firmware_configs = filter(866            lambda f: any(867                fnmatch.fnmatch(full_machine_type, target_machine)868                for target_machine in f["targets"][0]["machines"]869            ),870            filtered_firmware_configs,871        )872        # Exclude Intel TDX and AMD SEV-ES firmwares.873        filtered_firmware_configs = filter(874            lambda f: "intel-tdx" not in f["features"]875            and "amd-sev-es" not in f["features"],876            filtered_firmware_configs,877        )878        # Filter on secure boot.879        if enable_secure_boot:880            filtered_firmware_configs = filter(881                lambda f: "secure-boot" in f["features"]882                and "enrolled-keys" in f["features"],883                filtered_firmware_configs,884            )885        else:886            filtered_firmware_configs = filter(887                lambda f: "secure-boot" not in f["features"], filtered_firmware_configs888            )889        # Get first matching firmware.890        firmware_config = next(filtered_firmware_configs, None)891        if firmware_config is None:892            raise LisaException(893                f"Could not find matching firmware for machine-type={machine_type} "894                f"and secure-boot={enable_secure_boot}."895            )896        return firmware_config897    # Read a bunch of JSON files that have been concatenated together.898    def _read_concat_json_str(self, json_str: str) -> List[Dict[str, Any]]:899        objs = []900        # From: https://stackoverflow.com/a/42985887901        decoder = json.JSONDecoder()902        text = json_str.lstrip()  # decode hates leading whitespace903        while text:904            obj, index = decoder.raw_decode(text)905            text = text[index:].lstrip()906            objs.append(obj)907        return objs908    def _libvirt_uri_schema(self) -> str:909        raise NotImplementedError()910    def __init_libvirt_conn_string(self) -> None:911        hypervisor = self._libvirt_uri_schema()912        host = self.platform_runbook.hosts[0]913        host_addr = ""914        transport = ""915        params = ""916        if host.is_remote():917            assert host.address918            assert host.username919            host_addr = f"{host.username}@{host.address}"920            transport = "+ssh"921            params = f"?keyfile={host.private_key_file}"922        self.libvirt_conn_str = f"{hypervisor}{transport}://{host_addr}/system{params}"923    def __platform_runbook_type(self) -> type:924        platform_runbook_type: type = type(self).platform_runbook_type()925        assert issubclass(platform_runbook_type, BaseLibvirtPlatformSchema)926        return platform_runbook_type927    def __node_runbook_type(self) -> type:928        node_runbook_type: type = type(self).node_runbook_type()929        assert issubclass(node_runbook_type, BaseLibvirtNodeSchema)930        return node_runbook_type931    def _get_host_distro(self) -> str:932        result = self.host_node.os.information.full_version if self.host_node else ""933        return result934    def _get_host_kernel_version(self) -> str:935        result = ""936        if self.host_node:937            uname = self.host_node.tools[Uname]938            result = uname.get_linux_information().kernel_version_raw939        return result940    def _get_libvirt_version(self) -> str:941        result = ""942        if self.host_node:...ch_platform.py
Source:ch_platform.py  
...25    @classmethod26    def supported_features(cls) -> List[Type[Feature]]:27        return BaseLibvirtPlatform._supported_features28    @classmethod29    def node_runbook_type(cls) -> type:30        return CloudHypervisorNodeSchema31    def _libvirt_uri_schema(self) -> str:32        return "ch"33    def _configure_node(34        self,35        node: Node,36        node_idx: int,37        node_space: schema.NodeSpace,38        node_runbook: BaseLibvirtNodeSchema,39        vm_name_prefix: str,40    ) -> None:41        super()._configure_node(42            node,43            node_idx,...qemu_platform.py
Source:qemu_platform.py  
...19    @classmethod20    def supported_features(cls) -> List[Type[Feature]]:21        return BaseLibvirtPlatform._supported_features22    @classmethod23    def node_runbook_type(cls) -> type:24        return QemuNodeSchema25    def _libvirt_uri_schema(self) -> str:26        return "qemu"27    # Create the OS disk.28    def _create_node_os_disk(29        self, environment: Environment, log: Logger, node: Node30    ) -> None:31        node_context = get_node_context(node)32        self.host_node.tools[QemuImg].create_diff_qcow2(33            node_context.os_disk_file_path, node_context.os_disk_base_file_path34        )35    def _get_vmm_version(self) -> str:36        result = "Unknown"37        if self.host_node:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
