Best Python code snippet using lisa_python
platform_.py
Source:platform_.py  
...1574            product=plan_product,1575            publisher=plan_publisher,1576        )1577        return plan1578    def _generate_max_capability(self, vm_size: str, location: str) -> AzureCapability:1579        # some vm size cannot be queried from API, so use default capability to1580        # run with best guess on capability.1581        node_space = schema.NodeSpace(1582            node_count=1,1583            core_count=search_space.IntRange(min=1),1584            memory_mb=search_space.IntRange(min=0),1585            gpu_count=search_space.IntRange(min=0),1586        )1587        node_space.disk = features.AzureDiskOptionSettings()1588        node_space.disk.data_disk_count = search_space.IntRange(min=0)1589        node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1590            is_allow_set=True, items=[]1591        )1592        node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1593        node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1594        node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1595        node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1596        node_space.network_interface = schema.NetworkInterfaceOptionSettings()1597        node_space.network_interface.data_path = search_space.SetSpace[1598            schema.NetworkDataPath1599        ](is_allow_set=True, items=[])1600        node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1601        node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1602        node_space.network_interface.nic_count = search_space.IntRange(min=1)1603        # till now, the max nic number supported in Azure is 81604        node_space.network_interface.max_nic_count = 81605        azure_capability = AzureCapability(1606            location=location,1607            vm_size=vm_size,1608            capability=node_space,1609            resource_sku={},1610        )1611        node_space.name = f"{location}_{vm_size}"1612        node_space.features = search_space.SetSpace[schema.FeatureSettings](1613            is_allow_set=True1614        )1615        # all nodes support following features1616        all_features = self.supported_features()1617        node_space.features.update(1618            [schema.FeatureSettings.create(x.name()) for x in all_features]1619        )1620        _convert_to_azure_node_space(node_space)1621        return azure_capability1622    def _generate_min_capability(1623        self,1624        requirement: schema.NodeSpace,1625        azure_capability: AzureCapability,1626        location: str,1627    ) -> schema.NodeSpace:1628        min_cap: schema.NodeSpace = requirement.generate_min_capability(1629            azure_capability.capability1630        )1631        # Apply azure specified values. They will pass into arm template1632        azure_node_runbook = min_cap.get_extended_runbook(AzureNodeSchema, AZURE)1633        if azure_node_runbook.location:1634            assert location in azure_node_runbook.location, (1635                f"predefined location [{azure_node_runbook.location}] "1636                f"must be same as "1637                f"cap location [{location}]"1638            )1639        # the location may not be set1640        azure_node_runbook.location = location1641        azure_node_runbook.vm_size = azure_capability.vm_size1642        return min_cap1643    def _generate_sas_token(self, result_dict: Dict[str, str]) -> Any:1644        sc_name = result_dict["account_name"]1645        container_name = result_dict["container_name"]1646        rg = result_dict["resource_group_name"]1647        blob_name = result_dict["blob_name"]1648        source_container_client = get_or_create_storage_container(1649            credential=self.credential,1650            subscription_id=self.subscription_id,1651            account_name=sc_name,1652            container_name=container_name,1653            resource_group_name=rg,1654        )1655        source_blob = source_container_client.get_blob_client(blob_name)1656        sas_token = generate_sas_token(1657            credential=self.credential,1658            subscription_id=self.subscription_id,1659            account_name=sc_name,1660            resource_group_name=rg,1661        )1662        source_url = source_blob.url + "?" + sas_token1663        return source_url1664    @lru_cache(maxsize=10)  # noqa: B0191665    def _get_deployable_vhd_path(1666        self, vhd_path: str, location: str, log: Logger1667    ) -> str:1668        """1669        The sas url is not able to create a vm directly, so this method check if1670        the vhd_path is a sas url. If so, copy it to a location in current1671        subscription, so it can be deployed.1672        """1673        matches = SAS_URL_PATTERN.match(vhd_path)1674        if not matches:1675            vhd_details = self._get_vhd_details(vhd_path)1676            vhd_location = vhd_details["location"]1677            if location == vhd_location:1678                return vhd_path1679            else:1680                vhd_path = self._generate_sas_token(vhd_details)1681                matches = SAS_URL_PATTERN.match(vhd_path)1682                assert matches, f"fail to generate sas url for {vhd_path}"1683                log.debug(1684                    f"the vhd location {location} is not same with running case "1685                    f"location {vhd_location}, generate a sas url for source vhd, "1686                    f"it needs to be copied into location {location}."1687                )1688        else:1689            log.debug("found the vhd is a sas url, it may need to be copied.")1690        # get original vhd's hash key for comparing.1691        original_key: Optional[bytearray] = None1692        original_vhd_path = vhd_path1693        original_blob_client = BlobClient.from_blob_url(original_vhd_path)1694        properties = original_blob_client.get_blob_properties()1695        if properties.content_settings:1696            original_key = properties.content_settings.get(1697                "content_md5", None1698            )  # type: ignore1699        storage_name = get_storage_account_name(1700            subscription_id=self.subscription_id, location=location, type="t"1701        )1702        check_or_create_storage_account(1703            self.credential,1704            self.subscription_id,1705            storage_name,1706            self._azure_runbook.shared_resource_group_name,1707            location,1708            log,1709        )1710        container_client = get_or_create_storage_container(1711            credential=self.credential,1712            subscription_id=self.subscription_id,1713            account_name=storage_name,1714            container_name=SAS_COPIED_CONTAINER_NAME,1715            resource_group_name=self._azure_runbook.shared_resource_group_name,1716        )1717        normalized_vhd_name = constants.NORMALIZE_PATTERN.sub("-", vhd_path)1718        year = matches["year"] if matches["year"] else "9999"1719        month = matches["month"] if matches["month"] else "01"1720        day = matches["day"] if matches["day"] else "01"1721        # use the expire date to generate the path. It's easy to identify when1722        # the cache can be removed.1723        vhd_path = f"{year}{month}{day}/{normalized_vhd_name}.vhd"1724        full_vhd_path = f"{container_client.url}/{vhd_path}"1725        # lock here to prevent a vhd is copied in multi-thread1726        global _global_sas_vhd_copy_lock1727        cached_key: Optional[bytearray] = None1728        with _global_sas_vhd_copy_lock:1729            blobs = container_client.list_blobs(name_starts_with=vhd_path)1730            for blob in blobs:1731                if blob:1732                    # check if hash key matched with original key.1733                    if blob.content_settings:1734                        cached_key = blob.content_settings.get("content_md5", None)1735                    if original_key == cached_key:1736                        # if it exists, return the link, not to copy again.1737                        log.debug("the sas url is copied already, use it directly.")1738                        return full_vhd_path1739                    else:1740                        log.debug("found cached vhd, but the hash key mismatched.")1741            blob_client = container_client.get_blob_client(vhd_path)1742            blob_client.start_copy_from_url(1743                original_vhd_path, metadata=None, incremental_copy=False1744            )1745            wait_copy_blob(blob_client, vhd_path, log)1746        return full_vhd_path1747    def _get_vhd_details(self, vhd_path: str) -> Any:1748        matched = STORAGE_CONTAINER_BLOB_PATTERN.match(vhd_path)1749        assert matched, f"fail to get matched info from {vhd_path}"1750        sc_name = matched.group("sc")1751        container_name = matched.group("container")1752        blob_name = matched.group("blob")1753        storage_client = get_storage_client(self.credential, self.subscription_id)1754        sc = [x for x in storage_client.storage_accounts.list() if x.name == sc_name]1755        assert sc[1756            01757        ], f"fail to get storage account {sc_name} from {self.subscription_id}"1758        rg = get_matched_str(sc[0].id, RESOURCE_GROUP_PATTERN)1759        return {1760            "location": sc[0].location,1761            "resource_group_name": rg,1762            "account_name": sc_name,1763            "container_name": container_name,1764            "blob_name": blob_name,1765        }1766    def _generate_data_disks(1767        self,1768        node: Node,1769        azure_node_runbook: AzureNodeArmParameter,1770    ) -> List[DataDiskSchema]:1771        data_disks: List[DataDiskSchema] = []1772        assert node.capability.disk1773        if azure_node_runbook.marketplace:1774            marketplace = self._get_image_info(1775                azure_node_runbook.location, azure_node_runbook.marketplace1776            )1777            # some images has data disks by default1778            # e.g. microsoft-ads linux-data-science-vm linuxdsvm 21.05.271779            # we have to inject below part when dataDisks section added in1780            #  arm template,1781            # otherwise will see below exception:1782            # deployment failed: InvalidParameter: StorageProfile.dataDisks.lun1783            #  does not have required value(s) for image specified in1784            #  storage profile.1785            for default_data_disk in marketplace.data_disk_images:1786                data_disks.append(1787                    DataDiskSchema(1788                        node.capability.disk.data_disk_caching_type,1789                        default_data_disk.additional_properties["sizeInGb"],1790                        azure_node_runbook.disk_type,1791                        DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_FROM_IMAGE,1792                    )1793                )1794        assert isinstance(1795            node.capability.disk.data_disk_count, int1796        ), f"actual: {type(node.capability.disk.data_disk_count)}"1797        for _ in range(node.capability.disk.data_disk_count):1798            assert isinstance(node.capability.disk.data_disk_size, int)1799            data_disks.append(1800                DataDiskSchema(1801                    node.capability.disk.data_disk_caching_type,1802                    node.capability.disk.data_disk_size,1803                    azure_node_runbook.disk_type,1804                    DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY,1805                )1806            )1807        return data_disks1808    @lru_cache(maxsize=10)  # noqa: B0191809    def _get_image_info(1810        self, location: str, marketplace: Optional[AzureVmMarketplaceSchema]1811    ) -> VirtualMachineImage:1812        compute_client = get_compute_client(self)1813        assert isinstance(marketplace, AzureVmMarketplaceSchema)1814        with global_credential_access_lock:1815            image_info = compute_client.virtual_machine_images.get(1816                location=location,1817                publisher_name=marketplace.publisher,1818                offer=marketplace.offer,1819                skus=marketplace.sku,1820                version=marketplace.version,1821            )1822        return image_info1823    def _get_location_key(self, location: str) -> str:1824        return f"{self.subscription_id}_{location}"1825    def _enable_ssh_on_windows(self, node: Node) -> None:1826        runbook = node.capability.get_extended_runbook(AzureNodeSchema)1827        if runbook.is_linux:1828            return1829        context = get_node_context(node)1830        remote_node = cast(RemoteNode, node)1831        log = node.log1832        log.debug(1833            f"checking if SSH port {remote_node.public_port} is reachable "1834            f"on {remote_node.name}..."1835        )1836        connected, _ = wait_tcp_port_ready(1837            address=remote_node.public_address,1838            port=remote_node.public_port,1839            log=log,1840            timeout=3,1841        )1842        if connected:1843            log.debug("SSH port is reachable.")1844            return1845        log.debug("SSH port is not open, enabling ssh on Windows ...")1846        # The SSH port is not opened, try to enable it.1847        public_key_data = get_public_key_data(self.runbook.admin_private_key_file)1848        with open(Path(__file__).parent / "Enable-SSH.ps1", "r") as f:1849            script = f.read()1850        parameters = RunCommandInputParameter(name="PublicKey", value=public_key_data)1851        command = RunCommandInput(1852            command_id="RunPowerShellScript",1853            script=[script],1854            parameters=[parameters],1855        )1856        compute_client = get_compute_client(self)1857        operation = compute_client.virtual_machines.begin_run_command(1858            resource_group_name=context.resource_group_name,1859            vm_name=context.vm_name,1860            parameters=command,1861        )1862        result = wait_operation(operation=operation, failure_identity="enable ssh")1863        log.debug("SSH script result:")1864        log.dump_json(logging.DEBUG, result)1865    def _get_vhd_os_disk_size(self, blob_url: str) -> int:1866        result_dict = self._get_vhd_details(blob_url)1867        container_client = get_or_create_storage_container(1868            credential=self.credential,1869            subscription_id=self.subscription_id,1870            account_name=result_dict["account_name"],1871            container_name=result_dict["container_name"],1872            resource_group_name=result_dict["resource_group_name"],1873        )1874        vhd_blob = container_client.get_blob_client(result_dict["blob_name"])1875        properties = vhd_blob.get_blob_properties()1876        assert properties.size, f"fail to get blob size of {blob_url}"1877        # Azure requires only megabyte alignment of vhds, round size up1878        # for cases where the size is megabyte aligned1879        return math.ceil(properties.size / 1024 / 1024 / 1024)1880    def _get_sig_info(1881        self, shared_image: SharedImageGallerySchema1882    ) -> GalleryImageVersion:1883        compute_client = get_compute_client(self)1884        return compute_client.gallery_image_versions.get(1885            resource_group_name=shared_image.resource_group_name,1886            gallery_name=shared_image.image_gallery,1887            gallery_image_name=shared_image.image_definition,1888            gallery_image_version_name=shared_image.image_version,1889            expand="ReplicationStatus",1890        )1891    def _get_sig_os_disk_size(self, shared_image: SharedImageGallerySchema) -> int:1892        found_image = self._get_sig_info(shared_image)1893        assert found_image.storage_profile.os_disk_image.size_in_gb1894        return int(found_image.storage_profile.os_disk_image.size_in_gb)1895    def _get_normalized_vm_sizes(1896        self, name: str, location: str, log: Logger1897    ) -> List[str]:1898        split_vm_sizes: List[str] = [x.strip() for x in name.split(",")]1899        for index, vm_size in enumerate(split_vm_sizes):1900            split_vm_sizes[index] = self._get_normalized_vm_size(vm_size, location, log)1901        return [x for x in split_vm_sizes if x]1902    def _get_normalized_vm_size(self, name: str, location: str, log: Logger) -> str:1903        # find predefined vm size on all available's.1904        location_info: AzureLocation = self.get_location_info(location, log)1905        matched_score: float = 01906        matched_name: str = ""1907        matcher = SequenceMatcher(None, name.lower(), "")1908        for vm_size in location_info.capabilities:1909            matcher.set_seq2(vm_size.lower())1910            if name.lower() in vm_size.lower() and matched_score < matcher.ratio():1911                matched_name = vm_size1912                matched_score = matcher.ratio()1913        return matched_name1914    def _get_capabilities(1915        self, vm_sizes: List[str], location: str, use_max_capability: bool, log: Logger1916    ) -> List[AzureCapability]:1917        candidate_caps: List[AzureCapability] = []1918        caps = self.get_location_info(location, log).capabilities1919        for vm_size in vm_sizes:1920            # force to use max capability to run test cases as much as possible,1921            # or force to support non-exists vm size.1922            if use_max_capability:1923                candidate_caps.append(self._generate_max_capability(vm_size, location))1924                continue1925            if vm_size in caps:1926                candidate_caps.append(caps[vm_size])1927        return candidate_caps1928    def _get_matched_capability(1929        self,1930        requirement: schema.NodeSpace,1931        candidate_capabilities: List[AzureCapability],1932    ) -> Optional[schema.NodeSpace]:1933        matched_cap: Optional[schema.NodeSpace] = None1934        # filter allowed vm sizes1935        for azure_cap in candidate_capabilities:1936            check_result = requirement.check(azure_cap.capability)1937            if check_result.result:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
