Best Python code snippet using lisa_python
session_storage.py
Source:session_storage.py  
...41    if (account_name == "devstoreaccount1"):  # this means we are a developer running locally & using Azurite42        return f"http://127.0.0.1:{AZURITE_PORT[type]}/{account_name}"43    else:44        return f"https://{account_name}.{type}.core.windows.net"45def get_blob_service_client() -> BlobServiceClient:46    """47    Gets the Azure blob service client for interating with Azure blob storage.48    :returns BlobServiceClient: the blob service client49    """50    return BlobServiceClient(account_url=get_account_uri("blob"), credential=get_storage_account_key())51def get_table_service_client() -> TableServiceClient:52    """53    Gets the Azure table service client for interacting with Azure table storage.54    :returns TableServiceClient: the table service client55    """56    credential = AzureNamedKeyCredential(get_storage_account_name(), get_storage_account_key())57    return TableServiceClient(endpoint=get_account_uri("table"), credential=credential)58class SessionStorage:59    def __init__(self, session_name: str):60        """61        Constructor.62        :param str session_name: The name of the session.63        """64        self.blob_container        = session_name65        self.table                 = session_name66        self.direct_link_container = self.get_direct_link(RESOURCE_TYPE_BLOB_CONTAINER)67        self.direct_link_table     = self.get_direct_link(RESOURCE_TYPE_TABLE)68    def create_blob_container(self) -> bool:69        """70        Creates the Azure blob container for the session.71        :returns bool: True = container created, False = the container already exists72        """73        try:74            get_blob_service_client().create_container(self.blob_container)75            logging.info(f"Blob container created: {self.blob_container}")76            return True77        except ResourceExistsError:78            logging.error(f"Attempt to create a blob container \"{self.blob_container}\" that already exists")79            return False80    def create_table(self) -> bool:81        """82        Creates the Azure table for the session.83        :returns bool: True = table created, False = the table already exists84        """85        try:86            get_table_service_client().create_table(self.table)87            logging.info(f"Table created: {self.table}")88            return True89        except ResourceExistsError:90            return False91    def save_file(self, blob_path: str, file_bytes: bytes, overwrite: bool = False) -> None:92        """93        Saves the given data in the specified blob in this session's blob container.94        :param str blob_path: the path to the blob95        :param bytes file_bytes: the contents of the file96        :param bool overwrite: whether or not to overwrite the blob if it already exists; optional, default is False97        """98        blob_client = get_blob_service_client().get_blob_client(self.blob_container, blob_path)99        overwritten: str = " (overwritten)" if (blob_client.exists() and overwrite) else ""100        blob_client.upload_blob(file_bytes, overwrite=overwrite)101        logging.info(f"File saved \"/{self.blob_container}/{blob_path}\"{overwritten}")102    def blob_exists(self, blob_path: str) -> bool:103        """104        Determines whether or not the specified blob already exists.105        :param str blob_path: the path to the blob106        :returns bool: True = blob already exists, False = blob does not already exist107        """108        return get_blob_service_client().get_blob_client(self.blob_container, blob_path).exists()109    def read_file(self, blob_path: str) -> bytes:110        """111        Reads the contents of the specified blob container file.112        :param str blob_path: The specific file to read.113        :returns bytes: The contents of the file.114        """115        blob_client = get_blob_service_client().get_blob_client(self.blob_container, blob_path)116        stream = blob_client.download_blob()117        return stream.readall()118    def get_container_sas_uri(self, duration_hours: int) -> str:119        """120        Gets a container SAS (Shared Access Signature) URI for this session's blob container.121        :param int duration_hours: How many hours the SAS should be valid.122        :returns str: A limited-time SAS URI for the blob container.123        """124        permission = ContainerSasPermissions(read=True, write=True, list=True)125        expiry = datetime.datetime.utcnow() + timedelta(hours=duration_hours)126        uri = f"{get_account_uri('blob')}/{self.blob_container}"127        sas = generate_container_sas(get_storage_account_name(), self.blob_container, get_storage_account_key(), permission=permission, expiry=expiry)128        return f"{uri}?{sas}"129    def get_table_sas_uri(self, duration_hours: int) -> str:...test_blob_storage.py
Source:test_blob_storage.py  
...21    def test_get_blob_client(self):22        blob = BlobStorage(storage_container_name=self.storage_container_name, blob_name=self.blob_name)23        blob.get_blob_client()24        self.assertTrue(blob.blob_client)25    def test_get_blob_service_client(self):26        blob = BlobStorage(storage_container_name=self.storage_container_name, blob_name=self.blob_name)27        blob.get_blob_client()28        blob.get_blob_service_client()29        self.assertTrue(blob.blob_service_client)30    def test_get_storage_container_client(self):31        blob = BlobStorage(storage_container_name=self.storage_container_name, blob_name=self.blob_name)32        blob.get_blob_client()33        blob.get_blob_service_client()34        blob.get_storage_container_client()35        self.assertTrue(blob.storage_container)36    def test_close_connection(self):37        blob = BlobStorage(storage_container_name=self.storage_container_name, blob_name=self.blob_name)38        blob.get_blob_client()39        blob.get_blob_service_client()40        blob.get_storage_container_client()41        blob.close_connection()42        self.assertTrue(blob)43class TestDeleteBlob(unittest.TestCase):44    def setUp(self):45        self.api = ApiClient()46        self.storage_container_name = "pictures/profilePictures"47        self.blob_name = "Chlopig"48        with open(os.path.join(self.api.app.root_path, "static", "Chlopig.jpg"), "rb") as image:49            image = BytesIO(image.read())50            self.upload_data = image51        upload = UploadBlob(storage_container_name=self.storage_container_name, upload_data=self.upload_data, blob_name=self.blob_name)52        upload.get_blob_client()53        upload.get_blob_service_client()54        upload.get_storage_container_client()55        upload.upload_data_to_container()56        upload.get_blob_file_link()57    def tearDown(self):58        delete = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)59        delete.get_blob_client()60        delete.get_blob_service_client()61        delete.get_storage_container_client()62        delete.check_if_blob_exists()63        if delete.blob_exists:64            delete.delete_blob()65        delete.close_connection()66    def test_class_exists(self):67        self.assertTrue(DeleteBlob)68    def test_check_if_blob_exists(self):69        delete = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)70        delete.get_blob_client()71        delete.get_blob_service_client()72        delete.get_storage_container_client()73        delete.check_if_blob_exists()74        self.assertTrue(delete.blob_exists)75        delete.close_connection()76    def test_delete_blob(self):77        delete = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)78        delete.get_blob_client()79        delete.get_blob_service_client()80        delete.get_storage_container_client()81        delete.check_if_blob_exists()82        delete.delete_blob()83        confirm_deleted = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)84        confirm_deleted.get_blob_client()85        confirm_deleted.get_blob_service_client()86        confirm_deleted.get_storage_container_client()87        confirm_deleted.check_if_blob_exists()88        self.assertEqual(confirm_deleted.blob_exists, False)89        confirm_deleted.close_connection()90class TestUploadBlob(unittest.TestCase):91    def setUp(self):92        self.api = ApiClient()93        self.storage_container_name = "pictures/profilePictures"94        self.blob_name = "Chlopig"95        with open(os.path.join(self.api.app.root_path, "static", "Chlopig.jpg"), "rb") as image:96            image = BytesIO(image.read())97            self.upload_data = image98    def tearDown(self):99        delete = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)100        delete.get_blob_client()101        delete.get_blob_service_client()102        delete.get_storage_container_client()103        delete.check_if_blob_exists()104        if delete.blob_exists:105            delete.delete_blob()106        delete.close_connection()107    def test_class_exists(self):108        self.assertTrue(UploadBlob)109    def test_upload_data_to_container(self):110        upload = UploadBlob(storage_container_name=self.storage_container_name, upload_data=self.upload_data, blob_name=self.blob_name)111        upload.get_blob_client()112        upload.get_blob_service_client()113        upload.get_storage_container_client()114        upload.upload_data_to_container()115        self.assertTrue(upload.blob_client.exists())116        upload.close_connection()117    def test_get_blob_file_link(self):118        upload = UploadBlob(storage_container_name=self.storage_container_name, upload_data=self.upload_data, blob_name=self.blob_name)119        upload.get_blob_client()120        upload.get_blob_service_client()121        upload.get_storage_container_client()122        upload.upload_data_to_container()123        upload.get_blob_file_link()124        self.assertEqual(upload.blob_file_link, f"https://todoappblobstorage.blob.core.windows.net/pictures/profilePictures/{self.blob_name}")...test_common.py
Source:test_common.py  
...46    except Exception as e:47        print(f"Error while comparing files: Exception: {repr(e)}")48        return False49    return result50def get_blob_service_client(account_name: str) -> BlobServiceClient:51    try:52        credentials = DefaultAzureCredential()53        blob_service_client = BlobServiceClient(54            account_url="https://{account}.blob.core.windows.net/".format(55                account=account_name56            ),57            credential=credentials,58        )59    except Exception as e:60        print(f"Error getting BlobServiceClient: Exception: {repr(e)}")61        return None62    return blob_service_client63def upload_file_to_azure_blob(64    local_file_path: str,65    account_name: str,66    container_name: str,67    blob_path: str,68) -> bool:69    try:70        blob_service_client = get_blob_service_client(account_name)71        container_client = blob_service_client.get_container_client(container_name)72        with open(local_file_path, "rb") as data:73            container_client.upload_blob(74                blob_path, data, blob_type="BlockBlob", overwrite=True75            )76    except Exception as e:77        print(f"Error while uploading files: Exception: {repr(e)}")78        return False79    return True80def download_file_from_azure_blob(81    local_file_path: str,82    account_name: str,83    container_name: str,84    blob_path: str,85) -> bool:86    try:87        blob_service_client = get_blob_service_client(account_name)88        blob_client = blob_service_client.get_blob_client(container_name, blob_path)89        # Download the file.90        with open(local_file_path, "wb") as blob:91            download_stream = blob_client.download_blob()92            blob.write(download_stream.readall())93    except Exception as e:94        print(f"Error while downloading files: Exception: {repr(e)}")95        return False96    return True97def azure_blob_exists(98    account_name: str,99    container_name: str,100    blob_path: str,101) -> bool:102    try:103        blob_service_client = get_blob_service_client(account_name)104        blob_client = blob_service_client.get_blob_client(container_name, blob_path)105        return blob_client.exists()106    except Exception as e:107        print(f"Error while checking if blob exists: Exception: {repr(e)}")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
