Best Python code snippet using lisa_python
platform_.py
Source:platform_.py  
...390        else:391            normalized_name = constants.NORMALIZE_PATTERN.sub("-", constants.RUN_NAME)392            # Take last chars to make sure the length is to exceed max 90 chars393            # allowed in resource group name.394            resource_group_name = truncate_keep_prefix(395                f"{normalized_name}-e{environment.id}", 80396            )397            environment_context.resource_group_is_specified = True398        environment_context.resource_group_name = resource_group_name399        if self._azure_runbook.dry_run:400            log.info(f"dry_run: {self._azure_runbook.dry_run}")401        else:402            try:403                if self._azure_runbook.deploy:404                    log.info(405                        f"creating or updating resource group: [{resource_group_name}]"406                    )407                    check_or_create_resource_group(408                        self.credential,409                        subscription_id=self.subscription_id,410                        resource_group_name=resource_group_name,411                        location=RESOURCE_GROUP_LOCATION,412                        log=log,413                    )414                else:415                    log.info(f"reusing resource group: [{resource_group_name}]")416                location, deployment_parameters = self._create_deployment_parameters(417                    resource_group_name, environment, log418                )419                if self._azure_runbook.deploy:420                    self._validate_template(deployment_parameters, log)421                    self._deploy(location, deployment_parameters, log)422                # Even skipped deploy, try best to initialize nodes423                self.initialize_environment(environment, log)424            except Exception as identifier:425                self._delete_environment(environment, log)426                raise identifier427    def _delete_environment(self, environment: Environment, log: Logger) -> None:428        environment_context = get_environment_context(environment=environment)429        resource_group_name = environment_context.resource_group_name430        # the resource group name is empty when it is not deployed for some reasons,431        # like capability doesn't meet case requirement.432        if not resource_group_name:433            return434        assert self._azure_runbook435        if not environment_context.resource_group_is_specified:436            log.info(437                f"skipped to delete resource group: {resource_group_name}, "438                f"as it's specified in runbook."439            )440        elif self._azure_runbook.dry_run:441            log.info(442                f"skipped to delete resource group: {resource_group_name}, "443                f"as it's a dry run."444            )445        else:446            assert self._rm_client447            az_rg_exists = self._rm_client.resource_groups.check_existence(448                resource_group_name449            )450            if not az_rg_exists:451                return452            log.info(453                f"deleting resource group: {resource_group_name}, "454                f"wait: {self._azure_runbook.wait_delete}"455            )456            try:457                self._delete_boot_diagnostic_container(resource_group_name, log)458            except Exception as identifier:459                log.debug(460                    f"exception on deleting boot diagnostic container: {identifier}"461                )462            delete_operation: Any = None463            try:464                delete_operation = self._rm_client.resource_groups.begin_delete(465                    resource_group_name466                )467            except Exception as identifier:468                log.debug(f"exception on delete resource group: {identifier}")469            if delete_operation and self._azure_runbook.wait_delete:470                wait_operation(471                    delete_operation, failure_identity="delete resource group"472                )473            else:474                log.debug("not wait deleting")475    def _delete_boot_diagnostic_container(476        self, resource_group_name: str, log: Logger477    ) -> None:478        compute_client = get_compute_client(self)479        vms = compute_client.virtual_machines.list(resource_group_name)480        for vm in vms:481            diagnostic_data = (482                compute_client.virtual_machines.retrieve_boot_diagnostics_data(483                    resource_group_name=resource_group_name, vm_name=vm.name484                )485            )486            if not diagnostic_data:487                continue488            # A sample url,489            # https://storageaccountname.blob.core.windows.net:443/490            # bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70/491            # node-0.30779088-9b10-4074-8c27-98b91f1d8b70.serialconsole.log492            # ?sv=2018-03-28&sr=b&sig=mJEsvk9WunbKHfBs1lo1jcIBe4owq1brP8Kw3qXTQJA%3d&493            # se=2021-09-14T08%3a55%3a38Z&sp=r494            blob_uri = diagnostic_data.console_screenshot_blob_uri495            if blob_uri:496                matched = self._diagnostic_storage_container_pattern.match(blob_uri)497                assert matched498                # => storageaccountname499                storage_name = matched.group("storage_name")500                # => bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70501                container_name = matched.group("container_name")502                container_client = get_or_create_storage_container(503                    credential=self.credential,504                    subscription_id=self.subscription_id,505                    account_name=storage_name,506                    container_name=container_name,507                    resource_group_name=self._azure_runbook.shared_resource_group_name,508                )509                log.debug(510                    f"deleting boot diagnostic container: {container_name}"511                    f" under storage account {storage_name} of vm {vm.name}"512                )513                try:514                    container_client.delete_container()515                except Exception as identifier:516                    log.debug(517                        f"exception on deleting boot diagnostic container:"518                        f" {identifier}"519                    )520    def _get_node_information(self, node: Node) -> Dict[str, str]:521        information: Dict[str, Any] = {}522        node.log.debug("detecting lis version...")523        modinfo = node.tools[Modinfo]524        information["lis_version"] = modinfo.get_version("hv_vmbus")525        node.log.debug("detecting vm generation...")526        information[KEY_VM_GENERATION] = node.tools[VmGeneration].get_generation()527        node.log.debug(f"vm generation: {information[KEY_VM_GENERATION]}")528        return information529    def _get_kernel_version(self, node: Node) -> str:530        result: str = ""531        if not result and hasattr(node, ATTRIBUTE_FEATURES):532            # try to get kernel version in Azure. use it, when uname doesn't work533            node.log.debug("detecting kernel version from serial log...")534            serial_console = node.features[features.SerialConsole]535            result = serial_console.get_matched_str(KERNEL_VERSION_PATTERN)536        return result537    def _get_host_version(self, node: Node) -> str:538        result: str = ""539        try:540            if node.is_connected and node.is_posix:541                node.log.debug("detecting host version from dmesg...")542                dmesg = node.tools[Dmesg]543                result = get_matched_str(544                    dmesg.get_output(), HOST_VERSION_PATTERN, first_match=False545                )546        except Exception as identifier:547            # it happens on some error vms. Those error should be caught earlier in548            # test cases not here. So ignore any error here to collect information only.549            node.log.debug(f"error on run dmesg: {identifier}")550        # if not get, try again from serial console log.551        # skip if node is not initialized.552        if not result and hasattr(node, ATTRIBUTE_FEATURES):553            node.log.debug("detecting host version from serial log...")554            serial_console = node.features[features.SerialConsole]555            result = serial_console.get_matched_str(HOST_VERSION_PATTERN)556        return result557    def _get_wala_version(self, node: Node) -> str:558        result = ""559        try:560            if node.is_connected and node.is_posix:561                node.log.debug("detecting wala version from waagent...")562                waagent = node.tools[Waagent]563                result = waagent.get_version()564        except Exception as identifier:565            # it happens on some error vms. Those error should be caught earlier in566            # test cases not here. So ignore any error here to collect information only.567            node.log.debug(f"error on run waagent: {identifier}")568        if not result and hasattr(node, ATTRIBUTE_FEATURES):569            node.log.debug("detecting wala agent version from serial log...")570            serial_console = node.features[features.SerialConsole]571            result = serial_console.get_matched_str(WALA_VERSION_PATTERN)572        return result573    def _get_wala_distro_version(self, node: Node) -> str:574        result = "Unknown"575        try:576            if node.is_connected and node.is_posix:577                waagent = node.tools[Waagent]578                result = waagent.get_distro_version()579        except Exception as identifier:580            # it happens on some error vms. Those error should be caught earlier in581            # test cases not here. So ignore any error here to collect information only.582            node.log.debug(f"error on get waagent distro version: {identifier}")583        return result584    def _get_platform_information(self, environment: Environment) -> Dict[str, str]:585        result: Dict[str, str] = {}586        azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(587            AzurePlatformSchema588        )589        result[AZURE_RG_NAME_KEY] = get_environment_context(590            environment591        ).resource_group_name592        if azure_runbook.availability_set_properties:593            for (594                property_name,595                property_value,596            ) in azure_runbook.availability_set_properties.items():597                if property_name in [598                    "platformFaultDomainCount",599                    "platformUpdateDomainCount",600                ]:601                    continue602                if isinstance(property_value, dict):603                    for key, value in property_value.items():604                        if value:605                            result[key] = value606        if azure_runbook.availability_set_tags:607            for key, value in azure_runbook.availability_set_tags.items():608                if value:609                    result[key] = value610        if azure_runbook.vm_tags:611            for key, value in azure_runbook.vm_tags.items():612                if value:613                    result[key] = value614        return result615    def _get_environment_information(self, environment: Environment) -> Dict[str, str]:616        information: Dict[str, str] = {}617        node_runbook: Optional[AzureNodeSchema] = None618        if environment.nodes:619            node: Optional[Node] = environment.default_node620        else:621            node = None622        if node:623            node_runbook = node.capability.get_extended_runbook(AzureNodeSchema, AZURE)624            for key, method in self._environment_information_hooks.items():625                node.log.debug(f"detecting {key} ...")626                try:627                    value = method(node)628                    if value:629                        information[key] = value630                except Exception as identifier:631                    node.log.exception(f"error on get {key}.", exc_info=identifier)632            information.update(self._get_platform_information(environment))633            if node.is_connected and node.is_posix:634                information.update(self._get_node_information(node))635        elif environment.capability and environment.capability.nodes:636            # get deployment information, if failed on preparing phase637            node_space = environment.capability.nodes[0]638            node_runbook = node_space.get_extended_runbook(639                AzureNodeSchema, type_name=AZURE640            )641        if node_runbook:642            information["location"] = node_runbook.location643            information["vmsize"] = node_runbook.vm_size644            information["image"] = node_runbook.get_image_name()645        return information646    def _initialize(self, *args: Any, **kwargs: Any) -> None:647        # set needed environment variables for authentication648        azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(649            AzurePlatformSchema650        )651        assert azure_runbook, "platform runbook cannot be empty"652        self._azure_runbook = azure_runbook653        self.subscription_id = azure_runbook.subscription_id654        self._initialize_credential()655        check_or_create_resource_group(656            self.credential,657            self.subscription_id,658            azure_runbook.shared_resource_group_name,659            RESOURCE_GROUP_LOCATION,660            self._log,661        )662        self._rm_client = get_resource_management_client(663            self.credential, self.subscription_id664        )665    def _initialize_credential(self) -> None:666        azure_runbook = self._azure_runbook667        credential_key = (668            f"{azure_runbook.service_principal_tenant_id}_"669            f"{azure_runbook.service_principal_client_id}"670        )671        credential = self._credentials.get(credential_key, None)672        if not credential:673            # set azure log to warn level only674            logging.getLogger("azure").setLevel(azure_runbook.log_level)675            if azure_runbook.service_principal_tenant_id:676                os.environ[677                    "AZURE_TENANT_ID"678                ] = azure_runbook.service_principal_tenant_id679            if azure_runbook.service_principal_client_id:680                os.environ[681                    "AZURE_CLIENT_ID"682                ] = azure_runbook.service_principal_client_id683            if azure_runbook.service_principal_key:684                os.environ["AZURE_CLIENT_SECRET"] = azure_runbook.service_principal_key685            credential = DefaultAzureCredential()686            with SubscriptionClient(credential) as self._sub_client:687                # suppress warning message by search for different credential types688                azure_identity_logger = logging.getLogger("azure.identity")689                azure_identity_logger.setLevel(logging.ERROR)690                with global_credential_access_lock:691                    subscription = self._sub_client.subscriptions.get(692                        self.subscription_id693                    )694                azure_identity_logger.setLevel(logging.WARN)695            if not subscription:696                raise LisaException(697                    f"Cannot find subscription id: '{self.subscription_id}'. "698                    f"Make sure it exists and current account can access it."699                )700            self._log.info(701                f"connected to subscription: "702                f"{subscription.id}, '{subscription.display_name}'"703            )704            self._credentials[credential_key] = credential705        self.credential = credential706    def _load_template(self) -> Any:707        if self._arm_template is None:708            template_file_path = Path(__file__).parent / "arm_template.json"709            with open(template_file_path, "r") as f:710                self._arm_template = json.load(f)711        return self._arm_template712    @retry(tries=10, delay=1, jitter=(0.5, 1))713    def _load_location_info_from_file(714        self, cached_file_name: Path, log: Logger715    ) -> Optional[AzureLocation]:716        loaded_obj: Optional[AzureLocation] = None717        if cached_file_name.exists():718            try:719                with open(cached_file_name, "r") as f:720                    loaded_data: Dict[str, Any] = json.load(f)721                loaded_obj = schema.load_by_type(AzureLocation, loaded_data)722            except Exception as identifier:723                # if schema changed, There may be exception, remove cache and retry724                # Note: retry on this method depends on decorator725                log.debug(726                    f"error on loading cache, delete cache and retry. {identifier}"727                )728                cached_file_name.unlink()729                raise identifier730        return loaded_obj731    def get_location_info(self, location: str, log: Logger) -> AzureLocation:732        cached_file_name = constants.CACHE_PATH.joinpath(733            f"azure_locations_{location}.json"734        )735        should_refresh: bool = True736        key = self._get_location_key(location)737        location_data = self._locations_data_cache.get(key, None)738        if not location_data:739            location_data = self._load_location_info_from_file(740                cached_file_name=cached_file_name, log=log741            )742        if location_data:743            delta = datetime.now() - location_data.updated_time744            # refresh cached locations every 1 day.745            if delta.days < 1:746                should_refresh = False747            else:748                log.debug(749                    f"{key}: cache timeout: {location_data.updated_time},"750                    f"sku count: {len(location_data.capabilities)}"751                )752        else:753            log.debug(f"{key}: no cache found")754        if should_refresh:755            compute_client = get_compute_client(self)756            log.debug(f"{key}: querying")757            all_skus: Dict[str, AzureCapability] = dict()758            paged_skus = compute_client.resource_skus.list(759                f"location eq '{location}'"760            ).by_page()761            for skus in paged_skus:762                for sku_obj in skus:763                    try:764                        if sku_obj.resource_type == "virtualMachines":765                            if sku_obj.restrictions and any(766                                restriction.type == "Location"767                                for restriction in sku_obj.restrictions768                            ):769                                # restricted on this location770                                continue771                            resource_sku = sku_obj.as_dict()772                            capability = self._resource_sku_to_capability(773                                location, sku_obj774                            )775                            # estimate vm cost for priority776                            assert isinstance(capability.core_count, int)777                            assert isinstance(capability.gpu_count, int)778                            azure_capability = AzureCapability(779                                location=location,780                                vm_size=sku_obj.name,781                                capability=capability,782                                resource_sku=resource_sku,783                            )784                            all_skus[azure_capability.vm_size] = azure_capability785                    except Exception as identifier:786                        log.error(f"unknown sku: {sku_obj}")787                        raise identifier788            location_data = AzureLocation(location=location, capabilities=all_skus)789            log.debug(f"{location}: saving to disk")790            with open(cached_file_name, "w") as f:791                json.dump(location_data.to_dict(), f)  # type: ignore792            log.debug(f"{key}: new data, " f"sku: {len(location_data.capabilities)}")793        assert location_data794        self._locations_data_cache[key] = location_data795        return location_data796    def _create_deployment_parameters(797        self, resource_group_name: str, environment: Environment, log: Logger798    ) -> Tuple[str, Dict[str, Any]]:799        assert environment.runbook, "env data cannot be None"800        assert environment.runbook.nodes_requirement, "node requirement cannot be None"801        log.debug("creating deployment")802        # construct parameters803        arm_parameters = AzureArmParameter()804        copied_fields = [805            "availability_set_tags",806            "availability_set_properties",807            "vm_tags",808        ]809        set_filtered_fields(self._azure_runbook, arm_parameters, copied_fields)810        is_windows: bool = False811        arm_parameters.admin_username = self.runbook.admin_username812        if self.runbook.admin_private_key_file:813            arm_parameters.admin_key_data = get_public_key_data(814                self.runbook.admin_private_key_file815            )816        else:817            arm_parameters.admin_password = self.runbook.admin_password818        environment_context = get_environment_context(environment=environment)819        arm_parameters.vm_tags["RG"] = environment_context.resource_group_name820        # get local lisa environment821        arm_parameters.vm_tags["lisa_username"] = local().tools[Whoami].get_username()822        arm_parameters.vm_tags["lisa_hostname"] = local().tools[Hostname].get_hostname()823        nodes_parameters: List[AzureNodeArmParameter] = []824        features_settings: Dict[str, schema.FeatureSettings] = {}825        for node_space in environment.runbook.nodes_requirement:826            assert isinstance(827                node_space, schema.NodeSpace828            ), f"actual: {type(node_space)}"829            azure_node_runbook = node_space.get_extended_runbook(830                AzureNodeSchema, type_name=AZURE831            )832            # Subscription Id is used by Shared Gallery images located833            # in subscription different from where LISA is run834            azure_node_runbook.subscription_id = self.subscription_id835            # init node836            node = environment.create_node_from_requirement(837                node_space,838            )839            azure_node_runbook = self._create_node_runbook(840                len(nodes_parameters), node_space, log, resource_group_name841            )842            # save parsed runbook back, for example, the version of marketplace may be843            # parsed from latest to a specified version.844            node.capability.set_extended_runbook(azure_node_runbook)845            node_arm_parameters = self._create_node_arm_parameters(node.capability, log)846            nodes_parameters.append(node_arm_parameters)847            # Set data disk array848            arm_parameters.data_disks = self._generate_data_disks(849                node, node_arm_parameters850            )851            if not arm_parameters.location:852                # take first one's location853                arm_parameters.location = azure_node_runbook.location854            # save vm's information into node855            node_context = get_node_context(node)856            node_context.resource_group_name = environment_context.resource_group_name857            # vm's name, use to find it from azure858            node_context.vm_name = azure_node_runbook.name859            # ssh related information will be filled back once vm is created. If860            # it's Windows, fill in the password always. If it's Linux, the861            # private key has higher priority.862            node_context.username = arm_parameters.admin_username863            if azure_node_runbook.is_linux:864                node_context.password = arm_parameters.admin_password865            else:866                is_windows = True867                if not self.runbook.admin_password:868                    # password is required, if it doesn't present, generate one.869                    password = generate_random_chars()870                    add_secret(password)871                    self.runbook.admin_password = password872                node_context.password = self.runbook.admin_password873            node_context.private_key_file = self.runbook.admin_private_key_file874            # collect all features to handle special deployment logic. If one875            # node has this, it needs to run.876            if node.capability.features:877                for f in node.capability.features:878                    if f.type not in features_settings:879                        features_settings[f.type] = f880            log.info(f"vm setting: {azure_node_runbook}")881        if is_windows:882            # set password for windows any time.883            arm_parameters.admin_password = self.runbook.admin_password884        arm_parameters.nodes = nodes_parameters885        arm_parameters.storage_name = get_storage_account_name(886            self.subscription_id, arm_parameters.location887        )888        if (889            self._azure_runbook.availability_set_properties890            or self._azure_runbook.availability_set_tags891        ):892            arm_parameters.use_availability_sets = True893        # In Azure, each VM should have only one nic in one subnet. So calculate894        # the max nic count, and set to subnet count.895        arm_parameters.subnet_count = max(x.nic_count for x in arm_parameters.nodes)896        arm_parameters.shared_resource_group_name = (897            self._azure_runbook.shared_resource_group_name898        )899        # the arm template may be updated by the hooks, so make a copy to avoid900        # the original template is modified.901        template = deepcopy(self._load_template())902        plugin_manager.hook.azure_update_arm_template(903            template=template, environment=environment904        )905        # change deployment for each feature.906        for f in features_settings.values():907            feature_type = next(908                x for x in self.supported_features() if x.name() == f.type909            )910            feature_type.on_before_deployment(911                arm_parameters=arm_parameters,912                template=template,913                settings=f,914                environment=environment,915                log=log,916            )917        # composite deployment properties918        parameters = arm_parameters.to_dict()  # type:ignore919        parameters = {k: {"value": v} for k, v in parameters.items()}920        log.debug(f"parameters: {parameters}")921        deployment_properties = DeploymentProperties(922            mode=DeploymentMode.incremental,923            template=template,924            parameters=parameters,925        )926        # dump arm_template and arm_parameters to file927        template_dump_path = environment.log_path / "arm_template.json"928        param_dump_path = environment.log_path / "arm_template_parameters.json"929        dump_file(template_dump_path, json.dumps(template, indent=4))930        dump_file(param_dump_path, json.dumps(parameters, indent=4))931        return (932            arm_parameters.location,933            {934                AZURE_RG_NAME_KEY: resource_group_name,935                "deployment_name": AZURE_DEPLOYMENT_NAME,936                "parameters": Deployment(properties=deployment_properties),937            },938        )939    def _create_node_runbook(940        self,941        index: int,942        node_space: schema.NodeSpace,943        log: Logger,944        name_prefix: str,945    ) -> AzureNodeSchema:946        azure_node_runbook = node_space.get_extended_runbook(947            AzureNodeSchema, type_name=AZURE948        )949        if not azure_node_runbook.name:950            # the max length of vm name is 64 chars. Below logic takes last 45951            # chars in resource group name and keep the leading 5 chars.952            # name_prefix can contain any of customized (existing) or953            # generated (starts with "lisa-") resource group name,954            # so, pass the first 5 chars as prefix to truncate_keep_prefix955            # to handle both cases956            node_name = f"{name_prefix}-n{index}"957            azure_node_runbook.name = truncate_keep_prefix(node_name, 50, node_name[:5])958        # It's used as computer name only. Windows doesn't support name more959        # than 15 chars960        azure_node_runbook.short_name = truncate_keep_prefix(961            azure_node_runbook.name, 15, azure_node_runbook.name[:5]962        )963        if not azure_node_runbook.vm_size:964            raise LisaException("vm_size is not detected before deploy")965        if not azure_node_runbook.location:966            raise LisaException("location is not detected before deploy")967        if azure_node_runbook.hyperv_generation not in [1, 2]:968            raise LisaException(969                "hyperv_generation need value 1 or 2, "970                f"but {azure_node_runbook.hyperv_generation}",971            )972        if azure_node_runbook.vhd:973            # vhd is higher priority974            azure_node_runbook.vhd = self._get_deployable_vhd_path(...__init__.py
Source:__init__.py  
...454        mm_field=field_function(*args, **kwargs),455    )456def is_unittest() -> bool:457    return "unittest" in sys.argv[0]458def truncate_keep_prefix(content: str, kept_len: int, prefix: str = "lisa-") -> str:459    """460    This method is used to truncate names, when some resource has length461    limitation. It keeps meaningful part and the defined prefix.462    To support custom names. if the string size doesn't exceed the limitation,463    there is no any validation and truncate.464    The last chars include the datetime pattern, it's more unique than leading465    project/test pass names. The name is used to identify lisa deployed466    environment too, so it needs to keep the leading "lisa-" after truncated.467    This makes the name from lisa-long-name... to lisa-name...468    """469    # do nothing, if the string is shorter than required.470    if len(content) <= kept_len:471        return content472    if not content.startswith(prefix):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
