How to use _generate_vhd_path method in lisa

Best Python code snippet using lisa_python

transformers.py

Source:transformers.py Github

copy

Full Screen

...36from .tools import Waagent37DEFAULT_EXPORTED_VHD_CONTAINER_NAME = "lisa-vhd-exported"38DEFAULT_VHD_SUFFIX = "exported"39@retry(tries=10, jitter=(1, 2))40def _generate_vhd_path(container_client: Any, file_name_part: str = "") -> str:41 path = PurePosixPath(42 f"{get_date_str()}/{get_datetime_path()}_"43 f"{DEFAULT_VHD_SUFFIX}_{file_name_part}.vhd"44 )45 blobs = container_client.list_blobs(name_starts_with=path)46 for _ in blobs:47 raise LisaException(f"blob exists already: {path}")48 return str(path)49@dataclass_json50@dataclass51class VhdTransformerSchema(schema.Transformer):52 # shared resource group name53 shared_resource_group_name: str = AZURE_SHARED_RG_NAME54 # resource group and vm name to be exported55 resource_group_name: str = field(default="", metadata=field_metadata(required=True))56 vm_name: str = ""57 # values for SSH connection. public_address is optional, because it can be58 # retrieved from vm_name. Others can be retrieved from platform.59 public_address: str = ""60 public_port: int = 2261 username: str = constants.DEFAULT_USER_NAME62 password: str = ""63 private_key_file: str = ""64 # values for exported vhd. storage_account_name is optional, because it can65 # be the default storage of LISA.66 storage_account_name: str = ""67 container_name: str = DEFAULT_EXPORTED_VHD_CONTAINER_NAME68 file_name_part: str = ""69 # restore environment or not70 restore: bool = False71@dataclass_json72@dataclass73class DeployTransformerSchema(schema.Transformer):74 requirement: schema.Capability = field(default_factory=schema.Capability)75 resource_group_name: str = ""76@dataclass_json77@dataclass78class DeleteTransformerSchema(schema.Transformer):79 resource_group_name: str = field(default="", metadata=field_metadata(required=True))80class VhdTransformer(Transformer):81 """82 convert an azure VM to VHD, which is ready to deploy.83 """84 __url_name = "url"85 @classmethod86 def type_name(cls) -> str:87 return "azure_vhd"88 @classmethod89 def type_schema(cls) -> Type[schema.TypedSchema]:90 return VhdTransformerSchema91 @property92 def _output_names(self) -> List[str]:93 return [self.__url_name]94 def _internal_run(self) -> Dict[str, Any]:95 runbook: VhdTransformerSchema = self.runbook96 platform = _load_platform(self._runbook_builder, self.type_name())97 environment = load_environment(platform, runbook.resource_group_name, self._log)98 if runbook.vm_name:99 node = next(100 x for x in environment.nodes.list() if x.name == runbook.vm_name101 )102 else:103 # if no vm_name specified, use the first vm104 node = next(x for x in environment.nodes.list())105 assert isinstance(node, RemoteNode)106 self._prepare_virtual_machine(node)107 virtual_machine = get_vm(platform, node)108 vhd_location = self._export_vhd(platform, virtual_machine)109 self._restore_vm(platform, virtual_machine, node)110 return {self.__url_name: vhd_location}111 def _prepare_virtual_machine(self, node: RemoteNode) -> None:112 runbook: VhdTransformerSchema = self.runbook113 if not runbook.public_address:114 runbook.public_address = node.public_address115 # prepare vm for exporting116 wa = node.tools[Waagent]117 node.execute("export HISTSIZE=0", shell=True)118 wa.deprovision()119 # stop the vm120 startstop = node.features[StartStop]121 startstop.stop()122 def _export_vhd(self, platform: AzurePlatform, virtual_machine: Any) -> str:123 runbook: VhdTransformerSchema = self.runbook124 compute_client = get_compute_client(platform)125 # generate sas url from os disk, so it can be copied.126 self._log.debug("generating sas url...")127 location = virtual_machine.location128 os_disk_name = virtual_machine.storage_profile.os_disk.name129 operation = compute_client.disks.begin_grant_access(130 resource_group_name=runbook.resource_group_name,131 disk_name=os_disk_name,132 grant_access_data=GrantAccessData(access="Read", duration_in_seconds=86400),133 )134 wait_operation(operation)135 sas_url = operation.result().access_sas136 assert sas_url, "cannot get sas_url from os disk"137 self._log.debug("getting or creating storage account and container...")138 # get vhd container139 if not runbook.storage_account_name:140 runbook.storage_account_name = get_storage_account_name(141 subscription_id=platform.subscription_id, location=location, type="t"142 )143 check_or_create_storage_account(144 credential=platform.credential,145 subscription_id=platform.subscription_id,146 account_name=runbook.storage_account_name,147 resource_group_name=runbook.shared_resource_group_name,148 location=location,149 log=self._log,150 )151 container_client = get_or_create_storage_container(152 credential=platform.credential,153 subscription_id=platform.subscription_id,154 account_name=runbook.storage_account_name,155 container_name=runbook.container_name,156 resource_group_name=runbook.shared_resource_group_name,157 )158 path = _generate_vhd_path(container_client, runbook.file_name_part)159 vhd_path = f"{container_client.url}/{path}"160 blob_client = container_client.get_blob_client(path)161 blob_client.start_copy_from_url(sas_url, metadata=None, incremental_copy=False)162 wait_copy_blob(blob_client, vhd_path, self._log)163 return vhd_path164 def _restore_vm(165 self, platform: AzurePlatform, virtual_machine: Any, node: Node166 ) -> None:167 runbook: VhdTransformerSchema = self.runbook168 self._log.debug("restoring vm...")169 # release the vhd export lock, so it can be started back170 compute_client = get_compute_client(platform)171 os_disk_name = virtual_machine.storage_profile.os_disk.name172 operation = compute_client.disks.begin_revoke_access(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful