Best Python code snippet using lisa_python
platform_.py
Source:platform_.py  
...1026            )1027        elif arm_parameters.shared_gallery:1028            os_disk_size = max(1029                os_disk_size,1030                self._get_sig_os_disk_size(arm_parameters.shared_gallery),1031            )1032        else:1033            assert (1034                arm_parameters.marketplace1035            ), "not set one of marketplace, shared_gallery or vhd."1036            image_info = self._get_image_info(1037                arm_parameters.location, arm_parameters.marketplace1038            )1039            os_disk_size = max(1040                os_disk_size, image_info.os_disk_image.additional_properties["sizeInGb"]1041            )1042            if not arm_parameters.purchase_plan and image_info.plan:1043                # expand values for lru cache1044                plan_name = image_info.plan.name1045                plan_product = image_info.plan.product1046                plan_publisher = image_info.plan.publisher1047                # accept the default purchase plan automatically.1048                arm_parameters.purchase_plan = self._process_marketplace_image_plan(1049                    marketplace=arm_parameters.marketplace,1050                    plan_name=plan_name,1051                    plan_product=plan_product,1052                    plan_publisher=plan_publisher,1053                )1054        arm_parameters.osdisk_size_in_gb = os_disk_size1055        # Set disk type1056        assert capability.disk, "node space must have disk defined."1057        assert isinstance(capability.disk.disk_type, schema.DiskType)1058        arm_parameters.disk_type = features.get_azure_disk_type(1059            capability.disk.disk_type1060        )1061        assert capability.network_interface1062        assert isinstance(1063            capability.network_interface.nic_count, int1064        ), f"actual: {capability.network_interface.nic_count}"1065        arm_parameters.nic_count = capability.network_interface.nic_count1066        assert isinstance(1067            capability.network_interface.data_path, schema.NetworkDataPath1068        ), f"actual: {type(capability.network_interface.data_path)}"1069        if capability.network_interface.data_path == schema.NetworkDataPath.Sriov:1070            arm_parameters.enable_sriov = True1071        return arm_parameters1072    def _validate_template(1073        self, deployment_parameters: Dict[str, Any], log: Logger1074    ) -> None:1075        log.debug("validating deployment")1076        validate_operation: Any = None1077        try:1078            with global_credential_access_lock:1079                validate_operation = self._rm_client.deployments.begin_validate(1080                    **deployment_parameters1081                )1082            wait_operation(validate_operation, failure_identity="validation")1083        except Exception as identifier:1084            error_messages: List[str] = [str(identifier)]1085            if isinstance(identifier, HttpResponseError) and identifier.error:1086                # no validate_operation returned, the message may include1087                # some errors, so check details1088                error_messages = self._parse_detail_errors(identifier.error)1089            error_message = "\n".join(error_messages)1090            plugin_manager.hook.azure_deploy_failed(error_message=error_message)1091            raise LisaException(error_message)1092    def _deploy(1093        self, location: str, deployment_parameters: Dict[str, Any], log: Logger1094    ) -> None:1095        resource_group_name = deployment_parameters[AZURE_RG_NAME_KEY]1096        storage_account_name = get_storage_account_name(self.subscription_id, location)1097        check_or_create_storage_account(1098            self.credential,1099            self.subscription_id,1100            storage_account_name,1101            self._azure_runbook.shared_resource_group_name,1102            location,1103            log,1104        )1105        log.info(f"resource group '{resource_group_name}' deployment is in progress...")1106        deployment_operation: Any = None1107        deployments = self._rm_client.deployments1108        try:1109            deployment_operation = deployments.begin_create_or_update(1110                **deployment_parameters1111            )1112            wait_operation(deployment_operation, failure_identity="deploy")1113        except HttpResponseError as identifier:1114            # Some errors happens underlying, so there is no detail errors from API.1115            # For example,1116            # azure.core.exceptions.HttpResponseError:1117            #    Operation returned an invalid status 'OK'1118            assert identifier.error, f"HttpResponseError: {identifier}"1119            error_message = "\n".join(self._parse_detail_errors(identifier.error))1120            if (1121                self._azure_runbook.ignore_provisioning_error1122                and "OSProvisioningTimedOut: OS Provisioning for VM" in error_message1123            ):1124                # Provisioning timeout causes by waagent is not ready.1125                # In smoke test, it still can verify some information.1126                # Eat information here, to run test case any way.1127                #1128                # It may cause other cases fail on assumptions. In this case, we can1129                # define a flag in config, to mark this exception is ignorable or not.1130                log.error(1131                    f"provisioning time out, try to run case. "1132                    f"Exception: {error_message}"1133                )1134            elif self._azure_runbook.ignore_provisioning_error and get_matched_str(1135                error_message, AZURE_INTERNAL_ERROR_PATTERN1136            ):1137                # Similar situation with OSProvisioningTimedOut1138                # Some OSProvisioningInternalError caused by it doesn't support1139                # SSH key authentication1140                # e.g. hpe hpestoreoncevsa hpestoreoncevsa-3187 3.18.71141                # After passthrough this exception,1142                # actually the 22 port of this VM is open.1143                log.error(1144                    f"provisioning failed for an internal error, try to run case. "1145                    f"Exception: {error_message}"1146                )1147            else:1148                plugin_manager.hook.azure_deploy_failed(error_message=error_message)1149                raise LisaException(error_message)1150    def _parse_detail_errors(self, error: Any) -> List[str]:1151        # original message may be a summary, get lowest level details.1152        if hasattr(error, "details") and error.details:1153            errors: List[str] = []1154            for detail in error.details:1155                errors.extend(self._parse_detail_errors(detail))1156        else:1157            try:1158                # it returns serialized json string in message sometime1159                parsed_error = json.loads(1160                    error.message, object_hook=lambda x: SimpleNamespace(**x)1161                )1162                errors = self._parse_detail_errors(parsed_error.error)1163            except Exception:1164                # load failed, it should be a real error message string1165                errors = [f"{error.code}: {error.message}"]1166        return errors1167    # the VM may not be queried after deployed. use retry to mitigate it.1168    @retry(exceptions=LisaException, tries=150, delay=2)1169    def _load_vms(1170        self, environment: Environment, log: Logger1171    ) -> Dict[str, VirtualMachine]:1172        compute_client = get_compute_client(self, api_version="2020-06-01")1173        environment_context = get_environment_context(environment=environment)1174        log.debug(1175            f"listing vm in resource group "1176            f"'{environment_context.resource_group_name}'"1177        )1178        vms_map: Dict[str, VirtualMachine] = {}1179        vms = compute_client.virtual_machines.list(1180            environment_context.resource_group_name1181        )1182        for vm in vms:1183            vms_map[vm.name] = vm1184            log.debug(f"  found vm {vm.name}")1185        if not vms_map:1186            raise LisaException(1187                f"deployment succeeded, but VM not found in 5 minutes "1188                f"from '{environment_context.resource_group_name}'"1189            )1190        return vms_map1191    # Use Exception, because there may be credential conflict error. Make it1192    # retriable.1193    @retry(exceptions=Exception, tries=150, delay=2)1194    def _load_nics(1195        self, environment: Environment, log: Logger1196    ) -> Dict[str, NetworkInterface]:1197        network_client = get_network_client(self)1198        environment_context = get_environment_context(environment=environment)1199        log.debug(1200            f"listing network interfaces in resource group "1201            f"'{environment_context.resource_group_name}'"1202        )1203        # load nics1204        nics_map: Dict[str, NetworkInterface] = {}1205        network_interfaces = network_client.network_interfaces.list(1206            environment_context.resource_group_name1207        )1208        for nic in network_interfaces:1209            # nic name is like lisa-test-20220316-182126-985-e0-n0-nic-2, get vm1210            # name part for later pick only find primary nic, which is ended by1211            # -nic-01212            node_name_from_nic = RESOURCE_ID_NIC_PATTERN.findall(nic.name)1213            if node_name_from_nic:1214                name = node_name_from_nic[0]1215                nics_map[name] = nic1216                log.debug(f"  found nic '{nic.name}', and saved for next step.")1217            else:1218                log.debug(1219                    f"  found nic '{nic.name}', but dropped, "1220                    "because it's not primary nic."1221                )1222        if not nics_map:1223            raise LisaException(1224                f"deployment succeeded, but network interfaces not found in 5 minutes "1225                f"from '{environment_context.resource_group_name}'"1226            )1227        return nics_map1228    @retry(exceptions=LisaException, tries=150, delay=2)1229    def load_public_ips_from_resource_group(1230        self, resource_group_name: str, log: Logger1231    ) -> Dict[str, str]:1232        network_client = get_network_client(self)1233        log.debug(f"listing public ips in resource group '{resource_group_name}'")1234        # get public IP1235        public_ip_addresses = network_client.public_ip_addresses.list(1236            resource_group_name1237        )1238        public_ips_map: Dict[str, str] = {}1239        for ip_address in public_ip_addresses:1240            # nic name is like node-0-nic-2, get vm name part for later pick1241            # only find primary nic, which is ended by -nic-01242            node_name_from_public_ip = RESOURCE_ID_PUBLIC_IP_PATTERN.findall(1243                ip_address.name1244            )1245            assert (1246                ip_address1247            ), f"public IP address cannot be empty, ip_address object: {ip_address}"1248            if node_name_from_public_ip:1249                name = node_name_from_public_ip[0]1250                public_ips_map[name] = ip_address.ip_address1251                log.debug(1252                    f"  found public IP '{ip_address.name}', and saved for next step."1253                )1254            else:1255                log.debug(1256                    f"  found public IP '{ip_address.name}', but dropped "1257                    "because it's not primary nic."1258                )1259        if not public_ips_map:1260            raise LisaException(1261                f"deployment succeeded, but public ips not found in 5 minutes "1262                f"from '{resource_group_name}'"1263            )1264        return public_ips_map1265    def initialize_environment(self, environment: Environment, log: Logger) -> None:1266        node_context_map: Dict[str, Node] = {}1267        for node in environment.nodes.list():1268            node_context = get_node_context(node)1269            node_context_map[node_context.vm_name] = node1270        vms_map: Dict[str, VirtualMachine] = self._load_vms(environment, log)1271        nics_map: Dict[str, NetworkInterface] = self._load_nics(environment, log)1272        environment_context = get_environment_context(environment=environment)1273        public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1274            environment_context.resource_group_name, log1275        )1276        for vm_name, node in node_context_map.items():1277            node_context = get_node_context(node)1278            vm = vms_map.get(vm_name, None)1279            if not vm:1280                raise LisaException(1281                    f"cannot find vm: '{vm_name}', make sure deployment is correct."1282                )1283            nic = nics_map[vm_name]1284            public_ip = public_ips_map[vm_name]1285            address = nic.ip_configurations[0].private_ip_address1286            if not node.name:1287                node.name = vm_name1288            assert isinstance(node, RemoteNode)1289            node.set_connection_info(1290                address=address,1291                port=22,1292                public_address=public_ip,1293                public_port=22,1294                username=node_context.username,1295                password=node_context.password,1296                private_key_file=node_context.private_key_file,1297            )1298        # enable ssh for windows, if it's not Windows, or SSH reachable, it will1299        # skip.1300        run_in_parallel(1301            [1302                partial(self._enable_ssh_on_windows, node=x)1303                for x in environment.nodes.list()1304            ]1305        )1306    def _resource_sku_to_capability(  # noqa: C9011307        self, location: str, resource_sku: ResourceSku1308    ) -> schema.NodeSpace:1309        # fill in default values, in case no capability meet.1310        node_space = schema.NodeSpace(1311            node_count=1,1312            core_count=0,1313            memory_mb=0,1314            gpu_count=0,1315        )1316        node_space.name = f"{location}_{resource_sku.name}"1317        node_space.features = search_space.SetSpace[schema.FeatureSettings](1318            is_allow_set=True1319        )1320        node_space.disk = features.AzureDiskOptionSettings()1321        node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1322            is_allow_set=True, items=[]1323        )1324        node_space.disk.data_disk_iops = search_space.IntRange(min=0)1325        node_space.disk.data_disk_size = search_space.IntRange(min=0)1326        node_space.network_interface = schema.NetworkInterfaceOptionSettings()1327        node_space.network_interface.data_path = search_space.SetSpace[1328            schema.NetworkDataPath1329        ](is_allow_set=True, items=[])1330        # fill supported features1331        azure_raw_capabilities: Dict[str, str] = {}1332        for sku_capability in resource_sku.capabilities:1333            # prevent to loop in every feature1334            azure_raw_capabilities[sku_capability.name] = sku_capability.value1335        # calculate cpu count. Some vm sizes, like Standard_HC44rs, doesn't have1336        # vCPUsAvailable, so use vCPUs.1337        vcpus_available = int(azure_raw_capabilities.get("vCPUsAvailable", "0"))1338        if vcpus_available:1339            node_space.core_count = vcpus_available1340        else:1341            node_space.core_count = int(azure_raw_capabilities.get("vCPUs", "0"))1342        memory_value = azure_raw_capabilities.get("MemoryGB", None)1343        if memory_value:1344            node_space.memory_mb = int(float(memory_value) * 1024)1345        max_disk_count = azure_raw_capabilities.get("MaxDataDiskCount", None)1346        if max_disk_count:1347            node_space.disk.max_data_disk_count = int(max_disk_count)1348            node_space.disk.data_disk_count = search_space.IntRange(1349                max=node_space.disk.max_data_disk_count1350            )1351        max_nic_count = azure_raw_capabilities.get("MaxNetworkInterfaces", None)1352        if max_nic_count:1353            # set a min value for nic_count work around for an azure python sdk bug1354            # nic_count is 0 when get capability for some sizes e.g. Standard_D8a_v31355            sku_nic_count = int(max_nic_count)1356            if sku_nic_count == 0:1357                sku_nic_count = 11358            node_space.network_interface.nic_count = search_space.IntRange(1359                min=1, max=sku_nic_count1360            )1361            node_space.network_interface.max_nic_count = sku_nic_count1362        premium_io_supported = azure_raw_capabilities.get("PremiumIO", None)1363        if premium_io_supported and eval(premium_io_supported) is True:1364            node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1365        ephemeral_supported = azure_raw_capabilities.get(1366            "EphemeralOSDiskSupported", None1367        )1368        if ephemeral_supported and eval(ephemeral_supported) is True:1369            # Check if CachedDiskBytes is greater than 30GB1370            # We use diffdisk as cache disk for ephemeral OS disk1371            cached_disk_bytes = azure_raw_capabilities.get("CachedDiskBytes", 0)1372            cached_disk_bytes_gb = int(cached_disk_bytes) / 1024 / 1024 / 10241373            if cached_disk_bytes_gb >= 30:1374                node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1375        # set AN1376        an_enabled = azure_raw_capabilities.get("AcceleratedNetworkingEnabled", None)1377        if an_enabled and eval(an_enabled) is True:1378            # refer1379            # https://docs.microsoft.com/en-us/azure/virtual-machines/dcv2-series#configuration1380            # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv2-series1381            # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv3-series1382            # https://docs.microsoft.com/en-us/azure/virtual-machines/nd-series1383            # below VM size families don't support `Accelerated Networking` but1384            # API return `True`, fix this issue temporarily will revert it till1385            # bug fixed.1386            if resource_sku.family not in [1387                "standardDCSv2Family",1388                "standardNCSv2Family",1389                "standardNCSv3Family",1390                "standardNDSFamily",1391            ]:1392                # update data path types if sriov feature is supported1393                node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1394        # for some new sizes, there is no MaxNetworkInterfaces capability1395        # and we have to set a default value for max_nic_count1396        if not node_space.network_interface.max_nic_count:1397            node_space.network_interface.max_nic_count = 11398        # some vm size do not have resource disk present1399        # https://docs.microsoft.com/en-us/azure/virtual-machines/azure-vms-no-temp-disk1400        if resource_sku.family in [1401            "standardDv4Family",1402            "standardDSv4Family",1403            "standardEv4Family",1404            "standardESv4Family",1405            "standardEASv4Family",1406            "standardEASv5Family",1407            "standardESv5Family",1408            "standardEADSv5Family",1409            "standardDASv5Family",1410            "standardDSv5Family",1411            "standardFSv2Family",1412            "standardNCFamily",1413            "standardESv3Family",1414            "standardDPSv5Family",1415            "standardEBSv5Family",1416            "standardEv5Family",1417        ]:1418            node_space.disk.has_resource_disk = False1419        else:1420            node_space.disk.has_resource_disk = True1421        for supported_feature in self.supported_features():1422            if supported_feature.name() in [1423                features.Disk.name(),1424                features.NetworkInterface.name(),1425            ]:1426                # Skip the disk and network interfaces features. They will be1427                # handled by node_space directly.1428                continue1429            feature_setting = supported_feature.create_setting(1430                raw_capabilities=azure_raw_capabilities,1431                resource_sku=resource_sku,1432                node_space=node_space,1433            )1434            if feature_setting:1435                node_space.features.add(feature_setting)1436        node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1437        node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1438        node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1439        return node_space1440    def get_sorted_vm_sizes(1441        self, capabilities: List[AzureCapability], log: Logger1442    ) -> List[AzureCapability]:1443        # sort vm size by predefined pattern1444        sorted_capabilities: List[AzureCapability] = []1445        found_vm_sizes: Set[str] = set()1446        # loop all fall back levels1447        for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:1448            level_capabilities: List[AzureCapability] = []1449            # loop all capabilities1450            for capability in capabilities:1451                vm_size = capability.vm_size1452                if fallback_pattern.match(vm_size) and vm_size not in found_vm_sizes:1453                    level_capabilities.append(capability)1454                    found_vm_sizes.add(vm_size)1455            # sort by rough cost1456            level_capabilities.sort(key=lambda x: (x.capability.cost))1457            sorted_capabilities.extend(level_capabilities)1458        return sorted_capabilities1459    def load_public_ip(self, node: Node, log: Logger) -> str:1460        node_context = get_node_context(node)1461        vm_name = node_context.vm_name1462        resource_group_name = node_context.resource_group_name1463        public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1464            resource_group_name=resource_group_name, log=self._log1465        )1466        return public_ips_map[vm_name]1467    @lru_cache(maxsize=10)  # noqa: B0191468    def _resolve_marketplace_image(1469        self, location: str, marketplace: AzureVmMarketplaceSchema1470    ) -> AzureVmMarketplaceSchema:1471        new_marketplace = copy.copy(marketplace)1472        # latest doesn't work, it needs a specified version.1473        if marketplace.version.lower() == "latest":1474            compute_client = get_compute_client(self)1475            with global_credential_access_lock:1476                versioned_images = compute_client.virtual_machine_images.list(1477                    location=location,1478                    publisher_name=marketplace.publisher,1479                    offer=marketplace.offer,1480                    skus=marketplace.sku,1481                )1482            if 0 == len(versioned_images):1483                raise LisaException(1484                    f"cannot find any version of image {marketplace.publisher} "1485                    f"{marketplace.offer} {marketplace.sku} in {location}"1486                )1487            # any one should be the same to get purchase plan1488            new_marketplace.version = versioned_images[-1].name1489        return new_marketplace1490    def _parse_shared_gallery_image(1491        self, location: str, shared_image: SharedImageGallerySchema1492    ) -> SharedImageGallerySchema:1493        new_shared_image = copy.copy(shared_image)1494        compute_client = get_compute_client(self)1495        if not shared_image.resource_group_name:1496            # /subscriptions/xxxx/resourceGroups/xxxx/providers/Microsoft.Compute/1497            # galleries/xxxx1498            rg_pattern = re.compile(r"resourceGroups/(.*)/providers", re.M)1499            galleries = compute_client.galleries.list()1500            rg_name = ""1501            for gallery in galleries:1502                if gallery.name.lower() == shared_image.image_gallery:1503                    rg_name = get_matched_str(gallery.id, rg_pattern)1504                    break1505            if not rg_name:1506                raise LisaException(1507                    f"not find matched gallery {shared_image.image_gallery}"1508                )1509        new_shared_image.resource_group_name = rg_name1510        if shared_image.image_version.lower() == "latest":1511            gallery_images = (1512                compute_client.gallery_image_versions.list_by_gallery_image(1513                    resource_group_name=new_shared_image.resource_group_name,1514                    gallery_name=new_shared_image.image_gallery,1515                    gallery_image_name=new_shared_image.image_definition,1516                )1517            )1518            image: GalleryImageVersion = None1519            time: Optional[datetime] = None1520            for image in gallery_images:1521                gallery_image = compute_client.gallery_image_versions.get(1522                    resource_group_name=new_shared_image.resource_group_name,1523                    gallery_name=new_shared_image.image_gallery,1524                    gallery_image_name=new_shared_image.image_definition,1525                    gallery_image_version_name=image.name,1526                    expand="ReplicationStatus",1527                )1528                if not time:1529                    time = gallery_image.publishing_profile.published_date1530                if gallery_image.publishing_profile.published_date > time:1531                    time = gallery_image.publishing_profile.published_date1532                    new_shared_image.image_version = image.name1533        return new_shared_image1534    @lru_cache(maxsize=10)  # noqa: B0191535    def _process_marketplace_image_plan(1536        self,1537        marketplace: AzureVmMarketplaceSchema,1538        plan_name: str,1539        plan_product: str,1540        plan_publisher: str,1541    ) -> Optional[PurchasePlan]:1542        """1543        this method to fill plan, if a VM needs it. If don't fill it, the deployment1544        will be failed.1545        1. Get image_info to check if there is a plan.1546        2. If there is a plan, it may need to check and accept terms.1547        """1548        plan: Optional[AzureVmPurchasePlanSchema] = None1549        # if there is a plan, it may need to accept term.1550        marketplace_client = get_marketplace_ordering_client(self)1551        term: Optional[AgreementTerms] = None1552        try:1553            with global_credential_access_lock:1554                term = marketplace_client.marketplace_agreements.get(1555                    offer_type="virtualmachine",1556                    publisher_id=marketplace.publisher,1557                    offer_id=marketplace.offer,1558                    plan_id=plan_name,1559                )1560        except Exception as identifier:1561            raise LisaException(f"error on getting marketplace agreement: {identifier}")1562        assert term1563        if term.accepted is False:1564            term.accepted = True1565            marketplace_client.marketplace_agreements.create(1566                offer_type="virtualmachine",1567                publisher_id=marketplace.publisher,1568                offer_id=marketplace.offer,1569                plan_id=plan_name,1570                parameters=term,1571            )1572        plan = AzureVmPurchasePlanSchema(1573            name=plan_name,1574            product=plan_product,1575            publisher=plan_publisher,1576        )1577        return plan1578    def _generate_max_capability(self, vm_size: str, location: str) -> AzureCapability:1579        # some vm size cannot be queried from API, so use default capability to1580        # run with best guess on capability.1581        node_space = schema.NodeSpace(1582            node_count=1,1583            core_count=search_space.IntRange(min=1),1584            memory_mb=search_space.IntRange(min=0),1585            gpu_count=search_space.IntRange(min=0),1586        )1587        node_space.disk = features.AzureDiskOptionSettings()1588        node_space.disk.data_disk_count = search_space.IntRange(min=0)1589        node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1590            is_allow_set=True, items=[]1591        )1592        node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1593        node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1594        node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1595        node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1596        node_space.network_interface = schema.NetworkInterfaceOptionSettings()1597        node_space.network_interface.data_path = search_space.SetSpace[1598            schema.NetworkDataPath1599        ](is_allow_set=True, items=[])1600        node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1601        node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1602        node_space.network_interface.nic_count = search_space.IntRange(min=1)1603        # till now, the max nic number supported in Azure is 81604        node_space.network_interface.max_nic_count = 81605        azure_capability = AzureCapability(1606            location=location,1607            vm_size=vm_size,1608            capability=node_space,1609            resource_sku={},1610        )1611        node_space.name = f"{location}_{vm_size}"1612        node_space.features = search_space.SetSpace[schema.FeatureSettings](1613            is_allow_set=True1614        )1615        # all nodes support following features1616        all_features = self.supported_features()1617        node_space.features.update(1618            [schema.FeatureSettings.create(x.name()) for x in all_features]1619        )1620        _convert_to_azure_node_space(node_space)1621        return azure_capability1622    def _generate_min_capability(1623        self,1624        requirement: schema.NodeSpace,1625        azure_capability: AzureCapability,1626        location: str,1627    ) -> schema.NodeSpace:1628        min_cap: schema.NodeSpace = requirement.generate_min_capability(1629            azure_capability.capability1630        )1631        # Apply azure specified values. They will pass into arm template1632        azure_node_runbook = min_cap.get_extended_runbook(AzureNodeSchema, AZURE)1633        if azure_node_runbook.location:1634            assert location in azure_node_runbook.location, (1635                f"predefined location [{azure_node_runbook.location}] "1636                f"must be same as "1637                f"cap location [{location}]"1638            )1639        # the location may not be set1640        azure_node_runbook.location = location1641        azure_node_runbook.vm_size = azure_capability.vm_size1642        return min_cap1643    def _generate_sas_token(self, result_dict: Dict[str, str]) -> Any:1644        sc_name = result_dict["account_name"]1645        container_name = result_dict["container_name"]1646        rg = result_dict["resource_group_name"]1647        blob_name = result_dict["blob_name"]1648        source_container_client = get_or_create_storage_container(1649            credential=self.credential,1650            subscription_id=self.subscription_id,1651            account_name=sc_name,1652            container_name=container_name,1653            resource_group_name=rg,1654        )1655        source_blob = source_container_client.get_blob_client(blob_name)1656        sas_token = generate_sas_token(1657            credential=self.credential,1658            subscription_id=self.subscription_id,1659            account_name=sc_name,1660            resource_group_name=rg,1661        )1662        source_url = source_blob.url + "?" + sas_token1663        return source_url1664    @lru_cache(maxsize=10)  # noqa: B0191665    def _get_deployable_vhd_path(1666        self, vhd_path: str, location: str, log: Logger1667    ) -> str:1668        """1669        The sas url is not able to create a vm directly, so this method check if1670        the vhd_path is a sas url. If so, copy it to a location in current1671        subscription, so it can be deployed.1672        """1673        matches = SAS_URL_PATTERN.match(vhd_path)1674        if not matches:1675            vhd_details = self._get_vhd_details(vhd_path)1676            vhd_location = vhd_details["location"]1677            if location == vhd_location:1678                return vhd_path1679            else:1680                vhd_path = self._generate_sas_token(vhd_details)1681                matches = SAS_URL_PATTERN.match(vhd_path)1682                assert matches, f"fail to generate sas url for {vhd_path}"1683                log.debug(1684                    f"the vhd location {location} is not same with running case "1685                    f"location {vhd_location}, generate a sas url for source vhd, "1686                    f"it needs to be copied into location {location}."1687                )1688        else:1689            log.debug("found the vhd is a sas url, it may need to be copied.")1690        # get original vhd's hash key for comparing.1691        original_key: Optional[bytearray] = None1692        original_vhd_path = vhd_path1693        original_blob_client = BlobClient.from_blob_url(original_vhd_path)1694        properties = original_blob_client.get_blob_properties()1695        if properties.content_settings:1696            original_key = properties.content_settings.get(1697                "content_md5", None1698            )  # type: ignore1699        storage_name = get_storage_account_name(1700            subscription_id=self.subscription_id, location=location, type="t"1701        )1702        check_or_create_storage_account(1703            self.credential,1704            self.subscription_id,1705            storage_name,1706            self._azure_runbook.shared_resource_group_name,1707            location,1708            log,1709        )1710        container_client = get_or_create_storage_container(1711            credential=self.credential,1712            subscription_id=self.subscription_id,1713            account_name=storage_name,1714            container_name=SAS_COPIED_CONTAINER_NAME,1715            resource_group_name=self._azure_runbook.shared_resource_group_name,1716        )1717        normalized_vhd_name = constants.NORMALIZE_PATTERN.sub("-", vhd_path)1718        year = matches["year"] if matches["year"] else "9999"1719        month = matches["month"] if matches["month"] else "01"1720        day = matches["day"] if matches["day"] else "01"1721        # use the expire date to generate the path. It's easy to identify when1722        # the cache can be removed.1723        vhd_path = f"{year}{month}{day}/{normalized_vhd_name}.vhd"1724        full_vhd_path = f"{container_client.url}/{vhd_path}"1725        # lock here to prevent a vhd is copied in multi-thread1726        global _global_sas_vhd_copy_lock1727        cached_key: Optional[bytearray] = None1728        with _global_sas_vhd_copy_lock:1729            blobs = container_client.list_blobs(name_starts_with=vhd_path)1730            for blob in blobs:1731                if blob:1732                    # check if hash key matched with original key.1733                    if blob.content_settings:1734                        cached_key = blob.content_settings.get("content_md5", None)1735                    if original_key == cached_key:1736                        # if it exists, return the link, not to copy again.1737                        log.debug("the sas url is copied already, use it directly.")1738                        return full_vhd_path1739                    else:1740                        log.debug("found cached vhd, but the hash key mismatched.")1741            blob_client = container_client.get_blob_client(vhd_path)1742            blob_client.start_copy_from_url(1743                original_vhd_path, metadata=None, incremental_copy=False1744            )1745            wait_copy_blob(blob_client, vhd_path, log)1746        return full_vhd_path1747    def _get_vhd_details(self, vhd_path: str) -> Any:1748        matched = STORAGE_CONTAINER_BLOB_PATTERN.match(vhd_path)1749        assert matched, f"fail to get matched info from {vhd_path}"1750        sc_name = matched.group("sc")1751        container_name = matched.group("container")1752        blob_name = matched.group("blob")1753        storage_client = get_storage_client(self.credential, self.subscription_id)1754        sc = [x for x in storage_client.storage_accounts.list() if x.name == sc_name]1755        assert sc[1756            01757        ], f"fail to get storage account {sc_name} from {self.subscription_id}"1758        rg = get_matched_str(sc[0].id, RESOURCE_GROUP_PATTERN)1759        return {1760            "location": sc[0].location,1761            "resource_group_name": rg,1762            "account_name": sc_name,1763            "container_name": container_name,1764            "blob_name": blob_name,1765        }1766    def _generate_data_disks(1767        self,1768        node: Node,1769        azure_node_runbook: AzureNodeArmParameter,1770    ) -> List[DataDiskSchema]:1771        data_disks: List[DataDiskSchema] = []1772        assert node.capability.disk1773        if azure_node_runbook.marketplace:1774            marketplace = self._get_image_info(1775                azure_node_runbook.location, azure_node_runbook.marketplace1776            )1777            # some images has data disks by default1778            # e.g. microsoft-ads linux-data-science-vm linuxdsvm 21.05.271779            # we have to inject below part when dataDisks section added in1780            #  arm template,1781            # otherwise will see below exception:1782            # deployment failed: InvalidParameter: StorageProfile.dataDisks.lun1783            #  does not have required value(s) for image specified in1784            #  storage profile.1785            for default_data_disk in marketplace.data_disk_images:1786                data_disks.append(1787                    DataDiskSchema(1788                        node.capability.disk.data_disk_caching_type,1789                        default_data_disk.additional_properties["sizeInGb"],1790                        azure_node_runbook.disk_type,1791                        DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_FROM_IMAGE,1792                    )1793                )1794        assert isinstance(1795            node.capability.disk.data_disk_count, int1796        ), f"actual: {type(node.capability.disk.data_disk_count)}"1797        for _ in range(node.capability.disk.data_disk_count):1798            assert isinstance(node.capability.disk.data_disk_size, int)1799            data_disks.append(1800                DataDiskSchema(1801                    node.capability.disk.data_disk_caching_type,1802                    node.capability.disk.data_disk_size,1803                    azure_node_runbook.disk_type,1804                    DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY,1805                )1806            )1807        return data_disks1808    @lru_cache(maxsize=10)  # noqa: B0191809    def _get_image_info(1810        self, location: str, marketplace: Optional[AzureVmMarketplaceSchema]1811    ) -> VirtualMachineImage:1812        compute_client = get_compute_client(self)1813        assert isinstance(marketplace, AzureVmMarketplaceSchema)1814        with global_credential_access_lock:1815            image_info = compute_client.virtual_machine_images.get(1816                location=location,1817                publisher_name=marketplace.publisher,1818                offer=marketplace.offer,1819                skus=marketplace.sku,1820                version=marketplace.version,1821            )1822        return image_info1823    def _get_location_key(self, location: str) -> str:1824        return f"{self.subscription_id}_{location}"1825    def _enable_ssh_on_windows(self, node: Node) -> None:1826        runbook = node.capability.get_extended_runbook(AzureNodeSchema)1827        if runbook.is_linux:1828            return1829        context = get_node_context(node)1830        remote_node = cast(RemoteNode, node)1831        log = node.log1832        log.debug(1833            f"checking if SSH port {remote_node.public_port} is reachable "1834            f"on {remote_node.name}..."1835        )1836        connected, _ = wait_tcp_port_ready(1837            address=remote_node.public_address,1838            port=remote_node.public_port,1839            log=log,1840            timeout=3,1841        )1842        if connected:1843            log.debug("SSH port is reachable.")1844            return1845        log.debug("SSH port is not open, enabling ssh on Windows ...")1846        # The SSH port is not opened, try to enable it.1847        public_key_data = get_public_key_data(self.runbook.admin_private_key_file)1848        with open(Path(__file__).parent / "Enable-SSH.ps1", "r") as f:1849            script = f.read()1850        parameters = RunCommandInputParameter(name="PublicKey", value=public_key_data)1851        command = RunCommandInput(1852            command_id="RunPowerShellScript",1853            script=[script],1854            parameters=[parameters],1855        )1856        compute_client = get_compute_client(self)1857        operation = compute_client.virtual_machines.begin_run_command(1858            resource_group_name=context.resource_group_name,1859            vm_name=context.vm_name,1860            parameters=command,1861        )1862        result = wait_operation(operation=operation, failure_identity="enable ssh")1863        log.debug("SSH script result:")1864        log.dump_json(logging.DEBUG, result)1865    def _get_vhd_os_disk_size(self, blob_url: str) -> int:1866        result_dict = self._get_vhd_details(blob_url)1867        container_client = get_or_create_storage_container(1868            credential=self.credential,1869            subscription_id=self.subscription_id,1870            account_name=result_dict["account_name"],1871            container_name=result_dict["container_name"],1872            resource_group_name=result_dict["resource_group_name"],1873        )1874        vhd_blob = container_client.get_blob_client(result_dict["blob_name"])1875        properties = vhd_blob.get_blob_properties()1876        assert properties.size, f"fail to get blob size of {blob_url}"1877        # Azure requires only megabyte alignment of vhds, round size up1878        # for cases where the size is megabyte aligned1879        return math.ceil(properties.size / 1024 / 1024 / 1024)1880    def _get_sig_info(1881        self, shared_image: SharedImageGallerySchema1882    ) -> GalleryImageVersion:1883        compute_client = get_compute_client(self)1884        return compute_client.gallery_image_versions.get(1885            resource_group_name=shared_image.resource_group_name,1886            gallery_name=shared_image.image_gallery,1887            gallery_image_name=shared_image.image_definition,1888            gallery_image_version_name=shared_image.image_version,1889            expand="ReplicationStatus",1890        )1891    def _get_sig_os_disk_size(self, shared_image: SharedImageGallerySchema) -> int:1892        found_image = self._get_sig_info(shared_image)1893        assert found_image.storage_profile.os_disk_image.size_in_gb1894        return int(found_image.storage_profile.os_disk_image.size_in_gb)1895    def _get_normalized_vm_sizes(1896        self, name: str, location: str, log: Logger1897    ) -> List[str]:1898        split_vm_sizes: List[str] = [x.strip() for x in name.split(",")]1899        for index, vm_size in enumerate(split_vm_sizes):1900            split_vm_sizes[index] = self._get_normalized_vm_size(vm_size, location, log)1901        return [x for x in split_vm_sizes if x]1902    def _get_normalized_vm_size(self, name: str, location: str, log: Logger) -> str:1903        # find predefined vm size on all available's.1904        location_info: AzureLocation = self.get_location_info(location, log)1905        matched_score: float = 0...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
