How to use _convert_to_azure_node_space method in lisa

Best Python code snippet using lisa_python

platform_.py

Source:platform_.py Github

copy

Full Screen

...181 capability: schema.NodeSpace182 resource_sku: Dict[str, Any]183 def __post_init__(self, *args: Any, **kwargs: Any) -> None:184 # reload features settings with platform specified types.185 _convert_to_azure_node_space(self.capability)186@dataclass_json()187@dataclass188class AzureLocation:189 updated_time: datetime = field(190 default_factory=datetime.now,191 metadata=field_metadata(192 fields.DateTime,193 encoder=datetime.isoformat,194 decoder=datetime.fromisoformat,195 format="iso",196 ),197 )198 location: str = ""199 capabilities: Dict[str, AzureCapability] = field(default_factory=dict)200@dataclass_json()201@dataclass202class AzurePlatformSchema:203 service_principal_tenant_id: str = field(204 default="",205 metadata=field_metadata(206 validate=validate.Regexp(constants.GUID_REGEXP),207 ),208 )209 service_principal_client_id: str = field(210 default="",211 metadata=field_metadata(212 validate=validate.Regexp(constants.GUID_REGEXP),213 ),214 )215 service_principal_key: str = field(default="")216 subscription_id: str = field(217 default="",218 metadata=field_metadata(219 validate=validate.Regexp(constants.GUID_REGEXP),220 ),221 )222 shared_resource_group_name: str = AZURE_SHARED_RG_NAME223 resource_group_name: str = field(default="")224 availability_set_tags: Optional[Dict[str, str]] = field(default=None)225 availability_set_properties: Optional[Dict[str, Any]] = field(default=None)226 vm_tags: Optional[Dict[str, Any]] = field(default=None)227 locations: Optional[Union[str, List[str]]] = field(default=None)228 # Provisioning error causes by waagent is not ready or other reasons. In229 # smoke test, it can verify some points also. Other tests should use the230 # default False to raise errors to prevent unexpected behavior.231 ignore_provisioning_error: bool = False232 log_level: str = field(233 default=logging.getLevelName(logging.WARN),234 metadata=field_metadata(235 validate=validate.OneOf(236 [237 logging.getLevelName(logging.ERROR),238 logging.getLevelName(logging.WARN),239 logging.getLevelName(logging.INFO),240 logging.getLevelName(logging.DEBUG),241 ]242 ),243 ),244 )245 # do actual deployment, or pass through for troubleshooting246 dry_run: bool = False247 # do actual deployment, or try to retrieve existing vms248 deploy: bool = True249 # wait resource deleted or not250 wait_delete: bool = False251 def __post_init__(self, *args: Any, **kwargs: Any) -> None:252 strip_strs(253 self,254 [255 "service_principal_tenant_id",256 "service_principal_client_id",257 "service_principal_key",258 "subscription_id",259 "shared_resource_group_name",260 "resource_group_name",261 "locations",262 "log_level",263 ],264 )265 if self.service_principal_tenant_id:266 add_secret(self.service_principal_tenant_id, mask=PATTERN_GUID)267 if self.subscription_id:268 add_secret(self.subscription_id, mask=PATTERN_GUID)269 if self.service_principal_client_id or self.service_principal_key:270 add_secret(self.service_principal_client_id, mask=PATTERN_GUID)271 add_secret(self.service_principal_key)272 if not self.service_principal_client_id or not self.service_principal_key:273 raise LisaException(274 "service_principal_client_id and service_principal_key "275 "should be specified either both or not."276 )277 if not self.locations:278 self.locations = LOCATIONS279class AzurePlatform(Platform):280 _diagnostic_storage_container_pattern = re.compile(281 r"(https:\/\/)(?P<storage_name>.*)([.].*){4}\/" r"(?P<container_name>.*)\/",282 re.M,283 )284 _arm_template: Any = None285 _credentials: Dict[str, DefaultAzureCredential] = {}286 _locations_data_cache: Dict[str, AzureLocation] = {}287 def __init__(self, runbook: schema.Platform) -> None:288 super().__init__(runbook=runbook)289 # for type detection290 self.credential: DefaultAzureCredential291 # It has to be defined after the class definition is loaded. So it292 # cannot be a class level variable.293 self._environment_information_hooks = {294 KEY_HOST_VERSION: self._get_host_version,295 KEY_KERNEL_VERSION: self._get_kernel_version,296 KEY_WALA_VERSION: self._get_wala_version,297 KEY_WALA_DISTRO_VERSION: self._get_wala_distro_version,298 }299 @classmethod300 def type_name(cls) -> str:301 return AZURE302 @classmethod303 def supported_features(cls) -> List[Type[feature.Feature]]:304 return [305 features.Disk,306 features.Gpu,307 features.Nvme,308 features.NestedVirtualization,309 features.SerialConsole,310 features.NetworkInterface,311 features.Resize,312 features.StartStop,313 features.Infiniband,314 features.Hibernation,315 features.SecurityProfile,316 features.ACC,317 features.IsolatedResource,318 features.Nfs,319 ]320 def _prepare_environment(self, environment: Environment, log: Logger) -> bool:321 """322 Main flow323 1. load location, vm size patterns firstly.324 2. load available vm sizes for each location.325 3. match vm sizes by pattern.326 for each environment327 1. If predefined location exists on node level, check conflict and use it.328 2. If predefined vm size exists on node level, check exists and use it.329 3. check capability for each node by order of pattern.330 4. get min capability for each match331 """332 if not environment.runbook.nodes_requirement:333 return True334 # reload requirement to match335 environment.runbook.reload_requirements()336 nodes_requirement = environment.runbook.nodes_requirement337 # covert to azure node space, so the azure extensions can be loaded.338 for req in nodes_requirement:339 _convert_to_azure_node_space(req)340 is_success: bool = False341 # get eligible locations342 allowed_locations = _get_allowed_locations(nodes_requirement)343 log.debug(f"allowed locations: {allowed_locations}")344 # Any to wait for resource345 any_wait_for_resource: bool = False346 errors: List[str] = []347 for location in allowed_locations:348 caps, error = self._get_matched_capabilities(349 location=location, nodes_requirement=nodes_requirement, log=log350 )351 if error:352 errors.append(error)353 self._analyze_environment_availability(location, caps)354 # set all awaitable flag if nothing is False355 if all(x for x in caps):356 any_wait_for_resource = True357 # check to return value or raise WaitForMoreResource358 if all(isinstance(x, schema.NodeSpace) for x in caps):359 # with above condition, all types are NodeSpace. Ignore the mypy check.360 environment.runbook.nodes_requirement = caps # type: ignore361 environment.cost = sum(362 x.cost for x in caps if isinstance(x, schema.NodeSpace)363 )364 is_success = True365 log.debug(366 f"requirement meet, "367 f"cost: {environment.cost}, "368 f"cap: {environment.runbook.nodes_requirement}"369 )370 break371 if not is_success:372 if any_wait_for_resource:373 raise ResourceAwaitableException(374 "vm size", "No available quota, try to deploy later."375 )376 else:377 raise NotMeetRequirementException(378 f"{errors}, runbook: {environment.runbook}."379 )380 # resolve Latest to specified version381 if is_success:382 self._resolve_marketplace_image_version(nodes_requirement)383 return is_success384 def _deploy_environment(self, environment: Environment, log: Logger) -> None:385 assert self._rm_client386 assert self._azure_runbook387 environment_context = get_environment_context(environment=environment)388 if self._azure_runbook.resource_group_name:389 resource_group_name = self._azure_runbook.resource_group_name390 else:391 normalized_name = constants.NORMALIZE_PATTERN.sub("-", constants.RUN_NAME)392 # Take last chars to make sure the length is to exceed max 90 chars393 # allowed in resource group name.394 resource_group_name = truncate_keep_prefix(395 f"{normalized_name}-e{environment.id}", 80396 )397 environment_context.resource_group_is_specified = True398 environment_context.resource_group_name = resource_group_name399 if self._azure_runbook.dry_run:400 log.info(f"dry_run: {self._azure_runbook.dry_run}")401 else:402 try:403 if self._azure_runbook.deploy:404 log.info(405 f"creating or updating resource group: [{resource_group_name}]"406 )407 check_or_create_resource_group(408 self.credential,409 subscription_id=self.subscription_id,410 resource_group_name=resource_group_name,411 location=RESOURCE_GROUP_LOCATION,412 log=log,413 )414 else:415 log.info(f"reusing resource group: [{resource_group_name}]")416 location, deployment_parameters = self._create_deployment_parameters(417 resource_group_name, environment, log418 )419 if self._azure_runbook.deploy:420 self._validate_template(deployment_parameters, log)421 self._deploy(location, deployment_parameters, log)422 # Even skipped deploy, try best to initialize nodes423 self.initialize_environment(environment, log)424 except Exception as identifier:425 self._delete_environment(environment, log)426 raise identifier427 def _delete_environment(self, environment: Environment, log: Logger) -> None:428 environment_context = get_environment_context(environment=environment)429 resource_group_name = environment_context.resource_group_name430 # the resource group name is empty when it is not deployed for some reasons,431 # like capability doesn't meet case requirement.432 if not resource_group_name:433 return434 assert self._azure_runbook435 if not environment_context.resource_group_is_specified:436 log.info(437 f"skipped to delete resource group: {resource_group_name}, "438 f"as it's specified in runbook."439 )440 elif self._azure_runbook.dry_run:441 log.info(442 f"skipped to delete resource group: {resource_group_name}, "443 f"as it's a dry run."444 )445 else:446 assert self._rm_client447 az_rg_exists = self._rm_client.resource_groups.check_existence(448 resource_group_name449 )450 if not az_rg_exists:451 return452 log.info(453 f"deleting resource group: {resource_group_name}, "454 f"wait: {self._azure_runbook.wait_delete}"455 )456 try:457 self._delete_boot_diagnostic_container(resource_group_name, log)458 except Exception as identifier:459 log.debug(460 f"exception on deleting boot diagnostic container: {identifier}"461 )462 delete_operation: Any = None463 try:464 delete_operation = self._rm_client.resource_groups.begin_delete(465 resource_group_name466 )467 except Exception as identifier:468 log.debug(f"exception on delete resource group: {identifier}")469 if delete_operation and self._azure_runbook.wait_delete:470 wait_operation(471 delete_operation, failure_identity="delete resource group"472 )473 else:474 log.debug("not wait deleting")475 def _delete_boot_diagnostic_container(476 self, resource_group_name: str, log: Logger477 ) -> None:478 compute_client = get_compute_client(self)479 vms = compute_client.virtual_machines.list(resource_group_name)480 for vm in vms:481 diagnostic_data = (482 compute_client.virtual_machines.retrieve_boot_diagnostics_data(483 resource_group_name=resource_group_name, vm_name=vm.name484 )485 )486 if not diagnostic_data:487 continue488 # A sample url,489 # https://storageaccountname.blob.core.windows.net:443/490 # bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70/491 # node-0.30779088-9b10-4074-8c27-98b91f1d8b70.serialconsole.log492 # ?sv=2018-03-28&sr=b&sig=mJEsvk9WunbKHfBs1lo1jcIBe4owq1brP8Kw3qXTQJA%3d&493 # se=2021-09-14T08%3a55%3a38Z&sp=r494 blob_uri = diagnostic_data.console_screenshot_blob_uri495 if blob_uri:496 matched = self._diagnostic_storage_container_pattern.match(blob_uri)497 assert matched498 # => storageaccountname499 storage_name = matched.group("storage_name")500 # => bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70501 container_name = matched.group("container_name")502 container_client = get_or_create_storage_container(503 credential=self.credential,504 subscription_id=self.subscription_id,505 account_name=storage_name,506 container_name=container_name,507 resource_group_name=self._azure_runbook.shared_resource_group_name,508 )509 log.debug(510 f"deleting boot diagnostic container: {container_name}"511 f" under storage account {storage_name} of vm {vm.name}"512 )513 try:514 container_client.delete_container()515 except Exception as identifier:516 log.debug(517 f"exception on deleting boot diagnostic container:"518 f" {identifier}"519 )520 def _get_node_information(self, node: Node) -> Dict[str, str]:521 information: Dict[str, Any] = {}522 node.log.debug("detecting lis version...")523 modinfo = node.tools[Modinfo]524 information["lis_version"] = modinfo.get_version("hv_vmbus")525 node.log.debug("detecting vm generation...")526 information[KEY_VM_GENERATION] = node.tools[VmGeneration].get_generation()527 node.log.debug(f"vm generation: {information[KEY_VM_GENERATION]}")528 return information529 def _get_kernel_version(self, node: Node) -> str:530 result: str = ""531 if not result and hasattr(node, ATTRIBUTE_FEATURES):532 # try to get kernel version in Azure. use it, when uname doesn't work533 node.log.debug("detecting kernel version from serial log...")534 serial_console = node.features[features.SerialConsole]535 result = serial_console.get_matched_str(KERNEL_VERSION_PATTERN)536 return result537 def _get_host_version(self, node: Node) -> str:538 result: str = ""539 try:540 if node.is_connected and node.is_posix:541 node.log.debug("detecting host version from dmesg...")542 dmesg = node.tools[Dmesg]543 result = get_matched_str(544 dmesg.get_output(), HOST_VERSION_PATTERN, first_match=False545 )546 except Exception as identifier:547 # it happens on some error vms. Those error should be caught earlier in548 # test cases not here. So ignore any error here to collect information only.549 node.log.debug(f"error on run dmesg: {identifier}")550 # if not get, try again from serial console log.551 # skip if node is not initialized.552 if not result and hasattr(node, ATTRIBUTE_FEATURES):553 node.log.debug("detecting host version from serial log...")554 serial_console = node.features[features.SerialConsole]555 result = serial_console.get_matched_str(HOST_VERSION_PATTERN)556 return result557 def _get_wala_version(self, node: Node) -> str:558 result = ""559 try:560 if node.is_connected and node.is_posix:561 node.log.debug("detecting wala version from waagent...")562 waagent = node.tools[Waagent]563 result = waagent.get_version()564 except Exception as identifier:565 # it happens on some error vms. Those error should be caught earlier in566 # test cases not here. So ignore any error here to collect information only.567 node.log.debug(f"error on run waagent: {identifier}")568 if not result and hasattr(node, ATTRIBUTE_FEATURES):569 node.log.debug("detecting wala agent version from serial log...")570 serial_console = node.features[features.SerialConsole]571 result = serial_console.get_matched_str(WALA_VERSION_PATTERN)572 return result573 def _get_wala_distro_version(self, node: Node) -> str:574 result = "Unknown"575 try:576 if node.is_connected and node.is_posix:577 waagent = node.tools[Waagent]578 result = waagent.get_distro_version()579 except Exception as identifier:580 # it happens on some error vms. Those error should be caught earlier in581 # test cases not here. So ignore any error here to collect information only.582 node.log.debug(f"error on get waagent distro version: {identifier}")583 return result584 def _get_platform_information(self, environment: Environment) -> Dict[str, str]:585 result: Dict[str, str] = {}586 azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(587 AzurePlatformSchema588 )589 result[AZURE_RG_NAME_KEY] = get_environment_context(590 environment591 ).resource_group_name592 if azure_runbook.availability_set_properties:593 for (594 property_name,595 property_value,596 ) in azure_runbook.availability_set_properties.items():597 if property_name in [598 "platformFaultDomainCount",599 "platformUpdateDomainCount",600 ]:601 continue602 if isinstance(property_value, dict):603 for key, value in property_value.items():604 if value:605 result[key] = value606 if azure_runbook.availability_set_tags:607 for key, value in azure_runbook.availability_set_tags.items():608 if value:609 result[key] = value610 if azure_runbook.vm_tags:611 for key, value in azure_runbook.vm_tags.items():612 if value:613 result[key] = value614 return result615 def _get_environment_information(self, environment: Environment) -> Dict[str, str]:616 information: Dict[str, str] = {}617 node_runbook: Optional[AzureNodeSchema] = None618 if environment.nodes:619 node: Optional[Node] = environment.default_node620 else:621 node = None622 if node:623 node_runbook = node.capability.get_extended_runbook(AzureNodeSchema, AZURE)624 for key, method in self._environment_information_hooks.items():625 node.log.debug(f"detecting {key} ...")626 try:627 value = method(node)628 if value:629 information[key] = value630 except Exception as identifier:631 node.log.exception(f"error on get {key}.", exc_info=identifier)632 information.update(self._get_platform_information(environment))633 if node.is_connected and node.is_posix:634 information.update(self._get_node_information(node))635 elif environment.capability and environment.capability.nodes:636 # get deployment information, if failed on preparing phase637 node_space = environment.capability.nodes[0]638 node_runbook = node_space.get_extended_runbook(639 AzureNodeSchema, type_name=AZURE640 )641 if node_runbook:642 information["location"] = node_runbook.location643 information["vmsize"] = node_runbook.vm_size644 information["image"] = node_runbook.get_image_name()645 return information646 def _initialize(self, *args: Any, **kwargs: Any) -> None:647 # set needed environment variables for authentication648 azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(649 AzurePlatformSchema650 )651 assert azure_runbook, "platform runbook cannot be empty"652 self._azure_runbook = azure_runbook653 self.subscription_id = azure_runbook.subscription_id654 self._initialize_credential()655 check_or_create_resource_group(656 self.credential,657 self.subscription_id,658 azure_runbook.shared_resource_group_name,659 RESOURCE_GROUP_LOCATION,660 self._log,661 )662 self._rm_client = get_resource_management_client(663 self.credential, self.subscription_id664 )665 def _initialize_credential(self) -> None:666 azure_runbook = self._azure_runbook667 credential_key = (668 f"{azure_runbook.service_principal_tenant_id}_"669 f"{azure_runbook.service_principal_client_id}"670 )671 credential = self._credentials.get(credential_key, None)672 if not credential:673 # set azure log to warn level only674 logging.getLogger("azure").setLevel(azure_runbook.log_level)675 if azure_runbook.service_principal_tenant_id:676 os.environ[677 "AZURE_TENANT_ID"678 ] = azure_runbook.service_principal_tenant_id679 if azure_runbook.service_principal_client_id:680 os.environ[681 "AZURE_CLIENT_ID"682 ] = azure_runbook.service_principal_client_id683 if azure_runbook.service_principal_key:684 os.environ["AZURE_CLIENT_SECRET"] = azure_runbook.service_principal_key685 credential = DefaultAzureCredential()686 with SubscriptionClient(credential) as self._sub_client:687 # suppress warning message by search for different credential types688 azure_identity_logger = logging.getLogger("azure.identity")689 azure_identity_logger.setLevel(logging.ERROR)690 with global_credential_access_lock:691 subscription = self._sub_client.subscriptions.get(692 self.subscription_id693 )694 azure_identity_logger.setLevel(logging.WARN)695 if not subscription:696 raise LisaException(697 f"Cannot find subscription id: '{self.subscription_id}'. "698 f"Make sure it exists and current account can access it."699 )700 self._log.info(701 f"connected to subscription: "702 f"{subscription.id}, '{subscription.display_name}'"703 )704 self._credentials[credential_key] = credential705 self.credential = credential706 def _load_template(self) -> Any:707 if self._arm_template is None:708 template_file_path = Path(__file__).parent / "arm_template.json"709 with open(template_file_path, "r") as f:710 self._arm_template = json.load(f)711 return self._arm_template712 @retry(tries=10, delay=1, jitter=(0.5, 1))713 def _load_location_info_from_file(714 self, cached_file_name: Path, log: Logger715 ) -> Optional[AzureLocation]:716 loaded_obj: Optional[AzureLocation] = None717 if cached_file_name.exists():718 try:719 with open(cached_file_name, "r") as f:720 loaded_data: Dict[str, Any] = json.load(f)721 loaded_obj = schema.load_by_type(AzureLocation, loaded_data)722 except Exception as identifier:723 # if schema changed, There may be exception, remove cache and retry724 # Note: retry on this method depends on decorator725 log.debug(726 f"error on loading cache, delete cache and retry. {identifier}"727 )728 cached_file_name.unlink()729 raise identifier730 return loaded_obj731 def get_location_info(self, location: str, log: Logger) -> AzureLocation:732 cached_file_name = constants.CACHE_PATH.joinpath(733 f"azure_locations_{location}.json"734 )735 should_refresh: bool = True736 key = self._get_location_key(location)737 location_data = self._locations_data_cache.get(key, None)738 if not location_data:739 location_data = self._load_location_info_from_file(740 cached_file_name=cached_file_name, log=log741 )742 if location_data:743 delta = datetime.now() - location_data.updated_time744 # refresh cached locations every 1 day.745 if delta.days < 1:746 should_refresh = False747 else:748 log.debug(749 f"{key}: cache timeout: {location_data.updated_time},"750 f"sku count: {len(location_data.capabilities)}"751 )752 else:753 log.debug(f"{key}: no cache found")754 if should_refresh:755 compute_client = get_compute_client(self)756 log.debug(f"{key}: querying")757 all_skus: Dict[str, AzureCapability] = dict()758 paged_skus = compute_client.resource_skus.list(759 f"location eq '{location}'"760 ).by_page()761 for skus in paged_skus:762 for sku_obj in skus:763 try:764 if sku_obj.resource_type == "virtualMachines":765 if sku_obj.restrictions and any(766 restriction.type == "Location"767 for restriction in sku_obj.restrictions768 ):769 # restricted on this location770 continue771 resource_sku = sku_obj.as_dict()772 capability = self._resource_sku_to_capability(773 location, sku_obj774 )775 # estimate vm cost for priority776 assert isinstance(capability.core_count, int)777 assert isinstance(capability.gpu_count, int)778 azure_capability = AzureCapability(779 location=location,780 vm_size=sku_obj.name,781 capability=capability,782 resource_sku=resource_sku,783 )784 all_skus[azure_capability.vm_size] = azure_capability785 except Exception as identifier:786 log.error(f"unknown sku: {sku_obj}")787 raise identifier788 location_data = AzureLocation(location=location, capabilities=all_skus)789 log.debug(f"{location}: saving to disk")790 with open(cached_file_name, "w") as f:791 json.dump(location_data.to_dict(), f) # type: ignore792 log.debug(f"{key}: new data, " f"sku: {len(location_data.capabilities)}")793 assert location_data794 self._locations_data_cache[key] = location_data795 return location_data796 def _create_deployment_parameters(797 self, resource_group_name: str, environment: Environment, log: Logger798 ) -> Tuple[str, Dict[str, Any]]:799 assert environment.runbook, "env data cannot be None"800 assert environment.runbook.nodes_requirement, "node requirement cannot be None"801 log.debug("creating deployment")802 # construct parameters803 arm_parameters = AzureArmParameter()804 copied_fields = [805 "availability_set_tags",806 "availability_set_properties",807 "vm_tags",808 ]809 set_filtered_fields(self._azure_runbook, arm_parameters, copied_fields)810 is_windows: bool = False811 arm_parameters.admin_username = self.runbook.admin_username812 if self.runbook.admin_private_key_file:813 arm_parameters.admin_key_data = get_public_key_data(814 self.runbook.admin_private_key_file815 )816 else:817 arm_parameters.admin_password = self.runbook.admin_password818 environment_context = get_environment_context(environment=environment)819 arm_parameters.vm_tags["RG"] = environment_context.resource_group_name820 # get local lisa environment821 arm_parameters.vm_tags["lisa_username"] = local().tools[Whoami].get_username()822 arm_parameters.vm_tags["lisa_hostname"] = local().tools[Hostname].get_hostname()823 nodes_parameters: List[AzureNodeArmParameter] = []824 features_settings: Dict[str, schema.FeatureSettings] = {}825 for node_space in environment.runbook.nodes_requirement:826 assert isinstance(827 node_space, schema.NodeSpace828 ), f"actual: {type(node_space)}"829 azure_node_runbook = node_space.get_extended_runbook(830 AzureNodeSchema, type_name=AZURE831 )832 # Subscription Id is used by Shared Gallery images located833 # in subscription different from where LISA is run834 azure_node_runbook.subscription_id = self.subscription_id835 # init node836 node = environment.create_node_from_requirement(837 node_space,838 )839 azure_node_runbook = self._create_node_runbook(840 len(nodes_parameters), node_space, log, resource_group_name841 )842 # save parsed runbook back, for example, the version of marketplace may be843 # parsed from latest to a specified version.844 node.capability.set_extended_runbook(azure_node_runbook)845 node_arm_parameters = self._create_node_arm_parameters(node.capability, log)846 nodes_parameters.append(node_arm_parameters)847 # Set data disk array848 arm_parameters.data_disks = self._generate_data_disks(849 node, node_arm_parameters850 )851 if not arm_parameters.location:852 # take first one's location853 arm_parameters.location = azure_node_runbook.location854 # save vm's information into node855 node_context = get_node_context(node)856 node_context.resource_group_name = environment_context.resource_group_name857 # vm's name, use to find it from azure858 node_context.vm_name = azure_node_runbook.name859 # ssh related information will be filled back once vm is created. If860 # it's Windows, fill in the password always. If it's Linux, the861 # private key has higher priority.862 node_context.username = arm_parameters.admin_username863 if azure_node_runbook.is_linux:864 node_context.password = arm_parameters.admin_password865 else:866 is_windows = True867 if not self.runbook.admin_password:868 # password is required, if it doesn't present, generate one.869 password = generate_random_chars()870 add_secret(password)871 self.runbook.admin_password = password872 node_context.password = self.runbook.admin_password873 node_context.private_key_file = self.runbook.admin_private_key_file874 # collect all features to handle special deployment logic. If one875 # node has this, it needs to run.876 if node.capability.features:877 for f in node.capability.features:878 if f.type not in features_settings:879 features_settings[f.type] = f880 log.info(f"vm setting: {azure_node_runbook}")881 if is_windows:882 # set password for windows any time.883 arm_parameters.admin_password = self.runbook.admin_password884 arm_parameters.nodes = nodes_parameters885 arm_parameters.storage_name = get_storage_account_name(886 self.subscription_id, arm_parameters.location887 )888 if (889 self._azure_runbook.availability_set_properties890 or self._azure_runbook.availability_set_tags891 ):892 arm_parameters.use_availability_sets = True893 # In Azure, each VM should have only one nic in one subnet. So calculate894 # the max nic count, and set to subnet count.895 arm_parameters.subnet_count = max(x.nic_count for x in arm_parameters.nodes)896 arm_parameters.shared_resource_group_name = (897 self._azure_runbook.shared_resource_group_name898 )899 # the arm template may be updated by the hooks, so make a copy to avoid900 # the original template is modified.901 template = deepcopy(self._load_template())902 plugin_manager.hook.azure_update_arm_template(903 template=template, environment=environment904 )905 # change deployment for each feature.906 for f in features_settings.values():907 feature_type = next(908 x for x in self.supported_features() if x.name() == f.type909 )910 feature_type.on_before_deployment(911 arm_parameters=arm_parameters,912 template=template,913 settings=f,914 environment=environment,915 log=log,916 )917 # composite deployment properties918 parameters = arm_parameters.to_dict() # type:ignore919 parameters = {k: {"value": v} for k, v in parameters.items()}920 log.debug(f"parameters: {parameters}")921 deployment_properties = DeploymentProperties(922 mode=DeploymentMode.incremental,923 template=template,924 parameters=parameters,925 )926 # dump arm_template and arm_parameters to file927 template_dump_path = environment.log_path / "arm_template.json"928 param_dump_path = environment.log_path / "arm_template_parameters.json"929 dump_file(template_dump_path, json.dumps(template, indent=4))930 dump_file(param_dump_path, json.dumps(parameters, indent=4))931 return (932 arm_parameters.location,933 {934 AZURE_RG_NAME_KEY: resource_group_name,935 "deployment_name": AZURE_DEPLOYMENT_NAME,936 "parameters": Deployment(properties=deployment_properties),937 },938 )939 def _create_node_runbook(940 self,941 index: int,942 node_space: schema.NodeSpace,943 log: Logger,944 name_prefix: str,945 ) -> AzureNodeSchema:946 azure_node_runbook = node_space.get_extended_runbook(947 AzureNodeSchema, type_name=AZURE948 )949 if not azure_node_runbook.name:950 # the max length of vm name is 64 chars. Below logic takes last 45951 # chars in resource group name and keep the leading 5 chars.952 # name_prefix can contain any of customized (existing) or953 # generated (starts with "lisa-") resource group name,954 # so, pass the first 5 chars as prefix to truncate_keep_prefix955 # to handle both cases956 node_name = f"{name_prefix}-n{index}"957 azure_node_runbook.name = truncate_keep_prefix(node_name, 50, node_name[:5])958 # It's used as computer name only. Windows doesn't support name more959 # than 15 chars960 azure_node_runbook.short_name = truncate_keep_prefix(961 azure_node_runbook.name, 15, azure_node_runbook.name[:5]962 )963 if not azure_node_runbook.vm_size:964 raise LisaException("vm_size is not detected before deploy")965 if not azure_node_runbook.location:966 raise LisaException("location is not detected before deploy")967 if azure_node_runbook.hyperv_generation not in [1, 2]:968 raise LisaException(969 "hyperv_generation need value 1 or 2, "970 f"but {azure_node_runbook.hyperv_generation}",971 )972 if azure_node_runbook.vhd:973 # vhd is higher priority974 azure_node_runbook.vhd = self._get_deployable_vhd_path(975 azure_node_runbook.vhd, azure_node_runbook.location, log976 )977 azure_node_runbook.marketplace = None978 azure_node_runbook.shared_gallery = None979 elif azure_node_runbook.shared_gallery:980 azure_node_runbook.marketplace = None981 azure_node_runbook.shared_gallery = self._parse_shared_gallery_image(982 azure_node_runbook.location, azure_node_runbook.shared_gallery983 )984 elif not azure_node_runbook.marketplace:985 # set to default marketplace, if nothing specified986 azure_node_runbook.marketplace = AzureVmMarketplaceSchema()987 else:988 # marketplace value is already set in runbook989 ...990 if azure_node_runbook.marketplace:991 # resolve Latest to specified version992 azure_node_runbook.marketplace = self._resolve_marketplace_image(993 azure_node_runbook.location, azure_node_runbook.marketplace994 )995 image_info = self._get_image_info(996 azure_node_runbook.location, azure_node_runbook.marketplace997 )998 # HyperVGenerationTypes return "V1"/"V2", so we need to strip "V"999 if image_info.hyper_v_generation:1000 azure_node_runbook.hyperv_generation = int(1001 image_info.hyper_v_generation.strip("V")1002 )1003 # retrieve the os type for arm template.1004 if azure_node_runbook.is_linux is None:1005 if image_info.os_disk_image.operating_system == "Windows":1006 azure_node_runbook.is_linux = False1007 else:1008 azure_node_runbook.is_linux = True1009 if azure_node_runbook.is_linux is None:1010 # fill it default value1011 azure_node_runbook.is_linux = True1012 return azure_node_runbook1013 def _create_node_arm_parameters(1014 self, capability: schema.Capability, log: Logger1015 ) -> AzureNodeArmParameter:1016 runbook = capability.get_extended_runbook(AzureNodeSchema, type_name=AZURE)1017 arm_parameters = AzureNodeArmParameter.from_node_runbook(runbook)1018 os_disk_size = 301019 if arm_parameters.vhd:1020 # vhd is higher priority1021 arm_parameters.vhd = self._get_deployable_vhd_path(1022 arm_parameters.vhd, arm_parameters.location, log1023 )1024 os_disk_size = max(1025 os_disk_size, self._get_vhd_os_disk_size(arm_parameters.vhd)1026 )1027 elif arm_parameters.shared_gallery:1028 os_disk_size = max(1029 os_disk_size,1030 self._get_sig_os_disk_size(arm_parameters.shared_gallery),1031 )1032 else:1033 assert (1034 arm_parameters.marketplace1035 ), "not set one of marketplace, shared_gallery or vhd."1036 image_info = self._get_image_info(1037 arm_parameters.location, arm_parameters.marketplace1038 )1039 os_disk_size = max(1040 os_disk_size, image_info.os_disk_image.additional_properties["sizeInGb"]1041 )1042 if not arm_parameters.purchase_plan and image_info.plan:1043 # expand values for lru cache1044 plan_name = image_info.plan.name1045 plan_product = image_info.plan.product1046 plan_publisher = image_info.plan.publisher1047 # accept the default purchase plan automatically.1048 arm_parameters.purchase_plan = self._process_marketplace_image_plan(1049 marketplace=arm_parameters.marketplace,1050 plan_name=plan_name,1051 plan_product=plan_product,1052 plan_publisher=plan_publisher,1053 )1054 arm_parameters.osdisk_size_in_gb = os_disk_size1055 # Set disk type1056 assert capability.disk, "node space must have disk defined."1057 assert isinstance(capability.disk.disk_type, schema.DiskType)1058 arm_parameters.disk_type = features.get_azure_disk_type(1059 capability.disk.disk_type1060 )1061 assert capability.network_interface1062 assert isinstance(1063 capability.network_interface.nic_count, int1064 ), f"actual: {capability.network_interface.nic_count}"1065 arm_parameters.nic_count = capability.network_interface.nic_count1066 assert isinstance(1067 capability.network_interface.data_path, schema.NetworkDataPath1068 ), f"actual: {type(capability.network_interface.data_path)}"1069 if capability.network_interface.data_path == schema.NetworkDataPath.Sriov:1070 arm_parameters.enable_sriov = True1071 return arm_parameters1072 def _validate_template(1073 self, deployment_parameters: Dict[str, Any], log: Logger1074 ) -> None:1075 log.debug("validating deployment")1076 validate_operation: Any = None1077 try:1078 with global_credential_access_lock:1079 validate_operation = self._rm_client.deployments.begin_validate(1080 **deployment_parameters1081 )1082 wait_operation(validate_operation, failure_identity="validation")1083 except Exception as identifier:1084 error_messages: List[str] = [str(identifier)]1085 if isinstance(identifier, HttpResponseError) and identifier.error:1086 # no validate_operation returned, the message may include1087 # some errors, so check details1088 error_messages = self._parse_detail_errors(identifier.error)1089 error_message = "\n".join(error_messages)1090 plugin_manager.hook.azure_deploy_failed(error_message=error_message)1091 raise LisaException(error_message)1092 def _deploy(1093 self, location: str, deployment_parameters: Dict[str, Any], log: Logger1094 ) -> None:1095 resource_group_name = deployment_parameters[AZURE_RG_NAME_KEY]1096 storage_account_name = get_storage_account_name(self.subscription_id, location)1097 check_or_create_storage_account(1098 self.credential,1099 self.subscription_id,1100 storage_account_name,1101 self._azure_runbook.shared_resource_group_name,1102 location,1103 log,1104 )1105 log.info(f"resource group '{resource_group_name}' deployment is in progress...")1106 deployment_operation: Any = None1107 deployments = self._rm_client.deployments1108 try:1109 deployment_operation = deployments.begin_create_or_update(1110 **deployment_parameters1111 )1112 wait_operation(deployment_operation, failure_identity="deploy")1113 except HttpResponseError as identifier:1114 # Some errors happens underlying, so there is no detail errors from API.1115 # For example,1116 # azure.core.exceptions.HttpResponseError:1117 # Operation returned an invalid status 'OK'1118 assert identifier.error, f"HttpResponseError: {identifier}"1119 error_message = "\n".join(self._parse_detail_errors(identifier.error))1120 if (1121 self._azure_runbook.ignore_provisioning_error1122 and "OSProvisioningTimedOut: OS Provisioning for VM" in error_message1123 ):1124 # Provisioning timeout causes by waagent is not ready.1125 # In smoke test, it still can verify some information.1126 # Eat information here, to run test case any way.1127 #1128 # It may cause other cases fail on assumptions. In this case, we can1129 # define a flag in config, to mark this exception is ignorable or not.1130 log.error(1131 f"provisioning time out, try to run case. "1132 f"Exception: {error_message}"1133 )1134 elif self._azure_runbook.ignore_provisioning_error and get_matched_str(1135 error_message, AZURE_INTERNAL_ERROR_PATTERN1136 ):1137 # Similar situation with OSProvisioningTimedOut1138 # Some OSProvisioningInternalError caused by it doesn't support1139 # SSH key authentication1140 # e.g. hpe hpestoreoncevsa hpestoreoncevsa-3187 3.18.71141 # After passthrough this exception,1142 # actually the 22 port of this VM is open.1143 log.error(1144 f"provisioning failed for an internal error, try to run case. "1145 f"Exception: {error_message}"1146 )1147 else:1148 plugin_manager.hook.azure_deploy_failed(error_message=error_message)1149 raise LisaException(error_message)1150 def _parse_detail_errors(self, error: Any) -> List[str]:1151 # original message may be a summary, get lowest level details.1152 if hasattr(error, "details") and error.details:1153 errors: List[str] = []1154 for detail in error.details:1155 errors.extend(self._parse_detail_errors(detail))1156 else:1157 try:1158 # it returns serialized json string in message sometime1159 parsed_error = json.loads(1160 error.message, object_hook=lambda x: SimpleNamespace(**x)1161 )1162 errors = self._parse_detail_errors(parsed_error.error)1163 except Exception:1164 # load failed, it should be a real error message string1165 errors = [f"{error.code}: {error.message}"]1166 return errors1167 # the VM may not be queried after deployed. use retry to mitigate it.1168 @retry(exceptions=LisaException, tries=150, delay=2)1169 def _load_vms(1170 self, environment: Environment, log: Logger1171 ) -> Dict[str, VirtualMachine]:1172 compute_client = get_compute_client(self, api_version="2020-06-01")1173 environment_context = get_environment_context(environment=environment)1174 log.debug(1175 f"listing vm in resource group "1176 f"'{environment_context.resource_group_name}'"1177 )1178 vms_map: Dict[str, VirtualMachine] = {}1179 vms = compute_client.virtual_machines.list(1180 environment_context.resource_group_name1181 )1182 for vm in vms:1183 vms_map[vm.name] = vm1184 log.debug(f" found vm {vm.name}")1185 if not vms_map:1186 raise LisaException(1187 f"deployment succeeded, but VM not found in 5 minutes "1188 f"from '{environment_context.resource_group_name}'"1189 )1190 return vms_map1191 # Use Exception, because there may be credential conflict error. Make it1192 # retriable.1193 @retry(exceptions=Exception, tries=150, delay=2)1194 def _load_nics(1195 self, environment: Environment, log: Logger1196 ) -> Dict[str, NetworkInterface]:1197 network_client = get_network_client(self)1198 environment_context = get_environment_context(environment=environment)1199 log.debug(1200 f"listing network interfaces in resource group "1201 f"'{environment_context.resource_group_name}'"1202 )1203 # load nics1204 nics_map: Dict[str, NetworkInterface] = {}1205 network_interfaces = network_client.network_interfaces.list(1206 environment_context.resource_group_name1207 )1208 for nic in network_interfaces:1209 # nic name is like lisa-test-20220316-182126-985-e0-n0-nic-2, get vm1210 # name part for later pick only find primary nic, which is ended by1211 # -nic-01212 node_name_from_nic = RESOURCE_ID_NIC_PATTERN.findall(nic.name)1213 if node_name_from_nic:1214 name = node_name_from_nic[0]1215 nics_map[name] = nic1216 log.debug(f" found nic '{nic.name}', and saved for next step.")1217 else:1218 log.debug(1219 f" found nic '{nic.name}', but dropped, "1220 "because it's not primary nic."1221 )1222 if not nics_map:1223 raise LisaException(1224 f"deployment succeeded, but network interfaces not found in 5 minutes "1225 f"from '{environment_context.resource_group_name}'"1226 )1227 return nics_map1228 @retry(exceptions=LisaException, tries=150, delay=2)1229 def load_public_ips_from_resource_group(1230 self, resource_group_name: str, log: Logger1231 ) -> Dict[str, str]:1232 network_client = get_network_client(self)1233 log.debug(f"listing public ips in resource group '{resource_group_name}'")1234 # get public IP1235 public_ip_addresses = network_client.public_ip_addresses.list(1236 resource_group_name1237 )1238 public_ips_map: Dict[str, str] = {}1239 for ip_address in public_ip_addresses:1240 # nic name is like node-0-nic-2, get vm name part for later pick1241 # only find primary nic, which is ended by -nic-01242 node_name_from_public_ip = RESOURCE_ID_PUBLIC_IP_PATTERN.findall(1243 ip_address.name1244 )1245 assert (1246 ip_address1247 ), f"public IP address cannot be empty, ip_address object: {ip_address}"1248 if node_name_from_public_ip:1249 name = node_name_from_public_ip[0]1250 public_ips_map[name] = ip_address.ip_address1251 log.debug(1252 f" found public IP '{ip_address.name}', and saved for next step."1253 )1254 else:1255 log.debug(1256 f" found public IP '{ip_address.name}', but dropped "1257 "because it's not primary nic."1258 )1259 if not public_ips_map:1260 raise LisaException(1261 f"deployment succeeded, but public ips not found in 5 minutes "1262 f"from '{resource_group_name}'"1263 )1264 return public_ips_map1265 def initialize_environment(self, environment: Environment, log: Logger) -> None:1266 node_context_map: Dict[str, Node] = {}1267 for node in environment.nodes.list():1268 node_context = get_node_context(node)1269 node_context_map[node_context.vm_name] = node1270 vms_map: Dict[str, VirtualMachine] = self._load_vms(environment, log)1271 nics_map: Dict[str, NetworkInterface] = self._load_nics(environment, log)1272 environment_context = get_environment_context(environment=environment)1273 public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1274 environment_context.resource_group_name, log1275 )1276 for vm_name, node in node_context_map.items():1277 node_context = get_node_context(node)1278 vm = vms_map.get(vm_name, None)1279 if not vm:1280 raise LisaException(1281 f"cannot find vm: '{vm_name}', make sure deployment is correct."1282 )1283 nic = nics_map[vm_name]1284 public_ip = public_ips_map[vm_name]1285 address = nic.ip_configurations[0].private_ip_address1286 if not node.name:1287 node.name = vm_name1288 assert isinstance(node, RemoteNode)1289 node.set_connection_info(1290 address=address,1291 port=22,1292 public_address=public_ip,1293 public_port=22,1294 username=node_context.username,1295 password=node_context.password,1296 private_key_file=node_context.private_key_file,1297 )1298 # enable ssh for windows, if it's not Windows, or SSH reachable, it will1299 # skip.1300 run_in_parallel(1301 [1302 partial(self._enable_ssh_on_windows, node=x)1303 for x in environment.nodes.list()1304 ]1305 )1306 def _resource_sku_to_capability( # noqa: C9011307 self, location: str, resource_sku: ResourceSku1308 ) -> schema.NodeSpace:1309 # fill in default values, in case no capability meet.1310 node_space = schema.NodeSpace(1311 node_count=1,1312 core_count=0,1313 memory_mb=0,1314 gpu_count=0,1315 )1316 node_space.name = f"{location}_{resource_sku.name}"1317 node_space.features = search_space.SetSpace[schema.FeatureSettings](1318 is_allow_set=True1319 )1320 node_space.disk = features.AzureDiskOptionSettings()1321 node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1322 is_allow_set=True, items=[]1323 )1324 node_space.disk.data_disk_iops = search_space.IntRange(min=0)1325 node_space.disk.data_disk_size = search_space.IntRange(min=0)1326 node_space.network_interface = schema.NetworkInterfaceOptionSettings()1327 node_space.network_interface.data_path = search_space.SetSpace[1328 schema.NetworkDataPath1329 ](is_allow_set=True, items=[])1330 # fill supported features1331 azure_raw_capabilities: Dict[str, str] = {}1332 for sku_capability in resource_sku.capabilities:1333 # prevent to loop in every feature1334 azure_raw_capabilities[sku_capability.name] = sku_capability.value1335 # calculate cpu count. Some vm sizes, like Standard_HC44rs, doesn't have1336 # vCPUsAvailable, so use vCPUs.1337 vcpus_available = int(azure_raw_capabilities.get("vCPUsAvailable", "0"))1338 if vcpus_available:1339 node_space.core_count = vcpus_available1340 else:1341 node_space.core_count = int(azure_raw_capabilities.get("vCPUs", "0"))1342 memory_value = azure_raw_capabilities.get("MemoryGB", None)1343 if memory_value:1344 node_space.memory_mb = int(float(memory_value) * 1024)1345 max_disk_count = azure_raw_capabilities.get("MaxDataDiskCount", None)1346 if max_disk_count:1347 node_space.disk.max_data_disk_count = int(max_disk_count)1348 node_space.disk.data_disk_count = search_space.IntRange(1349 max=node_space.disk.max_data_disk_count1350 )1351 max_nic_count = azure_raw_capabilities.get("MaxNetworkInterfaces", None)1352 if max_nic_count:1353 # set a min value for nic_count work around for an azure python sdk bug1354 # nic_count is 0 when get capability for some sizes e.g. Standard_D8a_v31355 sku_nic_count = int(max_nic_count)1356 if sku_nic_count == 0:1357 sku_nic_count = 11358 node_space.network_interface.nic_count = search_space.IntRange(1359 min=1, max=sku_nic_count1360 )1361 node_space.network_interface.max_nic_count = sku_nic_count1362 premium_io_supported = azure_raw_capabilities.get("PremiumIO", None)1363 if premium_io_supported and eval(premium_io_supported) is True:1364 node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1365 ephemeral_supported = azure_raw_capabilities.get(1366 "EphemeralOSDiskSupported", None1367 )1368 if ephemeral_supported and eval(ephemeral_supported) is True:1369 # Check if CachedDiskBytes is greater than 30GB1370 # We use diffdisk as cache disk for ephemeral OS disk1371 cached_disk_bytes = azure_raw_capabilities.get("CachedDiskBytes", 0)1372 cached_disk_bytes_gb = int(cached_disk_bytes) / 1024 / 1024 / 10241373 if cached_disk_bytes_gb >= 30:1374 node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1375 # set AN1376 an_enabled = azure_raw_capabilities.get("AcceleratedNetworkingEnabled", None)1377 if an_enabled and eval(an_enabled) is True:1378 # refer1379 # https://docs.microsoft.com/en-us/azure/virtual-machines/dcv2-series#configuration1380 # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv2-series1381 # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv3-series1382 # https://docs.microsoft.com/en-us/azure/virtual-machines/nd-series1383 # below VM size families don't support `Accelerated Networking` but1384 # API return `True`, fix this issue temporarily will revert it till1385 # bug fixed.1386 if resource_sku.family not in [1387 "standardDCSv2Family",1388 "standardNCSv2Family",1389 "standardNCSv3Family",1390 "standardNDSFamily",1391 ]:1392 # update data path types if sriov feature is supported1393 node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1394 # for some new sizes, there is no MaxNetworkInterfaces capability1395 # and we have to set a default value for max_nic_count1396 if not node_space.network_interface.max_nic_count:1397 node_space.network_interface.max_nic_count = 11398 # some vm size do not have resource disk present1399 # https://docs.microsoft.com/en-us/azure/virtual-machines/azure-vms-no-temp-disk1400 if resource_sku.family in [1401 "standardDv4Family",1402 "standardDSv4Family",1403 "standardEv4Family",1404 "standardESv4Family",1405 "standardEASv4Family",1406 "standardEASv5Family",1407 "standardESv5Family",1408 "standardEADSv5Family",1409 "standardDASv5Family",1410 "standardDSv5Family",1411 "standardFSv2Family",1412 "standardNCFamily",1413 "standardESv3Family",1414 "standardDPSv5Family",1415 "standardEBSv5Family",1416 "standardEv5Family",1417 ]:1418 node_space.disk.has_resource_disk = False1419 else:1420 node_space.disk.has_resource_disk = True1421 for supported_feature in self.supported_features():1422 if supported_feature.name() in [1423 features.Disk.name(),1424 features.NetworkInterface.name(),1425 ]:1426 # Skip the disk and network interfaces features. They will be1427 # handled by node_space directly.1428 continue1429 feature_setting = supported_feature.create_setting(1430 raw_capabilities=azure_raw_capabilities,1431 resource_sku=resource_sku,1432 node_space=node_space,1433 )1434 if feature_setting:1435 node_space.features.add(feature_setting)1436 node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1437 node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1438 node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1439 return node_space1440 def get_sorted_vm_sizes(1441 self, capabilities: List[AzureCapability], log: Logger1442 ) -> List[AzureCapability]:1443 # sort vm size by predefined pattern1444 sorted_capabilities: List[AzureCapability] = []1445 found_vm_sizes: Set[str] = set()1446 # loop all fall back levels1447 for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:1448 level_capabilities: List[AzureCapability] = []1449 # loop all capabilities1450 for capability in capabilities:1451 vm_size = capability.vm_size1452 if fallback_pattern.match(vm_size) and vm_size not in found_vm_sizes:1453 level_capabilities.append(capability)1454 found_vm_sizes.add(vm_size)1455 # sort by rough cost1456 level_capabilities.sort(key=lambda x: (x.capability.cost))1457 sorted_capabilities.extend(level_capabilities)1458 return sorted_capabilities1459 def load_public_ip(self, node: Node, log: Logger) -> str:1460 node_context = get_node_context(node)1461 vm_name = node_context.vm_name1462 resource_group_name = node_context.resource_group_name1463 public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1464 resource_group_name=resource_group_name, log=self._log1465 )1466 return public_ips_map[vm_name]1467 @lru_cache(maxsize=10) # noqa: B0191468 def _resolve_marketplace_image(1469 self, location: str, marketplace: AzureVmMarketplaceSchema1470 ) -> AzureVmMarketplaceSchema:1471 new_marketplace = copy.copy(marketplace)1472 # latest doesn't work, it needs a specified version.1473 if marketplace.version.lower() == "latest":1474 compute_client = get_compute_client(self)1475 with global_credential_access_lock:1476 versioned_images = compute_client.virtual_machine_images.list(1477 location=location,1478 publisher_name=marketplace.publisher,1479 offer=marketplace.offer,1480 skus=marketplace.sku,1481 )1482 if 0 == len(versioned_images):1483 raise LisaException(1484 f"cannot find any version of image {marketplace.publisher} "1485 f"{marketplace.offer} {marketplace.sku} in {location}"1486 )1487 # any one should be the same to get purchase plan1488 new_marketplace.version = versioned_images[-1].name1489 return new_marketplace1490 def _parse_shared_gallery_image(1491 self, location: str, shared_image: SharedImageGallerySchema1492 ) -> SharedImageGallerySchema:1493 new_shared_image = copy.copy(shared_image)1494 compute_client = get_compute_client(self)1495 if not shared_image.resource_group_name:1496 # /subscriptions/xxxx/resourceGroups/xxxx/providers/Microsoft.Compute/1497 # galleries/xxxx1498 rg_pattern = re.compile(r"resourceGroups/(.*)/providers", re.M)1499 galleries = compute_client.galleries.list()1500 rg_name = ""1501 for gallery in galleries:1502 if gallery.name.lower() == shared_image.image_gallery:1503 rg_name = get_matched_str(gallery.id, rg_pattern)1504 break1505 if not rg_name:1506 raise LisaException(1507 f"not find matched gallery {shared_image.image_gallery}"1508 )1509 new_shared_image.resource_group_name = rg_name1510 if shared_image.image_version.lower() == "latest":1511 gallery_images = (1512 compute_client.gallery_image_versions.list_by_gallery_image(1513 resource_group_name=new_shared_image.resource_group_name,1514 gallery_name=new_shared_image.image_gallery,1515 gallery_image_name=new_shared_image.image_definition,1516 )1517 )1518 image: GalleryImageVersion = None1519 time: Optional[datetime] = None1520 for image in gallery_images:1521 gallery_image = compute_client.gallery_image_versions.get(1522 resource_group_name=new_shared_image.resource_group_name,1523 gallery_name=new_shared_image.image_gallery,1524 gallery_image_name=new_shared_image.image_definition,1525 gallery_image_version_name=image.name,1526 expand="ReplicationStatus",1527 )1528 if not time:1529 time = gallery_image.publishing_profile.published_date1530 if gallery_image.publishing_profile.published_date > time:1531 time = gallery_image.publishing_profile.published_date1532 new_shared_image.image_version = image.name1533 return new_shared_image1534 @lru_cache(maxsize=10) # noqa: B0191535 def _process_marketplace_image_plan(1536 self,1537 marketplace: AzureVmMarketplaceSchema,1538 plan_name: str,1539 plan_product: str,1540 plan_publisher: str,1541 ) -> Optional[PurchasePlan]:1542 """1543 this method to fill plan, if a VM needs it. If don't fill it, the deployment1544 will be failed.1545 1. Get image_info to check if there is a plan.1546 2. If there is a plan, it may need to check and accept terms.1547 """1548 plan: Optional[AzureVmPurchasePlanSchema] = None1549 # if there is a plan, it may need to accept term.1550 marketplace_client = get_marketplace_ordering_client(self)1551 term: Optional[AgreementTerms] = None1552 try:1553 with global_credential_access_lock:1554 term = marketplace_client.marketplace_agreements.get(1555 offer_type="virtualmachine",1556 publisher_id=marketplace.publisher,1557 offer_id=marketplace.offer,1558 plan_id=plan_name,1559 )1560 except Exception as identifier:1561 raise LisaException(f"error on getting marketplace agreement: {identifier}")1562 assert term1563 if term.accepted is False:1564 term.accepted = True1565 marketplace_client.marketplace_agreements.create(1566 offer_type="virtualmachine",1567 publisher_id=marketplace.publisher,1568 offer_id=marketplace.offer,1569 plan_id=plan_name,1570 parameters=term,1571 )1572 plan = AzureVmPurchasePlanSchema(1573 name=plan_name,1574 product=plan_product,1575 publisher=plan_publisher,1576 )1577 return plan1578 def _generate_max_capability(self, vm_size: str, location: str) -> AzureCapability:1579 # some vm size cannot be queried from API, so use default capability to1580 # run with best guess on capability.1581 node_space = schema.NodeSpace(1582 node_count=1,1583 core_count=search_space.IntRange(min=1),1584 memory_mb=search_space.IntRange(min=0),1585 gpu_count=search_space.IntRange(min=0),1586 )1587 node_space.disk = features.AzureDiskOptionSettings()1588 node_space.disk.data_disk_count = search_space.IntRange(min=0)1589 node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1590 is_allow_set=True, items=[]1591 )1592 node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1593 node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1594 node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1595 node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1596 node_space.network_interface = schema.NetworkInterfaceOptionSettings()1597 node_space.network_interface.data_path = search_space.SetSpace[1598 schema.NetworkDataPath1599 ](is_allow_set=True, items=[])1600 node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1601 node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1602 node_space.network_interface.nic_count = search_space.IntRange(min=1)1603 # till now, the max nic number supported in Azure is 81604 node_space.network_interface.max_nic_count = 81605 azure_capability = AzureCapability(1606 location=location,1607 vm_size=vm_size,1608 capability=node_space,1609 resource_sku={},1610 )1611 node_space.name = f"{location}_{vm_size}"1612 node_space.features = search_space.SetSpace[schema.FeatureSettings](1613 is_allow_set=True1614 )1615 # all nodes support following features1616 all_features = self.supported_features()1617 node_space.features.update(1618 [schema.FeatureSettings.create(x.name()) for x in all_features]1619 )1620 _convert_to_azure_node_space(node_space)1621 return azure_capability1622 def _generate_min_capability(1623 self,1624 requirement: schema.NodeSpace,1625 azure_capability: AzureCapability,1626 location: str,1627 ) -> schema.NodeSpace:1628 min_cap: schema.NodeSpace = requirement.generate_min_capability(1629 azure_capability.capability1630 )1631 # Apply azure specified values. They will pass into arm template1632 azure_node_runbook = min_cap.get_extended_runbook(AzureNodeSchema, AZURE)1633 if azure_node_runbook.location:1634 assert location in azure_node_runbook.location, (1635 f"predefined location [{azure_node_runbook.location}] "1636 f"must be same as "1637 f"cap location [{location}]"1638 )1639 # the location may not be set1640 azure_node_runbook.location = location1641 azure_node_runbook.vm_size = azure_capability.vm_size1642 return min_cap1643 def _generate_sas_token(self, result_dict: Dict[str, str]) -> Any:1644 sc_name = result_dict["account_name"]1645 container_name = result_dict["container_name"]1646 rg = result_dict["resource_group_name"]1647 blob_name = result_dict["blob_name"]1648 source_container_client = get_or_create_storage_container(1649 credential=self.credential,1650 subscription_id=self.subscription_id,1651 account_name=sc_name,1652 container_name=container_name,1653 resource_group_name=rg,1654 )1655 source_blob = source_container_client.get_blob_client(blob_name)1656 sas_token = generate_sas_token(1657 credential=self.credential,1658 subscription_id=self.subscription_id,1659 account_name=sc_name,1660 resource_group_name=rg,1661 )1662 source_url = source_blob.url + "?" + sas_token1663 return source_url1664 @lru_cache(maxsize=10) # noqa: B0191665 def _get_deployable_vhd_path(1666 self, vhd_path: str, location: str, log: Logger1667 ) -> str:1668 """1669 The sas url is not able to create a vm directly, so this method check if1670 the vhd_path is a sas url. If so, copy it to a location in current1671 subscription, so it can be deployed.1672 """1673 matches = SAS_URL_PATTERN.match(vhd_path)1674 if not matches:1675 vhd_details = self._get_vhd_details(vhd_path)1676 vhd_location = vhd_details["location"]1677 if location == vhd_location:1678 return vhd_path1679 else:1680 vhd_path = self._generate_sas_token(vhd_details)1681 matches = SAS_URL_PATTERN.match(vhd_path)1682 assert matches, f"fail to generate sas url for {vhd_path}"1683 log.debug(1684 f"the vhd location {location} is not same with running case "1685 f"location {vhd_location}, generate a sas url for source vhd, "1686 f"it needs to be copied into location {location}."1687 )1688 else:1689 log.debug("found the vhd is a sas url, it may need to be copied.")1690 # get original vhd's hash key for comparing.1691 original_key: Optional[bytearray] = None1692 original_vhd_path = vhd_path1693 original_blob_client = BlobClient.from_blob_url(original_vhd_path)1694 properties = original_blob_client.get_blob_properties()1695 if properties.content_settings:1696 original_key = properties.content_settings.get(1697 "content_md5", None1698 ) # type: ignore1699 storage_name = get_storage_account_name(1700 subscription_id=self.subscription_id, location=location, type="t"1701 )1702 check_or_create_storage_account(1703 self.credential,1704 self.subscription_id,1705 storage_name,1706 self._azure_runbook.shared_resource_group_name,1707 location,1708 log,1709 )1710 container_client = get_or_create_storage_container(1711 credential=self.credential,1712 subscription_id=self.subscription_id,1713 account_name=storage_name,1714 container_name=SAS_COPIED_CONTAINER_NAME,1715 resource_group_name=self._azure_runbook.shared_resource_group_name,1716 )1717 normalized_vhd_name = constants.NORMALIZE_PATTERN.sub("-", vhd_path)1718 year = matches["year"] if matches["year"] else "9999"1719 month = matches["month"] if matches["month"] else "01"1720 day = matches["day"] if matches["day"] else "01"1721 # use the expire date to generate the path. It's easy to identify when1722 # the cache can be removed.1723 vhd_path = f"{year}{month}{day}/{normalized_vhd_name}.vhd"1724 full_vhd_path = f"{container_client.url}/{vhd_path}"1725 # lock here to prevent a vhd is copied in multi-thread1726 global _global_sas_vhd_copy_lock1727 cached_key: Optional[bytearray] = None1728 with _global_sas_vhd_copy_lock:1729 blobs = container_client.list_blobs(name_starts_with=vhd_path)1730 for blob in blobs:1731 if blob:1732 # check if hash key matched with original key.1733 if blob.content_settings:1734 cached_key = blob.content_settings.get("content_md5", None)1735 if original_key == cached_key:1736 # if it exists, return the link, not to copy again.1737 log.debug("the sas url is copied already, use it directly.")1738 return full_vhd_path1739 else:1740 log.debug("found cached vhd, but the hash key mismatched.")1741 blob_client = container_client.get_blob_client(vhd_path)1742 blob_client.start_copy_from_url(1743 original_vhd_path, metadata=None, incremental_copy=False1744 )1745 wait_copy_blob(blob_client, vhd_path, log)1746 return full_vhd_path1747 def _get_vhd_details(self, vhd_path: str) -> Any:1748 matched = STORAGE_CONTAINER_BLOB_PATTERN.match(vhd_path)1749 assert matched, f"fail to get matched info from {vhd_path}"1750 sc_name = matched.group("sc")1751 container_name = matched.group("container")1752 blob_name = matched.group("blob")1753 storage_client = get_storage_client(self.credential, self.subscription_id)1754 sc = [x for x in storage_client.storage_accounts.list() if x.name == sc_name]1755 assert sc[1756 01757 ], f"fail to get storage account {sc_name} from {self.subscription_id}"1758 rg = get_matched_str(sc[0].id, RESOURCE_GROUP_PATTERN)1759 return {1760 "location": sc[0].location,1761 "resource_group_name": rg,1762 "account_name": sc_name,1763 "container_name": container_name,1764 "blob_name": blob_name,1765 }1766 def _generate_data_disks(1767 self,1768 node: Node,1769 azure_node_runbook: AzureNodeArmParameter,1770 ) -> List[DataDiskSchema]:1771 data_disks: List[DataDiskSchema] = []1772 assert node.capability.disk1773 if azure_node_runbook.marketplace:1774 marketplace = self._get_image_info(1775 azure_node_runbook.location, azure_node_runbook.marketplace1776 )1777 # some images has data disks by default1778 # e.g. microsoft-ads linux-data-science-vm linuxdsvm 21.05.271779 # we have to inject below part when dataDisks section added in1780 # arm template,1781 # otherwise will see below exception:1782 # deployment failed: InvalidParameter: StorageProfile.dataDisks.lun1783 # does not have required value(s) for image specified in1784 # storage profile.1785 for default_data_disk in marketplace.data_disk_images:1786 data_disks.append(1787 DataDiskSchema(1788 node.capability.disk.data_disk_caching_type,1789 default_data_disk.additional_properties["sizeInGb"],1790 azure_node_runbook.disk_type,1791 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_FROM_IMAGE,1792 )1793 )1794 assert isinstance(1795 node.capability.disk.data_disk_count, int1796 ), f"actual: {type(node.capability.disk.data_disk_count)}"1797 for _ in range(node.capability.disk.data_disk_count):1798 assert isinstance(node.capability.disk.data_disk_size, int)1799 data_disks.append(1800 DataDiskSchema(1801 node.capability.disk.data_disk_caching_type,1802 node.capability.disk.data_disk_size,1803 azure_node_runbook.disk_type,1804 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY,1805 )1806 )1807 return data_disks1808 @lru_cache(maxsize=10) # noqa: B0191809 def _get_image_info(1810 self, location: str, marketplace: Optional[AzureVmMarketplaceSchema]1811 ) -> VirtualMachineImage:1812 compute_client = get_compute_client(self)1813 assert isinstance(marketplace, AzureVmMarketplaceSchema)1814 with global_credential_access_lock:1815 image_info = compute_client.virtual_machine_images.get(1816 location=location,1817 publisher_name=marketplace.publisher,1818 offer=marketplace.offer,1819 skus=marketplace.sku,1820 version=marketplace.version,1821 )1822 return image_info1823 def _get_location_key(self, location: str) -> str:1824 return f"{self.subscription_id}_{location}"1825 def _enable_ssh_on_windows(self, node: Node) -> None:1826 runbook = node.capability.get_extended_runbook(AzureNodeSchema)1827 if runbook.is_linux:1828 return1829 context = get_node_context(node)1830 remote_node = cast(RemoteNode, node)1831 log = node.log1832 log.debug(1833 f"checking if SSH port {remote_node.public_port} is reachable "1834 f"on {remote_node.name}..."1835 )1836 connected, _ = wait_tcp_port_ready(1837 address=remote_node.public_address,1838 port=remote_node.public_port,1839 log=log,1840 timeout=3,1841 )1842 if connected:1843 log.debug("SSH port is reachable.")1844 return1845 log.debug("SSH port is not open, enabling ssh on Windows ...")1846 # The SSH port is not opened, try to enable it.1847 public_key_data = get_public_key_data(self.runbook.admin_private_key_file)1848 with open(Path(__file__).parent / "Enable-SSH.ps1", "r") as f:1849 script = f.read()1850 parameters = RunCommandInputParameter(name="PublicKey", value=public_key_data)1851 command = RunCommandInput(1852 command_id="RunPowerShellScript",1853 script=[script],1854 parameters=[parameters],1855 )1856 compute_client = get_compute_client(self)1857 operation = compute_client.virtual_machines.begin_run_command(1858 resource_group_name=context.resource_group_name,1859 vm_name=context.vm_name,1860 parameters=command,1861 )1862 result = wait_operation(operation=operation, failure_identity="enable ssh")1863 log.debug("SSH script result:")1864 log.dump_json(logging.DEBUG, result)1865 def _get_vhd_os_disk_size(self, blob_url: str) -> int:1866 result_dict = self._get_vhd_details(blob_url)1867 container_client = get_or_create_storage_container(1868 credential=self.credential,1869 subscription_id=self.subscription_id,1870 account_name=result_dict["account_name"],1871 container_name=result_dict["container_name"],1872 resource_group_name=result_dict["resource_group_name"],1873 )1874 vhd_blob = container_client.get_blob_client(result_dict["blob_name"])1875 properties = vhd_blob.get_blob_properties()1876 assert properties.size, f"fail to get blob size of {blob_url}"1877 # Azure requires only megabyte alignment of vhds, round size up1878 # for cases where the size is megabyte aligned1879 return math.ceil(properties.size / 1024 / 1024 / 1024)1880 def _get_sig_info(1881 self, shared_image: SharedImageGallerySchema1882 ) -> GalleryImageVersion:1883 compute_client = get_compute_client(self)1884 return compute_client.gallery_image_versions.get(1885 resource_group_name=shared_image.resource_group_name,1886 gallery_name=shared_image.image_gallery,1887 gallery_image_name=shared_image.image_definition,1888 gallery_image_version_name=shared_image.image_version,1889 expand="ReplicationStatus",1890 )1891 def _get_sig_os_disk_size(self, shared_image: SharedImageGallerySchema) -> int:1892 found_image = self._get_sig_info(shared_image)1893 assert found_image.storage_profile.os_disk_image.size_in_gb1894 return int(found_image.storage_profile.os_disk_image.size_in_gb)1895 def _get_normalized_vm_sizes(1896 self, name: str, location: str, log: Logger1897 ) -> List[str]:1898 split_vm_sizes: List[str] = [x.strip() for x in name.split(",")]1899 for index, vm_size in enumerate(split_vm_sizes):1900 split_vm_sizes[index] = self._get_normalized_vm_size(vm_size, location, log)1901 return [x for x in split_vm_sizes if x]1902 def _get_normalized_vm_size(self, name: str, location: str, log: Logger) -> str:1903 # find predefined vm size on all available's.1904 location_info: AzureLocation = self.get_location_info(location, log)1905 matched_score: float = 01906 matched_name: str = ""1907 matcher = SequenceMatcher(None, name.lower(), "")1908 for vm_size in location_info.capabilities:1909 matcher.set_seq2(vm_size.lower())1910 if name.lower() in vm_size.lower() and matched_score < matcher.ratio():1911 matched_name = vm_size1912 matched_score = matcher.ratio()1913 return matched_name1914 def _get_capabilities(1915 self, vm_sizes: List[str], location: str, use_max_capability: bool, log: Logger1916 ) -> List[AzureCapability]:1917 candidate_caps: List[AzureCapability] = []1918 caps = self.get_location_info(location, log).capabilities1919 for vm_size in vm_sizes:1920 # force to use max capability to run test cases as much as possible,1921 # or force to support non-exists vm size.1922 if use_max_capability:1923 candidate_caps.append(self._generate_max_capability(vm_size, location))1924 continue1925 if vm_size in caps:1926 candidate_caps.append(caps[vm_size])1927 return candidate_caps1928 def _get_matched_capability(1929 self,1930 requirement: schema.NodeSpace,1931 candidate_capabilities: List[AzureCapability],1932 ) -> Optional[schema.NodeSpace]:1933 matched_cap: Optional[schema.NodeSpace] = None1934 # filter allowed vm sizes1935 for azure_cap in candidate_capabilities:1936 check_result = requirement.check(azure_cap.capability)1937 if check_result.result:1938 min_cap = self._generate_min_capability(1939 requirement, azure_cap, azure_cap.location1940 )1941 matched_cap = min_cap1942 break1943 return matched_cap1944 def _get_matched_capabilities(1945 self, location: str, nodes_requirement: List[schema.NodeSpace], log: Logger1946 ) -> Tuple[List[Union[schema.NodeSpace, bool]], str]:1947 # capability or if it's able to wait.1948 caps: List[Union[schema.NodeSpace, bool]] = [False] * len(nodes_requirement)1949 # one of errors for all requirements. It's enough for troubleshooting.1950 error: str = ""1951 # get allowed vm sizes. Either it's from the runbook defined, or1952 # from subscription supported .1953 for req_index, req in enumerate(nodes_requirement):1954 candidate_caps, sub_error = self._get_allowed_capabilities(1955 req, location, log1956 )1957 if sub_error:1958 # no candidate found, so try next one.1959 error = sub_error1960 continue1961 # filter vm sizes and return two list. 1st is deployable, 2nd is1962 # wait able for released resource.1963 (1964 available_capabilities,1965 awaitable_capabilities,1966 ) = self._parse_cap_availabilities(candidate_caps)1967 # sort vm sizes to match1968 available_capabilities = self.get_sorted_vm_sizes(1969 available_capabilities, log1970 )1971 # match vm sizes by capability or use the predefined vm sizes.1972 candidate_cap = self._get_matched_capability(req, available_capabilities)1973 if candidate_cap:1974 caps[req_index] = candidate_cap1975 else:1976 # the error will be overwritten, if there is vm sizes without1977 # quota.1978 error = f"no available vm size found on '{location}'."1979 if not candidate_cap:1980 # check if there is awaitable VMs1981 candidate_cap = self._get_matched_capability(1982 req, awaitable_capabilities1983 )1984 if candidate_cap:1985 # True means awaitable.1986 caps[req_index] = True1987 error = f"no quota found on '{location}'"1988 return caps, error1989 def _get_allowed_capabilities(1990 self, req: schema.NodeSpace, location: str, log: Logger1991 ) -> Tuple[List[AzureCapability], str]:1992 node_runbook = req.get_extended_runbook(AzureNodeSchema, AZURE)1993 error: str = ""1994 if node_runbook.vm_size:1995 # find the vm_size1996 allowed_vm_sizes = self._get_normalized_vm_sizes(1997 name=node_runbook.vm_size, location=location, log=log1998 )1999 # Some preview vm size may not be queried from the list.2000 # Force to add.2001 if not allowed_vm_sizes:2002 log.debug(2003 f"no vm size matched '{node_runbook.vm_size}' on location "2004 f"'{location}', using the raw string as vm size name."2005 )2006 allowed_vm_sizes = [node_runbook.vm_size]2007 else:2008 location_info = self.get_location_info(location, log)2009 allowed_vm_sizes = [key for key, _ in location_info.capabilities.items()]2010 # build the capability of vm sizes. The information is useful to2011 # check quota.2012 allowed_capabilities = self._get_capabilities(2013 vm_sizes=allowed_vm_sizes,2014 location=location,2015 use_max_capability=node_runbook.maximize_capability,2016 log=log,2017 )2018 if not allowed_capabilities:2019 error = f"no vm size found in '{location}' for {allowed_vm_sizes}."2020 return allowed_capabilities, error2021 def _parse_cap_availabilities(2022 self, capabilities: List[AzureCapability]2023 ) -> Tuple[List[AzureCapability], List[AzureCapability]]:2024 available_capabilities: List[AzureCapability] = []2025 awaitable_capabilities: List[AzureCapability] = []2026 if not capabilities:2027 return ([], [])2028 # skip because it needs call azure API.2029 if is_unittest():2030 return (capabilities, [])2031 # assume all vm sizes are in the same location.2032 location = capabilities[0].location2033 quotas = self._get_quotas(location=location)2034 for capability in capabilities:2035 quota = quotas.get(capability.vm_size, None)2036 if quota:2037 remaining, limit = quota2038 if limit == 0:2039 # no quota, doesn't need to wait2040 continue2041 if remaining > 0:2042 available_capabilities.append(capability)2043 else:2044 awaitable_capabilities.append(capability)2045 else:2046 # not trackable vm size, assume the capability is enough.2047 available_capabilities.append(capability)2048 return (available_capabilities, awaitable_capabilities)2049 def _analyze_environment_availability(2050 self, location: str, capabilities: List[Union[schema.NodeSpace, bool]]2051 ) -> None:2052 # Check if sum of the same capabilities over the cap. If so, mark2053 # the overflow cap as True.2054 if all(isinstance(x, schema.NodeSpace) for x in capabilities):2055 cap_calculator: Dict[str, Tuple[int, int]] = {}2056 for index, cap in enumerate(capabilities):2057 assert isinstance(cap, schema.NodeSpace), f"actual: {type(cap)}"2058 azure_runbook = cap.get_extended_runbook(AzureNodeSchema, AZURE)2059 vm_size = azure_runbook.vm_size2060 if vm_size not in cap_calculator:2061 cap_calculator[vm_size] = self._get_usage(location, vm_size)2062 remaining, limit = cap_calculator[vm_size]2063 remaining -= 12064 cap_calculator[vm_size] = (remaining, limit)2065 if remaining < 0 and limit > 0:2066 capabilities[index] = True2067 @cached(cache=TTLCache(maxsize=50, ttl=10))2068 def _get_quotas(self, location: str) -> Dict[str, Tuple[int, int]]:2069 """2070 The Dict item is: vm size name, Tuple(remaining vm count, limited vm count)2071 """2072 result: Dict[str, Tuple[int, int]] = dict()2073 client = get_compute_client(self)2074 usages = client.usage.list(location=location)2075 # named map2076 quotas_map: Dict[str, Any] = {value.name.value: value for value in usages}2077 # This method signature is used to generate cache. If pass in the log2078 # object, it makes the cache doesn't work. So create a logger in the2079 # method.2080 log = get_logger("azure")2081 location_info = self.get_location_info(location=location, log=log)2082 capabilities = location_info.capabilities2083 for vm_size, capability in capabilities.items():2084 # looking for quota for each vm size's family, and calculate2085 # remaining and limit by core count of vm size.2086 quota = quotas_map.get(capability.resource_sku["family"], None)2087 if quota:2088 limit = math.floor(quota.limit / capability.capability.core_count)2089 remaining = math.floor(2090 (quota.limit - quota.current_value)2091 / capability.capability.core_count2092 )2093 result[vm_size] = (remaining, limit)2094 log.debug(f"found {len(result)} vm sizes with quota in location '{location}'.")2095 return result2096 def _get_usage(self, location: str, vm_size: str) -> Tuple[int, int]:2097 """2098 The format of return value refer to _get_usages2099 """2100 if is_unittest():2101 return (sys.maxsize, sys.maxsize)2102 usages = self._get_quotas(location)2103 # The default value is to support force run for non-exists vm size.2104 return usages.get(vm_size, (sys.maxsize, sys.maxsize))2105 def _resolve_marketplace_image_version(2106 self, nodes_requirement: List[schema.NodeSpace]2107 ) -> None:2108 for req in nodes_requirement:2109 node_runbook = req.get_extended_runbook(AzureNodeSchema, AZURE)2110 if node_runbook.location and node_runbook.marketplace:2111 node_runbook.marketplace = self._resolve_marketplace_image(2112 node_runbook.location, node_runbook.marketplace2113 )2114def _convert_to_azure_node_space(node_space: schema.NodeSpace) -> None:2115 if node_space:2116 if node_space.features:2117 new_settings = search_space.SetSpace[schema.FeatureSettings](2118 is_allow_set=True2119 )2120 for current_settings in node_space.features.items:2121 # reload to type specified settings2122 try:2123 settings_type = feature.get_feature_settings_type_by_name(2124 current_settings.type, AzurePlatform.supported_features()2125 )2126 except NotMeetRequirementException as identifier:2127 raise LisaException(2128 f"platform doesn't support all features. {identifier}"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful