Best Python code snippet using lisa_python
platform_.py
Source:platform_.py
...403 if self._azure_runbook.deploy:404 log.info(405 f"creating or updating resource group: [{resource_group_name}]"406 )407 check_or_create_resource_group(408 self.credential,409 subscription_id=self.subscription_id,410 resource_group_name=resource_group_name,411 location=RESOURCE_GROUP_LOCATION,412 log=log,413 )414 else:415 log.info(f"reusing resource group: [{resource_group_name}]")416 location, deployment_parameters = self._create_deployment_parameters(417 resource_group_name, environment, log418 )419 if self._azure_runbook.deploy:420 self._validate_template(deployment_parameters, log)421 self._deploy(location, deployment_parameters, log)422 # Even skipped deploy, try best to initialize nodes423 self.initialize_environment(environment, log)424 except Exception as identifier:425 self._delete_environment(environment, log)426 raise identifier427 def _delete_environment(self, environment: Environment, log: Logger) -> None:428 environment_context = get_environment_context(environment=environment)429 resource_group_name = environment_context.resource_group_name430 # the resource group name is empty when it is not deployed for some reasons,431 # like capability doesn't meet case requirement.432 if not resource_group_name:433 return434 assert self._azure_runbook435 if not environment_context.resource_group_is_specified:436 log.info(437 f"skipped to delete resource group: {resource_group_name}, "438 f"as it's specified in runbook."439 )440 elif self._azure_runbook.dry_run:441 log.info(442 f"skipped to delete resource group: {resource_group_name}, "443 f"as it's a dry run."444 )445 else:446 assert self._rm_client447 az_rg_exists = self._rm_client.resource_groups.check_existence(448 resource_group_name449 )450 if not az_rg_exists:451 return452 log.info(453 f"deleting resource group: {resource_group_name}, "454 f"wait: {self._azure_runbook.wait_delete}"455 )456 try:457 self._delete_boot_diagnostic_container(resource_group_name, log)458 except Exception as identifier:459 log.debug(460 f"exception on deleting boot diagnostic container: {identifier}"461 )462 delete_operation: Any = None463 try:464 delete_operation = self._rm_client.resource_groups.begin_delete(465 resource_group_name466 )467 except Exception as identifier:468 log.debug(f"exception on delete resource group: {identifier}")469 if delete_operation and self._azure_runbook.wait_delete:470 wait_operation(471 delete_operation, failure_identity="delete resource group"472 )473 else:474 log.debug("not wait deleting")475 def _delete_boot_diagnostic_container(476 self, resource_group_name: str, log: Logger477 ) -> None:478 compute_client = get_compute_client(self)479 vms = compute_client.virtual_machines.list(resource_group_name)480 for vm in vms:481 diagnostic_data = (482 compute_client.virtual_machines.retrieve_boot_diagnostics_data(483 resource_group_name=resource_group_name, vm_name=vm.name484 )485 )486 if not diagnostic_data:487 continue488 # A sample url,489 # https://storageaccountname.blob.core.windows.net:443/490 # bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70/491 # node-0.30779088-9b10-4074-8c27-98b91f1d8b70.serialconsole.log492 # ?sv=2018-03-28&sr=b&sig=mJEsvk9WunbKHfBs1lo1jcIBe4owq1brP8Kw3qXTQJA%3d&493 # se=2021-09-14T08%3a55%3a38Z&sp=r494 blob_uri = diagnostic_data.console_screenshot_blob_uri495 if blob_uri:496 matched = self._diagnostic_storage_container_pattern.match(blob_uri)497 assert matched498 # => storageaccountname499 storage_name = matched.group("storage_name")500 # => bootdiagnostics-node0-30779088-9b10-4074-8c27-98b91f1d8b70501 container_name = matched.group("container_name")502 container_client = get_or_create_storage_container(503 credential=self.credential,504 subscription_id=self.subscription_id,505 account_name=storage_name,506 container_name=container_name,507 resource_group_name=self._azure_runbook.shared_resource_group_name,508 )509 log.debug(510 f"deleting boot diagnostic container: {container_name}"511 f" under storage account {storage_name} of vm {vm.name}"512 )513 try:514 container_client.delete_container()515 except Exception as identifier:516 log.debug(517 f"exception on deleting boot diagnostic container:"518 f" {identifier}"519 )520 def _get_node_information(self, node: Node) -> Dict[str, str]:521 information: Dict[str, Any] = {}522 node.log.debug("detecting lis version...")523 modinfo = node.tools[Modinfo]524 information["lis_version"] = modinfo.get_version("hv_vmbus")525 node.log.debug("detecting vm generation...")526 information[KEY_VM_GENERATION] = node.tools[VmGeneration].get_generation()527 node.log.debug(f"vm generation: {information[KEY_VM_GENERATION]}")528 return information529 def _get_kernel_version(self, node: Node) -> str:530 result: str = ""531 if not result and hasattr(node, ATTRIBUTE_FEATURES):532 # try to get kernel version in Azure. use it, when uname doesn't work533 node.log.debug("detecting kernel version from serial log...")534 serial_console = node.features[features.SerialConsole]535 result = serial_console.get_matched_str(KERNEL_VERSION_PATTERN)536 return result537 def _get_host_version(self, node: Node) -> str:538 result: str = ""539 try:540 if node.is_connected and node.is_posix:541 node.log.debug("detecting host version from dmesg...")542 dmesg = node.tools[Dmesg]543 result = get_matched_str(544 dmesg.get_output(), HOST_VERSION_PATTERN, first_match=False545 )546 except Exception as identifier:547 # it happens on some error vms. Those error should be caught earlier in548 # test cases not here. So ignore any error here to collect information only.549 node.log.debug(f"error on run dmesg: {identifier}")550 # if not get, try again from serial console log.551 # skip if node is not initialized.552 if not result and hasattr(node, ATTRIBUTE_FEATURES):553 node.log.debug("detecting host version from serial log...")554 serial_console = node.features[features.SerialConsole]555 result = serial_console.get_matched_str(HOST_VERSION_PATTERN)556 return result557 def _get_wala_version(self, node: Node) -> str:558 result = ""559 try:560 if node.is_connected and node.is_posix:561 node.log.debug("detecting wala version from waagent...")562 waagent = node.tools[Waagent]563 result = waagent.get_version()564 except Exception as identifier:565 # it happens on some error vms. Those error should be caught earlier in566 # test cases not here. So ignore any error here to collect information only.567 node.log.debug(f"error on run waagent: {identifier}")568 if not result and hasattr(node, ATTRIBUTE_FEATURES):569 node.log.debug("detecting wala agent version from serial log...")570 serial_console = node.features[features.SerialConsole]571 result = serial_console.get_matched_str(WALA_VERSION_PATTERN)572 return result573 def _get_wala_distro_version(self, node: Node) -> str:574 result = "Unknown"575 try:576 if node.is_connected and node.is_posix:577 waagent = node.tools[Waagent]578 result = waagent.get_distro_version()579 except Exception as identifier:580 # it happens on some error vms. Those error should be caught earlier in581 # test cases not here. So ignore any error here to collect information only.582 node.log.debug(f"error on get waagent distro version: {identifier}")583 return result584 def _get_platform_information(self, environment: Environment) -> Dict[str, str]:585 result: Dict[str, str] = {}586 azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(587 AzurePlatformSchema588 )589 result[AZURE_RG_NAME_KEY] = get_environment_context(590 environment591 ).resource_group_name592 if azure_runbook.availability_set_properties:593 for (594 property_name,595 property_value,596 ) in azure_runbook.availability_set_properties.items():597 if property_name in [598 "platformFaultDomainCount",599 "platformUpdateDomainCount",600 ]:601 continue602 if isinstance(property_value, dict):603 for key, value in property_value.items():604 if value:605 result[key] = value606 if azure_runbook.availability_set_tags:607 for key, value in azure_runbook.availability_set_tags.items():608 if value:609 result[key] = value610 if azure_runbook.vm_tags:611 for key, value in azure_runbook.vm_tags.items():612 if value:613 result[key] = value614 return result615 def _get_environment_information(self, environment: Environment) -> Dict[str, str]:616 information: Dict[str, str] = {}617 node_runbook: Optional[AzureNodeSchema] = None618 if environment.nodes:619 node: Optional[Node] = environment.default_node620 else:621 node = None622 if node:623 node_runbook = node.capability.get_extended_runbook(AzureNodeSchema, AZURE)624 for key, method in self._environment_information_hooks.items():625 node.log.debug(f"detecting {key} ...")626 try:627 value = method(node)628 if value:629 information[key] = value630 except Exception as identifier:631 node.log.exception(f"error on get {key}.", exc_info=identifier)632 information.update(self._get_platform_information(environment))633 if node.is_connected and node.is_posix:634 information.update(self._get_node_information(node))635 elif environment.capability and environment.capability.nodes:636 # get deployment information, if failed on preparing phase637 node_space = environment.capability.nodes[0]638 node_runbook = node_space.get_extended_runbook(639 AzureNodeSchema, type_name=AZURE640 )641 if node_runbook:642 information["location"] = node_runbook.location643 information["vmsize"] = node_runbook.vm_size644 information["image"] = node_runbook.get_image_name()645 return information646 def _initialize(self, *args: Any, **kwargs: Any) -> None:647 # set needed environment variables for authentication648 azure_runbook: AzurePlatformSchema = self.runbook.get_extended_runbook(649 AzurePlatformSchema650 )651 assert azure_runbook, "platform runbook cannot be empty"652 self._azure_runbook = azure_runbook653 self.subscription_id = azure_runbook.subscription_id654 self._initialize_credential()655 check_or_create_resource_group(656 self.credential,657 self.subscription_id,658 azure_runbook.shared_resource_group_name,659 RESOURCE_GROUP_LOCATION,660 self._log,661 )662 self._rm_client = get_resource_management_client(663 self.credential, self.subscription_id664 )665 def _initialize_credential(self) -> None:666 azure_runbook = self._azure_runbook667 credential_key = (668 f"{azure_runbook.service_principal_tenant_id}_"669 f"{azure_runbook.service_principal_client_id}"...
common.py
Source:common.py
...826 )827 log.debug(f"delete storage account: {account_name}")828 except Exception:829 log.debug(f"not find storage account: {account_name}")830def check_or_create_resource_group(831 credential: Any,832 subscription_id: str,833 resource_group_name: str,834 location: str,835 log: Logger,836) -> None:837 with get_resource_management_client(credential, subscription_id) as rm_client:838 global global_credential_access_lock839 with global_credential_access_lock:840 az_shared_rg_exists = rm_client.resource_groups.check_existence(841 resource_group_name842 )843 if not az_shared_rg_exists:844 log.info(f"Creating Resource group: '{resource_group_name}'")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!