Best Python code snippet using lisa_python
platform_.py
Source:platform_.py  
...344        # Any to wait for resource345        any_wait_for_resource: bool = False346        errors: List[str] = []347        for location in allowed_locations:348            caps, error = self._get_matched_capabilities(349                location=location, nodes_requirement=nodes_requirement, log=log350            )351            if error:352                errors.append(error)353            self._analyze_environment_availability(location, caps)354            # set all awaitable flag if nothing is False355            if all(x for x in caps):356                any_wait_for_resource = True357            # check to return value or raise WaitForMoreResource358            if all(isinstance(x, schema.NodeSpace) for x in caps):359                # with above condition, all types are NodeSpace. Ignore the mypy check.360                environment.runbook.nodes_requirement = caps  # type: ignore361                environment.cost = sum(362                    x.cost for x in caps if isinstance(x, schema.NodeSpace)363                )364                is_success = True365                log.debug(366                    f"requirement meet, "367                    f"cost: {environment.cost}, "368                    f"cap: {environment.runbook.nodes_requirement}"369                )370                break371        if not is_success:372            if any_wait_for_resource:373                raise ResourceAwaitableException(374                    "vm size", "No available quota, try to deploy later."375                )376            else:377                raise NotMeetRequirementException(378                    f"{errors}, runbook: {environment.runbook}."379                )380        # resolve Latest to specified version381        if is_success:382            self._resolve_marketplace_image_version(nodes_requirement)383        return is_success384    def _deploy_environment(self, environment: Environment, log: Logger) -> None:385        assert self._rm_client386        assert self._azure_runbook387        environment_context = get_environment_context(environment=environment)388        if self._azure_runbook.resource_group_name:389            resource_group_name = self._azure_runbook.resource_group_name390        else:391            normalized_name = constants.NORMALIZE_PATTERN.sub("-", constants.RUN_NAME)392            # Take last chars to make sure the length is to exceed max 90 chars393            # allowed in resource group name.394            resource_group_name = truncate_keep_prefix(395                f"{normalized_name}-e{environment.id}", 80396            )397            environment_context.resource_group_is_specified = True398        environment_context.resource_group_name = resource_group_name399        if self._azure_runbook.dry_run:400            log.info(f"dry_run: {self._azure_runbook.dry_run}")401        else:402            try:403                if self._azure_runbook.deploy:404                    log.info(405                        f"creating or updating resource group: [{resource_group_name}]"406                    )407                    check_or_create_resource_group(408                        self.credential,409                        subscription_id=self.subscription_id,410                        resource_group_name=resource_group_name,411                        location=RESOURCE_GROUP_LOCATION,412                        log=log,413                    )414                else:415                    log.info(f"reusing resource group: [{resource_group_name}]")416                location, deployment_parameters = self._create_deployment_parameters(417                    resource_group_name, environment, log418                )419                if self._azure_runbook.deploy:420                    self._validate_template(deployment_parameters, log)421                    self._deploy(location, deployment_parameters, log)422                # Even skipped deploy, try best to initialize nodes423                self.initialize_environment(environment, log)424            except Exception as identifier:425                self._delete_environment(environment, log)426                raise identifier427    def _delete_environment(self, environment: Environment, log: Logger) -> None:428        environment_context = get_environment_context(environment=environment)429        resource_group_name = environment_context.resource_group_name430        # the resource group name is empty when it is not deployed for some reasons,431        # like capability doesn't meet case requirement.432        if not resource_group_name:433            return434        assert self._azure_runbook435        if not environment_context.resource_group_is_specified:436            log.info(437                f"skipped to delete resource group: {resource_group_name}, "438                f"as it's specified in runbook."439            )440        elif self._azure_runbook.dry_run:441            log.info(442                f"skipped to delete resource group: {resource_group_name}, "443                f"as it's a dry run."444            )445        else:446            assert self._rm_client447            az_rg_exists = self._rm_client.resource_groups.check_existence(448                resource_group_name449            )450            if not az_rg_exists:451                return452            log.info(453                f"deleting resource group: {resource_group_name}, "454                f"wait: {self._azure_runbook.wait_delete}"455            )456            try:457                self._delete_boot_diagnostic_container(resource_group_name, log)458            except Exception as identifier:459                log.debug(460                    f"exception on deleting boot diagnostic container: {identifier}"461                )462            delete_operation: Any = None463            try:464                delete_operation = self._rm_client.resource_groups.begin_delete(465                    resource_group_name466                )467            except Exception as identifier:468                log.debug(f"exception on delete resource group: {identifier}")469            if delete_operation and self._azure_runbook.wait_delete:470                wait_operation(471                    delete_operation, failure_identity="delete resource group"472                )473            else:474                log.debug("not wait deleting")475    def _delete_boot_diagnostic_container(476        self, resource_group_name: str, log: Logger477    ) -> None:478        compute_client = get_compute_client(self)479        vms = compute_client.virtual_machines.list(resource_group_name)480        for vm in vms:481            diagnostic_data = (482                compute_client.virtual_machines.retrieve_boot_diagnostics_data(483                    resource_group_name=resource_group_name, vm_name=vm.name484                )485            )486            if not diagnostic_data:487                continue488            # A sample url,489            # https://storageaccountname.blob.core.windows.net:443/490            # bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70/491            # node-0.30779088-9b10-4074-8c27-98b91f1d8b70.serialconsole.log492            # ?sv=2018-03-28&sr=b&sig=mJEsvk9WunbKHfBs1lo1jcIBe4owq1brP8Kw3qXTQJA%3d&493            # se=2021-09-14T08%3a55%3a38Z&sp=r494            blob_uri = diagnostic_data.console_screenshot_blob_uri495            if blob_uri:496                matched = self._diagnostic_storage_container_pattern.match(blob_uri)497                assert matched498                # => storageaccountname499                storage_name = matched.group("storage_name")500                # => bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70501                container_name = matched.group("container_name")502                container_client = get_or_create_storage_container(503                    credential=self.credential,504                    subscription_id=self.subscription_id,505                    account_name=storage_name,506                    container_name=container_name,507                    resource_group_name=self._azure_runbook.shared_resource_group_name,508                )509                log.debug(510                    f"deleting boot diagnostic container: {container_name}"511                    f" under storage account {storage_name} of vm {vm.name}"512                )513                try:514                    container_client.delete_container()515                except Exception as identifier:516                    log.debug(517                        f"exception on deleting boot diagnostic container:"518                        f" {identifier}"519                    )520    def _get_node_information(self, node: Node) -> Dict[str, str]:521        information: Dict[str, Any] = {}522        node.log.debug("detecting lis version...")523        modinfo = node.tools[Modinfo]524        information["lis_version"] = modinfo.get_version("hv_vmbus")525        node.log.debug("detecting vm generation...")526        information[KEY_VM_GENERATION] = node.tools[VmGeneration].get_generation()527        node.log.debug(f"vm generation: {information[KEY_VM_GENERATION]}")528        return information529    def _get_kernel_version(self, node: Node) -> str:530        result: str = ""531        if not result and hasattr(node, ATTRIBUTE_FEATURES):532            # try to get kernel version in Azure. use it, when uname doesn't work533            node.log.debug("detecting kernel version from serial log...")534            serial_console = node.features[features.SerialConsole]535            result = serial_console.get_matched_str(KERNEL_VERSION_PATTERN)536        return result537    def _get_host_version(self, node: Node) -> str:538        result: str = ""539        try:540            if node.is_connected and node.is_posix:541                node.log.debug("detecting host version from dmesg...")542                dmesg = node.tools[Dmesg]543                result = get_matched_str(544                    dmesg.get_output(), HOST_VERSION_PATTERN, first_match=False545                )546        except Exception as identifier:547            # it happens on some error vms. Those error should be caught earlier in548            # test cases not here. So ignore any error here to collect information only.549            node.log.debug(f"error on run dmesg: {identifier}")550        # if not get, try again from serial console log.551        # skip if node is not initialized.552        if not result and hasattr(node, ATTRIBUTE_FEATURES):553            node.log.debug("detecting host version from serial log...")554            serial_console = node.features[features.SerialConsole]555            result = serial_console.get_matched_str(HOST_VERSION_PATTERN)556        return result557    def _get_wala_version(self, node: Node) -> str:558        result = ""559        try:560            if node.is_connected and node.is_posix:561                node.log.debug("detecting wala version from waagent...")562                waagent = node.tools[Waagent]563                result = waagent.get_version()564        except Exception as identifier:565            # it happens on some error vms. Those error should be caught earlier in566            # test cases not here. So ignore any error here to collect information only.567            node.log.debug(f"error on run waagent: {identifier}")568        if not result and hasattr(node, ATTRIBUTE_FEATURES):569            node.log.debug("detecting wala agent version from serial log...")570            serial_console = node.features[features.SerialConsole]571            result = serial_console.get_matched_str(WALA_VERSION_PATTERN)572        return result573    def _get_wala_distro_version(self, node: Node) -> str:574        result = "Unknown"575        try:576            if node.is_connected and node.is_posix:577                waagent = node.tools[Waagent]578                result = waagent.get_distro_version()579        except Exception as identifier:580            # it happens on some error vms. Those error should be caught earlier in581            # test cases not here. So ignore any error here to collect information only.582            node.log.debug(f"error on get waagent distro version: {identifier}")583        return result584    def _get_platform_information(self, environment: Environment) -> Dict[str, str]:585        result: Dict[str, str] = {}586        azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(587            AzurePlatformSchema588        )589        result[AZURE_RG_NAME_KEY] = get_environment_context(590            environment591        ).resource_group_name592        if azure_runbook.availability_set_properties:593            for (594                property_name,595                property_value,596            ) in azure_runbook.availability_set_properties.items():597                if property_name in [598                    "platformFaultDomainCount",599                    "platformUpdateDomainCount",600                ]:601                    continue602                if isinstance(property_value, dict):603                    for key, value in property_value.items():604                        if value:605                            result[key] = value606        if azure_runbook.availability_set_tags:607            for key, value in azure_runbook.availability_set_tags.items():608                if value:609                    result[key] = value610        if azure_runbook.vm_tags:611            for key, value in azure_runbook.vm_tags.items():612                if value:613                    result[key] = value614        return result615    def _get_environment_information(self, environment: Environment) -> Dict[str, str]:616        information: Dict[str, str] = {}617        node_runbook: Optional[AzureNodeSchema] = None618        if environment.nodes:619            node: Optional[Node] = environment.default_node620        else:621            node = None622        if node:623            node_runbook = node.capability.get_extended_runbook(AzureNodeSchema, AZURE)624            for key, method in self._environment_information_hooks.items():625                node.log.debug(f"detecting {key} ...")626                try:627                    value = method(node)628                    if value:629                        information[key] = value630                except Exception as identifier:631                    node.log.exception(f"error on get {key}.", exc_info=identifier)632            information.update(self._get_platform_information(environment))633            if node.is_connected and node.is_posix:634                information.update(self._get_node_information(node))635        elif environment.capability and environment.capability.nodes:636            # get deployment information, if failed on preparing phase637            node_space = environment.capability.nodes[0]638            node_runbook = node_space.get_extended_runbook(639                AzureNodeSchema, type_name=AZURE640            )641        if node_runbook:642            information["location"] = node_runbook.location643            information["vmsize"] = node_runbook.vm_size644            information["image"] = node_runbook.get_image_name()645        return information646    def _initialize(self, *args: Any, **kwargs: Any) -> None:647        # set needed environment variables for authentication648        azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(649            AzurePlatformSchema650        )651        assert azure_runbook, "platform runbook cannot be empty"652        self._azure_runbook = azure_runbook653        self.subscription_id = azure_runbook.subscription_id654        self._initialize_credential()655        check_or_create_resource_group(656            self.credential,657            self.subscription_id,658            azure_runbook.shared_resource_group_name,659            RESOURCE_GROUP_LOCATION,660            self._log,661        )662        self._rm_client = get_resource_management_client(663            self.credential, self.subscription_id664        )665    def _initialize_credential(self) -> None:666        azure_runbook = self._azure_runbook667        credential_key = (668            f"{azure_runbook.service_principal_tenant_id}_"669            f"{azure_runbook.service_principal_client_id}"670        )671        credential = self._credentials.get(credential_key, None)672        if not credential:673            # set azure log to warn level only674            logging.getLogger("azure").setLevel(azure_runbook.log_level)675            if azure_runbook.service_principal_tenant_id:676                os.environ[677                    "AZURE_TENANT_ID"678                ] = azure_runbook.service_principal_tenant_id679            if azure_runbook.service_principal_client_id:680                os.environ[681                    "AZURE_CLIENT_ID"682                ] = azure_runbook.service_principal_client_id683            if azure_runbook.service_principal_key:684                os.environ["AZURE_CLIENT_SECRET"] = azure_runbook.service_principal_key685            credential = DefaultAzureCredential()686            with SubscriptionClient(credential) as self._sub_client:687                # suppress warning message by search for different credential types688                azure_identity_logger = logging.getLogger("azure.identity")689                azure_identity_logger.setLevel(logging.ERROR)690                with global_credential_access_lock:691                    subscription = self._sub_client.subscriptions.get(692                        self.subscription_id693                    )694                azure_identity_logger.setLevel(logging.WARN)695            if not subscription:696                raise LisaException(697                    f"Cannot find subscription id: '{self.subscription_id}'. "698                    f"Make sure it exists and current account can access it."699                )700            self._log.info(701                f"connected to subscription: "702                f"{subscription.id}, '{subscription.display_name}'"703            )704            self._credentials[credential_key] = credential705        self.credential = credential706    def _load_template(self) -> Any:707        if self._arm_template is None:708            template_file_path = Path(__file__).parent / "arm_template.json"709            with open(template_file_path, "r") as f:710                self._arm_template = json.load(f)711        return self._arm_template712    @retry(tries=10, delay=1, jitter=(0.5, 1))713    def _load_location_info_from_file(714        self, cached_file_name: Path, log: Logger715    ) -> Optional[AzureLocation]:716        loaded_obj: Optional[AzureLocation] = None717        if cached_file_name.exists():718            try:719                with open(cached_file_name, "r") as f:720                    loaded_data: Dict[str, Any] = json.load(f)721                loaded_obj = schema.load_by_type(AzureLocation, loaded_data)722            except Exception as identifier:723                # if schema changed, There may be exception, remove cache and retry724                # Note: retry on this method depends on decorator725                log.debug(726                    f"error on loading cache, delete cache and retry. {identifier}"727                )728                cached_file_name.unlink()729                raise identifier730        return loaded_obj731    def get_location_info(self, location: str, log: Logger) -> AzureLocation:732        cached_file_name = constants.CACHE_PATH.joinpath(733            f"azure_locations_{location}.json"734        )735        should_refresh: bool = True736        key = self._get_location_key(location)737        location_data = self._locations_data_cache.get(key, None)738        if not location_data:739            location_data = self._load_location_info_from_file(740                cached_file_name=cached_file_name, log=log741            )742        if location_data:743            delta = datetime.now() - location_data.updated_time744            # refresh cached locations every 1 day.745            if delta.days < 1:746                should_refresh = False747            else:748                log.debug(749                    f"{key}: cache timeout: {location_data.updated_time},"750                    f"sku count: {len(location_data.capabilities)}"751                )752        else:753            log.debug(f"{key}: no cache found")754        if should_refresh:755            compute_client = get_compute_client(self)756            log.debug(f"{key}: querying")757            all_skus: Dict[str, AzureCapability] = dict()758            paged_skus = compute_client.resource_skus.list(759                f"location eq '{location}'"760            ).by_page()761            for skus in paged_skus:762                for sku_obj in skus:763                    try:764                        if sku_obj.resource_type == "virtualMachines":765                            if sku_obj.restrictions and any(766                                restriction.type == "Location"767                                for restriction in sku_obj.restrictions768                            ):769                                # restricted on this location770                                continue771                            resource_sku = sku_obj.as_dict()772                            capability = self._resource_sku_to_capability(773                                location, sku_obj774                            )775                            # estimate vm cost for priority776                            assert isinstance(capability.core_count, int)777                            assert isinstance(capability.gpu_count, int)778                            azure_capability = AzureCapability(779                                location=location,780                                vm_size=sku_obj.name,781                                capability=capability,782                                resource_sku=resource_sku,783                            )784                            all_skus[azure_capability.vm_size] = azure_capability785                    except Exception as identifier:786                        log.error(f"unknown sku: {sku_obj}")787                        raise identifier788            location_data = AzureLocation(location=location, capabilities=all_skus)789            log.debug(f"{location}: saving to disk")790            with open(cached_file_name, "w") as f:791                json.dump(location_data.to_dict(), f)  # type: ignore792            log.debug(f"{key}: new data, " f"sku: {len(location_data.capabilities)}")793        assert location_data794        self._locations_data_cache[key] = location_data795        return location_data796    def _create_deployment_parameters(797        self, resource_group_name: str, environment: Environment, log: Logger798    ) -> Tuple[str, Dict[str, Any]]:799        assert environment.runbook, "env data cannot be None"800        assert environment.runbook.nodes_requirement, "node requirement cannot be None"801        log.debug("creating deployment")802        # construct parameters803        arm_parameters = AzureArmParameter()804        copied_fields = [805            "availability_set_tags",806            "availability_set_properties",807            "vm_tags",808        ]809        set_filtered_fields(self._azure_runbook, arm_parameters, copied_fields)810        is_windows: bool = False811        arm_parameters.admin_username = self.runbook.admin_username812        if self.runbook.admin_private_key_file:813            arm_parameters.admin_key_data = get_public_key_data(814                self.runbook.admin_private_key_file815            )816        else:817            arm_parameters.admin_password = self.runbook.admin_password818        environment_context = get_environment_context(environment=environment)819        arm_parameters.vm_tags["RG"] = environment_context.resource_group_name820        # get local lisa environment821        arm_parameters.vm_tags["lisa_username"] = local().tools[Whoami].get_username()822        arm_parameters.vm_tags["lisa_hostname"] = local().tools[Hostname].get_hostname()823        nodes_parameters: List[AzureNodeArmParameter] = []824        features_settings: Dict[str, schema.FeatureSettings] = {}825        for node_space in environment.runbook.nodes_requirement:826            assert isinstance(827                node_space, schema.NodeSpace828            ), f"actual: {type(node_space)}"829            azure_node_runbook = node_space.get_extended_runbook(830                AzureNodeSchema, type_name=AZURE831            )832            # Subscription Id is used by Shared Gallery images located833            # in subscription different from where LISA is run834            azure_node_runbook.subscription_id = self.subscription_id835            # init node836            node = environment.create_node_from_requirement(837                node_space,838            )839            azure_node_runbook = self._create_node_runbook(840                len(nodes_parameters), node_space, log, resource_group_name841            )842            # save parsed runbook back, for example, the version of marketplace may be843            # parsed from latest to a specified version.844            node.capability.set_extended_runbook(azure_node_runbook)845            node_arm_parameters = self._create_node_arm_parameters(node.capability, log)846            nodes_parameters.append(node_arm_parameters)847            # Set data disk array848            arm_parameters.data_disks = self._generate_data_disks(849                node, node_arm_parameters850            )851            if not arm_parameters.location:852                # take first one's location853                arm_parameters.location = azure_node_runbook.location854            # save vm's information into node855            node_context = get_node_context(node)856            node_context.resource_group_name = environment_context.resource_group_name857            # vm's name, use to find it from azure858            node_context.vm_name = azure_node_runbook.name859            # ssh related information will be filled back once vm is created. If860            # it's Windows, fill in the password always. If it's Linux, the861            # private key has higher priority.862            node_context.username = arm_parameters.admin_username863            if azure_node_runbook.is_linux:864                node_context.password = arm_parameters.admin_password865            else:866                is_windows = True867                if not self.runbook.admin_password:868                    # password is required, if it doesn't present, generate one.869                    password = generate_random_chars()870                    add_secret(password)871                    self.runbook.admin_password = password872                node_context.password = self.runbook.admin_password873            node_context.private_key_file = self.runbook.admin_private_key_file874            # collect all features to handle special deployment logic. If one875            # node has this, it needs to run.876            if node.capability.features:877                for f in node.capability.features:878                    if f.type not in features_settings:879                        features_settings[f.type] = f880            log.info(f"vm setting: {azure_node_runbook}")881        if is_windows:882            # set password for windows any time.883            arm_parameters.admin_password = self.runbook.admin_password884        arm_parameters.nodes = nodes_parameters885        arm_parameters.storage_name = get_storage_account_name(886            self.subscription_id, arm_parameters.location887        )888        if (889            self._azure_runbook.availability_set_properties890            or self._azure_runbook.availability_set_tags891        ):892            arm_parameters.use_availability_sets = True893        # In Azure, each VM should have only one nic in one subnet. So calculate894        # the max nic count, and set to subnet count.895        arm_parameters.subnet_count = max(x.nic_count for x in arm_parameters.nodes)896        arm_parameters.shared_resource_group_name = (897            self._azure_runbook.shared_resource_group_name898        )899        # the arm template may be updated by the hooks, so make a copy to avoid900        # the original template is modified.901        template = deepcopy(self._load_template())902        plugin_manager.hook.azure_update_arm_template(903            template=template, environment=environment904        )905        # change deployment for each feature.906        for f in features_settings.values():907            feature_type = next(908                x for x in self.supported_features() if x.name() == f.type909            )910            feature_type.on_before_deployment(911                arm_parameters=arm_parameters,912                template=template,913                settings=f,914                environment=environment,915                log=log,916            )917        # composite deployment properties918        parameters = arm_parameters.to_dict()  # type:ignore919        parameters = {k: {"value": v} for k, v in parameters.items()}920        log.debug(f"parameters: {parameters}")921        deployment_properties = DeploymentProperties(922            mode=DeploymentMode.incremental,923            template=template,924            parameters=parameters,925        )926        # dump arm_template and arm_parameters to file927        template_dump_path = environment.log_path / "arm_template.json"928        param_dump_path = environment.log_path / "arm_template_parameters.json"929        dump_file(template_dump_path, json.dumps(template, indent=4))930        dump_file(param_dump_path, json.dumps(parameters, indent=4))931        return (932            arm_parameters.location,933            {934                AZURE_RG_NAME_KEY: resource_group_name,935                "deployment_name": AZURE_DEPLOYMENT_NAME,936                "parameters": Deployment(properties=deployment_properties),937            },938        )939    def _create_node_runbook(940        self,941        index: int,942        node_space: schema.NodeSpace,943        log: Logger,944        name_prefix: str,945    ) -> AzureNodeSchema:946        azure_node_runbook = node_space.get_extended_runbook(947            AzureNodeSchema, type_name=AZURE948        )949        if not azure_node_runbook.name:950            # the max length of vm name is 64 chars. Below logic takes last 45951            # chars in resource group name and keep the leading 5 chars.952            # name_prefix can contain any of customized (existing) or953            # generated (starts with "lisa-") resource group name,954            # so, pass the first 5 chars as prefix to truncate_keep_prefix955            # to handle both cases956            node_name = f"{name_prefix}-n{index}"957            azure_node_runbook.name = truncate_keep_prefix(node_name, 50, node_name[:5])958        # It's used as computer name only. Windows doesn't support name more959        # than 15 chars960        azure_node_runbook.short_name = truncate_keep_prefix(961            azure_node_runbook.name, 15, azure_node_runbook.name[:5]962        )963        if not azure_node_runbook.vm_size:964            raise LisaException("vm_size is not detected before deploy")965        if not azure_node_runbook.location:966            raise LisaException("location is not detected before deploy")967        if azure_node_runbook.hyperv_generation not in [1, 2]:968            raise LisaException(969                "hyperv_generation need value 1 or 2, "970                f"but {azure_node_runbook.hyperv_generation}",971            )972        if azure_node_runbook.vhd:973            # vhd is higher priority974            azure_node_runbook.vhd = self._get_deployable_vhd_path(975                azure_node_runbook.vhd, azure_node_runbook.location, log976            )977            azure_node_runbook.marketplace = None978            azure_node_runbook.shared_gallery = None979        elif azure_node_runbook.shared_gallery:980            azure_node_runbook.marketplace = None981            azure_node_runbook.shared_gallery = self._parse_shared_gallery_image(982                azure_node_runbook.location, azure_node_runbook.shared_gallery983            )984        elif not azure_node_runbook.marketplace:985            # set to default marketplace, if nothing specified986            azure_node_runbook.marketplace = AzureVmMarketplaceSchema()987        else:988            # marketplace value is already set in runbook989            ...990        if azure_node_runbook.marketplace:991            # resolve Latest to specified version992            azure_node_runbook.marketplace = self._resolve_marketplace_image(993                azure_node_runbook.location, azure_node_runbook.marketplace994            )995            image_info = self._get_image_info(996                azure_node_runbook.location, azure_node_runbook.marketplace997            )998            # HyperVGenerationTypes return "V1"/"V2", so we need to strip "V"999            if image_info.hyper_v_generation:1000                azure_node_runbook.hyperv_generation = int(1001                    image_info.hyper_v_generation.strip("V")1002                )1003            # retrieve the os type for arm template.1004            if azure_node_runbook.is_linux is None:1005                if image_info.os_disk_image.operating_system == "Windows":1006                    azure_node_runbook.is_linux = False1007                else:1008                    azure_node_runbook.is_linux = True1009        if azure_node_runbook.is_linux is None:1010            # fill it default value1011            azure_node_runbook.is_linux = True1012        return azure_node_runbook1013    def _create_node_arm_parameters(1014        self, capability: schema.Capability, log: Logger1015    ) -> AzureNodeArmParameter:1016        runbook = capability.get_extended_runbook(AzureNodeSchema, type_name=AZURE)1017        arm_parameters = AzureNodeArmParameter.from_node_runbook(runbook)1018        os_disk_size = 301019        if arm_parameters.vhd:1020            # vhd is higher priority1021            arm_parameters.vhd = self._get_deployable_vhd_path(1022                arm_parameters.vhd, arm_parameters.location, log1023            )1024            os_disk_size = max(1025                os_disk_size, self._get_vhd_os_disk_size(arm_parameters.vhd)1026            )1027        elif arm_parameters.shared_gallery:1028            os_disk_size = max(1029                os_disk_size,1030                self._get_sig_os_disk_size(arm_parameters.shared_gallery),1031            )1032        else:1033            assert (1034                arm_parameters.marketplace1035            ), "not set one of marketplace, shared_gallery or vhd."1036            image_info = self._get_image_info(1037                arm_parameters.location, arm_parameters.marketplace1038            )1039            os_disk_size = max(1040                os_disk_size, image_info.os_disk_image.additional_properties["sizeInGb"]1041            )1042            if not arm_parameters.purchase_plan and image_info.plan:1043                # expand values for lru cache1044                plan_name = image_info.plan.name1045                plan_product = image_info.plan.product1046                plan_publisher = image_info.plan.publisher1047                # accept the default purchase plan automatically.1048                arm_parameters.purchase_plan = self._process_marketplace_image_plan(1049                    marketplace=arm_parameters.marketplace,1050                    plan_name=plan_name,1051                    plan_product=plan_product,1052                    plan_publisher=plan_publisher,1053                )1054        arm_parameters.osdisk_size_in_gb = os_disk_size1055        # Set disk type1056        assert capability.disk, "node space must have disk defined."1057        assert isinstance(capability.disk.disk_type, schema.DiskType)1058        arm_parameters.disk_type = features.get_azure_disk_type(1059            capability.disk.disk_type1060        )1061        assert capability.network_interface1062        assert isinstance(1063            capability.network_interface.nic_count, int1064        ), f"actual: {capability.network_interface.nic_count}"1065        arm_parameters.nic_count = capability.network_interface.nic_count1066        assert isinstance(1067            capability.network_interface.data_path, schema.NetworkDataPath1068        ), f"actual: {type(capability.network_interface.data_path)}"1069        if capability.network_interface.data_path == schema.NetworkDataPath.Sriov:1070            arm_parameters.enable_sriov = True1071        return arm_parameters1072    def _validate_template(1073        self, deployment_parameters: Dict[str, Any], log: Logger1074    ) -> None:1075        log.debug("validating deployment")1076        validate_operation: Any = None1077        try:1078            with global_credential_access_lock:1079                validate_operation = self._rm_client.deployments.begin_validate(1080                    **deployment_parameters1081                )1082            wait_operation(validate_operation, failure_identity="validation")1083        except Exception as identifier:1084            error_messages: List[str] = [str(identifier)]1085            if isinstance(identifier, HttpResponseError) and identifier.error:1086                # no validate_operation returned, the message may include1087                # some errors, so check details1088                error_messages = self._parse_detail_errors(identifier.error)1089            error_message = "\n".join(error_messages)1090            plugin_manager.hook.azure_deploy_failed(error_message=error_message)1091            raise LisaException(error_message)1092    def _deploy(1093        self, location: str, deployment_parameters: Dict[str, Any], log: Logger1094    ) -> None:1095        resource_group_name = deployment_parameters[AZURE_RG_NAME_KEY]1096        storage_account_name = get_storage_account_name(self.subscription_id, location)1097        check_or_create_storage_account(1098            self.credential,1099            self.subscription_id,1100            storage_account_name,1101            self._azure_runbook.shared_resource_group_name,1102            location,1103            log,1104        )1105        log.info(f"resource group '{resource_group_name}' deployment is in progress...")1106        deployment_operation: Any = None1107        deployments = self._rm_client.deployments1108        try:1109            deployment_operation = deployments.begin_create_or_update(1110                **deployment_parameters1111            )1112            wait_operation(deployment_operation, failure_identity="deploy")1113        except HttpResponseError as identifier:1114            # Some errors happens underlying, so there is no detail errors from API.1115            # For example,1116            # azure.core.exceptions.HttpResponseError:1117            #    Operation returned an invalid status 'OK'1118            assert identifier.error, f"HttpResponseError: {identifier}"1119            error_message = "\n".join(self._parse_detail_errors(identifier.error))1120            if (1121                self._azure_runbook.ignore_provisioning_error1122                and "OSProvisioningTimedOut: OS Provisioning for VM" in error_message1123            ):1124                # Provisioning timeout causes by waagent is not ready.1125                # In smoke test, it still can verify some information.1126                # Eat information here, to run test case any way.1127                #1128                # It may cause other cases fail on assumptions. In this case, we can1129                # define a flag in config, to mark this exception is ignorable or not.1130                log.error(1131                    f"provisioning time out, try to run case. "1132                    f"Exception: {error_message}"1133                )1134            elif self._azure_runbook.ignore_provisioning_error and get_matched_str(1135                error_message, AZURE_INTERNAL_ERROR_PATTERN1136            ):1137                # Similar situation with OSProvisioningTimedOut1138                # Some OSProvisioningInternalError caused by it doesn't support1139                # SSH key authentication1140                # e.g. hpe hpestoreoncevsa hpestoreoncevsa-3187 3.18.71141                # After passthrough this exception,1142                # actually the 22 port of this VM is open.1143                log.error(1144                    f"provisioning failed for an internal error, try to run case. "1145                    f"Exception: {error_message}"1146                )1147            else:1148                plugin_manager.hook.azure_deploy_failed(error_message=error_message)1149                raise LisaException(error_message)1150    def _parse_detail_errors(self, error: Any) -> List[str]:1151        # original message may be a summary, get lowest level details.1152        if hasattr(error, "details") and error.details:1153            errors: List[str] = []1154            for detail in error.details:1155                errors.extend(self._parse_detail_errors(detail))1156        else:1157            try:1158                # it returns serialized json string in message sometime1159                parsed_error = json.loads(1160                    error.message, object_hook=lambda x: SimpleNamespace(**x)1161                )1162                errors = self._parse_detail_errors(parsed_error.error)1163            except Exception:1164                # load failed, it should be a real error message string1165                errors = [f"{error.code}: {error.message}"]1166        return errors1167    # the VM may not be queried after deployed. use retry to mitigate it.1168    @retry(exceptions=LisaException, tries=150, delay=2)1169    def _load_vms(1170        self, environment: Environment, log: Logger1171    ) -> Dict[str, VirtualMachine]:1172        compute_client = get_compute_client(self, api_version="2020-06-01")1173        environment_context = get_environment_context(environment=environment)1174        log.debug(1175            f"listing vm in resource group "1176            f"'{environment_context.resource_group_name}'"1177        )1178        vms_map: Dict[str, VirtualMachine] = {}1179        vms = compute_client.virtual_machines.list(1180            environment_context.resource_group_name1181        )1182        for vm in vms:1183            vms_map[vm.name] = vm1184            log.debug(f"  found vm {vm.name}")1185        if not vms_map:1186            raise LisaException(1187                f"deployment succeeded, but VM not found in 5 minutes "1188                f"from '{environment_context.resource_group_name}'"1189            )1190        return vms_map1191    # Use Exception, because there may be credential conflict error. Make it1192    # retriable.1193    @retry(exceptions=Exception, tries=150, delay=2)1194    def _load_nics(1195        self, environment: Environment, log: Logger1196    ) -> Dict[str, NetworkInterface]:1197        network_client = get_network_client(self)1198        environment_context = get_environment_context(environment=environment)1199        log.debug(1200            f"listing network interfaces in resource group "1201            f"'{environment_context.resource_group_name}'"1202        )1203        # load nics1204        nics_map: Dict[str, NetworkInterface] = {}1205        network_interfaces = network_client.network_interfaces.list(1206            environment_context.resource_group_name1207        )1208        for nic in network_interfaces:1209            # nic name is like lisa-test-20220316-182126-985-e0-n0-nic-2, get vm1210            # name part for later pick only find primary nic, which is ended by1211            # -nic-01212            node_name_from_nic = RESOURCE_ID_NIC_PATTERN.findall(nic.name)1213            if node_name_from_nic:1214                name = node_name_from_nic[0]1215                nics_map[name] = nic1216                log.debug(f"  found nic '{nic.name}', and saved for next step.")1217            else:1218                log.debug(1219                    f"  found nic '{nic.name}', but dropped, "1220                    "because it's not primary nic."1221                )1222        if not nics_map:1223            raise LisaException(1224                f"deployment succeeded, but network interfaces not found in 5 minutes "1225                f"from '{environment_context.resource_group_name}'"1226            )1227        return nics_map1228    @retry(exceptions=LisaException, tries=150, delay=2)1229    def load_public_ips_from_resource_group(1230        self, resource_group_name: str, log: Logger1231    ) -> Dict[str, str]:1232        network_client = get_network_client(self)1233        log.debug(f"listing public ips in resource group '{resource_group_name}'")1234        # get public IP1235        public_ip_addresses = network_client.public_ip_addresses.list(1236            resource_group_name1237        )1238        public_ips_map: Dict[str, str] = {}1239        for ip_address in public_ip_addresses:1240            # nic name is like node-0-nic-2, get vm name part for later pick1241            # only find primary nic, which is ended by -nic-01242            node_name_from_public_ip = RESOURCE_ID_PUBLIC_IP_PATTERN.findall(1243                ip_address.name1244            )1245            assert (1246                ip_address1247            ), f"public IP address cannot be empty, ip_address object: {ip_address}"1248            if node_name_from_public_ip:1249                name = node_name_from_public_ip[0]1250                public_ips_map[name] = ip_address.ip_address1251                log.debug(1252                    f"  found public IP '{ip_address.name}', and saved for next step."1253                )1254            else:1255                log.debug(1256                    f"  found public IP '{ip_address.name}', but dropped "1257                    "because it's not primary nic."1258                )1259        if not public_ips_map:1260            raise LisaException(1261                f"deployment succeeded, but public ips not found in 5 minutes "1262                f"from '{resource_group_name}'"1263            )1264        return public_ips_map1265    def initialize_environment(self, environment: Environment, log: Logger) -> None:1266        node_context_map: Dict[str, Node] = {}1267        for node in environment.nodes.list():1268            node_context = get_node_context(node)1269            node_context_map[node_context.vm_name] = node1270        vms_map: Dict[str, VirtualMachine] = self._load_vms(environment, log)1271        nics_map: Dict[str, NetworkInterface] = self._load_nics(environment, log)1272        environment_context = get_environment_context(environment=environment)1273        public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1274            environment_context.resource_group_name, log1275        )1276        for vm_name, node in node_context_map.items():1277            node_context = get_node_context(node)1278            vm = vms_map.get(vm_name, None)1279            if not vm:1280                raise LisaException(1281                    f"cannot find vm: '{vm_name}', make sure deployment is correct."1282                )1283            nic = nics_map[vm_name]1284            public_ip = public_ips_map[vm_name]1285            address = nic.ip_configurations[0].private_ip_address1286            if not node.name:1287                node.name = vm_name1288            assert isinstance(node, RemoteNode)1289            node.set_connection_info(1290                address=address,1291                port=22,1292                public_address=public_ip,1293                public_port=22,1294                username=node_context.username,1295                password=node_context.password,1296                private_key_file=node_context.private_key_file,1297            )1298        # enable ssh for windows, if it's not Windows, or SSH reachable, it will1299        # skip.1300        run_in_parallel(1301            [1302                partial(self._enable_ssh_on_windows, node=x)1303                for x in environment.nodes.list()1304            ]1305        )1306    def _resource_sku_to_capability(  # noqa: C9011307        self, location: str, resource_sku: ResourceSku1308    ) -> schema.NodeSpace:1309        # fill in default values, in case no capability meet.1310        node_space = schema.NodeSpace(1311            node_count=1,1312            core_count=0,1313            memory_mb=0,1314            gpu_count=0,1315        )1316        node_space.name = f"{location}_{resource_sku.name}"1317        node_space.features = search_space.SetSpace[schema.FeatureSettings](1318            is_allow_set=True1319        )1320        node_space.disk = features.AzureDiskOptionSettings()1321        node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1322            is_allow_set=True, items=[]1323        )1324        node_space.disk.data_disk_iops = search_space.IntRange(min=0)1325        node_space.disk.data_disk_size = search_space.IntRange(min=0)1326        node_space.network_interface = schema.NetworkInterfaceOptionSettings()1327        node_space.network_interface.data_path = search_space.SetSpace[1328            schema.NetworkDataPath1329        ](is_allow_set=True, items=[])1330        # fill supported features1331        azure_raw_capabilities: Dict[str, str] = {}1332        for sku_capability in resource_sku.capabilities:1333            # prevent to loop in every feature1334            azure_raw_capabilities[sku_capability.name] = sku_capability.value1335        # calculate cpu count. Some vm sizes, like Standard_HC44rs, doesn't have1336        # vCPUsAvailable, so use vCPUs.1337        vcpus_available = int(azure_raw_capabilities.get("vCPUsAvailable", "0"))1338        if vcpus_available:1339            node_space.core_count = vcpus_available1340        else:1341            node_space.core_count = int(azure_raw_capabilities.get("vCPUs", "0"))1342        memory_value = azure_raw_capabilities.get("MemoryGB", None)1343        if memory_value:1344            node_space.memory_mb = int(float(memory_value) * 1024)1345        max_disk_count = azure_raw_capabilities.get("MaxDataDiskCount", None)1346        if max_disk_count:1347            node_space.disk.max_data_disk_count = int(max_disk_count)1348            node_space.disk.data_disk_count = search_space.IntRange(1349                max=node_space.disk.max_data_disk_count1350            )1351        max_nic_count = azure_raw_capabilities.get("MaxNetworkInterfaces", None)1352        if max_nic_count:1353            # set a min value for nic_count work around for an azure python sdk bug1354            # nic_count is 0 when get capability for some sizes e.g. Standard_D8a_v31355            sku_nic_count = int(max_nic_count)1356            if sku_nic_count == 0:1357                sku_nic_count = 11358            node_space.network_interface.nic_count = search_space.IntRange(1359                min=1, max=sku_nic_count1360            )1361            node_space.network_interface.max_nic_count = sku_nic_count1362        premium_io_supported = azure_raw_capabilities.get("PremiumIO", None)1363        if premium_io_supported and eval(premium_io_supported) is True:1364            node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1365        ephemeral_supported = azure_raw_capabilities.get(1366            "EphemeralOSDiskSupported", None1367        )1368        if ephemeral_supported and eval(ephemeral_supported) is True:1369            # Check if CachedDiskBytes is greater than 30GB1370            # We use diffdisk as cache disk for ephemeral OS disk1371            cached_disk_bytes = azure_raw_capabilities.get("CachedDiskBytes", 0)1372            cached_disk_bytes_gb = int(cached_disk_bytes) / 1024 / 1024 / 10241373            if cached_disk_bytes_gb >= 30:1374                node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1375        # set AN1376        an_enabled = azure_raw_capabilities.get("AcceleratedNetworkingEnabled", None)1377        if an_enabled and eval(an_enabled) is True:1378            # refer1379            # https://docs.microsoft.com/en-us/azure/virtual-machines/dcv2-series#configuration1380            # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv2-series1381            # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv3-series1382            # https://docs.microsoft.com/en-us/azure/virtual-machines/nd-series1383            # below VM size families don't support `Accelerated Networking` but1384            # API return `True`, fix this issue temporarily will revert it till1385            # bug fixed.1386            if resource_sku.family not in [1387                "standardDCSv2Family",1388                "standardNCSv2Family",1389                "standardNCSv3Family",1390                "standardNDSFamily",1391            ]:1392                # update data path types if sriov feature is supported1393                node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1394        # for some new sizes, there is no MaxNetworkInterfaces capability1395        # and we have to set a default value for max_nic_count1396        if not node_space.network_interface.max_nic_count:1397            node_space.network_interface.max_nic_count = 11398        # some vm size do not have resource disk present1399        # https://docs.microsoft.com/en-us/azure/virtual-machines/azure-vms-no-temp-disk1400        if resource_sku.family in [1401            "standardDv4Family",1402            "standardDSv4Family",1403            "standardEv4Family",1404            "standardESv4Family",1405            "standardEASv4Family",1406            "standardEASv5Family",1407            "standardESv5Family",1408            "standardEADSv5Family",1409            "standardDASv5Family",1410            "standardDSv5Family",1411            "standardFSv2Family",1412            "standardNCFamily",1413            "standardESv3Family",1414            "standardDPSv5Family",1415            "standardEBSv5Family",1416            "standardEv5Family",1417        ]:1418            node_space.disk.has_resource_disk = False1419        else:1420            node_space.disk.has_resource_disk = True1421        for supported_feature in self.supported_features():1422            if supported_feature.name() in [1423                features.Disk.name(),1424                features.NetworkInterface.name(),1425            ]:1426                # Skip the disk and network interfaces features. They will be1427                # handled by node_space directly.1428                continue1429            feature_setting = supported_feature.create_setting(1430                raw_capabilities=azure_raw_capabilities,1431                resource_sku=resource_sku,1432                node_space=node_space,1433            )1434            if feature_setting:1435                node_space.features.add(feature_setting)1436        node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1437        node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1438        node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1439        return node_space1440    def get_sorted_vm_sizes(1441        self, capabilities: List[AzureCapability], log: Logger1442    ) -> List[AzureCapability]:1443        # sort vm size by predefined pattern1444        sorted_capabilities: List[AzureCapability] = []1445        found_vm_sizes: Set[str] = set()1446        # loop all fall back levels1447        for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:1448            level_capabilities: List[AzureCapability] = []1449            # loop all capabilities1450            for capability in capabilities:1451                vm_size = capability.vm_size1452                if fallback_pattern.match(vm_size) and vm_size not in found_vm_sizes:1453                    level_capabilities.append(capability)1454                    found_vm_sizes.add(vm_size)1455            # sort by rough cost1456            level_capabilities.sort(key=lambda x: (x.capability.cost))1457            sorted_capabilities.extend(level_capabilities)1458        return sorted_capabilities1459    def load_public_ip(self, node: Node, log: Logger) -> str:1460        node_context = get_node_context(node)1461        vm_name = node_context.vm_name1462        resource_group_name = node_context.resource_group_name1463        public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1464            resource_group_name=resource_group_name, log=self._log1465        )1466        return public_ips_map[vm_name]1467    @lru_cache(maxsize=10)  # noqa: B0191468    def _resolve_marketplace_image(1469        self, location: str, marketplace: AzureVmMarketplaceSchema1470    ) -> AzureVmMarketplaceSchema:1471        new_marketplace = copy.copy(marketplace)1472        # latest doesn't work, it needs a specified version.1473        if marketplace.version.lower() == "latest":1474            compute_client = get_compute_client(self)1475            with global_credential_access_lock:1476                versioned_images = compute_client.virtual_machine_images.list(1477                    location=location,1478                    publisher_name=marketplace.publisher,1479                    offer=marketplace.offer,1480                    skus=marketplace.sku,1481                )1482            if 0 == len(versioned_images):1483                raise LisaException(1484                    f"cannot find any version of image {marketplace.publisher} "1485                    f"{marketplace.offer} {marketplace.sku} in {location}"1486                )1487            # any one should be the same to get purchase plan1488            new_marketplace.version = versioned_images[-1].name1489        return new_marketplace1490    def _parse_shared_gallery_image(1491        self, location: str, shared_image: SharedImageGallerySchema1492    ) -> SharedImageGallerySchema:1493        new_shared_image = copy.copy(shared_image)1494        compute_client = get_compute_client(self)1495        if not shared_image.resource_group_name:1496            # /subscriptions/xxxx/resourceGroups/xxxx/providers/Microsoft.Compute/1497            # galleries/xxxx1498            rg_pattern = re.compile(r"resourceGroups/(.*)/providers", re.M)1499            galleries = compute_client.galleries.list()1500            rg_name = ""1501            for gallery in galleries:1502                if gallery.name.lower() == shared_image.image_gallery:1503                    rg_name = get_matched_str(gallery.id, rg_pattern)1504                    break1505            if not rg_name:1506                raise LisaException(1507                    f"not find matched gallery {shared_image.image_gallery}"1508                )1509        new_shared_image.resource_group_name = rg_name1510        if shared_image.image_version.lower() == "latest":1511            gallery_images = (1512                compute_client.gallery_image_versions.list_by_gallery_image(1513                    resource_group_name=new_shared_image.resource_group_name,1514                    gallery_name=new_shared_image.image_gallery,1515                    gallery_image_name=new_shared_image.image_definition,1516                )1517            )1518            image: GalleryImageVersion = None1519            time: Optional[datetime] = None1520            for image in gallery_images:1521                gallery_image = compute_client.gallery_image_versions.get(1522                    resource_group_name=new_shared_image.resource_group_name,1523                    gallery_name=new_shared_image.image_gallery,1524                    gallery_image_name=new_shared_image.image_definition,1525                    gallery_image_version_name=image.name,1526                    expand="ReplicationStatus",1527                )1528                if not time:1529                    time = gallery_image.publishing_profile.published_date1530                if gallery_image.publishing_profile.published_date > time:1531                    time = gallery_image.publishing_profile.published_date1532                    new_shared_image.image_version = image.name1533        return new_shared_image1534    @lru_cache(maxsize=10)  # noqa: B0191535    def _process_marketplace_image_plan(1536        self,1537        marketplace: AzureVmMarketplaceSchema,1538        plan_name: str,1539        plan_product: str,1540        plan_publisher: str,1541    ) -> Optional[PurchasePlan]:1542        """1543        this method to fill plan, if a VM needs it. If don't fill it, the deployment1544        will be failed.1545        1. Get image_info to check if there is a plan.1546        2. If there is a plan, it may need to check and accept terms.1547        """1548        plan: Optional[AzureVmPurchasePlanSchema] = None1549        # if there is a plan, it may need to accept term.1550        marketplace_client = get_marketplace_ordering_client(self)1551        term: Optional[AgreementTerms] = None1552        try:1553            with global_credential_access_lock:1554                term = marketplace_client.marketplace_agreements.get(1555                    offer_type="virtualmachine",1556                    publisher_id=marketplace.publisher,1557                    offer_id=marketplace.offer,1558                    plan_id=plan_name,1559                )1560        except Exception as identifier:1561            raise LisaException(f"error on getting marketplace agreement: {identifier}")1562        assert term1563        if term.accepted is False:1564            term.accepted = True1565            marketplace_client.marketplace_agreements.create(1566                offer_type="virtualmachine",1567                publisher_id=marketplace.publisher,1568                offer_id=marketplace.offer,1569                plan_id=plan_name,1570                parameters=term,1571            )1572        plan = AzureVmPurchasePlanSchema(1573            name=plan_name,1574            product=plan_product,1575            publisher=plan_publisher,1576        )1577        return plan1578    def _generate_max_capability(self, vm_size: str, location: str) -> AzureCapability:1579        # some vm size cannot be queried from API, so use default capability to1580        # run with best guess on capability.1581        node_space = schema.NodeSpace(1582            node_count=1,1583            core_count=search_space.IntRange(min=1),1584            memory_mb=search_space.IntRange(min=0),1585            gpu_count=search_space.IntRange(min=0),1586        )1587        node_space.disk = features.AzureDiskOptionSettings()1588        node_space.disk.data_disk_count = search_space.IntRange(min=0)1589        node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1590            is_allow_set=True, items=[]1591        )1592        node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1593        node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1594        node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1595        node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1596        node_space.network_interface = schema.NetworkInterfaceOptionSettings()1597        node_space.network_interface.data_path = search_space.SetSpace[1598            schema.NetworkDataPath1599        ](is_allow_set=True, items=[])1600        node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1601        node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1602        node_space.network_interface.nic_count = search_space.IntRange(min=1)1603        # till now, the max nic number supported in Azure is 81604        node_space.network_interface.max_nic_count = 81605        azure_capability = AzureCapability(1606            location=location,1607            vm_size=vm_size,1608            capability=node_space,1609            resource_sku={},1610        )1611        node_space.name = f"{location}_{vm_size}"1612        node_space.features = search_space.SetSpace[schema.FeatureSettings](1613            is_allow_set=True1614        )1615        # all nodes support following features1616        all_features = self.supported_features()1617        node_space.features.update(1618            [schema.FeatureSettings.create(x.name()) for x in all_features]1619        )1620        _convert_to_azure_node_space(node_space)1621        return azure_capability1622    def _generate_min_capability(1623        self,1624        requirement: schema.NodeSpace,1625        azure_capability: AzureCapability,1626        location: str,1627    ) -> schema.NodeSpace:1628        min_cap: schema.NodeSpace = requirement.generate_min_capability(1629            azure_capability.capability1630        )1631        # Apply azure specified values. They will pass into arm template1632        azure_node_runbook = min_cap.get_extended_runbook(AzureNodeSchema, AZURE)1633        if azure_node_runbook.location:1634            assert location in azure_node_runbook.location, (1635                f"predefined location [{azure_node_runbook.location}] "1636                f"must be same as "1637                f"cap location [{location}]"1638            )1639        # the location may not be set1640        azure_node_runbook.location = location1641        azure_node_runbook.vm_size = azure_capability.vm_size1642        return min_cap1643    def _generate_sas_token(self, result_dict: Dict[str, str]) -> Any:1644        sc_name = result_dict["account_name"]1645        container_name = result_dict["container_name"]1646        rg = result_dict["resource_group_name"]1647        blob_name = result_dict["blob_name"]1648        source_container_client = get_or_create_storage_container(1649            credential=self.credential,1650            subscription_id=self.subscription_id,1651            account_name=sc_name,1652            container_name=container_name,1653            resource_group_name=rg,1654        )1655        source_blob = source_container_client.get_blob_client(blob_name)1656        sas_token = generate_sas_token(1657            credential=self.credential,1658            subscription_id=self.subscription_id,1659            account_name=sc_name,1660            resource_group_name=rg,1661        )1662        source_url = source_blob.url + "?" + sas_token1663        return source_url1664    @lru_cache(maxsize=10)  # noqa: B0191665    def _get_deployable_vhd_path(1666        self, vhd_path: str, location: str, log: Logger1667    ) -> str:1668        """1669        The sas url is not able to create a vm directly, so this method check if1670        the vhd_path is a sas url. If so, copy it to a location in current1671        subscription, so it can be deployed.1672        """1673        matches = SAS_URL_PATTERN.match(vhd_path)1674        if not matches:1675            vhd_details = self._get_vhd_details(vhd_path)1676            vhd_location = vhd_details["location"]1677            if location == vhd_location:1678                return vhd_path1679            else:1680                vhd_path = self._generate_sas_token(vhd_details)1681                matches = SAS_URL_PATTERN.match(vhd_path)1682                assert matches, f"fail to generate sas url for {vhd_path}"1683                log.debug(1684                    f"the vhd location {location} is not same with running case "1685                    f"location {vhd_location}, generate a sas url for source vhd, "1686                    f"it needs to be copied into location {location}."1687                )1688        else:1689            log.debug("found the vhd is a sas url, it may need to be copied.")1690        # get original vhd's hash key for comparing.1691        original_key: Optional[bytearray] = None1692        original_vhd_path = vhd_path1693        original_blob_client = BlobClient.from_blob_url(original_vhd_path)1694        properties = original_blob_client.get_blob_properties()1695        if properties.content_settings:1696            original_key = properties.content_settings.get(1697                "content_md5", None1698            )  # type: ignore1699        storage_name = get_storage_account_name(1700            subscription_id=self.subscription_id, location=location, type="t"1701        )1702        check_or_create_storage_account(1703            self.credential,1704            self.subscription_id,1705            storage_name,1706            self._azure_runbook.shared_resource_group_name,1707            location,1708            log,1709        )1710        container_client = get_or_create_storage_container(1711            credential=self.credential,1712            subscription_id=self.subscription_id,1713            account_name=storage_name,1714            container_name=SAS_COPIED_CONTAINER_NAME,1715            resource_group_name=self._azure_runbook.shared_resource_group_name,1716        )1717        normalized_vhd_name = constants.NORMALIZE_PATTERN.sub("-", vhd_path)1718        year = matches["year"] if matches["year"] else "9999"1719        month = matches["month"] if matches["month"] else "01"1720        day = matches["day"] if matches["day"] else "01"1721        # use the expire date to generate the path. It's easy to identify when1722        # the cache can be removed.1723        vhd_path = f"{year}{month}{day}/{normalized_vhd_name}.vhd"1724        full_vhd_path = f"{container_client.url}/{vhd_path}"1725        # lock here to prevent a vhd is copied in multi-thread1726        global _global_sas_vhd_copy_lock1727        cached_key: Optional[bytearray] = None1728        with _global_sas_vhd_copy_lock:1729            blobs = container_client.list_blobs(name_starts_with=vhd_path)1730            for blob in blobs:1731                if blob:1732                    # check if hash key matched with original key.1733                    if blob.content_settings:1734                        cached_key = blob.content_settings.get("content_md5", None)1735                    if original_key == cached_key:1736                        # if it exists, return the link, not to copy again.1737                        log.debug("the sas url is copied already, use it directly.")1738                        return full_vhd_path1739                    else:1740                        log.debug("found cached vhd, but the hash key mismatched.")1741            blob_client = container_client.get_blob_client(vhd_path)1742            blob_client.start_copy_from_url(1743                original_vhd_path, metadata=None, incremental_copy=False1744            )1745            wait_copy_blob(blob_client, vhd_path, log)1746        return full_vhd_path1747    def _get_vhd_details(self, vhd_path: str) -> Any:1748        matched = STORAGE_CONTAINER_BLOB_PATTERN.match(vhd_path)1749        assert matched, f"fail to get matched info from {vhd_path}"1750        sc_name = matched.group("sc")1751        container_name = matched.group("container")1752        blob_name = matched.group("blob")1753        storage_client = get_storage_client(self.credential, self.subscription_id)1754        sc = [x for x in storage_client.storage_accounts.list() if x.name == sc_name]1755        assert sc[1756            01757        ], f"fail to get storage account {sc_name} from {self.subscription_id}"1758        rg = get_matched_str(sc[0].id, RESOURCE_GROUP_PATTERN)1759        return {1760            "location": sc[0].location,1761            "resource_group_name": rg,1762            "account_name": sc_name,1763            "container_name": container_name,1764            "blob_name": blob_name,1765        }1766    def _generate_data_disks(1767        self,1768        node: Node,1769        azure_node_runbook: AzureNodeArmParameter,1770    ) -> List[DataDiskSchema]:1771        data_disks: List[DataDiskSchema] = []1772        assert node.capability.disk1773        if azure_node_runbook.marketplace:1774            marketplace = self._get_image_info(1775                azure_node_runbook.location, azure_node_runbook.marketplace1776            )1777            # some images has data disks by default1778            # e.g. microsoft-ads linux-data-science-vm linuxdsvm 21.05.271779            # we have to inject below part when dataDisks section added in1780            #  arm template,1781            # otherwise will see below exception:1782            # deployment failed: InvalidParameter: StorageProfile.dataDisks.lun1783            #  does not have required value(s) for image specified in1784            #  storage profile.1785            for default_data_disk in marketplace.data_disk_images:1786                data_disks.append(1787                    DataDiskSchema(1788                        node.capability.disk.data_disk_caching_type,1789                        default_data_disk.additional_properties["sizeInGb"],1790                        azure_node_runbook.disk_type,1791                        DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_FROM_IMAGE,1792                    )1793                )1794        assert isinstance(1795            node.capability.disk.data_disk_count, int1796        ), f"actual: {type(node.capability.disk.data_disk_count)}"1797        for _ in range(node.capability.disk.data_disk_count):1798            assert isinstance(node.capability.disk.data_disk_size, int)1799            data_disks.append(1800                DataDiskSchema(1801                    node.capability.disk.data_disk_caching_type,1802                    node.capability.disk.data_disk_size,1803                    azure_node_runbook.disk_type,1804                    DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY,1805                )1806            )1807        return data_disks1808    @lru_cache(maxsize=10)  # noqa: B0191809    def _get_image_info(1810        self, location: str, marketplace: Optional[AzureVmMarketplaceSchema]1811    ) -> VirtualMachineImage:1812        compute_client = get_compute_client(self)1813        assert isinstance(marketplace, AzureVmMarketplaceSchema)1814        with global_credential_access_lock:1815            image_info = compute_client.virtual_machine_images.get(1816                location=location,1817                publisher_name=marketplace.publisher,1818                offer=marketplace.offer,1819                skus=marketplace.sku,1820                version=marketplace.version,1821            )1822        return image_info1823    def _get_location_key(self, location: str) -> str:1824        return f"{self.subscription_id}_{location}"1825    def _enable_ssh_on_windows(self, node: Node) -> None:1826        runbook = node.capability.get_extended_runbook(AzureNodeSchema)1827        if runbook.is_linux:1828            return1829        context = get_node_context(node)1830        remote_node = cast(RemoteNode, node)1831        log = node.log1832        log.debug(1833            f"checking if SSH port {remote_node.public_port} is reachable "1834            f"on {remote_node.name}..."1835        )1836        connected, _ = wait_tcp_port_ready(1837            address=remote_node.public_address,1838            port=remote_node.public_port,1839            log=log,1840            timeout=3,1841        )1842        if connected:1843            log.debug("SSH port is reachable.")1844            return1845        log.debug("SSH port is not open, enabling ssh on Windows ...")1846        # The SSH port is not opened, try to enable it.1847        public_key_data = get_public_key_data(self.runbook.admin_private_key_file)1848        with open(Path(__file__).parent / "Enable-SSH.ps1", "r") as f:1849            script = f.read()1850        parameters = RunCommandInputParameter(name="PublicKey", value=public_key_data)1851        command = RunCommandInput(1852            command_id="RunPowerShellScript",1853            script=[script],1854            parameters=[parameters],1855        )1856        compute_client = get_compute_client(self)1857        operation = compute_client.virtual_machines.begin_run_command(1858            resource_group_name=context.resource_group_name,1859            vm_name=context.vm_name,1860            parameters=command,1861        )1862        result = wait_operation(operation=operation, failure_identity="enable ssh")1863        log.debug("SSH script result:")1864        log.dump_json(logging.DEBUG, result)1865    def _get_vhd_os_disk_size(self, blob_url: str) -> int:1866        result_dict = self._get_vhd_details(blob_url)1867        container_client = get_or_create_storage_container(1868            credential=self.credential,1869            subscription_id=self.subscription_id,1870            account_name=result_dict["account_name"],1871            container_name=result_dict["container_name"],1872            resource_group_name=result_dict["resource_group_name"],1873        )1874        vhd_blob = container_client.get_blob_client(result_dict["blob_name"])1875        properties = vhd_blob.get_blob_properties()1876        assert properties.size, f"fail to get blob size of {blob_url}"1877        # Azure requires only megabyte alignment of vhds, round size up1878        # for cases where the size is megabyte aligned1879        return math.ceil(properties.size / 1024 / 1024 / 1024)1880    def _get_sig_info(1881        self, shared_image: SharedImageGallerySchema1882    ) -> GalleryImageVersion:1883        compute_client = get_compute_client(self)1884        return compute_client.gallery_image_versions.get(1885            resource_group_name=shared_image.resource_group_name,1886            gallery_name=shared_image.image_gallery,1887            gallery_image_name=shared_image.image_definition,1888            gallery_image_version_name=shared_image.image_version,1889            expand="ReplicationStatus",1890        )1891    def _get_sig_os_disk_size(self, shared_image: SharedImageGallerySchema) -> int:1892        found_image = self._get_sig_info(shared_image)1893        assert found_image.storage_profile.os_disk_image.size_in_gb1894        return int(found_image.storage_profile.os_disk_image.size_in_gb)1895    def _get_normalized_vm_sizes(1896        self, name: str, location: str, log: Logger1897    ) -> List[str]:1898        split_vm_sizes: List[str] = [x.strip() for x in name.split(",")]1899        for index, vm_size in enumerate(split_vm_sizes):1900            split_vm_sizes[index] = self._get_normalized_vm_size(vm_size, location, log)1901        return [x for x in split_vm_sizes if x]1902    def _get_normalized_vm_size(self, name: str, location: str, log: Logger) -> str:1903        # find predefined vm size on all available's.1904        location_info: AzureLocation = self.get_location_info(location, log)1905        matched_score: float = 01906        matched_name: str = ""1907        matcher = SequenceMatcher(None, name.lower(), "")1908        for vm_size in location_info.capabilities:1909            matcher.set_seq2(vm_size.lower())1910            if name.lower() in vm_size.lower() and matched_score < matcher.ratio():1911                matched_name = vm_size1912                matched_score = matcher.ratio()1913        return matched_name1914    def _get_capabilities(1915        self, vm_sizes: List[str], location: str, use_max_capability: bool, log: Logger1916    ) -> List[AzureCapability]:1917        candidate_caps: List[AzureCapability] = []1918        caps = self.get_location_info(location, log).capabilities1919        for vm_size in vm_sizes:1920            # force to use max capability to run test cases as much as possible,1921            # or force to support non-exists vm size.1922            if use_max_capability:1923                candidate_caps.append(self._generate_max_capability(vm_size, location))1924                continue1925            if vm_size in caps:1926                candidate_caps.append(caps[vm_size])1927        return candidate_caps1928    def _get_matched_capability(1929        self,1930        requirement: schema.NodeSpace,1931        candidate_capabilities: List[AzureCapability],1932    ) -> Optional[schema.NodeSpace]:1933        matched_cap: Optional[schema.NodeSpace] = None1934        # filter allowed vm sizes1935        for azure_cap in candidate_capabilities:1936            check_result = requirement.check(azure_cap.capability)1937            if check_result.result:1938                min_cap = self._generate_min_capability(1939                    requirement, azure_cap, azure_cap.location1940                )1941                matched_cap = min_cap1942                break1943        return matched_cap1944    def _get_matched_capabilities(1945        self, location: str, nodes_requirement: List[schema.NodeSpace], log: Logger1946    ) -> Tuple[List[Union[schema.NodeSpace, bool]], str]:1947        # capability or if it's able to wait.1948        caps: List[Union[schema.NodeSpace, bool]] = [False] * len(nodes_requirement)1949        # one of errors for all requirements. It's enough for troubleshooting.1950        error: str = ""1951        # get allowed vm sizes. Either it's from the runbook defined, or1952        # from subscription supported .1953        for req_index, req in enumerate(nodes_requirement):1954            candidate_caps, sub_error = self._get_allowed_capabilities(1955                req, location, log1956            )1957            if sub_error:1958                # no candidate found, so try next one....Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
