How to use _resource_sku_to_capability method in lisa

Best Python code snippet using lisa_python

platform_.py

Source:platform_.py Github

copy

Full Screen

...768 ):769 # restricted on this location770 continue771 resource_sku = sku_obj.as_dict()772 capability = self._resource_sku_to_capability(773 location, sku_obj774 )775 # estimate vm cost for priority776 assert isinstance(capability.core_count, int)777 assert isinstance(capability.gpu_count, int)778 azure_capability = AzureCapability(779 location=location,780 vm_size=sku_obj.name,781 capability=capability,782 resource_sku=resource_sku,783 )784 all_skus[azure_capability.vm_size] = azure_capability785 except Exception as identifier:786 log.error(f"unknown sku: {sku_obj}")787 raise identifier788 location_data = AzureLocation(location=location, capabilities=all_skus)789 log.debug(f"{location}: saving to disk")790 with open(cached_file_name, "w") as f:791 json.dump(location_data.to_dict(), f) # type: ignore792 log.debug(f"{key}: new data, " f"sku: {len(location_data.capabilities)}")793 assert location_data794 self._locations_data_cache[key] = location_data795 return location_data796 def _create_deployment_parameters(797 self, resource_group_name: str, environment: Environment, log: Logger798 ) -> Tuple[str, Dict[str, Any]]:799 assert environment.runbook, "env data cannot be None"800 assert environment.runbook.nodes_requirement, "node requirement cannot be None"801 log.debug("creating deployment")802 # construct parameters803 arm_parameters = AzureArmParameter()804 copied_fields = [805 "availability_set_tags",806 "availability_set_properties",807 "vm_tags",808 ]809 set_filtered_fields(self._azure_runbook, arm_parameters, copied_fields)810 is_windows: bool = False811 arm_parameters.admin_username = self.runbook.admin_username812 if self.runbook.admin_private_key_file:813 arm_parameters.admin_key_data = get_public_key_data(814 self.runbook.admin_private_key_file815 )816 else:817 arm_parameters.admin_password = self.runbook.admin_password818 environment_context = get_environment_context(environment=environment)819 arm_parameters.vm_tags["RG"] = environment_context.resource_group_name820 # get local lisa environment821 arm_parameters.vm_tags["lisa_username"] = local().tools[Whoami].get_username()822 arm_parameters.vm_tags["lisa_hostname"] = local().tools[Hostname].get_hostname()823 nodes_parameters: List[AzureNodeArmParameter] = []824 features_settings: Dict[str, schema.FeatureSettings] = {}825 for node_space in environment.runbook.nodes_requirement:826 assert isinstance(827 node_space, schema.NodeSpace828 ), f"actual: {type(node_space)}"829 azure_node_runbook = node_space.get_extended_runbook(830 AzureNodeSchema, type_name=AZURE831 )832 # Subscription Id is used by Shared Gallery images located833 # in subscription different from where LISA is run834 azure_node_runbook.subscription_id = self.subscription_id835 # init node836 node = environment.create_node_from_requirement(837 node_space,838 )839 azure_node_runbook = self._create_node_runbook(840 len(nodes_parameters), node_space, log, resource_group_name841 )842 # save parsed runbook back, for example, the version of marketplace may be843 # parsed from latest to a specified version.844 node.capability.set_extended_runbook(azure_node_runbook)845 node_arm_parameters = self._create_node_arm_parameters(node.capability, log)846 nodes_parameters.append(node_arm_parameters)847 # Set data disk array848 arm_parameters.data_disks = self._generate_data_disks(849 node, node_arm_parameters850 )851 if not arm_parameters.location:852 # take first one's location853 arm_parameters.location = azure_node_runbook.location854 # save vm's information into node855 node_context = get_node_context(node)856 node_context.resource_group_name = environment_context.resource_group_name857 # vm's name, use to find it from azure858 node_context.vm_name = azure_node_runbook.name859 # ssh related information will be filled back once vm is created. If860 # it's Windows, fill in the password always. If it's Linux, the861 # private key has higher priority.862 node_context.username = arm_parameters.admin_username863 if azure_node_runbook.is_linux:864 node_context.password = arm_parameters.admin_password865 else:866 is_windows = True867 if not self.runbook.admin_password:868 # password is required, if it doesn't present, generate one.869 password = generate_random_chars()870 add_secret(password)871 self.runbook.admin_password = password872 node_context.password = self.runbook.admin_password873 node_context.private_key_file = self.runbook.admin_private_key_file874 # collect all features to handle special deployment logic. If one875 # node has this, it needs to run.876 if node.capability.features:877 for f in node.capability.features:878 if f.type not in features_settings:879 features_settings[f.type] = f880 log.info(f"vm setting: {azure_node_runbook}")881 if is_windows:882 # set password for windows any time.883 arm_parameters.admin_password = self.runbook.admin_password884 arm_parameters.nodes = nodes_parameters885 arm_parameters.storage_name = get_storage_account_name(886 self.subscription_id, arm_parameters.location887 )888 if (889 self._azure_runbook.availability_set_properties890 or self._azure_runbook.availability_set_tags891 ):892 arm_parameters.use_availability_sets = True893 # In Azure, each VM should have only one nic in one subnet. So calculate894 # the max nic count, and set to subnet count.895 arm_parameters.subnet_count = max(x.nic_count for x in arm_parameters.nodes)896 arm_parameters.shared_resource_group_name = (897 self._azure_runbook.shared_resource_group_name898 )899 # the arm template may be updated by the hooks, so make a copy to avoid900 # the original template is modified.901 template = deepcopy(self._load_template())902 plugin_manager.hook.azure_update_arm_template(903 template=template, environment=environment904 )905 # change deployment for each feature.906 for f in features_settings.values():907 feature_type = next(908 x for x in self.supported_features() if x.name() == f.type909 )910 feature_type.on_before_deployment(911 arm_parameters=arm_parameters,912 template=template,913 settings=f,914 environment=environment,915 log=log,916 )917 # composite deployment properties918 parameters = arm_parameters.to_dict() # type:ignore919 parameters = {k: {"value": v} for k, v in parameters.items()}920 log.debug(f"parameters: {parameters}")921 deployment_properties = DeploymentProperties(922 mode=DeploymentMode.incremental,923 template=template,924 parameters=parameters,925 )926 # dump arm_template and arm_parameters to file927 template_dump_path = environment.log_path / "arm_template.json"928 param_dump_path = environment.log_path / "arm_template_parameters.json"929 dump_file(template_dump_path, json.dumps(template, indent=4))930 dump_file(param_dump_path, json.dumps(parameters, indent=4))931 return (932 arm_parameters.location,933 {934 AZURE_RG_NAME_KEY: resource_group_name,935 "deployment_name": AZURE_DEPLOYMENT_NAME,936 "parameters": Deployment(properties=deployment_properties),937 },938 )939 def _create_node_runbook(940 self,941 index: int,942 node_space: schema.NodeSpace,943 log: Logger,944 name_prefix: str,945 ) -> AzureNodeSchema:946 azure_node_runbook = node_space.get_extended_runbook(947 AzureNodeSchema, type_name=AZURE948 )949 if not azure_node_runbook.name:950 # the max length of vm name is 64 chars. Below logic takes last 45951 # chars in resource group name and keep the leading 5 chars.952 # name_prefix can contain any of customized (existing) or953 # generated (starts with "lisa-") resource group name,954 # so, pass the first 5 chars as prefix to truncate_keep_prefix955 # to handle both cases956 node_name = f"{name_prefix}-n{index}"957 azure_node_runbook.name = truncate_keep_prefix(node_name, 50, node_name[:5])958 # It's used as computer name only. Windows doesn't support name more959 # than 15 chars960 azure_node_runbook.short_name = truncate_keep_prefix(961 azure_node_runbook.name, 15, azure_node_runbook.name[:5]962 )963 if not azure_node_runbook.vm_size:964 raise LisaException("vm_size is not detected before deploy")965 if not azure_node_runbook.location:966 raise LisaException("location is not detected before deploy")967 if azure_node_runbook.hyperv_generation not in [1, 2]:968 raise LisaException(969 "hyperv_generation need value 1 or 2, "970 f"but {azure_node_runbook.hyperv_generation}",971 )972 if azure_node_runbook.vhd:973 # vhd is higher priority974 azure_node_runbook.vhd = self._get_deployable_vhd_path(975 azure_node_runbook.vhd, azure_node_runbook.location, log976 )977 azure_node_runbook.marketplace = None978 azure_node_runbook.shared_gallery = None979 elif azure_node_runbook.shared_gallery:980 azure_node_runbook.marketplace = None981 azure_node_runbook.shared_gallery = self._parse_shared_gallery_image(982 azure_node_runbook.location, azure_node_runbook.shared_gallery983 )984 elif not azure_node_runbook.marketplace:985 # set to default marketplace, if nothing specified986 azure_node_runbook.marketplace = AzureVmMarketplaceSchema()987 else:988 # marketplace value is already set in runbook989 ...990 if azure_node_runbook.marketplace:991 # resolve Latest to specified version992 azure_node_runbook.marketplace = self._resolve_marketplace_image(993 azure_node_runbook.location, azure_node_runbook.marketplace994 )995 image_info = self._get_image_info(996 azure_node_runbook.location, azure_node_runbook.marketplace997 )998 # HyperVGenerationTypes return "V1"/"V2", so we need to strip "V"999 if image_info.hyper_v_generation:1000 azure_node_runbook.hyperv_generation = int(1001 image_info.hyper_v_generation.strip("V")1002 )1003 # retrieve the os type for arm template.1004 if azure_node_runbook.is_linux is None:1005 if image_info.os_disk_image.operating_system == "Windows":1006 azure_node_runbook.is_linux = False1007 else:1008 azure_node_runbook.is_linux = True1009 if azure_node_runbook.is_linux is None:1010 # fill it default value1011 azure_node_runbook.is_linux = True1012 return azure_node_runbook1013 def _create_node_arm_parameters(1014 self, capability: schema.Capability, log: Logger1015 ) -> AzureNodeArmParameter:1016 runbook = capability.get_extended_runbook(AzureNodeSchema, type_name=AZURE)1017 arm_parameters = AzureNodeArmParameter.from_node_runbook(runbook)1018 os_disk_size = 301019 if arm_parameters.vhd:1020 # vhd is higher priority1021 arm_parameters.vhd = self._get_deployable_vhd_path(1022 arm_parameters.vhd, arm_parameters.location, log1023 )1024 os_disk_size = max(1025 os_disk_size, self._get_vhd_os_disk_size(arm_parameters.vhd)1026 )1027 elif arm_parameters.shared_gallery:1028 os_disk_size = max(1029 os_disk_size,1030 self._get_sig_os_disk_size(arm_parameters.shared_gallery),1031 )1032 else:1033 assert (1034 arm_parameters.marketplace1035 ), "not set one of marketplace, shared_gallery or vhd."1036 image_info = self._get_image_info(1037 arm_parameters.location, arm_parameters.marketplace1038 )1039 os_disk_size = max(1040 os_disk_size, image_info.os_disk_image.additional_properties["sizeInGb"]1041 )1042 if not arm_parameters.purchase_plan and image_info.plan:1043 # expand values for lru cache1044 plan_name = image_info.plan.name1045 plan_product = image_info.plan.product1046 plan_publisher = image_info.plan.publisher1047 # accept the default purchase plan automatically.1048 arm_parameters.purchase_plan = self._process_marketplace_image_plan(1049 marketplace=arm_parameters.marketplace,1050 plan_name=plan_name,1051 plan_product=plan_product,1052 plan_publisher=plan_publisher,1053 )1054 arm_parameters.osdisk_size_in_gb = os_disk_size1055 # Set disk type1056 assert capability.disk, "node space must have disk defined."1057 assert isinstance(capability.disk.disk_type, schema.DiskType)1058 arm_parameters.disk_type = features.get_azure_disk_type(1059 capability.disk.disk_type1060 )1061 assert capability.network_interface1062 assert isinstance(1063 capability.network_interface.nic_count, int1064 ), f"actual: {capability.network_interface.nic_count}"1065 arm_parameters.nic_count = capability.network_interface.nic_count1066 assert isinstance(1067 capability.network_interface.data_path, schema.NetworkDataPath1068 ), f"actual: {type(capability.network_interface.data_path)}"1069 if capability.network_interface.data_path == schema.NetworkDataPath.Sriov:1070 arm_parameters.enable_sriov = True1071 return arm_parameters1072 def _validate_template(1073 self, deployment_parameters: Dict[str, Any], log: Logger1074 ) -> None:1075 log.debug("validating deployment")1076 validate_operation: Any = None1077 try:1078 with global_credential_access_lock:1079 validate_operation = self._rm_client.deployments.begin_validate(1080 **deployment_parameters1081 )1082 wait_operation(validate_operation, failure_identity="validation")1083 except Exception as identifier:1084 error_messages: List[str] = [str(identifier)]1085 if isinstance(identifier, HttpResponseError) and identifier.error:1086 # no validate_operation returned, the message may include1087 # some errors, so check details1088 error_messages = self._parse_detail_errors(identifier.error)1089 error_message = "\n".join(error_messages)1090 plugin_manager.hook.azure_deploy_failed(error_message=error_message)1091 raise LisaException(error_message)1092 def _deploy(1093 self, location: str, deployment_parameters: Dict[str, Any], log: Logger1094 ) -> None:1095 resource_group_name = deployment_parameters[AZURE_RG_NAME_KEY]1096 storage_account_name = get_storage_account_name(self.subscription_id, location)1097 check_or_create_storage_account(1098 self.credential,1099 self.subscription_id,1100 storage_account_name,1101 self._azure_runbook.shared_resource_group_name,1102 location,1103 log,1104 )1105 log.info(f"resource group '{resource_group_name}' deployment is in progress...")1106 deployment_operation: Any = None1107 deployments = self._rm_client.deployments1108 try:1109 deployment_operation = deployments.begin_create_or_update(1110 **deployment_parameters1111 )1112 wait_operation(deployment_operation, failure_identity="deploy")1113 except HttpResponseError as identifier:1114 # Some errors happens underlying, so there is no detail errors from API.1115 # For example,1116 # azure.core.exceptions.HttpResponseError:1117 # Operation returned an invalid status 'OK'1118 assert identifier.error, f"HttpResponseError: {identifier}"1119 error_message = "\n".join(self._parse_detail_errors(identifier.error))1120 if (1121 self._azure_runbook.ignore_provisioning_error1122 and "OSProvisioningTimedOut: OS Provisioning for VM" in error_message1123 ):1124 # Provisioning timeout causes by waagent is not ready.1125 # In smoke test, it still can verify some information.1126 # Eat information here, to run test case any way.1127 #1128 # It may cause other cases fail on assumptions. In this case, we can1129 # define a flag in config, to mark this exception is ignorable or not.1130 log.error(1131 f"provisioning time out, try to run case. "1132 f"Exception: {error_message}"1133 )1134 elif self._azure_runbook.ignore_provisioning_error and get_matched_str(1135 error_message, AZURE_INTERNAL_ERROR_PATTERN1136 ):1137 # Similar situation with OSProvisioningTimedOut1138 # Some OSProvisioningInternalError caused by it doesn't support1139 # SSH key authentication1140 # e.g. hpe hpestoreoncevsa hpestoreoncevsa-3187 3.18.71141 # After passthrough this exception,1142 # actually the 22 port of this VM is open.1143 log.error(1144 f"provisioning failed for an internal error, try to run case. "1145 f"Exception: {error_message}"1146 )1147 else:1148 plugin_manager.hook.azure_deploy_failed(error_message=error_message)1149 raise LisaException(error_message)1150 def _parse_detail_errors(self, error: Any) -> List[str]:1151 # original message may be a summary, get lowest level details.1152 if hasattr(error, "details") and error.details:1153 errors: List[str] = []1154 for detail in error.details:1155 errors.extend(self._parse_detail_errors(detail))1156 else:1157 try:1158 # it returns serialized json string in message sometime1159 parsed_error = json.loads(1160 error.message, object_hook=lambda x: SimpleNamespace(**x)1161 )1162 errors = self._parse_detail_errors(parsed_error.error)1163 except Exception:1164 # load failed, it should be a real error message string1165 errors = [f"{error.code}: {error.message}"]1166 return errors1167 # the VM may not be queried after deployed. use retry to mitigate it.1168 @retry(exceptions=LisaException, tries=150, delay=2)1169 def _load_vms(1170 self, environment: Environment, log: Logger1171 ) -> Dict[str, VirtualMachine]:1172 compute_client = get_compute_client(self, api_version="2020-06-01")1173 environment_context = get_environment_context(environment=environment)1174 log.debug(1175 f"listing vm in resource group "1176 f"'{environment_context.resource_group_name}'"1177 )1178 vms_map: Dict[str, VirtualMachine] = {}1179 vms = compute_client.virtual_machines.list(1180 environment_context.resource_group_name1181 )1182 for vm in vms:1183 vms_map[vm.name] = vm1184 log.debug(f" found vm {vm.name}")1185 if not vms_map:1186 raise LisaException(1187 f"deployment succeeded, but VM not found in 5 minutes "1188 f"from '{environment_context.resource_group_name}'"1189 )1190 return vms_map1191 # Use Exception, because there may be credential conflict error. Make it1192 # retriable.1193 @retry(exceptions=Exception, tries=150, delay=2)1194 def _load_nics(1195 self, environment: Environment, log: Logger1196 ) -> Dict[str, NetworkInterface]:1197 network_client = get_network_client(self)1198 environment_context = get_environment_context(environment=environment)1199 log.debug(1200 f"listing network interfaces in resource group "1201 f"'{environment_context.resource_group_name}'"1202 )1203 # load nics1204 nics_map: Dict[str, NetworkInterface] = {}1205 network_interfaces = network_client.network_interfaces.list(1206 environment_context.resource_group_name1207 )1208 for nic in network_interfaces:1209 # nic name is like lisa-test-20220316-182126-985-e0-n0-nic-2, get vm1210 # name part for later pick only find primary nic, which is ended by1211 # -nic-01212 node_name_from_nic = RESOURCE_ID_NIC_PATTERN.findall(nic.name)1213 if node_name_from_nic:1214 name = node_name_from_nic[0]1215 nics_map[name] = nic1216 log.debug(f" found nic '{nic.name}', and saved for next step.")1217 else:1218 log.debug(1219 f" found nic '{nic.name}', but dropped, "1220 "because it's not primary nic."1221 )1222 if not nics_map:1223 raise LisaException(1224 f"deployment succeeded, but network interfaces not found in 5 minutes "1225 f"from '{environment_context.resource_group_name}'"1226 )1227 return nics_map1228 @retry(exceptions=LisaException, tries=150, delay=2)1229 def load_public_ips_from_resource_group(1230 self, resource_group_name: str, log: Logger1231 ) -> Dict[str, str]:1232 network_client = get_network_client(self)1233 log.debug(f"listing public ips in resource group '{resource_group_name}'")1234 # get public IP1235 public_ip_addresses = network_client.public_ip_addresses.list(1236 resource_group_name1237 )1238 public_ips_map: Dict[str, str] = {}1239 for ip_address in public_ip_addresses:1240 # nic name is like node-0-nic-2, get vm name part for later pick1241 # only find primary nic, which is ended by -nic-01242 node_name_from_public_ip = RESOURCE_ID_PUBLIC_IP_PATTERN.findall(1243 ip_address.name1244 )1245 assert (1246 ip_address1247 ), f"public IP address cannot be empty, ip_address object: {ip_address}"1248 if node_name_from_public_ip:1249 name = node_name_from_public_ip[0]1250 public_ips_map[name] = ip_address.ip_address1251 log.debug(1252 f" found public IP '{ip_address.name}', and saved for next step."1253 )1254 else:1255 log.debug(1256 f" found public IP '{ip_address.name}', but dropped "1257 "because it's not primary nic."1258 )1259 if not public_ips_map:1260 raise LisaException(1261 f"deployment succeeded, but public ips not found in 5 minutes "1262 f"from '{resource_group_name}'"1263 )1264 return public_ips_map1265 def initialize_environment(self, environment: Environment, log: Logger) -> None:1266 node_context_map: Dict[str, Node] = {}1267 for node in environment.nodes.list():1268 node_context = get_node_context(node)1269 node_context_map[node_context.vm_name] = node1270 vms_map: Dict[str, VirtualMachine] = self._load_vms(environment, log)1271 nics_map: Dict[str, NetworkInterface] = self._load_nics(environment, log)1272 environment_context = get_environment_context(environment=environment)1273 public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1274 environment_context.resource_group_name, log1275 )1276 for vm_name, node in node_context_map.items():1277 node_context = get_node_context(node)1278 vm = vms_map.get(vm_name, None)1279 if not vm:1280 raise LisaException(1281 f"cannot find vm: '{vm_name}', make sure deployment is correct."1282 )1283 nic = nics_map[vm_name]1284 public_ip = public_ips_map[vm_name]1285 address = nic.ip_configurations[0].private_ip_address1286 if not node.name:1287 node.name = vm_name1288 assert isinstance(node, RemoteNode)1289 node.set_connection_info(1290 address=address,1291 port=22,1292 public_address=public_ip,1293 public_port=22,1294 username=node_context.username,1295 password=node_context.password,1296 private_key_file=node_context.private_key_file,1297 )1298 # enable ssh for windows, if it's not Windows, or SSH reachable, it will1299 # skip.1300 run_in_parallel(1301 [1302 partial(self._enable_ssh_on_windows, node=x)1303 for x in environment.nodes.list()1304 ]1305 )1306 def _resource_sku_to_capability( # noqa: C9011307 self, location: str, resource_sku: ResourceSku1308 ) -> schema.NodeSpace:1309 # fill in default values, in case no capability meet.1310 node_space = schema.NodeSpace(1311 node_count=1,1312 core_count=0,1313 memory_mb=0,1314 gpu_count=0,1315 )1316 node_space.name = f"{location}_{resource_sku.name}"1317 node_space.features = search_space.SetSpace[schema.FeatureSettings](1318 is_allow_set=True1319 )1320 node_space.disk = features.AzureDiskOptionSettings()...

Full Screen

Full Screen

test_prepare.py

Source:test_prepare.py Github

copy

Full Screen

...54 ],55 "restrictions": [],56 }57 )58 node = self._platform._resource_sku_to_capability("eastus2", resource_sku)59 self.assertEqual(48, node.core_count)60 self.assertEqual(458752, node.memory_mb)61 assert node.network_interface62 self.assertEqual(63 search_space.IntRange(min=1, max=8), node.network_interface.nic_count64 )65 assert node.disk66 self.assertEqual(67 search_space.IntRange(min=0, max=32), node.disk.data_disk_count68 )69 self.assertEqual(4, node.gpu_count)70 def test_not_eligible_dropped(self) -> None:71 # if a vm size doesn't exists, it should be dropped.72 # if a location is not eligible, it should be dropped....

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful