Best Python code snippet using lisa_python
platform.py
Source:platform.py  
...655            # In an ideal world, we would use libvirt's firmware auto-selection feature.656            # Unfortunatley, it isn't possible to specify the secure-boot state until657            # libvirt v7.2.0 and Ubuntu 20.04 only has libvirt v6.0.0. Therefore, we658            # have to select the firmware manually.659            firmware_config = self._get_firmware_config(660                lv_conn, node_context.machine_type, node_context.enable_secure_boot661            )662            print(firmware_config)663            loader = ET.SubElement(os_tag, "loader")664            loader.attrib["readonly"] = "yes"665            loader.attrib["type"] = "pflash"666            loader.attrib["secure"] = "yes" if node_context.enable_secure_boot else "no"667            loader.text = firmware_config["mapping"]["executable"]["filename"]668            nvram = ET.SubElement(os_tag, "nvram")669            nvram.attrib["template"] = firmware_config["mapping"]["nvram-template"][670                "filename"671            ]672        features = ET.SubElement(domain, "features")673        ET.SubElement(features, "acpi")674        ET.SubElement(features, "apic")675        cpu = ET.SubElement(domain, "cpu")676        cpu.attrib["mode"] = "host-passthrough"677        clock = ET.SubElement(domain, "clock")678        clock.attrib["offset"] = "utc"679        on_poweroff = ET.SubElement(domain, "on_poweroff")680        on_poweroff.text = "destroy"681        on_reboot = ET.SubElement(domain, "on_reboot")682        on_reboot.text = "restart"683        on_crash = ET.SubElement(domain, "on_crash")684        on_crash.text = "destroy"685        devices = ET.SubElement(domain, "devices")686        serial = ET.SubElement(devices, "serial")687        serial.attrib["type"] = "pty"688        serial_target = ET.SubElement(serial, "target")689        serial_target.attrib["type"] = "isa-serial"690        serial_target.attrib["port"] = "0"691        serial_target_model = ET.SubElement(serial_target, "model")692        serial_target_model.attrib["name"] = "isa-serial"693        console = ET.SubElement(devices, "console")694        console.attrib["type"] = "pty"695        console_target = ET.SubElement(console, "target")696        console_target.attrib["type"] = "serial"697        console_target.attrib["port"] = "0"698        video = ET.SubElement(devices, "video")699        video_model = ET.SubElement(video, "model")700        if isinstance(self.host_node.os, CBLMariner):701            video_model.attrib["type"] = "vga"702        else:703            video_model.attrib["type"] = "qxl"704            graphics = ET.SubElement(devices, "graphics")705            graphics.attrib["type"] = "spice"706        network_interface = ET.SubElement(devices, "interface")707        network_interface.attrib["type"] = "network"708        network_interface_source = ET.SubElement(network_interface, "source")709        network_interface_source.attrib["network"] = "default"710        network_interface_model = ET.SubElement(network_interface, "model")711        network_interface_model.attrib["type"] = "virtio"712        self._add_disk_xml(713            node_context,714            devices,715            node_context.cloud_init_file_path,716            "cdrom",717            "raw",718            "sata",719        )720        self._add_disk_xml(721            node_context,722            devices,723            node_context.os_disk_file_path,724            "disk",725            "qcow2",726            "virtio",727        )728        for data_disk in node_context.data_disks:729            self._add_disk_xml(730                node_context,731                devices,732                data_disk.file_path,733                "disk",734                "qcow2",735                "virtio",736            )737        xml = ET.tostring(domain, "unicode")738        return xml739    def _add_disk_xml(740        self,741        node_context: NodeContext,742        devices: ET.Element,743        file_path: str,744        device_type: str,745        image_type: str,746        bus_type: str,747    ) -> None:748        device_name = self._new_disk_device_name(node_context)749        disk = ET.SubElement(devices, "disk")750        disk.attrib["type"] = "file"751        disk.attrib["device"] = device_type752        disk_driver = ET.SubElement(disk, "driver")753        disk_driver.attrib["name"] = "qemu"754        disk_driver.attrib["type"] = image_type755        disk_target = ET.SubElement(disk, "target")756        disk_target.attrib["dev"] = device_name757        disk_target.attrib["bus"] = bus_type758        disk_source = ET.SubElement(disk, "source")759        disk_source.attrib["file"] = file_path760    def _add_virtio_disk_xml(761        self,762        node_context: NodeContext,763        devices: ET.Element,764        file_path: str,765        queues: int,766    ) -> None:767        device_name = self._new_disk_device_name(node_context, True)768        disk = ET.SubElement(devices, "disk")769        disk.attrib["type"] = "file"770        disk_driver = ET.SubElement(disk, "driver")771        disk_driver.attrib["if"] = "virtio"772        disk_driver.attrib["type"] = "raw"773        disk_driver.attrib["queues"] = str(queues)774        disk_target = ET.SubElement(disk, "target")775        disk_target.attrib["dev"] = device_name776        disk_source = ET.SubElement(disk, "source")777        disk_source.attrib["file"] = file_path778    def _new_disk_device_name(779        self,780        node_context: NodeContext,781        is_paravirtualized: bool = False,782    ) -> str:783        disk_index = node_context.next_disk_index784        node_context.next_disk_index += 1785        device_name = self._get_disk_device_name(disk_index, is_paravirtualized)786        return device_name787    def _get_disk_device_name(788        self, disk_index: int, is_paravirtualized: bool = False789    ) -> str:790        # The disk device name is required to follow the standard Linux device naming791        # scheme. That is: [ sda, sdb, ..., sdz, sdaa, sdab, ... ]. However, it is792        # unlikely that someone will ever need more than 26 disks. So, keep is simple793        # for now.794        if disk_index < 0 or disk_index > 25:795            raise LisaException(f"Unsupported disk index: {disk_index}.")796        prefix = "v" if is_paravirtualized else "s"797        suffix = chr(ord("a") + disk_index)798        return f"{prefix}d{suffix}"799    # Wait for the VM to boot and then get the IP address.800    def _get_node_ip_address(801        self,802        environment: Environment,803        log: Logger,804        lv_conn: libvirt.virConnect,805        node: Node,806        timeout: float,807    ) -> str:808        node_context = get_node_context(node)809        while True:810            addr = self._try_get_node_ip_address(environment, log, lv_conn, node)811            if addr:812                return addr813            if time.time() > timeout:814                raise LisaException(f"no IP addresses found for {node_context.vm_name}")815    # Try to get the IP address of the VM.816    def _try_get_node_ip_address(817        self,818        environment: Environment,819        log: Logger,820        lv_conn: libvirt.virConnect,821        node: Node,822    ) -> Optional[str]:823        node_context = get_node_context(node)824        domain = lv_conn.lookupByName(node_context.vm_name)825        # Acquire IP address from libvirt's DHCP server.826        interfaces = domain.interfaceAddresses(827            libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE828        )829        if len(interfaces) < 1:830            return None831        interface_name = next(iter(interfaces))832        addrs = interfaces[interface_name]["addrs"]833        if len(addrs) < 1:834            return None835        addr = addrs[0]["addr"]836        assert isinstance(addr, str)837        return addr838    def _get_firmware_config(839        self,840        lv_conn: libvirt.virConnect,841        machine_type: Optional[str],842        enable_secure_boot: bool,843    ) -> Dict[str, Any]:844        # Resolve the machine type to its full name.845        domain_caps_str = lv_conn.getDomainCapabilities(846            machine=machine_type, virttype="kvm"847        )848        domain_caps = ET.fromstring(domain_caps_str)849        full_machine_type = domain_caps.findall("./machine")[0].text850        arch = domain_caps.findall("./arch")[0].text851        # Read the QEMU firmware config files.852        # Note: "/usr/share/qemu/firmware" is a well known location for these files....zcu111_vna_sweep.py
Source:zcu111_vna_sweep.py  
...33        print('Connecting to board: %s' % host)34        self.fpga = casperfpga.CasperFpga(host, transport=casperfpga.KatcpTransport)35        self._parse_fpg(fpgfile)36        try:37            self._get_firmware_config()38        except:39            pass # Probably not programmed40    def _get_firmware_config(self):41        devlist = self.fpga.listdev()42        if 'n_samples' in devlist:43            self.n_samples = self.fpga.read_uint('n_samples')44        else:45            self.n_samples = 046        if 'n_parallel' in devlist:47            self.n_parallel = self.fpga.read_uint('n_parallel')48        else:49            self.n_parallel = FPGA_DEMUX_FACTOR50        self.sample_hz = SAMPLE_HZ // 8 * self.n_parallel // 251        self.fpga_clk_mhz = self.sample_hz / self.n_parallel / 1e652        print('Firmware config detected: %d sample RAM buffer' % self.n_samples)53        print('Firmware config detected: %d ADC/DAC demux' % self.n_parallel)54        print('Firmware config detected: %.2f MHz FPGA clock' % self.fpga_clk_mhz)55        print('Firmware config detected: %.2f MHz ADC clock' % (self.sample_hz / 1e6))56    def _parse_fpg(self, fpgfile):57        self.fpgfile = fpgfile58        print('Parsing %s' % fpgfile)59        self.fpga.get_system_information(fpgfile)60        self.rfdc = self.fpga.adcs['rfdc']61    def program(self, fpgfile=None):62        fpgfile = fpgfile or self.fpgfile63        print('Programming with %s' % fpgfile)64        self.fpga.upload_to_ram_and_program(fpgfile)65        print('Initializing RFDC')66        self.fpga.adcs['rfdc'].init(lmk_file=LMK_FILE, lmx_file=LMX_FILE)67        self._parse_fpg(fpgfile)68        self._get_firmware_config()69        if self.n_parallel > 2:70            self.fpga.write_int('phase_inc_rst', 1)71    def enable_loopback(self):72        print('Enabling DAC->ADC internal loopback')73        self.fpga.write_int('adc_dac_loopback', 1)74    def disable_loopback(self):75        print('Disabling DAC->ADC internal loopback')76        self.fpga.write_int('adc_dac_loopback', 0)77    def get_fpga_clk(self):78        fpga_clk_mhz = self.fpga.estimate_fpga_clock()79        print('FPGA board clock: %.2f MHz' % fpga_clk_mhz)80        return fpga_clk_mhz81    def get_rfdc_status(self):82        self.rfdc.status()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
