How to use get_resource_management_client method in lisa

Best Python code snippet using lisa_python

platform_.py

Source:platform_.py Github

copy

Full Screen

...658 azure_runbook.shared_resource_group_name,659 RESOURCE_GROUP_LOCATION,660 self._log,661 )662 self._rm_client = get_resource_management_client(663 self.credential, self.subscription_id664 )665 def _initialize_credential(self) -> None:666 azure_runbook = self._azure_runbook667 credential_key = (668 f"{azure_runbook.service_principal_tenant_id}_"669 f"{azure_runbook.service_principal_client_id}"670 )671 credential = self._credentials.get(credential_key, None)672 if not credential:673 # set azure log to warn level only674 logging.getLogger("azure").setLevel(azure_runbook.log_level)675 if azure_runbook.service_principal_tenant_id:676 os.environ[...

Full Screen

Full Screen

common.py

Source:common.py Github

copy

Full Screen

...651 return StorageManagementClient(652 credential=credential,653 subscription_id=subscription_id,654 )655def get_resource_management_client(656 credential: Any, subscription_id: str657) -> ResourceManagementClient:658 return ResourceManagementClient(659 credential=credential, subscription_id=subscription_id660 )661def get_storage_account_name(662 subscription_id: str, location: str, type: str = "s"663) -> str:664 subscription_id_postfix = subscription_id[-8:]665 # name should be shorter than 24 character666 return f"lisa{type}{location[0:11]}{subscription_id_postfix}"667def get_marketplace_ordering_client(668 platform: "AzurePlatform",669) -> MarketplaceOrderingAgreements:670 return MarketplaceOrderingAgreements(671 credential=platform.credential,672 subscription_id=platform.subscription_id,673 )674def get_node_context(node: Node) -> NodeContext:675 return node.get_context(NodeContext)676def get_environment_context(environment: Environment) -> EnvironmentContext:677 return environment.get_context(EnvironmentContext)678def wait_operation(679 operation: Any, time_out: int = sys.maxsize, failure_identity: str = ""680) -> Any:681 timer = create_timer()682 wait_result: Any = None683 if failure_identity:684 failure_identity = f"{failure_identity} failed:"685 else:686 failure_identity = "Azure operation failed:"687 while time_out > timer.elapsed(False):688 check_cancelled()689 if operation.done():690 break691 wait_result = operation.wait(1)692 if wait_result:693 raise LisaException(f"{failure_identity} {wait_result}")694 if time_out < timer.elapsed():695 raise Exception(f"{failure_identity} timeout after {time_out} seconds.")696 result = operation.result()697 if result:698 result = result.as_dict()699 return result700def get_storage_credential(701 credential: Any, subscription_id: str, account_name: str, resource_group_name: str702) -> Any:703 """704 return a shared key credential. This credential doesn't need extra705 permissions to access blobs.706 """707 storage_client = get_storage_client(credential, subscription_id)708 key = storage_client.storage_accounts.list_keys(709 account_name=account_name, resource_group_name=resource_group_name710 ).keys[0]711 return {"account_name": account_name, "account_key": key.value}712def generate_sas_token(713 credential: Any,714 subscription_id: str,715 account_name: str,716 resource_group_name: str,717 expired_hours: int = 2,718) -> Any:719 shared_key_credential = get_storage_credential(720 credential=credential,721 subscription_id=subscription_id,722 account_name=account_name,723 resource_group_name=resource_group_name,724 )725 resource_types = ResourceTypes( # type: ignore726 service=True, container=True, object=True727 )728 sas_token = generate_account_sas(729 account_name=shared_key_credential["account_name"],730 account_key=shared_key_credential["account_key"],731 resource_types=resource_types,732 permission=AccountSasPermissions(read=True), # type: ignore733 expiry=datetime.utcnow() + timedelta(hours=expired_hours),734 )735 return sas_token736def get_blob_service_client(737 credential: Any,738 subscription_id: str,739 account_name: str,740 resource_group_name: str,741) -> BlobServiceClient:742 """743 Create a Azure Storage container if it does not exist.744 """745 shared_key_credential = get_storage_credential(746 credential=credential,747 subscription_id=subscription_id,748 account_name=account_name,749 resource_group_name=resource_group_name,750 )751 blob_service_client = BlobServiceClient(752 f"https://{account_name}.blob.core.windows.net", shared_key_credential753 )754 return blob_service_client755def get_or_create_storage_container(756 credential: Any,757 subscription_id: str,758 account_name: str,759 container_name: str,760 resource_group_name: str,761) -> ContainerClient:762 """763 Create a Azure Storage container if it does not exist.764 """765 blob_service_client = get_blob_service_client(766 credential, subscription_id, account_name, resource_group_name767 )768 container_client = blob_service_client.get_container_client(container_name)769 if not container_client.exists():770 container_client.create_container()771 return container_client772def check_or_create_storage_account(773 credential: Any,774 subscription_id: str,775 account_name: str,776 resource_group_name: str,777 location: str,778 log: Logger,779 sku: str = "Standard_LRS",780 kind: str = "StorageV2",781 enable_https_traffic_only: bool = True,782) -> None:783 # check and deploy storage account.784 # storage account can be deployed inside of arm template, but if the concurrent785 # is too big, Azure may not able to delete deployment script on time. so there786 # will be error like below787 # Creating the deployment 'name' would exceed the quota of '800'.788 storage_client = get_storage_client(credential, subscription_id)789 try:790 storage_client.storage_accounts.get_properties(791 account_name=account_name,792 resource_group_name=resource_group_name,793 )794 log.debug(f"found storage account: {account_name}")795 except Exception:796 log.debug(f"creating storage account: {account_name}")797 parameters = StorageAccountCreateParameters(798 sku=Sku(name=sku),799 kind=kind,800 location=location,801 enable_https_traffic_only=enable_https_traffic_only,802 )803 operation = storage_client.storage_accounts.begin_create(804 resource_group_name=resource_group_name,805 account_name=account_name,806 parameters=parameters,807 )808 wait_operation(operation)809def delete_storage_account(810 credential: Any,811 subscription_id: str,812 account_name: str,813 resource_group_name: str,814 log: Logger,815) -> None:816 storage_client = get_storage_client(credential, subscription_id)817 try:818 storage_client.storage_accounts.get_properties(819 account_name=account_name,820 resource_group_name=resource_group_name,821 )822 log.debug(f"found storage account: {account_name}")823 storage_client.storage_accounts.delete(824 account_name=account_name,825 resource_group_name=resource_group_name,826 )827 log.debug(f"delete storage account: {account_name}")828 except Exception:829 log.debug(f"not find storage account: {account_name}")830def check_or_create_resource_group(831 credential: Any,832 subscription_id: str,833 resource_group_name: str,834 location: str,835 log: Logger,836) -> None:837 with get_resource_management_client(credential, subscription_id) as rm_client:838 global global_credential_access_lock839 with global_credential_access_lock:840 az_shared_rg_exists = rm_client.resource_groups.check_existence(841 resource_group_name842 )843 if not az_shared_rg_exists:844 log.info(f"Creating Resource group: '{resource_group_name}'")845 with global_credential_access_lock:846 rm_client.resource_groups.create_or_update(847 resource_group_name, {"location": location}848 )849def wait_copy_blob(850 blob_client: Any,851 vhd_path: str,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful