How to use _create_node_os_disk method in lisa

Best Python code snippet using lisa_python

platform.py

Source:platform.py Github

copy

Full Screen

...422 )423 # Create cloud-init ISO file.424 self._create_node_cloud_init_iso(environment, log, node)425 # Create OS disk from the provided image.426 self._create_node_os_disk(environment, log, node)427 # Create data disks428 self._create_node_data_disks(node)429 # Create libvirt domain (i.e. VM).430 xml = self._create_node_domain_xml(environment, log, node, lv_conn)431 node_context.domain = lv_conn.defineXML(xml)432 self._create_domain_and_attach_logger(433 lv_conn,434 node_context,435 )436 # Delete all the VMs.437 def _delete_nodes(self, environment: Environment, log: Logger) -> None:438 # Delete nodes.439 for node in environment.nodes.list():440 self._delete_node(node, log)441 # Delete VM disks directory.442 try:443 self.host_node.shell.remove(Path(self.vm_disks_dir), True)444 except Exception as ex:445 log.warning(f"Failed to delete VM files directory: {ex}")446 def _delete_node_watchdog_callback(self) -> None:447 print("VM delete watchdog timer fired.\n", file=sys.__stderr__)448 faulthandler.dump_traceback(file=sys.__stderr__, all_threads=True)449 os._exit(1)450 def _delete_node(self, node: Node, log: Logger) -> None:451 node_context = get_node_context(node)452 watchdog = Timer(60.0, self._delete_node_watchdog_callback)453 watchdog.start()454 # Stop the VM.455 if node_context.domain:456 log.debug(f"Stop VM: {node_context.vm_name}")457 try:458 # In the libvirt API, "destroy" means "stop".459 node_context.domain.destroy()460 except libvirt.libvirtError as ex:461 log.warning(f"VM stop failed. {ex}")462 # Wait for console log to close.463 # Note: libvirt can deadlock if you try to undefine the VM while the stream464 # is trying to close.465 if node_context.console_logger:466 log.debug(f"Close VM console log: {node_context.vm_name}")467 node_context.console_logger.close()468 node_context.console_logger = None469 # Undefine the VM.470 if node_context.domain:471 log.debug(f"Delete VM: {node_context.vm_name}")472 try:473 node_context.domain.undefineFlags(self._get_domain_undefine_flags())474 except libvirt.libvirtError as ex:475 log.warning(f"VM delete failed. {ex}")476 node_context.domain = None477 watchdog.cancel()478 def _get_domain_undefine_flags(self) -> int:479 return int(480 libvirt.VIR_DOMAIN_UNDEFINE_MANAGED_SAVE481 | libvirt.VIR_DOMAIN_UNDEFINE_SNAPSHOTS_METADATA482 | libvirt.VIR_DOMAIN_UNDEFINE_NVRAM483 | libvirt.VIR_DOMAIN_UNDEFINE_CHECKPOINTS_METADATA484 )485 def _stop_port_forwarding(self, environment: Environment, log: Logger) -> None:486 log.debug(f"Clearing port forwarding rules for environment {environment.name}")487 environment_context = get_environment_context(environment)488 for (port, address) in environment_context.port_forwarding_list:489 self.host_node.tools[Iptables].stop_forwarding(port, address, 22)490 # Retrieve the VMs' dynamic properties (e.g. IP address).491 def _fill_nodes_metadata(492 self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect493 ) -> None:494 environment_context = get_environment_context(environment)495 # Give all the VMs some time to boot and then acquire an IP address.496 timeout = time.time() + environment_context.network_boot_timeout497 if self.host_node.is_remote:498 remote_node = cast(RemoteNode, self.host_node)499 conn_info = remote_node.connection_info500 address = conn_info[constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS]501 for node in environment.nodes.list():502 assert isinstance(node, RemoteNode)503 # Get the VM's IP address.504 local_address = self._get_node_ip_address(505 environment, log, lv_conn, node, timeout506 )507 node_port = 22508 if self.host_node.is_remote:509 with self._port_forwarding_lock:510 port_not_found = True511 while port_not_found:512 if self._next_available_port > 65535:513 raise LisaException(514 "No available ports on the host to forward"515 )516 # check if the port is already in use517 output = self.host_node.execute(518 f"nc -vz 127.0.0.1 {self._next_available_port}"519 )520 if output.exit_code == 1: # port not in use521 node_port = self._next_available_port522 port_not_found = False523 self._next_available_port += 1524 self.host_node.tools[Iptables].start_forwarding(525 node_port, local_address, 22526 )527 environment_context.port_forwarding_list.append(528 (node_port, local_address)529 )530 else:531 address = local_address532 # Set SSH connection info for the node.533 node.set_connection_info(534 address=local_address,535 public_address=address,536 public_port=node_port,537 username=self.runbook.admin_username,538 private_key_file=self.runbook.admin_private_key_file,539 )540 # Ensure cloud-init completes its setup.541 node.execute(542 "cloud-init status --wait",543 sudo=True,544 expected_exit_code=0,545 expected_exit_code_failure_message="waiting on cloud-init",546 )547 # Create a cloud-init ISO for a VM.548 def _create_node_cloud_init_iso(549 self, environment: Environment, log: Logger, node: Node550 ) -> None:551 environment_context = get_environment_context(environment)552 node_context = get_node_context(node)553 user_data = {554 "users": [555 "default",556 {557 "name": self.runbook.admin_username,558 "shell": "/bin/bash",559 "sudo": ["ALL=(ALL) NOPASSWD:ALL"],560 "groups": ["sudo", "docker"],561 "ssh_authorized_keys": [environment_context.ssh_public_key],562 },563 ],564 }565 # Iterate through all the top-level properties.566 for extra_user_data in node_context.extra_cloud_init_user_data:567 for key, value in extra_user_data.items():568 existing_value = user_data.get(key)569 if not existing_value:570 # Property doesn't exist yet. So, add it.571 user_data[key] = value572 elif isinstance(existing_value, dict) and isinstance(value, dict):573 # Merge two dictionaries by adding properties from new value and574 # replacing any existing properties.575 # Examples: disk_setup, etc.576 existing_value.update(value)577 elif isinstance(existing_value, list) and isinstance(value, list):578 # Merge two lists by appending to the end of the existing list.579 # Examples: write_files, runcmd, etc.580 existing_value.extend(value)581 else:582 # String, unknown type or mismatched type.583 # Just replace the existing property.584 user_data[key] = value585 meta_data = {586 "local-hostname": node_context.vm_name,587 }588 # Note: cloud-init requires the user-data file to be prefixed with589 # `#cloud-config`.590 user_data_string = "#cloud-config\n" + yaml.safe_dump(user_data)591 meta_data_string = yaml.safe_dump(meta_data)592 iso_path = node_context.cloud_init_file_path593 tmp_dir = tempfile.TemporaryDirectory()594 try:595 iso_path = os.path.join(tmp_dir.name, "cloud-init.iso")596 self._create_iso(597 iso_path,598 [("/user-data", user_data_string), ("/meta-data", meta_data_string)],599 )600 self.host_node.shell.copy(601 Path(iso_path), Path(node_context.cloud_init_file_path)602 )603 finally:604 tmp_dir.cleanup()605 # Create an ISO file.606 def _create_iso(self, file_path: str, files: List[Tuple[str, str]]) -> None:607 iso = pycdlib.PyCdlib()608 iso.new(joliet=3, vol_ident="cidata")609 for i, file in enumerate(files):610 path, contents = file611 contents_data = contents.encode()612 iso.add_fp(613 io.BytesIO(contents_data),614 len(contents_data),615 f"/{i}.;1",616 joliet_path=path,617 )618 iso.write(file_path)619 # Create the OS disk.620 def _create_node_os_disk(621 self, environment: Environment, log: Logger, node: Node622 ) -> None:623 raise NotImplementedError()624 def _create_node_data_disks(self, node: Node) -> None:625 node_context = get_node_context(node)626 qemu_img = self.host_node.tools[QemuImg]627 for disk in node_context.data_disks:628 qemu_img.create_new_qcow2(disk.file_path, disk.size_gib * 1024)629 # Create the XML definition for the VM.630 def _create_node_domain_xml(631 self,632 environment: Environment,633 log: Logger,634 node: Node,...

Full Screen

Full Screen

ch_platform.py

Source:ch_platform.py Github

copy

Full Screen

...141 node_context.console_logger.attach(142 libvirt_conn, node_context.domain, node_context.console_log_file_path143 )144 # Create the OS disk.145 def _create_node_os_disk(146 self, environment: Environment, log: Logger, node: Node147 ) -> None:148 node_context = get_node_context(node)149 if node_context.os_disk_base_file_fmt == DiskImageFormat.QCOW2:150 self.host_node.tools[QemuImg].convert(151 "qcow2",152 node_context.os_disk_base_file_path,153 "raw",154 node_context.os_disk_file_path,155 )156 else:157 self.host_node.execute(158 f"cp {node_context.os_disk_base_file_path}"159 f" {node_context.os_disk_file_path}",...

Full Screen

Full Screen

qemu_platform.py

Source:qemu_platform.py Github

copy

Full Screen

...24 return QemuNodeSchema25 def _libvirt_uri_schema(self) -> str:26 return "qemu"27 # Create the OS disk.28 def _create_node_os_disk(29 self, environment: Environment, log: Logger, node: Node30 ) -> None:31 node_context = get_node_context(node)32 self.host_node.tools[QemuImg].create_diff_qcow2(33 node_context.os_disk_file_path, node_context.os_disk_base_file_path34 )35 def _get_vmm_version(self) -> str:36 result = "Unknown"37 if self.host_node:38 output = self.host_node.execute(39 "qemu-system-x86_64 --version",40 shell=True,41 ).stdout42 output = filter_ansi_escape(output)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful