How to use get_sorted_vm_sizes method in lisa

Best Python code snippet using lisa_python

platform_.py

Source:platform_.py Github

copy

Full Screen

...1436 node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1437 node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1438 node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1439 return node_space1440 def get_sorted_vm_sizes(1441 self, capabilities: List[AzureCapability], log: Logger1442 ) -> List[AzureCapability]:1443 # sort vm size by predefined pattern1444 sorted_capabilities: List[AzureCapability] = []1445 found_vm_sizes: Set[str] = set()1446 # loop all fall back levels1447 for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:1448 level_capabilities: List[AzureCapability] = []1449 # loop all capabilities1450 for capability in capabilities:1451 vm_size = capability.vm_size1452 if fallback_pattern.match(vm_size) and vm_size not in found_vm_sizes:1453 level_capabilities.append(capability)1454 found_vm_sizes.add(vm_size)1455 # sort by rough cost1456 level_capabilities.sort(key=lambda x: (x.capability.cost))1457 sorted_capabilities.extend(level_capabilities)1458 return sorted_capabilities1459 def load_public_ip(self, node: Node, log: Logger) -> str:1460 node_context = get_node_context(node)1461 vm_name = node_context.vm_name1462 resource_group_name = node_context.resource_group_name1463 public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1464 resource_group_name=resource_group_name, log=self._log1465 )1466 return public_ips_map[vm_name]1467 @lru_cache(maxsize=10) # noqa: B0191468 def _resolve_marketplace_image(1469 self, location: str, marketplace: AzureVmMarketplaceSchema1470 ) -> AzureVmMarketplaceSchema:1471 new_marketplace = copy.copy(marketplace)1472 # latest doesn't work, it needs a specified version.1473 if marketplace.version.lower() == "latest":1474 compute_client = get_compute_client(self)1475 with global_credential_access_lock:1476 versioned_images = compute_client.virtual_machine_images.list(1477 location=location,1478 publisher_name=marketplace.publisher,1479 offer=marketplace.offer,1480 skus=marketplace.sku,1481 )1482 if 0 == len(versioned_images):1483 raise LisaException(1484 f"cannot find any version of image {marketplace.publisher} "1485 f"{marketplace.offer} {marketplace.sku} in {location}"1486 )1487 # any one should be the same to get purchase plan1488 new_marketplace.version = versioned_images[-1].name1489 return new_marketplace1490 def _parse_shared_gallery_image(1491 self, location: str, shared_image: SharedImageGallerySchema1492 ) -> SharedImageGallerySchema:1493 new_shared_image = copy.copy(shared_image)1494 compute_client = get_compute_client(self)1495 if not shared_image.resource_group_name:1496 # /subscriptions/xxxx/resourceGroups/xxxx/providers/Microsoft.Compute/1497 # galleries/xxxx1498 rg_pattern = re.compile(r"resourceGroups/(.*)/providers", re.M)1499 galleries = compute_client.galleries.list()1500 rg_name = ""1501 for gallery in galleries:1502 if gallery.name.lower() == shared_image.image_gallery:1503 rg_name = get_matched_str(gallery.id, rg_pattern)1504 break1505 if not rg_name:1506 raise LisaException(1507 f"not find matched gallery {shared_image.image_gallery}"1508 )1509 new_shared_image.resource_group_name = rg_name1510 if shared_image.image_version.lower() == "latest":1511 gallery_images = (1512 compute_client.gallery_image_versions.list_by_gallery_image(1513 resource_group_name=new_shared_image.resource_group_name,1514 gallery_name=new_shared_image.image_gallery,1515 gallery_image_name=new_shared_image.image_definition,1516 )1517 )1518 image: GalleryImageVersion = None1519 time: Optional[datetime] = None1520 for image in gallery_images:1521 gallery_image = compute_client.gallery_image_versions.get(1522 resource_group_name=new_shared_image.resource_group_name,1523 gallery_name=new_shared_image.image_gallery,1524 gallery_image_name=new_shared_image.image_definition,1525 gallery_image_version_name=image.name,1526 expand="ReplicationStatus",1527 )1528 if not time:1529 time = gallery_image.publishing_profile.published_date1530 if gallery_image.publishing_profile.published_date > time:1531 time = gallery_image.publishing_profile.published_date1532 new_shared_image.image_version = image.name1533 return new_shared_image1534 @lru_cache(maxsize=10) # noqa: B0191535 def _process_marketplace_image_plan(1536 self,1537 marketplace: AzureVmMarketplaceSchema,1538 plan_name: str,1539 plan_product: str,1540 plan_publisher: str,1541 ) -> Optional[PurchasePlan]:1542 """1543 this method to fill plan, if a VM needs it. If don't fill it, the deployment1544 will be failed.1545 1. Get image_info to check if there is a plan.1546 2. If there is a plan, it may need to check and accept terms.1547 """1548 plan: Optional[AzureVmPurchasePlanSchema] = None1549 # if there is a plan, it may need to accept term.1550 marketplace_client = get_marketplace_ordering_client(self)1551 term: Optional[AgreementTerms] = None1552 try:1553 with global_credential_access_lock:1554 term = marketplace_client.marketplace_agreements.get(1555 offer_type="virtualmachine",1556 publisher_id=marketplace.publisher,1557 offer_id=marketplace.offer,1558 plan_id=plan_name,1559 )1560 except Exception as identifier:1561 raise LisaException(f"error on getting marketplace agreement: {identifier}")1562 assert term1563 if term.accepted is False:1564 term.accepted = True1565 marketplace_client.marketplace_agreements.create(1566 offer_type="virtualmachine",1567 publisher_id=marketplace.publisher,1568 offer_id=marketplace.offer,1569 plan_id=plan_name,1570 parameters=term,1571 )1572 plan = AzureVmPurchasePlanSchema(1573 name=plan_name,1574 product=plan_product,1575 publisher=plan_publisher,1576 )1577 return plan1578 def _generate_max_capability(self, vm_size: str, location: str) -> AzureCapability:1579 # some vm size cannot be queried from API, so use default capability to1580 # run with best guess on capability.1581 node_space = schema.NodeSpace(1582 node_count=1,1583 core_count=search_space.IntRange(min=1),1584 memory_mb=search_space.IntRange(min=0),1585 gpu_count=search_space.IntRange(min=0),1586 )1587 node_space.disk = features.AzureDiskOptionSettings()1588 node_space.disk.data_disk_count = search_space.IntRange(min=0)1589 node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1590 is_allow_set=True, items=[]1591 )1592 node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1593 node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1594 node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1595 node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1596 node_space.network_interface = schema.NetworkInterfaceOptionSettings()1597 node_space.network_interface.data_path = search_space.SetSpace[1598 schema.NetworkDataPath1599 ](is_allow_set=True, items=[])1600 node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1601 node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1602 node_space.network_interface.nic_count = search_space.IntRange(min=1)1603 # till now, the max nic number supported in Azure is 81604 node_space.network_interface.max_nic_count = 81605 azure_capability = AzureCapability(1606 location=location,1607 vm_size=vm_size,1608 capability=node_space,1609 resource_sku={},1610 )1611 node_space.name = f"{location}_{vm_size}"1612 node_space.features = search_space.SetSpace[schema.FeatureSettings](1613 is_allow_set=True1614 )1615 # all nodes support following features1616 all_features = self.supported_features()1617 node_space.features.update(1618 [schema.FeatureSettings.create(x.name()) for x in all_features]1619 )1620 _convert_to_azure_node_space(node_space)1621 return azure_capability1622 def _generate_min_capability(1623 self,1624 requirement: schema.NodeSpace,1625 azure_capability: AzureCapability,1626 location: str,1627 ) -> schema.NodeSpace:1628 min_cap: schema.NodeSpace = requirement.generate_min_capability(1629 azure_capability.capability1630 )1631 # Apply azure specified values. They will pass into arm template1632 azure_node_runbook = min_cap.get_extended_runbook(AzureNodeSchema, AZURE)1633 if azure_node_runbook.location:1634 assert location in azure_node_runbook.location, (1635 f"predefined location [{azure_node_runbook.location}] "1636 f"must be same as "1637 f"cap location [{location}]"1638 )1639 # the location may not be set1640 azure_node_runbook.location = location1641 azure_node_runbook.vm_size = azure_capability.vm_size1642 return min_cap1643 def _generate_sas_token(self, result_dict: Dict[str, str]) -> Any:1644 sc_name = result_dict["account_name"]1645 container_name = result_dict["container_name"]1646 rg = result_dict["resource_group_name"]1647 blob_name = result_dict["blob_name"]1648 source_container_client = get_or_create_storage_container(1649 credential=self.credential,1650 subscription_id=self.subscription_id,1651 account_name=sc_name,1652 container_name=container_name,1653 resource_group_name=rg,1654 )1655 source_blob = source_container_client.get_blob_client(blob_name)1656 sas_token = generate_sas_token(1657 credential=self.credential,1658 subscription_id=self.subscription_id,1659 account_name=sc_name,1660 resource_group_name=rg,1661 )1662 source_url = source_blob.url + "?" + sas_token1663 return source_url1664 @lru_cache(maxsize=10) # noqa: B0191665 def _get_deployable_vhd_path(1666 self, vhd_path: str, location: str, log: Logger1667 ) -> str:1668 """1669 The sas url is not able to create a vm directly, so this method check if1670 the vhd_path is a sas url. If so, copy it to a location in current1671 subscription, so it can be deployed.1672 """1673 matches = SAS_URL_PATTERN.match(vhd_path)1674 if not matches:1675 vhd_details = self._get_vhd_details(vhd_path)1676 vhd_location = vhd_details["location"]1677 if location == vhd_location:1678 return vhd_path1679 else:1680 vhd_path = self._generate_sas_token(vhd_details)1681 matches = SAS_URL_PATTERN.match(vhd_path)1682 assert matches, f"fail to generate sas url for {vhd_path}"1683 log.debug(1684 f"the vhd location {location} is not same with running case "1685 f"location {vhd_location}, generate a sas url for source vhd, "1686 f"it needs to be copied into location {location}."1687 )1688 else:1689 log.debug("found the vhd is a sas url, it may need to be copied.")1690 # get original vhd's hash key for comparing.1691 original_key: Optional[bytearray] = None1692 original_vhd_path = vhd_path1693 original_blob_client = BlobClient.from_blob_url(original_vhd_path)1694 properties = original_blob_client.get_blob_properties()1695 if properties.content_settings:1696 original_key = properties.content_settings.get(1697 "content_md5", None1698 ) # type: ignore1699 storage_name = get_storage_account_name(1700 subscription_id=self.subscription_id, location=location, type="t"1701 )1702 check_or_create_storage_account(1703 self.credential,1704 self.subscription_id,1705 storage_name,1706 self._azure_runbook.shared_resource_group_name,1707 location,1708 log,1709 )1710 container_client = get_or_create_storage_container(1711 credential=self.credential,1712 subscription_id=self.subscription_id,1713 account_name=storage_name,1714 container_name=SAS_COPIED_CONTAINER_NAME,1715 resource_group_name=self._azure_runbook.shared_resource_group_name,1716 )1717 normalized_vhd_name = constants.NORMALIZE_PATTERN.sub("-", vhd_path)1718 year = matches["year"] if matches["year"] else "9999"1719 month = matches["month"] if matches["month"] else "01"1720 day = matches["day"] if matches["day"] else "01"1721 # use the expire date to generate the path. It's easy to identify when1722 # the cache can be removed.1723 vhd_path = f"{year}{month}{day}/{normalized_vhd_name}.vhd"1724 full_vhd_path = f"{container_client.url}/{vhd_path}"1725 # lock here to prevent a vhd is copied in multi-thread1726 global _global_sas_vhd_copy_lock1727 cached_key: Optional[bytearray] = None1728 with _global_sas_vhd_copy_lock:1729 blobs = container_client.list_blobs(name_starts_with=vhd_path)1730 for blob in blobs:1731 if blob:1732 # check if hash key matched with original key.1733 if blob.content_settings:1734 cached_key = blob.content_settings.get("content_md5", None)1735 if original_key == cached_key:1736 # if it exists, return the link, not to copy again.1737 log.debug("the sas url is copied already, use it directly.")1738 return full_vhd_path1739 else:1740 log.debug("found cached vhd, but the hash key mismatched.")1741 blob_client = container_client.get_blob_client(vhd_path)1742 blob_client.start_copy_from_url(1743 original_vhd_path, metadata=None, incremental_copy=False1744 )1745 wait_copy_blob(blob_client, vhd_path, log)1746 return full_vhd_path1747 def _get_vhd_details(self, vhd_path: str) -> Any:1748 matched = STORAGE_CONTAINER_BLOB_PATTERN.match(vhd_path)1749 assert matched, f"fail to get matched info from {vhd_path}"1750 sc_name = matched.group("sc")1751 container_name = matched.group("container")1752 blob_name = matched.group("blob")1753 storage_client = get_storage_client(self.credential, self.subscription_id)1754 sc = [x for x in storage_client.storage_accounts.list() if x.name == sc_name]1755 assert sc[1756 01757 ], f"fail to get storage account {sc_name} from {self.subscription_id}"1758 rg = get_matched_str(sc[0].id, RESOURCE_GROUP_PATTERN)1759 return {1760 "location": sc[0].location,1761 "resource_group_name": rg,1762 "account_name": sc_name,1763 "container_name": container_name,1764 "blob_name": blob_name,1765 }1766 def _generate_data_disks(1767 self,1768 node: Node,1769 azure_node_runbook: AzureNodeArmParameter,1770 ) -> List[DataDiskSchema]:1771 data_disks: List[DataDiskSchema] = []1772 assert node.capability.disk1773 if azure_node_runbook.marketplace:1774 marketplace = self._get_image_info(1775 azure_node_runbook.location, azure_node_runbook.marketplace1776 )1777 # some images has data disks by default1778 # e.g. microsoft-ads linux-data-science-vm linuxdsvm 21.05.271779 # we have to inject below part when dataDisks section added in1780 # arm template,1781 # otherwise will see below exception:1782 # deployment failed: InvalidParameter: StorageProfile.dataDisks.lun1783 # does not have required value(s) for image specified in1784 # storage profile.1785 for default_data_disk in marketplace.data_disk_images:1786 data_disks.append(1787 DataDiskSchema(1788 node.capability.disk.data_disk_caching_type,1789 default_data_disk.additional_properties["sizeInGb"],1790 azure_node_runbook.disk_type,1791 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_FROM_IMAGE,1792 )1793 )1794 assert isinstance(1795 node.capability.disk.data_disk_count, int1796 ), f"actual: {type(node.capability.disk.data_disk_count)}"1797 for _ in range(node.capability.disk.data_disk_count):1798 assert isinstance(node.capability.disk.data_disk_size, int)1799 data_disks.append(1800 DataDiskSchema(1801 node.capability.disk.data_disk_caching_type,1802 node.capability.disk.data_disk_size,1803 azure_node_runbook.disk_type,1804 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY,1805 )1806 )1807 return data_disks1808 @lru_cache(maxsize=10) # noqa: B0191809 def _get_image_info(1810 self, location: str, marketplace: Optional[AzureVmMarketplaceSchema]1811 ) -> VirtualMachineImage:1812 compute_client = get_compute_client(self)1813 assert isinstance(marketplace, AzureVmMarketplaceSchema)1814 with global_credential_access_lock:1815 image_info = compute_client.virtual_machine_images.get(1816 location=location,1817 publisher_name=marketplace.publisher,1818 offer=marketplace.offer,1819 skus=marketplace.sku,1820 version=marketplace.version,1821 )1822 return image_info1823 def _get_location_key(self, location: str) -> str:1824 return f"{self.subscription_id}_{location}"1825 def _enable_ssh_on_windows(self, node: Node) -> None:1826 runbook = node.capability.get_extended_runbook(AzureNodeSchema)1827 if runbook.is_linux:1828 return1829 context = get_node_context(node)1830 remote_node = cast(RemoteNode, node)1831 log = node.log1832 log.debug(1833 f"checking if SSH port {remote_node.public_port} is reachable "1834 f"on {remote_node.name}..."1835 )1836 connected, _ = wait_tcp_port_ready(1837 address=remote_node.public_address,1838 port=remote_node.public_port,1839 log=log,1840 timeout=3,1841 )1842 if connected:1843 log.debug("SSH port is reachable.")1844 return1845 log.debug("SSH port is not open, enabling ssh on Windows ...")1846 # The SSH port is not opened, try to enable it.1847 public_key_data = get_public_key_data(self.runbook.admin_private_key_file)1848 with open(Path(__file__).parent / "Enable-SSH.ps1", "r") as f:1849 script = f.read()1850 parameters = RunCommandInputParameter(name="PublicKey", value=public_key_data)1851 command = RunCommandInput(1852 command_id="RunPowerShellScript",1853 script=[script],1854 parameters=[parameters],1855 )1856 compute_client = get_compute_client(self)1857 operation = compute_client.virtual_machines.begin_run_command(1858 resource_group_name=context.resource_group_name,1859 vm_name=context.vm_name,1860 parameters=command,1861 )1862 result = wait_operation(operation=operation, failure_identity="enable ssh")1863 log.debug("SSH script result:")1864 log.dump_json(logging.DEBUG, result)1865 def _get_vhd_os_disk_size(self, blob_url: str) -> int:1866 result_dict = self._get_vhd_details(blob_url)1867 container_client = get_or_create_storage_container(1868 credential=self.credential,1869 subscription_id=self.subscription_id,1870 account_name=result_dict["account_name"],1871 container_name=result_dict["container_name"],1872 resource_group_name=result_dict["resource_group_name"],1873 )1874 vhd_blob = container_client.get_blob_client(result_dict["blob_name"])1875 properties = vhd_blob.get_blob_properties()1876 assert properties.size, f"fail to get blob size of {blob_url}"1877 # Azure requires only megabyte alignment of vhds, round size up1878 # for cases where the size is megabyte aligned1879 return math.ceil(properties.size / 1024 / 1024 / 1024)1880 def _get_sig_info(1881 self, shared_image: SharedImageGallerySchema1882 ) -> GalleryImageVersion:1883 compute_client = get_compute_client(self)1884 return compute_client.gallery_image_versions.get(1885 resource_group_name=shared_image.resource_group_name,1886 gallery_name=shared_image.image_gallery,1887 gallery_image_name=shared_image.image_definition,1888 gallery_image_version_name=shared_image.image_version,1889 expand="ReplicationStatus",1890 )1891 def _get_sig_os_disk_size(self, shared_image: SharedImageGallerySchema) -> int:1892 found_image = self._get_sig_info(shared_image)1893 assert found_image.storage_profile.os_disk_image.size_in_gb1894 return int(found_image.storage_profile.os_disk_image.size_in_gb)1895 def _get_normalized_vm_sizes(1896 self, name: str, location: str, log: Logger1897 ) -> List[str]:1898 split_vm_sizes: List[str] = [x.strip() for x in name.split(",")]1899 for index, vm_size in enumerate(split_vm_sizes):1900 split_vm_sizes[index] = self._get_normalized_vm_size(vm_size, location, log)1901 return [x for x in split_vm_sizes if x]1902 def _get_normalized_vm_size(self, name: str, location: str, log: Logger) -> str:1903 # find predefined vm size on all available's.1904 location_info: AzureLocation = self.get_location_info(location, log)1905 matched_score: float = 01906 matched_name: str = ""1907 matcher = SequenceMatcher(None, name.lower(), "")1908 for vm_size in location_info.capabilities:1909 matcher.set_seq2(vm_size.lower())1910 if name.lower() in vm_size.lower() and matched_score < matcher.ratio():1911 matched_name = vm_size1912 matched_score = matcher.ratio()1913 return matched_name1914 def _get_capabilities(1915 self, vm_sizes: List[str], location: str, use_max_capability: bool, log: Logger1916 ) -> List[AzureCapability]:1917 candidate_caps: List[AzureCapability] = []1918 caps = self.get_location_info(location, log).capabilities1919 for vm_size in vm_sizes:1920 # force to use max capability to run test cases as much as possible,1921 # or force to support non-exists vm size.1922 if use_max_capability:1923 candidate_caps.append(self._generate_max_capability(vm_size, location))1924 continue1925 if vm_size in caps:1926 candidate_caps.append(caps[vm_size])1927 return candidate_caps1928 def _get_matched_capability(1929 self,1930 requirement: schema.NodeSpace,1931 candidate_capabilities: List[AzureCapability],1932 ) -> Optional[schema.NodeSpace]:1933 matched_cap: Optional[schema.NodeSpace] = None1934 # filter allowed vm sizes1935 for azure_cap in candidate_capabilities:1936 check_result = requirement.check(azure_cap.capability)1937 if check_result.result:1938 min_cap = self._generate_min_capability(1939 requirement, azure_cap, azure_cap.location1940 )1941 matched_cap = min_cap1942 break1943 return matched_cap1944 def _get_matched_capabilities(1945 self, location: str, nodes_requirement: List[schema.NodeSpace], log: Logger1946 ) -> Tuple[List[Union[schema.NodeSpace, bool]], str]:1947 # capability or if it's able to wait.1948 caps: List[Union[schema.NodeSpace, bool]] = [False] * len(nodes_requirement)1949 # one of errors for all requirements. It's enough for troubleshooting.1950 error: str = ""1951 # get allowed vm sizes. Either it's from the runbook defined, or1952 # from subscription supported .1953 for req_index, req in enumerate(nodes_requirement):1954 candidate_caps, sub_error = self._get_allowed_capabilities(1955 req, location, log1956 )1957 if sub_error:1958 # no candidate found, so try next one.1959 error = sub_error1960 continue1961 # filter vm sizes and return two list. 1st is deployable, 2nd is1962 # wait able for released resource.1963 (1964 available_capabilities,1965 awaitable_capabilities,1966 ) = self._parse_cap_availabilities(candidate_caps)1967 # sort vm sizes to match1968 available_capabilities = self.get_sorted_vm_sizes(1969 available_capabilities, log1970 )1971 # match vm sizes by capability or use the predefined vm sizes.1972 candidate_cap = self._get_matched_capability(req, available_capabilities)1973 if candidate_cap:1974 caps[req_index] = candidate_cap1975 else:1976 # the error will be overwritten, if there is vm sizes without1977 # quota.1978 error = f"no available vm size found on '{location}'."1979 if not candidate_cap:1980 # check if there is awaitable VMs1981 candidate_cap = self._get_matched_capability(1982 req, awaitable_capabilities...

Full Screen

Full Screen

features.py

Source:features.py Github

copy

Full Screen

...1147 )1148 # Get list of vm sizes available in the current location1149 location_info = platform.get_location_info(node_runbook.location, self._log)1150 capabilities = [value for _, value in location_info.capabilities.items()]1151 sorted_sizes = platform.get_sorted_vm_sizes(capabilities, self._log)1152 current_vm_size = next(1153 (x for x in sorted_sizes if x.vm_size == node_runbook.vm_size),1154 None,1155 )1156 assert current_vm_size, "cannot find current vm size in eligible list"1157 # Intersection of available_sizes and eligible_sizes1158 avail_eligible_intersect: List[AzureCapability] = []1159 # Populating avail_eligible_intersect with vm sizes that are available in the1160 # current location and that are available for the current vm size to resize to1161 for size in available_sizes:1162 vm_size_name = size.as_dict()["name"]1163 # Getting eligible vm sizes and their capability data1164 new_vm_size = next(1165 (x for x in sorted_sizes if x.vm_size == vm_size_name), None...

Full Screen

Full Screen

test_prepare.py

Source:test_prepare.py Github

copy

Full Screen

...321 self, location: str, vm_size: str, expect_exists: bool322 ) -> Optional[platform_.AzureCapability]:323 result = None324 location_info = self._platform.get_location_info(location, self._log)325 sorted_capabilities = self._platform.get_sorted_vm_sizes(326 [value for _, value in location_info.capabilities.items()], self._log327 )328 self.assertEqual(329 expect_exists,330 any([x.vm_size == vm_size for x in sorted_capabilities]),331 )332 if expect_exists:333 result = next(x for x in sorted_capabilities if x.vm_size == vm_size)334 return result335 def load_environment(336 self,337 node_req_count: int = 2,338 ) -> Environment:339 runbook = schema.Environment()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful