How to use _get_vhd_details method in lisa

Best Python code snippet using lisa_python

platform_.py

Source:platform_.py Github

copy

Full Screen

...1671 subscription, so it can be deployed.1672 """1673 matches = SAS_URL_PATTERN.match(vhd_path)1674 if not matches:1675 vhd_details = self._get_vhd_details(vhd_path)1676 vhd_location = vhd_details["location"]1677 if location == vhd_location:1678 return vhd_path1679 else:1680 vhd_path = self._generate_sas_token(vhd_details)1681 matches = SAS_URL_PATTERN.match(vhd_path)1682 assert matches, f"fail to generate sas url for {vhd_path}"1683 log.debug(1684 f"the vhd location {location} is not same with running case "1685 f"location {vhd_location}, generate a sas url for source vhd, "1686 f"it needs to be copied into location {location}."1687 )1688 else:1689 log.debug("found the vhd is a sas url, it may need to be copied.")1690 # get original vhd's hash key for comparing.1691 original_key: Optional[bytearray] = None1692 original_vhd_path = vhd_path1693 original_blob_client = BlobClient.from_blob_url(original_vhd_path)1694 properties = original_blob_client.get_blob_properties()1695 if properties.content_settings:1696 original_key = properties.content_settings.get(1697 "content_md5", None1698 ) # type: ignore1699 storage_name = get_storage_account_name(1700 subscription_id=self.subscription_id, location=location, type="t"1701 )1702 check_or_create_storage_account(1703 self.credential,1704 self.subscription_id,1705 storage_name,1706 self._azure_runbook.shared_resource_group_name,1707 location,1708 log,1709 )1710 container_client = get_or_create_storage_container(1711 credential=self.credential,1712 subscription_id=self.subscription_id,1713 account_name=storage_name,1714 container_name=SAS_COPIED_CONTAINER_NAME,1715 resource_group_name=self._azure_runbook.shared_resource_group_name,1716 )1717 normalized_vhd_name = constants.NORMALIZE_PATTERN.sub("-", vhd_path)1718 year = matches["year"] if matches["year"] else "9999"1719 month = matches["month"] if matches["month"] else "01"1720 day = matches["day"] if matches["day"] else "01"1721 # use the expire date to generate the path. It's easy to identify when1722 # the cache can be removed.1723 vhd_path = f"{year}{month}{day}/{normalized_vhd_name}.vhd"1724 full_vhd_path = f"{container_client.url}/{vhd_path}"1725 # lock here to prevent a vhd is copied in multi-thread1726 global _global_sas_vhd_copy_lock1727 cached_key: Optional[bytearray] = None1728 with _global_sas_vhd_copy_lock:1729 blobs = container_client.list_blobs(name_starts_with=vhd_path)1730 for blob in blobs:1731 if blob:1732 # check if hash key matched with original key.1733 if blob.content_settings:1734 cached_key = blob.content_settings.get("content_md5", None)1735 if original_key == cached_key:1736 # if it exists, return the link, not to copy again.1737 log.debug("the sas url is copied already, use it directly.")1738 return full_vhd_path1739 else:1740 log.debug("found cached vhd, but the hash key mismatched.")1741 blob_client = container_client.get_blob_client(vhd_path)1742 blob_client.start_copy_from_url(1743 original_vhd_path, metadata=None, incremental_copy=False1744 )1745 wait_copy_blob(blob_client, vhd_path, log)1746 return full_vhd_path1747 def _get_vhd_details(self, vhd_path: str) -> Any:1748 matched = STORAGE_CONTAINER_BLOB_PATTERN.match(vhd_path)1749 assert matched, f"fail to get matched info from {vhd_path}"1750 sc_name = matched.group("sc")1751 container_name = matched.group("container")1752 blob_name = matched.group("blob")1753 storage_client = get_storage_client(self.credential, self.subscription_id)1754 sc = [x for x in storage_client.storage_accounts.list() if x.name == sc_name]1755 assert sc[1756 01757 ], f"fail to get storage account {sc_name} from {self.subscription_id}"1758 rg = get_matched_str(sc[0].id, RESOURCE_GROUP_PATTERN)1759 return {1760 "location": sc[0].location,1761 "resource_group_name": rg,1762 "account_name": sc_name,1763 "container_name": container_name,1764 "blob_name": blob_name,1765 }1766 def _generate_data_disks(1767 self,1768 node: Node,1769 azure_node_runbook: AzureNodeArmParameter,1770 ) -> List[DataDiskSchema]:1771 data_disks: List[DataDiskSchema] = []1772 assert node.capability.disk1773 if azure_node_runbook.marketplace:1774 marketplace = self._get_image_info(1775 azure_node_runbook.location, azure_node_runbook.marketplace1776 )1777 # some images has data disks by default1778 # e.g. microsoft-ads linux-data-science-vm linuxdsvm 21.05.271779 # we have to inject below part when dataDisks section added in1780 # arm template,1781 # otherwise will see below exception:1782 # deployment failed: InvalidParameter: StorageProfile.dataDisks.lun1783 # does not have required value(s) for image specified in1784 # storage profile.1785 for default_data_disk in marketplace.data_disk_images:1786 data_disks.append(1787 DataDiskSchema(1788 node.capability.disk.data_disk_caching_type,1789 default_data_disk.additional_properties["sizeInGb"],1790 azure_node_runbook.disk_type,1791 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_FROM_IMAGE,1792 )1793 )1794 assert isinstance(1795 node.capability.disk.data_disk_count, int1796 ), f"actual: {type(node.capability.disk.data_disk_count)}"1797 for _ in range(node.capability.disk.data_disk_count):1798 assert isinstance(node.capability.disk.data_disk_size, int)1799 data_disks.append(1800 DataDiskSchema(1801 node.capability.disk.data_disk_caching_type,1802 node.capability.disk.data_disk_size,1803 azure_node_runbook.disk_type,1804 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY,1805 )1806 )1807 return data_disks1808 @lru_cache(maxsize=10) # noqa: B0191809 def _get_image_info(1810 self, location: str, marketplace: Optional[AzureVmMarketplaceSchema]1811 ) -> VirtualMachineImage:1812 compute_client = get_compute_client(self)1813 assert isinstance(marketplace, AzureVmMarketplaceSchema)1814 with global_credential_access_lock:1815 image_info = compute_client.virtual_machine_images.get(1816 location=location,1817 publisher_name=marketplace.publisher,1818 offer=marketplace.offer,1819 skus=marketplace.sku,1820 version=marketplace.version,1821 )1822 return image_info1823 def _get_location_key(self, location: str) -> str:1824 return f"{self.subscription_id}_{location}"1825 def _enable_ssh_on_windows(self, node: Node) -> None:1826 runbook = node.capability.get_extended_runbook(AzureNodeSchema)1827 if runbook.is_linux:1828 return1829 context = get_node_context(node)1830 remote_node = cast(RemoteNode, node)1831 log = node.log1832 log.debug(1833 f"checking if SSH port {remote_node.public_port} is reachable "1834 f"on {remote_node.name}..."1835 )1836 connected, _ = wait_tcp_port_ready(1837 address=remote_node.public_address,1838 port=remote_node.public_port,1839 log=log,1840 timeout=3,1841 )1842 if connected:1843 log.debug("SSH port is reachable.")1844 return1845 log.debug("SSH port is not open, enabling ssh on Windows ...")1846 # The SSH port is not opened, try to enable it.1847 public_key_data = get_public_key_data(self.runbook.admin_private_key_file)1848 with open(Path(__file__).parent / "Enable-SSH.ps1", "r") as f:1849 script = f.read()1850 parameters = RunCommandInputParameter(name="PublicKey", value=public_key_data)1851 command = RunCommandInput(1852 command_id="RunPowerShellScript",1853 script=[script],1854 parameters=[parameters],1855 )1856 compute_client = get_compute_client(self)1857 operation = compute_client.virtual_machines.begin_run_command(1858 resource_group_name=context.resource_group_name,1859 vm_name=context.vm_name,1860 parameters=command,1861 )1862 result = wait_operation(operation=operation, failure_identity="enable ssh")1863 log.debug("SSH script result:")1864 log.dump_json(logging.DEBUG, result)1865 def _get_vhd_os_disk_size(self, blob_url: str) -> int:1866 result_dict = self._get_vhd_details(blob_url)1867 container_client = get_or_create_storage_container(1868 credential=self.credential,1869 subscription_id=self.subscription_id,1870 account_name=result_dict["account_name"],1871 container_name=result_dict["container_name"],1872 resource_group_name=result_dict["resource_group_name"],1873 )1874 vhd_blob = container_client.get_blob_client(result_dict["blob_name"])1875 properties = vhd_blob.get_blob_properties()1876 assert properties.size, f"fail to get blob size of {blob_url}"1877 # Azure requires only megabyte alignment of vhds, round size up1878 # for cases where the size is megabyte aligned1879 return math.ceil(properties.size / 1024 / 1024 / 1024)1880 def _get_sig_info(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful