How to use get_storage_account_name method in lisa

Best Python code snippet using lisa_python

session_storage.py

Source:session_storage.py Github

copy

Full Screen

...16RAW_SITES_BLOB_PATH = "setup/raw_sites.csv"17SITES_BLOB_PATH = "setup/sites.geojson"18RESOURCE_TYPE_BLOB_CONTAINER = "Azure.BlobContainer"19RESOURCE_TYPE_TABLE = "Azure.Table"20def get_storage_account_name() -> str:21 """22 Gets the name of the storage account.23 Requires the STORAGE_ACCOUNT_NAME environment variable to be set appropriately.24 :returns str: The name of the storage account, or None if the environment variable is not set.25 """26 return get_environment_variable("STORAGE_ACCOUNT_NAME") # this should be the only place where this env var name is used27def get_storage_account_key() -> str:28 """29 Gets the access key of the storage account.30 Requires the STORAGE_ACCOUNT_KEY environment variable to be set appropriately.31 :returns str: The access key of the storage account, or None if the environment variable is not set.32 """33 return get_environment_variable("STORAGE_ACCOUNT_KEY") # this should be the only place where this env var name is used34def get_account_uri(type: str) -> str:35 """36 Gets the URI of the storage account, either Azurite (local) or Azure (cloud).37 :param str type: the storage account type, either "blob" or "table"38 :returns str: the URI of the storage account39 """40 account_name = get_storage_account_name()41 if (account_name == "devstoreaccount1"): # this means we are a developer running locally & using Azurite42 return f"http://127.0.0.1:{AZURITE_PORT[type]}/{account_name}"43 else:44 return f"https://{account_name}.{type}.core.windows.net"45def get_blob_service_client() -> BlobServiceClient:46 """47 Gets the Azure blob service client for interating with Azure blob storage.48 :returns BlobServiceClient: the blob service client49 """50 return BlobServiceClient(account_url=get_account_uri("blob"), credential=get_storage_account_key())51def get_table_service_client() -> TableServiceClient:52 """53 Gets the Azure table service client for interacting with Azure table storage.54 :returns TableServiceClient: the table service client55 """56 credential = AzureNamedKeyCredential(get_storage_account_name(), get_storage_account_key())57 return TableServiceClient(endpoint=get_account_uri("table"), credential=credential)58class SessionStorage:59 def __init__(self, session_name: str):60 """61 Constructor.62 :param str session_name: The name of the session.63 """64 self.blob_container = session_name65 self.table = session_name66 self.direct_link_container = self.get_direct_link(RESOURCE_TYPE_BLOB_CONTAINER)67 self.direct_link_table = self.get_direct_link(RESOURCE_TYPE_TABLE)68 def create_blob_container(self) -> bool:69 """70 Creates the Azure blob container for the session.71 :returns bool: True = container created, False = the container already exists72 """73 try:74 get_blob_service_client().create_container(self.blob_container)75 logging.info(f"Blob container created: {self.blob_container}")76 return True77 except ResourceExistsError:78 logging.error(f"Attempt to create a blob container \"{self.blob_container}\" that already exists")79 return False80 def create_table(self) -> bool:81 """82 Creates the Azure table for the session.83 :returns bool: True = table created, False = the table already exists84 """85 try:86 get_table_service_client().create_table(self.table)87 logging.info(f"Table created: {self.table}")88 return True89 except ResourceExistsError:90 return False91 def save_file(self, blob_path: str, file_bytes: bytes, overwrite: bool = False) -> None:92 """93 Saves the given data in the specified blob in this session's blob container.94 :param str blob_path: the path to the blob95 :param bytes file_bytes: the contents of the file96 :param bool overwrite: whether or not to overwrite the blob if it already exists; optional, default is False97 """98 blob_client = get_blob_service_client().get_blob_client(self.blob_container, blob_path)99 overwritten: str = " (overwritten)" if (blob_client.exists() and overwrite) else ""100 blob_client.upload_blob(file_bytes, overwrite=overwrite)101 logging.info(f"File saved \"/{self.blob_container}/{blob_path}\"{overwritten}")102 def blob_exists(self, blob_path: str) -> bool:103 """104 Determines whether or not the specified blob already exists.105 :param str blob_path: the path to the blob106 :returns bool: True = blob already exists, False = blob does not already exist107 """108 return get_blob_service_client().get_blob_client(self.blob_container, blob_path).exists()109 def read_file(self, blob_path: str) -> bytes:110 """111 Reads the contents of the specified blob container file.112 :param str blob_path: The specific file to read.113 :returns bytes: The contents of the file.114 """115 blob_client = get_blob_service_client().get_blob_client(self.blob_container, blob_path)116 stream = blob_client.download_blob()117 return stream.readall()118 def get_container_sas_uri(self, duration_hours: int) -> str:119 """120 Gets a container SAS (Shared Access Signature) URI for this session's blob container.121 :param int duration_hours: How many hours the SAS should be valid.122 :returns str: A limited-time SAS URI for the blob container.123 """124 permission = ContainerSasPermissions(read=True, write=True, list=True)125 expiry = datetime.datetime.utcnow() + timedelta(hours=duration_hours)126 uri = f"{get_account_uri('blob')}/{self.blob_container}"127 sas = generate_container_sas(get_storage_account_name(), self.blob_container, get_storage_account_key(), permission=permission, expiry=expiry)128 return f"{uri}?{sas}"129 def get_table_sas_uri(self, duration_hours: int) -> str:130 """131 Gets a table SAS (Shared Access Signature) URI for this session's table.132 :param int duration_hours: How many hours the SAS should be valid.133 :returns str: A limited-time read-only SAS URI for the table.134 """135 permission = TableSasPermissions(read=True)136 expiry = datetime.datetime.utcnow() + timedelta(hours=duration_hours)137 uri = f"{get_account_uri('table')}/{self.table}"138 sas = generate_table_sas(AzureNamedKeyCredential(get_storage_account_name(), get_storage_account_key()), self.table, permission=permission, expiry=expiry)139 return f"{uri}?{sas}"140 def get_direct_link(self, resource_type: str) -> str:141 """142 Gets the Storage Explorer direct link to the specified Azure resource.143 :param str resource_type: The resource type, either "Azure.BlobContainer" or "Azure.Table".144 :returns str: The direct link to the resource.145 """146 subscription = get_environment_variable("AZURE_SUBSCRIPTION_ID")147 resource_group_name = get_environment_variable("STORAGE_ACCOUNT_RESOURCE_GROUP_NAME")148 params = {149 "accountid": f"/subscriptions/{subscription}/resourceGroups/{resource_group_name}/providers/Microsoft.Storage/storageAccounts/{get_storage_account_name()}",150 "subscriptionid": subscription,151 "resourcetype": resource_type,152 "resourcename": self.blob_container153 }154 return f"storageexplorer://v=1&{urlencode(params)}"155 def read_logs(self, task_id: str) -> object:156 """157 Returns the contents of the two log files, "stdout.txt" and "stderr.txt", produced and published by the batch activity tasks.158 :param str task_id: The task ID of the activity for which to retrieve its logs.159 :returns object: The contents of the two log files.160 """161 try:162 std_out = self.read_file(f"tasklogs/{task_id}/stdout.txt").decode("ascii", "replace")163 except ResourceNotFoundError:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful