How to use _get_vhd_os_disk_size method in lisa

Best Python code snippet using lisa_python

platform_.py

Source:platform_.py Github

copy

Full Screen

...1021 arm_parameters.vhd = self._get_deployable_vhd_path(1022 arm_parameters.vhd, arm_parameters.location, log1023 )1024 os_disk_size = max(1025 os_disk_size, self._get_vhd_os_disk_size(arm_parameters.vhd)1026 )1027 elif arm_parameters.shared_gallery:1028 os_disk_size = max(1029 os_disk_size,1030 self._get_sig_os_disk_size(arm_parameters.shared_gallery),1031 )1032 else:1033 assert (1034 arm_parameters.marketplace1035 ), "not set one of marketplace, shared_gallery or vhd."1036 image_info = self._get_image_info(1037 arm_parameters.location, arm_parameters.marketplace1038 )1039 os_disk_size = max(1040 os_disk_size, image_info.os_disk_image.additional_properties["sizeInGb"]1041 )1042 if not arm_parameters.purchase_plan and image_info.plan:1043 # expand values for lru cache1044 plan_name = image_info.plan.name1045 plan_product = image_info.plan.product1046 plan_publisher = image_info.plan.publisher1047 # accept the default purchase plan automatically.1048 arm_parameters.purchase_plan = self._process_marketplace_image_plan(1049 marketplace=arm_parameters.marketplace,1050 plan_name=plan_name,1051 plan_product=plan_product,1052 plan_publisher=plan_publisher,1053 )1054 arm_parameters.osdisk_size_in_gb = os_disk_size1055 # Set disk type1056 assert capability.disk, "node space must have disk defined."1057 assert isinstance(capability.disk.disk_type, schema.DiskType)1058 arm_parameters.disk_type = features.get_azure_disk_type(1059 capability.disk.disk_type1060 )1061 assert capability.network_interface1062 assert isinstance(1063 capability.network_interface.nic_count, int1064 ), f"actual: {capability.network_interface.nic_count}"1065 arm_parameters.nic_count = capability.network_interface.nic_count1066 assert isinstance(1067 capability.network_interface.data_path, schema.NetworkDataPath1068 ), f"actual: {type(capability.network_interface.data_path)}"1069 if capability.network_interface.data_path == schema.NetworkDataPath.Sriov:1070 arm_parameters.enable_sriov = True1071 return arm_parameters1072 def _validate_template(1073 self, deployment_parameters: Dict[str, Any], log: Logger1074 ) -> None:1075 log.debug("validating deployment")1076 validate_operation: Any = None1077 try:1078 with global_credential_access_lock:1079 validate_operation = self._rm_client.deployments.begin_validate(1080 **deployment_parameters1081 )1082 wait_operation(validate_operation, failure_identity="validation")1083 except Exception as identifier:1084 error_messages: List[str] = [str(identifier)]1085 if isinstance(identifier, HttpResponseError) and identifier.error:1086 # no validate_operation returned, the message may include1087 # some errors, so check details1088 error_messages = self._parse_detail_errors(identifier.error)1089 error_message = "\n".join(error_messages)1090 plugin_manager.hook.azure_deploy_failed(error_message=error_message)1091 raise LisaException(error_message)1092 def _deploy(1093 self, location: str, deployment_parameters: Dict[str, Any], log: Logger1094 ) -> None:1095 resource_group_name = deployment_parameters[AZURE_RG_NAME_KEY]1096 storage_account_name = get_storage_account_name(self.subscription_id, location)1097 check_or_create_storage_account(1098 self.credential,1099 self.subscription_id,1100 storage_account_name,1101 self._azure_runbook.shared_resource_group_name,1102 location,1103 log,1104 )1105 log.info(f"resource group '{resource_group_name}' deployment is in progress...")1106 deployment_operation: Any = None1107 deployments = self._rm_client.deployments1108 try:1109 deployment_operation = deployments.begin_create_or_update(1110 **deployment_parameters1111 )1112 wait_operation(deployment_operation, failure_identity="deploy")1113 except HttpResponseError as identifier:1114 # Some errors happens underlying, so there is no detail errors from API.1115 # For example,1116 # azure.core.exceptions.HttpResponseError:1117 # Operation returned an invalid status 'OK'1118 assert identifier.error, f"HttpResponseError: {identifier}"1119 error_message = "\n".join(self._parse_detail_errors(identifier.error))1120 if (1121 self._azure_runbook.ignore_provisioning_error1122 and "OSProvisioningTimedOut: OS Provisioning for VM" in error_message1123 ):1124 # Provisioning timeout causes by waagent is not ready.1125 # In smoke test, it still can verify some information.1126 # Eat information here, to run test case any way.1127 #1128 # It may cause other cases fail on assumptions. In this case, we can1129 # define a flag in config, to mark this exception is ignorable or not.1130 log.error(1131 f"provisioning time out, try to run case. "1132 f"Exception: {error_message}"1133 )1134 elif self._azure_runbook.ignore_provisioning_error and get_matched_str(1135 error_message, AZURE_INTERNAL_ERROR_PATTERN1136 ):1137 # Similar situation with OSProvisioningTimedOut1138 # Some OSProvisioningInternalError caused by it doesn't support1139 # SSH key authentication1140 # e.g. hpe hpestoreoncevsa hpestoreoncevsa-3187 3.18.71141 # After passthrough this exception,1142 # actually the 22 port of this VM is open.1143 log.error(1144 f"provisioning failed for an internal error, try to run case. "1145 f"Exception: {error_message}"1146 )1147 else:1148 plugin_manager.hook.azure_deploy_failed(error_message=error_message)1149 raise LisaException(error_message)1150 def _parse_detail_errors(self, error: Any) -> List[str]:1151 # original message may be a summary, get lowest level details.1152 if hasattr(error, "details") and error.details:1153 errors: List[str] = []1154 for detail in error.details:1155 errors.extend(self._parse_detail_errors(detail))1156 else:1157 try:1158 # it returns serialized json string in message sometime1159 parsed_error = json.loads(1160 error.message, object_hook=lambda x: SimpleNamespace(**x)1161 )1162 errors = self._parse_detail_errors(parsed_error.error)1163 except Exception:1164 # load failed, it should be a real error message string1165 errors = [f"{error.code}: {error.message}"]1166 return errors1167 # the VM may not be queried after deployed. use retry to mitigate it.1168 @retry(exceptions=LisaException, tries=150, delay=2)1169 def _load_vms(1170 self, environment: Environment, log: Logger1171 ) -> Dict[str, VirtualMachine]:1172 compute_client = get_compute_client(self, api_version="2020-06-01")1173 environment_context = get_environment_context(environment=environment)1174 log.debug(1175 f"listing vm in resource group "1176 f"'{environment_context.resource_group_name}'"1177 )1178 vms_map: Dict[str, VirtualMachine] = {}1179 vms = compute_client.virtual_machines.list(1180 environment_context.resource_group_name1181 )1182 for vm in vms:1183 vms_map[vm.name] = vm1184 log.debug(f" found vm {vm.name}")1185 if not vms_map:1186 raise LisaException(1187 f"deployment succeeded, but VM not found in 5 minutes "1188 f"from '{environment_context.resource_group_name}'"1189 )1190 return vms_map1191 # Use Exception, because there may be credential conflict error. Make it1192 # retriable.1193 @retry(exceptions=Exception, tries=150, delay=2)1194 def _load_nics(1195 self, environment: Environment, log: Logger1196 ) -> Dict[str, NetworkInterface]:1197 network_client = get_network_client(self)1198 environment_context = get_environment_context(environment=environment)1199 log.debug(1200 f"listing network interfaces in resource group "1201 f"'{environment_context.resource_group_name}'"1202 )1203 # load nics1204 nics_map: Dict[str, NetworkInterface] = {}1205 network_interfaces = network_client.network_interfaces.list(1206 environment_context.resource_group_name1207 )1208 for nic in network_interfaces:1209 # nic name is like lisa-test-20220316-182126-985-e0-n0-nic-2, get vm1210 # name part for later pick only find primary nic, which is ended by1211 # -nic-01212 node_name_from_nic = RESOURCE_ID_NIC_PATTERN.findall(nic.name)1213 if node_name_from_nic:1214 name = node_name_from_nic[0]1215 nics_map[name] = nic1216 log.debug(f" found nic '{nic.name}', and saved for next step.")1217 else:1218 log.debug(1219 f" found nic '{nic.name}', but dropped, "1220 "because it's not primary nic."1221 )1222 if not nics_map:1223 raise LisaException(1224 f"deployment succeeded, but network interfaces not found in 5 minutes "1225 f"from '{environment_context.resource_group_name}'"1226 )1227 return nics_map1228 @retry(exceptions=LisaException, tries=150, delay=2)1229 def load_public_ips_from_resource_group(1230 self, resource_group_name: str, log: Logger1231 ) -> Dict[str, str]:1232 network_client = get_network_client(self)1233 log.debug(f"listing public ips in resource group '{resource_group_name}'")1234 # get public IP1235 public_ip_addresses = network_client.public_ip_addresses.list(1236 resource_group_name1237 )1238 public_ips_map: Dict[str, str] = {}1239 for ip_address in public_ip_addresses:1240 # nic name is like node-0-nic-2, get vm name part for later pick1241 # only find primary nic, which is ended by -nic-01242 node_name_from_public_ip = RESOURCE_ID_PUBLIC_IP_PATTERN.findall(1243 ip_address.name1244 )1245 assert (1246 ip_address1247 ), f"public IP address cannot be empty, ip_address object: {ip_address}"1248 if node_name_from_public_ip:1249 name = node_name_from_public_ip[0]1250 public_ips_map[name] = ip_address.ip_address1251 log.debug(1252 f" found public IP '{ip_address.name}', and saved for next step."1253 )1254 else:1255 log.debug(1256 f" found public IP '{ip_address.name}', but dropped "1257 "because it's not primary nic."1258 )1259 if not public_ips_map:1260 raise LisaException(1261 f"deployment succeeded, but public ips not found in 5 minutes "1262 f"from '{resource_group_name}'"1263 )1264 return public_ips_map1265 def initialize_environment(self, environment: Environment, log: Logger) -> None:1266 node_context_map: Dict[str, Node] = {}1267 for node in environment.nodes.list():1268 node_context = get_node_context(node)1269 node_context_map[node_context.vm_name] = node1270 vms_map: Dict[str, VirtualMachine] = self._load_vms(environment, log)1271 nics_map: Dict[str, NetworkInterface] = self._load_nics(environment, log)1272 environment_context = get_environment_context(environment=environment)1273 public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1274 environment_context.resource_group_name, log1275 )1276 for vm_name, node in node_context_map.items():1277 node_context = get_node_context(node)1278 vm = vms_map.get(vm_name, None)1279 if not vm:1280 raise LisaException(1281 f"cannot find vm: '{vm_name}', make sure deployment is correct."1282 )1283 nic = nics_map[vm_name]1284 public_ip = public_ips_map[vm_name]1285 address = nic.ip_configurations[0].private_ip_address1286 if not node.name:1287 node.name = vm_name1288 assert isinstance(node, RemoteNode)1289 node.set_connection_info(1290 address=address,1291 port=22,1292 public_address=public_ip,1293 public_port=22,1294 username=node_context.username,1295 password=node_context.password,1296 private_key_file=node_context.private_key_file,1297 )1298 # enable ssh for windows, if it's not Windows, or SSH reachable, it will1299 # skip.1300 run_in_parallel(1301 [1302 partial(self._enable_ssh_on_windows, node=x)1303 for x in environment.nodes.list()1304 ]1305 )1306 def _resource_sku_to_capability( # noqa: C9011307 self, location: str, resource_sku: ResourceSku1308 ) -> schema.NodeSpace:1309 # fill in default values, in case no capability meet.1310 node_space = schema.NodeSpace(1311 node_count=1,1312 core_count=0,1313 memory_mb=0,1314 gpu_count=0,1315 )1316 node_space.name = f"{location}_{resource_sku.name}"1317 node_space.features = search_space.SetSpace[schema.FeatureSettings](1318 is_allow_set=True1319 )1320 node_space.disk = features.AzureDiskOptionSettings()1321 node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1322 is_allow_set=True, items=[]1323 )1324 node_space.disk.data_disk_iops = search_space.IntRange(min=0)1325 node_space.disk.data_disk_size = search_space.IntRange(min=0)1326 node_space.network_interface = schema.NetworkInterfaceOptionSettings()1327 node_space.network_interface.data_path = search_space.SetSpace[1328 schema.NetworkDataPath1329 ](is_allow_set=True, items=[])1330 # fill supported features1331 azure_raw_capabilities: Dict[str, str] = {}1332 for sku_capability in resource_sku.capabilities:1333 # prevent to loop in every feature1334 azure_raw_capabilities[sku_capability.name] = sku_capability.value1335 # calculate cpu count. Some vm sizes, like Standard_HC44rs, doesn't have1336 # vCPUsAvailable, so use vCPUs.1337 vcpus_available = int(azure_raw_capabilities.get("vCPUsAvailable", "0"))1338 if vcpus_available:1339 node_space.core_count = vcpus_available1340 else:1341 node_space.core_count = int(azure_raw_capabilities.get("vCPUs", "0"))1342 memory_value = azure_raw_capabilities.get("MemoryGB", None)1343 if memory_value:1344 node_space.memory_mb = int(float(memory_value) * 1024)1345 max_disk_count = azure_raw_capabilities.get("MaxDataDiskCount", None)1346 if max_disk_count:1347 node_space.disk.max_data_disk_count = int(max_disk_count)1348 node_space.disk.data_disk_count = search_space.IntRange(1349 max=node_space.disk.max_data_disk_count1350 )1351 max_nic_count = azure_raw_capabilities.get("MaxNetworkInterfaces", None)1352 if max_nic_count:1353 # set a min value for nic_count work around for an azure python sdk bug1354 # nic_count is 0 when get capability for some sizes e.g. Standard_D8a_v31355 sku_nic_count = int(max_nic_count)1356 if sku_nic_count == 0:1357 sku_nic_count = 11358 node_space.network_interface.nic_count = search_space.IntRange(1359 min=1, max=sku_nic_count1360 )1361 node_space.network_interface.max_nic_count = sku_nic_count1362 premium_io_supported = azure_raw_capabilities.get("PremiumIO", None)1363 if premium_io_supported and eval(premium_io_supported) is True:1364 node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1365 ephemeral_supported = azure_raw_capabilities.get(1366 "EphemeralOSDiskSupported", None1367 )1368 if ephemeral_supported and eval(ephemeral_supported) is True:1369 # Check if CachedDiskBytes is greater than 30GB1370 # We use diffdisk as cache disk for ephemeral OS disk1371 cached_disk_bytes = azure_raw_capabilities.get("CachedDiskBytes", 0)1372 cached_disk_bytes_gb = int(cached_disk_bytes) / 1024 / 1024 / 10241373 if cached_disk_bytes_gb >= 30:1374 node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1375 # set AN1376 an_enabled = azure_raw_capabilities.get("AcceleratedNetworkingEnabled", None)1377 if an_enabled and eval(an_enabled) is True:1378 # refer1379 # https://docs.microsoft.com/en-us/azure/virtual-machines/dcv2-series#configuration1380 # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv2-series1381 # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv3-series1382 # https://docs.microsoft.com/en-us/azure/virtual-machines/nd-series1383 # below VM size families don't support `Accelerated Networking` but1384 # API return `True`, fix this issue temporarily will revert it till1385 # bug fixed.1386 if resource_sku.family not in [1387 "standardDCSv2Family",1388 "standardNCSv2Family",1389 "standardNCSv3Family",1390 "standardNDSFamily",1391 ]:1392 # update data path types if sriov feature is supported1393 node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1394 # for some new sizes, there is no MaxNetworkInterfaces capability1395 # and we have to set a default value for max_nic_count1396 if not node_space.network_interface.max_nic_count:1397 node_space.network_interface.max_nic_count = 11398 # some vm size do not have resource disk present1399 # https://docs.microsoft.com/en-us/azure/virtual-machines/azure-vms-no-temp-disk1400 if resource_sku.family in [1401 "standardDv4Family",1402 "standardDSv4Family",1403 "standardEv4Family",1404 "standardESv4Family",1405 "standardEASv4Family",1406 "standardEASv5Family",1407 "standardESv5Family",1408 "standardEADSv5Family",1409 "standardDASv5Family",1410 "standardDSv5Family",1411 "standardFSv2Family",1412 "standardNCFamily",1413 "standardESv3Family",1414 "standardDPSv5Family",1415 "standardEBSv5Family",1416 "standardEv5Family",1417 ]:1418 node_space.disk.has_resource_disk = False1419 else:1420 node_space.disk.has_resource_disk = True1421 for supported_feature in self.supported_features():1422 if supported_feature.name() in [1423 features.Disk.name(),1424 features.NetworkInterface.name(),1425 ]:1426 # Skip the disk and network interfaces features. They will be1427 # handled by node_space directly.1428 continue1429 feature_setting = supported_feature.create_setting(1430 raw_capabilities=azure_raw_capabilities,1431 resource_sku=resource_sku,1432 node_space=node_space,1433 )1434 if feature_setting:1435 node_space.features.add(feature_setting)1436 node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1437 node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1438 node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1439 return node_space1440 def get_sorted_vm_sizes(1441 self, capabilities: List[AzureCapability], log: Logger1442 ) -> List[AzureCapability]:1443 # sort vm size by predefined pattern1444 sorted_capabilities: List[AzureCapability] = []1445 found_vm_sizes: Set[str] = set()1446 # loop all fall back levels1447 for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:1448 level_capabilities: List[AzureCapability] = []1449 # loop all capabilities1450 for capability in capabilities:1451 vm_size = capability.vm_size1452 if fallback_pattern.match(vm_size) and vm_size not in found_vm_sizes:1453 level_capabilities.append(capability)1454 found_vm_sizes.add(vm_size)1455 # sort by rough cost1456 level_capabilities.sort(key=lambda x: (x.capability.cost))1457 sorted_capabilities.extend(level_capabilities)1458 return sorted_capabilities1459 def load_public_ip(self, node: Node, log: Logger) -> str:1460 node_context = get_node_context(node)1461 vm_name = node_context.vm_name1462 resource_group_name = node_context.resource_group_name1463 public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1464 resource_group_name=resource_group_name, log=self._log1465 )1466 return public_ips_map[vm_name]1467 @lru_cache(maxsize=10) # noqa: B0191468 def _resolve_marketplace_image(1469 self, location: str, marketplace: AzureVmMarketplaceSchema1470 ) -> AzureVmMarketplaceSchema:1471 new_marketplace = copy.copy(marketplace)1472 # latest doesn't work, it needs a specified version.1473 if marketplace.version.lower() == "latest":1474 compute_client = get_compute_client(self)1475 with global_credential_access_lock:1476 versioned_images = compute_client.virtual_machine_images.list(1477 location=location,1478 publisher_name=marketplace.publisher,1479 offer=marketplace.offer,1480 skus=marketplace.sku,1481 )1482 if 0 == len(versioned_images):1483 raise LisaException(1484 f"cannot find any version of image {marketplace.publisher} "1485 f"{marketplace.offer} {marketplace.sku} in {location}"1486 )1487 # any one should be the same to get purchase plan1488 new_marketplace.version = versioned_images[-1].name1489 return new_marketplace1490 def _parse_shared_gallery_image(1491 self, location: str, shared_image: SharedImageGallerySchema1492 ) -> SharedImageGallerySchema:1493 new_shared_image = copy.copy(shared_image)1494 compute_client = get_compute_client(self)1495 if not shared_image.resource_group_name:1496 # /subscriptions/xxxx/resourceGroups/xxxx/providers/Microsoft.Compute/1497 # galleries/xxxx1498 rg_pattern = re.compile(r"resourceGroups/(.*)/providers", re.M)1499 galleries = compute_client.galleries.list()1500 rg_name = ""1501 for gallery in galleries:1502 if gallery.name.lower() == shared_image.image_gallery:1503 rg_name = get_matched_str(gallery.id, rg_pattern)1504 break1505 if not rg_name:1506 raise LisaException(1507 f"not find matched gallery {shared_image.image_gallery}"1508 )1509 new_shared_image.resource_group_name = rg_name1510 if shared_image.image_version.lower() == "latest":1511 gallery_images = (1512 compute_client.gallery_image_versions.list_by_gallery_image(1513 resource_group_name=new_shared_image.resource_group_name,1514 gallery_name=new_shared_image.image_gallery,1515 gallery_image_name=new_shared_image.image_definition,1516 )1517 )1518 image: GalleryImageVersion = None1519 time: Optional[datetime] = None1520 for image in gallery_images:1521 gallery_image = compute_client.gallery_image_versions.get(1522 resource_group_name=new_shared_image.resource_group_name,1523 gallery_name=new_shared_image.image_gallery,1524 gallery_image_name=new_shared_image.image_definition,1525 gallery_image_version_name=image.name,1526 expand="ReplicationStatus",1527 )1528 if not time:1529 time = gallery_image.publishing_profile.published_date1530 if gallery_image.publishing_profile.published_date > time:1531 time = gallery_image.publishing_profile.published_date1532 new_shared_image.image_version = image.name1533 return new_shared_image1534 @lru_cache(maxsize=10) # noqa: B0191535 def _process_marketplace_image_plan(1536 self,1537 marketplace: AzureVmMarketplaceSchema,1538 plan_name: str,1539 plan_product: str,1540 plan_publisher: str,1541 ) -> Optional[PurchasePlan]:1542 """1543 this method to fill plan, if a VM needs it. If don't fill it, the deployment1544 will be failed.1545 1. Get image_info to check if there is a plan.1546 2. If there is a plan, it may need to check and accept terms.1547 """1548 plan: Optional[AzureVmPurchasePlanSchema] = None1549 # if there is a plan, it may need to accept term.1550 marketplace_client = get_marketplace_ordering_client(self)1551 term: Optional[AgreementTerms] = None1552 try:1553 with global_credential_access_lock:1554 term = marketplace_client.marketplace_agreements.get(1555 offer_type="virtualmachine",1556 publisher_id=marketplace.publisher,1557 offer_id=marketplace.offer,1558 plan_id=plan_name,1559 )1560 except Exception as identifier:1561 raise LisaException(f"error on getting marketplace agreement: {identifier}")1562 assert term1563 if term.accepted is False:1564 term.accepted = True1565 marketplace_client.marketplace_agreements.create(1566 offer_type="virtualmachine",1567 publisher_id=marketplace.publisher,1568 offer_id=marketplace.offer,1569 plan_id=plan_name,1570 parameters=term,1571 )1572 plan = AzureVmPurchasePlanSchema(1573 name=plan_name,1574 product=plan_product,1575 publisher=plan_publisher,1576 )1577 return plan1578 def _generate_max_capability(self, vm_size: str, location: str) -> AzureCapability:1579 # some vm size cannot be queried from API, so use default capability to1580 # run with best guess on capability.1581 node_space = schema.NodeSpace(1582 node_count=1,1583 core_count=search_space.IntRange(min=1),1584 memory_mb=search_space.IntRange(min=0),1585 gpu_count=search_space.IntRange(min=0),1586 )1587 node_space.disk = features.AzureDiskOptionSettings()1588 node_space.disk.data_disk_count = search_space.IntRange(min=0)1589 node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1590 is_allow_set=True, items=[]1591 )1592 node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1593 node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1594 node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1595 node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1596 node_space.network_interface = schema.NetworkInterfaceOptionSettings()1597 node_space.network_interface.data_path = search_space.SetSpace[1598 schema.NetworkDataPath1599 ](is_allow_set=True, items=[])1600 node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1601 node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1602 node_space.network_interface.nic_count = search_space.IntRange(min=1)1603 # till now, the max nic number supported in Azure is 81604 node_space.network_interface.max_nic_count = 81605 azure_capability = AzureCapability(1606 location=location,1607 vm_size=vm_size,1608 capability=node_space,1609 resource_sku={},1610 )1611 node_space.name = f"{location}_{vm_size}"1612 node_space.features = search_space.SetSpace[schema.FeatureSettings](1613 is_allow_set=True1614 )1615 # all nodes support following features1616 all_features = self.supported_features()1617 node_space.features.update(1618 [schema.FeatureSettings.create(x.name()) for x in all_features]1619 )1620 _convert_to_azure_node_space(node_space)1621 return azure_capability1622 def _generate_min_capability(1623 self,1624 requirement: schema.NodeSpace,1625 azure_capability: AzureCapability,1626 location: str,1627 ) -> schema.NodeSpace:1628 min_cap: schema.NodeSpace = requirement.generate_min_capability(1629 azure_capability.capability1630 )1631 # Apply azure specified values. They will pass into arm template1632 azure_node_runbook = min_cap.get_extended_runbook(AzureNodeSchema, AZURE)1633 if azure_node_runbook.location:1634 assert location in azure_node_runbook.location, (1635 f"predefined location [{azure_node_runbook.location}] "1636 f"must be same as "1637 f"cap location [{location}]"1638 )1639 # the location may not be set1640 azure_node_runbook.location = location1641 azure_node_runbook.vm_size = azure_capability.vm_size1642 return min_cap1643 def _generate_sas_token(self, result_dict: Dict[str, str]) -> Any:1644 sc_name = result_dict["account_name"]1645 container_name = result_dict["container_name"]1646 rg = result_dict["resource_group_name"]1647 blob_name = result_dict["blob_name"]1648 source_container_client = get_or_create_storage_container(1649 credential=self.credential,1650 subscription_id=self.subscription_id,1651 account_name=sc_name,1652 container_name=container_name,1653 resource_group_name=rg,1654 )1655 source_blob = source_container_client.get_blob_client(blob_name)1656 sas_token = generate_sas_token(1657 credential=self.credential,1658 subscription_id=self.subscription_id,1659 account_name=sc_name,1660 resource_group_name=rg,1661 )1662 source_url = source_blob.url + "?" + sas_token1663 return source_url1664 @lru_cache(maxsize=10) # noqa: B0191665 def _get_deployable_vhd_path(1666 self, vhd_path: str, location: str, log: Logger1667 ) -> str:1668 """1669 The sas url is not able to create a vm directly, so this method check if1670 the vhd_path is a sas url. If so, copy it to a location in current1671 subscription, so it can be deployed.1672 """1673 matches = SAS_URL_PATTERN.match(vhd_path)1674 if not matches:1675 vhd_details = self._get_vhd_details(vhd_path)1676 vhd_location = vhd_details["location"]1677 if location == vhd_location:1678 return vhd_path1679 else:1680 vhd_path = self._generate_sas_token(vhd_details)1681 matches = SAS_URL_PATTERN.match(vhd_path)1682 assert matches, f"fail to generate sas url for {vhd_path}"1683 log.debug(1684 f"the vhd location {location} is not same with running case "1685 f"location {vhd_location}, generate a sas url for source vhd, "1686 f"it needs to be copied into location {location}."1687 )1688 else:1689 log.debug("found the vhd is a sas url, it may need to be copied.")1690 # get original vhd's hash key for comparing.1691 original_key: Optional[bytearray] = None1692 original_vhd_path = vhd_path1693 original_blob_client = BlobClient.from_blob_url(original_vhd_path)1694 properties = original_blob_client.get_blob_properties()1695 if properties.content_settings:1696 original_key = properties.content_settings.get(1697 "content_md5", None1698 ) # type: ignore1699 storage_name = get_storage_account_name(1700 subscription_id=self.subscription_id, location=location, type="t"1701 )1702 check_or_create_storage_account(1703 self.credential,1704 self.subscription_id,1705 storage_name,1706 self._azure_runbook.shared_resource_group_name,1707 location,1708 log,1709 )1710 container_client = get_or_create_storage_container(1711 credential=self.credential,1712 subscription_id=self.subscription_id,1713 account_name=storage_name,1714 container_name=SAS_COPIED_CONTAINER_NAME,1715 resource_group_name=self._azure_runbook.shared_resource_group_name,1716 )1717 normalized_vhd_name = constants.NORMALIZE_PATTERN.sub("-", vhd_path)1718 year = matches["year"] if matches["year"] else "9999"1719 month = matches["month"] if matches["month"] else "01"1720 day = matches["day"] if matches["day"] else "01"1721 # use the expire date to generate the path. It's easy to identify when1722 # the cache can be removed.1723 vhd_path = f"{year}{month}{day}/{normalized_vhd_name}.vhd"1724 full_vhd_path = f"{container_client.url}/{vhd_path}"1725 # lock here to prevent a vhd is copied in multi-thread1726 global _global_sas_vhd_copy_lock1727 cached_key: Optional[bytearray] = None1728 with _global_sas_vhd_copy_lock:1729 blobs = container_client.list_blobs(name_starts_with=vhd_path)1730 for blob in blobs:1731 if blob:1732 # check if hash key matched with original key.1733 if blob.content_settings:1734 cached_key = blob.content_settings.get("content_md5", None)1735 if original_key == cached_key:1736 # if it exists, return the link, not to copy again.1737 log.debug("the sas url is copied already, use it directly.")1738 return full_vhd_path1739 else:1740 log.debug("found cached vhd, but the hash key mismatched.")1741 blob_client = container_client.get_blob_client(vhd_path)1742 blob_client.start_copy_from_url(1743 original_vhd_path, metadata=None, incremental_copy=False1744 )1745 wait_copy_blob(blob_client, vhd_path, log)1746 return full_vhd_path1747 def _get_vhd_details(self, vhd_path: str) -> Any:1748 matched = STORAGE_CONTAINER_BLOB_PATTERN.match(vhd_path)1749 assert matched, f"fail to get matched info from {vhd_path}"1750 sc_name = matched.group("sc")1751 container_name = matched.group("container")1752 blob_name = matched.group("blob")1753 storage_client = get_storage_client(self.credential, self.subscription_id)1754 sc = [x for x in storage_client.storage_accounts.list() if x.name == sc_name]1755 assert sc[1756 01757 ], f"fail to get storage account {sc_name} from {self.subscription_id}"1758 rg = get_matched_str(sc[0].id, RESOURCE_GROUP_PATTERN)1759 return {1760 "location": sc[0].location,1761 "resource_group_name": rg,1762 "account_name": sc_name,1763 "container_name": container_name,1764 "blob_name": blob_name,1765 }1766 def _generate_data_disks(1767 self,1768 node: Node,1769 azure_node_runbook: AzureNodeArmParameter,1770 ) -> List[DataDiskSchema]:1771 data_disks: List[DataDiskSchema] = []1772 assert node.capability.disk1773 if azure_node_runbook.marketplace:1774 marketplace = self._get_image_info(1775 azure_node_runbook.location, azure_node_runbook.marketplace1776 )1777 # some images has data disks by default1778 # e.g. microsoft-ads linux-data-science-vm linuxdsvm 21.05.271779 # we have to inject below part when dataDisks section added in1780 # arm template,1781 # otherwise will see below exception:1782 # deployment failed: InvalidParameter: StorageProfile.dataDisks.lun1783 # does not have required value(s) for image specified in1784 # storage profile.1785 for default_data_disk in marketplace.data_disk_images:1786 data_disks.append(1787 DataDiskSchema(1788 node.capability.disk.data_disk_caching_type,1789 default_data_disk.additional_properties["sizeInGb"],1790 azure_node_runbook.disk_type,1791 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_FROM_IMAGE,1792 )1793 )1794 assert isinstance(1795 node.capability.disk.data_disk_count, int1796 ), f"actual: {type(node.capability.disk.data_disk_count)}"1797 for _ in range(node.capability.disk.data_disk_count):1798 assert isinstance(node.capability.disk.data_disk_size, int)1799 data_disks.append(1800 DataDiskSchema(1801 node.capability.disk.data_disk_caching_type,1802 node.capability.disk.data_disk_size,1803 azure_node_runbook.disk_type,1804 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY,1805 )1806 )1807 return data_disks1808 @lru_cache(maxsize=10) # noqa: B0191809 def _get_image_info(1810 self, location: str, marketplace: Optional[AzureVmMarketplaceSchema]1811 ) -> VirtualMachineImage:1812 compute_client = get_compute_client(self)1813 assert isinstance(marketplace, AzureVmMarketplaceSchema)1814 with global_credential_access_lock:1815 image_info = compute_client.virtual_machine_images.get(1816 location=location,1817 publisher_name=marketplace.publisher,1818 offer=marketplace.offer,1819 skus=marketplace.sku,1820 version=marketplace.version,1821 )1822 return image_info1823 def _get_location_key(self, location: str) -> str:1824 return f"{self.subscription_id}_{location}"1825 def _enable_ssh_on_windows(self, node: Node) -> None:1826 runbook = node.capability.get_extended_runbook(AzureNodeSchema)1827 if runbook.is_linux:1828 return1829 context = get_node_context(node)1830 remote_node = cast(RemoteNode, node)1831 log = node.log1832 log.debug(1833 f"checking if SSH port {remote_node.public_port} is reachable "1834 f"on {remote_node.name}..."1835 )1836 connected, _ = wait_tcp_port_ready(1837 address=remote_node.public_address,1838 port=remote_node.public_port,1839 log=log,1840 timeout=3,1841 )1842 if connected:1843 log.debug("SSH port is reachable.")1844 return1845 log.debug("SSH port is not open, enabling ssh on Windows ...")1846 # The SSH port is not opened, try to enable it.1847 public_key_data = get_public_key_data(self.runbook.admin_private_key_file)1848 with open(Path(__file__).parent / "Enable-SSH.ps1", "r") as f:1849 script = f.read()1850 parameters = RunCommandInputParameter(name="PublicKey", value=public_key_data)1851 command = RunCommandInput(1852 command_id="RunPowerShellScript",1853 script=[script],1854 parameters=[parameters],1855 )1856 compute_client = get_compute_client(self)1857 operation = compute_client.virtual_machines.begin_run_command(1858 resource_group_name=context.resource_group_name,1859 vm_name=context.vm_name,1860 parameters=command,1861 )1862 result = wait_operation(operation=operation, failure_identity="enable ssh")1863 log.debug("SSH script result:")1864 log.dump_json(logging.DEBUG, result)1865 def _get_vhd_os_disk_size(self, blob_url: str) -> int:1866 result_dict = self._get_vhd_details(blob_url)1867 container_client = get_or_create_storage_container(1868 credential=self.credential,1869 subscription_id=self.subscription_id,1870 account_name=result_dict["account_name"],1871 container_name=result_dict["container_name"],1872 resource_group_name=result_dict["resource_group_name"],1873 )1874 vhd_blob = container_client.get_blob_client(result_dict["blob_name"])1875 properties = vhd_blob.get_blob_properties()1876 assert properties.size, f"fail to get blob size of {blob_url}"1877 # Azure requires only megabyte alignment of vhds, round size up1878 # for cases where the size is megabyte aligned1879 return math.ceil(properties.size / 1024 / 1024 / 1024)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful