How to use _create_node_cloud_init_iso method in lisa

Best Python code snippet using lisa_python

platform.py

Source:platform.py Github

copy

Full Screen

...420 Path(node_context.os_disk_source_file_path),421 Path(node_context.os_disk_base_file_path),422 )423 # Create cloud-init ISO file.424 self._create_node_cloud_init_iso(environment, log, node)425 # Create OS disk from the provided image.426 self._create_node_os_disk(environment, log, node)427 # Create data disks428 self._create_node_data_disks(node)429 # Create libvirt domain (i.e. VM).430 xml = self._create_node_domain_xml(environment, log, node, lv_conn)431 node_context.domain = lv_conn.defineXML(xml)432 self._create_domain_and_attach_logger(433 lv_conn,434 node_context,435 )436 # Delete all the VMs.437 def _delete_nodes(self, environment: Environment, log: Logger) -> None:438 # Delete nodes.439 for node in environment.nodes.list():440 self._delete_node(node, log)441 # Delete VM disks directory.442 try:443 self.host_node.shell.remove(Path(self.vm_disks_dir), True)444 except Exception as ex:445 log.warning(f"Failed to delete VM files directory: {ex}")446 def _delete_node_watchdog_callback(self) -> None:447 print("VM delete watchdog timer fired.\n", file=sys.__stderr__)448 faulthandler.dump_traceback(file=sys.__stderr__, all_threads=True)449 os._exit(1)450 def _delete_node(self, node: Node, log: Logger) -> None:451 node_context = get_node_context(node)452 watchdog = Timer(60.0, self._delete_node_watchdog_callback)453 watchdog.start()454 # Stop the VM.455 if node_context.domain:456 log.debug(f"Stop VM: {node_context.vm_name}")457 try:458 # In the libvirt API, "destroy" means "stop".459 node_context.domain.destroy()460 except libvirt.libvirtError as ex:461 log.warning(f"VM stop failed. {ex}")462 # Wait for console log to close.463 # Note: libvirt can deadlock if you try to undefine the VM while the stream464 # is trying to close.465 if node_context.console_logger:466 log.debug(f"Close VM console log: {node_context.vm_name}")467 node_context.console_logger.close()468 node_context.console_logger = None469 # Undefine the VM.470 if node_context.domain:471 log.debug(f"Delete VM: {node_context.vm_name}")472 try:473 node_context.domain.undefineFlags(self._get_domain_undefine_flags())474 except libvirt.libvirtError as ex:475 log.warning(f"VM delete failed. {ex}")476 node_context.domain = None477 watchdog.cancel()478 def _get_domain_undefine_flags(self) -> int:479 return int(480 libvirt.VIR_DOMAIN_UNDEFINE_MANAGED_SAVE481 | libvirt.VIR_DOMAIN_UNDEFINE_SNAPSHOTS_METADATA482 | libvirt.VIR_DOMAIN_UNDEFINE_NVRAM483 | libvirt.VIR_DOMAIN_UNDEFINE_CHECKPOINTS_METADATA484 )485 def _stop_port_forwarding(self, environment: Environment, log: Logger) -> None:486 log.debug(f"Clearing port forwarding rules for environment {environment.name}")487 environment_context = get_environment_context(environment)488 for (port, address) in environment_context.port_forwarding_list:489 self.host_node.tools[Iptables].stop_forwarding(port, address, 22)490 # Retrieve the VMs' dynamic properties (e.g. IP address).491 def _fill_nodes_metadata(492 self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect493 ) -> None:494 environment_context = get_environment_context(environment)495 # Give all the VMs some time to boot and then acquire an IP address.496 timeout = time.time() + environment_context.network_boot_timeout497 if self.host_node.is_remote:498 remote_node = cast(RemoteNode, self.host_node)499 conn_info = remote_node.connection_info500 address = conn_info[constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS]501 for node in environment.nodes.list():502 assert isinstance(node, RemoteNode)503 # Get the VM's IP address.504 local_address = self._get_node_ip_address(505 environment, log, lv_conn, node, timeout506 )507 node_port = 22508 if self.host_node.is_remote:509 with self._port_forwarding_lock:510 port_not_found = True511 while port_not_found:512 if self._next_available_port > 65535:513 raise LisaException(514 "No available ports on the host to forward"515 )516 # check if the port is already in use517 output = self.host_node.execute(518 f"nc -vz 127.0.0.1 {self._next_available_port}"519 )520 if output.exit_code == 1: # port not in use521 node_port = self._next_available_port522 port_not_found = False523 self._next_available_port += 1524 self.host_node.tools[Iptables].start_forwarding(525 node_port, local_address, 22526 )527 environment_context.port_forwarding_list.append(528 (node_port, local_address)529 )530 else:531 address = local_address532 # Set SSH connection info for the node.533 node.set_connection_info(534 address=local_address,535 public_address=address,536 public_port=node_port,537 username=self.runbook.admin_username,538 private_key_file=self.runbook.admin_private_key_file,539 )540 # Ensure cloud-init completes its setup.541 node.execute(542 "cloud-init status --wait",543 sudo=True,544 expected_exit_code=0,545 expected_exit_code_failure_message="waiting on cloud-init",546 )547 # Create a cloud-init ISO for a VM.548 def _create_node_cloud_init_iso(549 self, environment: Environment, log: Logger, node: Node550 ) -> None:551 environment_context = get_environment_context(environment)552 node_context = get_node_context(node)553 user_data = {554 "users": [555 "default",556 {557 "name": self.runbook.admin_username,558 "shell": "/bin/bash",559 "sudo": ["ALL=(ALL) NOPASSWD:ALL"],560 "groups": ["sudo", "docker"],561 "ssh_authorized_keys": [environment_context.ssh_public_key],562 },...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful