How to use get_blob_service_client method in lisa

Best Python code snippet using lisa_python

session_storage.py

Source:session_storage.py Github

copy

Full Screen

...41 if (account_name == "devstoreaccount1"): # this means we are a developer running locally & using Azurite42 return f"http://127.0.0.1:{AZURITE_PORT[type]}/{account_name}"43 else:44 return f"https://{account_name}.{type}.core.windows.net"45def get_blob_service_client() -> BlobServiceClient:46 """47 Gets the Azure blob service client for interating with Azure blob storage.48 :returns BlobServiceClient: the blob service client49 """50 return BlobServiceClient(account_url=get_account_uri("blob"), credential=get_storage_account_key())51def get_table_service_client() -> TableServiceClient:52 """53 Gets the Azure table service client for interacting with Azure table storage.54 :returns TableServiceClient: the table service client55 """56 credential = AzureNamedKeyCredential(get_storage_account_name(), get_storage_account_key())57 return TableServiceClient(endpoint=get_account_uri("table"), credential=credential)58class SessionStorage:59 def __init__(self, session_name: str):60 """61 Constructor.62 :param str session_name: The name of the session.63 """64 self.blob_container = session_name65 self.table = session_name66 self.direct_link_container = self.get_direct_link(RESOURCE_TYPE_BLOB_CONTAINER)67 self.direct_link_table = self.get_direct_link(RESOURCE_TYPE_TABLE)68 def create_blob_container(self) -> bool:69 """70 Creates the Azure blob container for the session.71 :returns bool: True = container created, False = the container already exists72 """73 try:74 get_blob_service_client().create_container(self.blob_container)75 logging.info(f"Blob container created: {self.blob_container}")76 return True77 except ResourceExistsError:78 logging.error(f"Attempt to create a blob container \"{self.blob_container}\" that already exists")79 return False80 def create_table(self) -> bool:81 """82 Creates the Azure table for the session.83 :returns bool: True = table created, False = the table already exists84 """85 try:86 get_table_service_client().create_table(self.table)87 logging.info(f"Table created: {self.table}")88 return True89 except ResourceExistsError:90 return False91 def save_file(self, blob_path: str, file_bytes: bytes, overwrite: bool = False) -> None:92 """93 Saves the given data in the specified blob in this session's blob container.94 :param str blob_path: the path to the blob95 :param bytes file_bytes: the contents of the file96 :param bool overwrite: whether or not to overwrite the blob if it already exists; optional, default is False97 """98 blob_client = get_blob_service_client().get_blob_client(self.blob_container, blob_path)99 overwritten: str = " (overwritten)" if (blob_client.exists() and overwrite) else ""100 blob_client.upload_blob(file_bytes, overwrite=overwrite)101 logging.info(f"File saved \"/{self.blob_container}/{blob_path}\"{overwritten}")102 def blob_exists(self, blob_path: str) -> bool:103 """104 Determines whether or not the specified blob already exists.105 :param str blob_path: the path to the blob106 :returns bool: True = blob already exists, False = blob does not already exist107 """108 return get_blob_service_client().get_blob_client(self.blob_container, blob_path).exists()109 def read_file(self, blob_path: str) -> bytes:110 """111 Reads the contents of the specified blob container file.112 :param str blob_path: The specific file to read.113 :returns bytes: The contents of the file.114 """115 blob_client = get_blob_service_client().get_blob_client(self.blob_container, blob_path)116 stream = blob_client.download_blob()117 return stream.readall()118 def get_container_sas_uri(self, duration_hours: int) -> str:119 """120 Gets a container SAS (Shared Access Signature) URI for this session's blob container.121 :param int duration_hours: How many hours the SAS should be valid.122 :returns str: A limited-time SAS URI for the blob container.123 """124 permission = ContainerSasPermissions(read=True, write=True, list=True)125 expiry = datetime.datetime.utcnow() + timedelta(hours=duration_hours)126 uri = f"{get_account_uri('blob')}/{self.blob_container}"127 sas = generate_container_sas(get_storage_account_name(), self.blob_container, get_storage_account_key(), permission=permission, expiry=expiry)128 return f"{uri}?{sas}"129 def get_table_sas_uri(self, duration_hours: int) -> str:...

Full Screen

Full Screen

test_blob_storage.py

Source:test_blob_storage.py Github

copy

Full Screen

...21 def test_get_blob_client(self):22 blob = BlobStorage(storage_container_name=self.storage_container_name, blob_name=self.blob_name)23 blob.get_blob_client()24 self.assertTrue(blob.blob_client)25 def test_get_blob_service_client(self):26 blob = BlobStorage(storage_container_name=self.storage_container_name, blob_name=self.blob_name)27 blob.get_blob_client()28 blob.get_blob_service_client()29 self.assertTrue(blob.blob_service_client)30 def test_get_storage_container_client(self):31 blob = BlobStorage(storage_container_name=self.storage_container_name, blob_name=self.blob_name)32 blob.get_blob_client()33 blob.get_blob_service_client()34 blob.get_storage_container_client()35 self.assertTrue(blob.storage_container)36 def test_close_connection(self):37 blob = BlobStorage(storage_container_name=self.storage_container_name, blob_name=self.blob_name)38 blob.get_blob_client()39 blob.get_blob_service_client()40 blob.get_storage_container_client()41 blob.close_connection()42 self.assertTrue(blob)43class TestDeleteBlob(unittest.TestCase):44 def setUp(self):45 self.api = ApiClient()46 self.storage_container_name = "pictures/profilePictures"47 self.blob_name = "Chlopig"48 with open(os.path.join(self.api.app.root_path, "static", "Chlopig.jpg"), "rb") as image:49 image = BytesIO(image.read())50 self.upload_data = image51 upload = UploadBlob(storage_container_name=self.storage_container_name, upload_data=self.upload_data, blob_name=self.blob_name)52 upload.get_blob_client()53 upload.get_blob_service_client()54 upload.get_storage_container_client()55 upload.upload_data_to_container()56 upload.get_blob_file_link()57 def tearDown(self):58 delete = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)59 delete.get_blob_client()60 delete.get_blob_service_client()61 delete.get_storage_container_client()62 delete.check_if_blob_exists()63 if delete.blob_exists:64 delete.delete_blob()65 delete.close_connection()66 def test_class_exists(self):67 self.assertTrue(DeleteBlob)68 def test_check_if_blob_exists(self):69 delete = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)70 delete.get_blob_client()71 delete.get_blob_service_client()72 delete.get_storage_container_client()73 delete.check_if_blob_exists()74 self.assertTrue(delete.blob_exists)75 delete.close_connection()76 def test_delete_blob(self):77 delete = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)78 delete.get_blob_client()79 delete.get_blob_service_client()80 delete.get_storage_container_client()81 delete.check_if_blob_exists()82 delete.delete_blob()83 confirm_deleted = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)84 confirm_deleted.get_blob_client()85 confirm_deleted.get_blob_service_client()86 confirm_deleted.get_storage_container_client()87 confirm_deleted.check_if_blob_exists()88 self.assertEqual(confirm_deleted.blob_exists, False)89 confirm_deleted.close_connection()90class TestUploadBlob(unittest.TestCase):91 def setUp(self):92 self.api = ApiClient()93 self.storage_container_name = "pictures/profilePictures"94 self.blob_name = "Chlopig"95 with open(os.path.join(self.api.app.root_path, "static", "Chlopig.jpg"), "rb") as image:96 image = BytesIO(image.read())97 self.upload_data = image98 def tearDown(self):99 delete = DeleteBlob(storage_container_name=self.storage_container_name, blob_name=self.blob_name)100 delete.get_blob_client()101 delete.get_blob_service_client()102 delete.get_storage_container_client()103 delete.check_if_blob_exists()104 if delete.blob_exists:105 delete.delete_blob()106 delete.close_connection()107 def test_class_exists(self):108 self.assertTrue(UploadBlob)109 def test_upload_data_to_container(self):110 upload = UploadBlob(storage_container_name=self.storage_container_name, upload_data=self.upload_data, blob_name=self.blob_name)111 upload.get_blob_client()112 upload.get_blob_service_client()113 upload.get_storage_container_client()114 upload.upload_data_to_container()115 self.assertTrue(upload.blob_client.exists())116 upload.close_connection()117 def test_get_blob_file_link(self):118 upload = UploadBlob(storage_container_name=self.storage_container_name, upload_data=self.upload_data, blob_name=self.blob_name)119 upload.get_blob_client()120 upload.get_blob_service_client()121 upload.get_storage_container_client()122 upload.upload_data_to_container()123 upload.get_blob_file_link()124 self.assertEqual(upload.blob_file_link, f"https://todoappblobstorage.blob.core.windows.net/pictures/profilePictures/{self.blob_name}")...

Full Screen

Full Screen

test_common.py

Source:test_common.py Github

copy

Full Screen

...46 except Exception as e:47 print(f"Error while comparing files: Exception: {repr(e)}")48 return False49 return result50def get_blob_service_client(account_name: str) -> BlobServiceClient:51 try:52 credentials = DefaultAzureCredential()53 blob_service_client = BlobServiceClient(54 account_url="https://{account}.blob.core.windows.net/".format(55 account=account_name56 ),57 credential=credentials,58 )59 except Exception as e:60 print(f"Error getting BlobServiceClient: Exception: {repr(e)}")61 return None62 return blob_service_client63def upload_file_to_azure_blob(64 local_file_path: str,65 account_name: str,66 container_name: str,67 blob_path: str,68) -> bool:69 try:70 blob_service_client = get_blob_service_client(account_name)71 container_client = blob_service_client.get_container_client(container_name)72 with open(local_file_path, "rb") as data:73 container_client.upload_blob(74 blob_path, data, blob_type="BlockBlob", overwrite=True75 )76 except Exception as e:77 print(f"Error while uploading files: Exception: {repr(e)}")78 return False79 return True80def download_file_from_azure_blob(81 local_file_path: str,82 account_name: str,83 container_name: str,84 blob_path: str,85) -> bool:86 try:87 blob_service_client = get_blob_service_client(account_name)88 blob_client = blob_service_client.get_blob_client(container_name, blob_path)89 # Download the file.90 with open(local_file_path, "wb") as blob:91 download_stream = blob_client.download_blob()92 blob.write(download_stream.readall())93 except Exception as e:94 print(f"Error while downloading files: Exception: {repr(e)}")95 return False96 return True97def azure_blob_exists(98 account_name: str,99 container_name: str,100 blob_path: str,101) -> bool:102 try:103 blob_service_client = get_blob_service_client(account_name)104 blob_client = blob_service_client.get_blob_client(container_name, blob_path)105 return blob_client.exists()106 except Exception as e:107 print(f"Error while checking if blob exists: Exception: {repr(e)}")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful