How to use _stop_port_forwarding method in lisa

Best Python code snippet using lisa_python

platform.py

Source:platform.py Github

copy

Full Screen

...131 self._deploy_nodes(environment, log)132 def _delete_environment(self, environment: Environment, log: Logger) -> None:133 self._delete_nodes(environment, log)134 if self.host_node.is_remote:135 self._stop_port_forwarding(environment, log)136 libvirt_log = self.host_node.tools[Journalctl].logs_for_unit(137 "libvirtd", sudo=self.host_node.is_remote138 )139 libvirt_log_path = self.host_node.local_log_path / "libvirtd.log"140 with open(str(libvirt_log_path), "w") as f:141 f.write(libvirt_log)142 def _configure_environment(self, environment: Environment, log: Logger) -> None:143 environment_context = get_environment_context(environment)144 if self.platform_runbook.network_boot_timeout:145 environment_context.network_boot_timeout = (146 self.platform_runbook.network_boot_timeout147 )148 environment_context.ssh_public_key = get_public_key_data(149 self.runbook.admin_private_key_file150 )151 def _configure_node_capabilities(152 self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect153 ) -> bool:154 if not environment.runbook.nodes_requirement:155 return True156 host_capabilities = self._get_host_capabilities(lv_conn, log)157 nodes_capabilities = self._create_node_capabilities(host_capabilities)158 nodes_requirement = []159 for node_space in environment.runbook.nodes_requirement:160 # Check that the general node capabilities are compatible with this node's161 # specific requirements.162 if not node_space.check(nodes_capabilities):163 return False164 # Rectify the general node capabilities with this node's specific165 # requirements.166 node_requirement = node_space.generate_min_capability(nodes_capabilities)167 nodes_requirement.append(node_requirement)168 if not self._check_host_capabilities(nodes_requirement, host_capabilities, log):169 return False170 environment.runbook.nodes_requirement = nodes_requirement171 return True172 def _get_host_capabilities(173 self, lv_conn: libvirt.virConnect, log: Logger174 ) -> _HostCapabilities:175 host_capabilities = _HostCapabilities()176 capabilities_xml_str = lv_conn.getCapabilities()177 capabilities_xml = ET.fromstring(capabilities_xml_str)178 host_xml = capabilities_xml.find("host")179 assert host_xml180 topology_xml = host_xml.find("topology")181 assert topology_xml182 cells_xml = topology_xml.find("cells")183 assert cells_xml184 for cell in cells_xml.findall("cell"):185 cpus_xml = cell.find("cpus")186 assert cpus_xml187 host_capabilities.core_count += int(cpus_xml.attrib["num"])188 # Get free memory.189 # Include the disk cache size, as it will be freed if memory becomes limited.190 memory_stats = lv_conn.getMemoryStats(libvirt.VIR_NODE_MEMORY_STATS_ALL_CELLS)191 host_capabilities.free_memory_kib = (192 memory_stats[libvirt.VIR_NODE_MEMORY_STATS_FREE]193 + memory_stats[libvirt.VIR_NODE_MEMORY_STATS_CACHED]194 )195 log.debug(196 f"QEMU host: "197 f"CPU Cores = {host_capabilities.core_count}, "198 f"Free Memory = {host_capabilities.free_memory_kib} KiB"199 )200 return host_capabilities201 # Create the set of capabilities that are generally supported on QEMU nodes.202 def _create_node_capabilities(203 self, host_capabilities: _HostCapabilities204 ) -> schema.NodeSpace:205 node_capabilities = schema.NodeSpace()206 node_capabilities.name = "QEMU"207 node_capabilities.node_count = 1208 node_capabilities.core_count = search_space.IntRange(209 min=1, max=host_capabilities.core_count210 )211 node_capabilities.disk = schema.DiskOptionSettings(212 data_disk_count=search_space.IntRange(min=0),213 data_disk_size=search_space.IntRange(min=1),214 )215 node_capabilities.network_interface = schema.NetworkInterfaceOptionSettings()216 node_capabilities.network_interface.max_nic_count = 1217 node_capabilities.network_interface.nic_count = 1218 node_capabilities.gpu_count = 0219 node_capabilities.features = search_space.SetSpace[schema.FeatureSettings](220 is_allow_set=True,221 items=[222 schema.FeatureSettings.create(SerialConsole.name()),223 ],224 )225 return node_capabilities226 # Check that the VM requirements can be fulfilled by the host.227 def _check_host_capabilities(228 self,229 nodes_requirements: List[schema.NodeSpace],230 host_capabilities: _HostCapabilities,231 log: Logger,232 ) -> bool:233 total_required_memory_mib = 0234 for node_requirements in nodes_requirements:235 # Calculate the total amount of memory required for all the VMs.236 assert isinstance(node_requirements.memory_mb, int)237 total_required_memory_mib += node_requirements.memory_mb238 # Ensure host has enough memory for all the VMs.239 total_required_memory_kib = total_required_memory_mib * 1024240 if total_required_memory_kib > host_capabilities.free_memory_kib:241 log.error(242 f"Nodes require a total of {total_required_memory_kib} KiB memory. "243 f"Host only has {host_capabilities.free_memory_kib} KiB free."244 )245 return False246 return True247 # Get the minimum value for a node requirement with an interger type.248 # Note: Unlike other orchestrators, we don't want to fill up the capacity of249 # the host in case the test is running on a dev box.250 def _get_count_space_min(self, count_space: search_space.CountSpace) -> int:251 return search_space.generate_min_capability_countspace(count_space, count_space)252 def _deploy_nodes(self, environment: Environment, log: Logger) -> None:253 self._configure_nodes(environment, log)254 with libvirt.open(self.libvirt_conn_str) as lv_conn:255 try:256 self._create_nodes(environment, log, lv_conn)257 self._fill_nodes_metadata(environment, log, lv_conn)258 except Exception as ex:259 assert environment.platform260 if (261 environment.platform.runbook.keep_environment262 == constants.ENVIRONMENT_KEEP_NO263 ):264 self._delete_nodes(environment, log)265 raise ex266 # Pre-determine all the nodes' properties, including the name of all the resouces267 # to be created. This makes it easier to cleanup everything after the test is268 # finished (or fails).269 def _configure_nodes(self, environment: Environment, log: Logger) -> None:270 # Generate a random name for the VMs.271 test_suffix = "".join(random.choice(string.ascii_uppercase) for _ in range(5))272 vm_name_prefix = f"lisa-{test_suffix}"273 self.vm_disks_dir = os.path.join(274 self.platform_runbook.hosts[0].lisa_working_dir, vm_name_prefix275 )276 assert environment.runbook.nodes_requirement277 for i, node_space in enumerate(environment.runbook.nodes_requirement):278 assert isinstance(279 node_space, schema.NodeSpace280 ), f"actual: {type(node_space)}"281 node_runbook: BaseLibvirtNodeSchema = node_space.get_extended_runbook(282 self.__node_runbook_type(), type_name=type(self).type_name()283 )284 if not os.path.exists(node_runbook.disk_img):285 raise LisaException(f"file does not exist: {node_runbook.disk_img}")286 node = environment.create_node_from_requirement(node_space)287 self._configure_node(288 node,289 i,290 node_space,291 node_runbook,292 vm_name_prefix,293 )294 def _configure_node(295 self,296 node: Node,297 node_idx: int,298 node_space: schema.NodeSpace,299 node_runbook: BaseLibvirtNodeSchema,300 vm_name_prefix: str,301 ) -> None:302 node_context = get_node_context(node)303 if (304 not node_runbook.firmware_type305 or node_runbook.firmware_type == FIRMWARE_TYPE_UEFI306 ):307 node_context.use_bios_firmware = False308 elif node_runbook.firmware_type == FIRMWARE_TYPE_BIOS:309 node_context.use_bios_firmware = True310 if node_runbook.enable_secure_boot:311 raise LisaException("Secure-boot requires UEFI firmware.")312 else:313 raise LisaException(314 f"Unknown node firmware type: {node_runbook.firmware_type}."315 f"Expecting either {FIRMWARE_TYPE_UEFI} or {FIRMWARE_TYPE_BIOS}."316 )317 node_context.machine_type = node_runbook.machine_type or None318 node_context.enable_secure_boot = node_runbook.enable_secure_boot319 node_context.vm_name = f"{vm_name_prefix}-{node_idx}"320 if not node.name:321 node.name = node_context.vm_name322 node_context.cloud_init_file_path = os.path.join(323 self.vm_disks_dir, f"{node_context.vm_name}-cloud-init.iso"324 )325 if self.host_node.is_remote:326 node_context.os_disk_source_file_path = node_runbook.disk_img327 node_context.os_disk_base_file_path = os.path.join(328 self.vm_disks_dir, os.path.basename(node_runbook.disk_img)329 )330 else:331 node_context.os_disk_base_file_path = node_runbook.disk_img332 node_context.os_disk_base_file_fmt = DiskImageFormat(333 node_runbook.disk_img_format334 )335 node_context.os_disk_file_path = os.path.join(336 self.vm_disks_dir, f"{node_context.vm_name}-os.qcow2"337 )338 node_context.console_log_file_path = str(339 node.local_log_path / "qemu-console.log"340 )341 # Read extra cloud-init data.342 extra_user_data = (343 node_runbook.cloud_init and node_runbook.cloud_init.extra_user_data344 )345 if extra_user_data:346 node_context.extra_cloud_init_user_data = []347 if isinstance(extra_user_data, str):348 extra_user_data = [extra_user_data]349 for relative_file_path in extra_user_data:350 if not relative_file_path:351 continue352 file_path = constants.RUNBOOK_PATH.joinpath(relative_file_path)353 with open(file_path, "r") as file:354 node_context.extra_cloud_init_user_data.append(yaml.safe_load(file))355 # Configure data disks.356 if node_space.disk:357 assert isinstance(358 node_space.disk.data_disk_count, int359 ), f"actual: {type(node_space.disk.data_disk_count)}"360 assert isinstance(361 node_space.disk.data_disk_size, int362 ), f"actual: {type(node_space.disk.data_disk_size)}"363 for i in range(node_space.disk.data_disk_count):364 data_disk = DataDiskContext()365 data_disk.file_path = os.path.join(366 self.vm_disks_dir, f"{node_context.vm_name}-data-{i}.qcow2"367 )368 data_disk.size_gib = node_space.disk.data_disk_size369 node_context.data_disks.append(data_disk)370 def _create_domain_and_attach_logger(371 self,372 libvirt_conn: libvirt.virConnect,373 node_context: NodeContext,374 ) -> None:375 # Start the VM in the paused state.376 # This gives the console logger a chance to connect before the VM starts377 # for real.378 assert node_context.domain379 node_context.domain.createWithFlags(libvirt.VIR_DOMAIN_START_PAUSED)380 # Attach the console logger381 node_context.console_logger = QemuConsoleLogger()382 node_context.console_logger.attach(383 libvirt_conn, node_context.domain, node_context.console_log_file_path384 )385 # Start the VM.386 node_context.domain.resume()387 # Create all the VMs.388 def _create_nodes(389 self,390 environment: Environment,391 log: Logger,392 lv_conn: libvirt.virConnect,393 ) -> None:394 self.host_node.shell.mkdir(Path(self.vm_disks_dir), exist_ok=True)395 for node in environment.nodes.list():396 node_context = get_node_context(node)397 self._create_node(398 node,399 node_context,400 environment,401 log,402 lv_conn,403 )404 def _create_node(405 self,406 node: Node,407 node_context: NodeContext,408 environment: Environment,409 log: Logger,410 lv_conn: libvirt.virConnect,411 ) -> None:412 # Create required directories and copy the required files to the host413 # node.414 if node_context.os_disk_source_file_path:415 source_exists = self.host_node.tools[Ls].path_exists(416 path=node_context.os_disk_base_file_path, sudo=True417 )418 if not source_exists:419 self.host_node.shell.copy(420 Path(node_context.os_disk_source_file_path),421 Path(node_context.os_disk_base_file_path),422 )423 # Create cloud-init ISO file.424 self._create_node_cloud_init_iso(environment, log, node)425 # Create OS disk from the provided image.426 self._create_node_os_disk(environment, log, node)427 # Create data disks428 self._create_node_data_disks(node)429 # Create libvirt domain (i.e. VM).430 xml = self._create_node_domain_xml(environment, log, node, lv_conn)431 node_context.domain = lv_conn.defineXML(xml)432 self._create_domain_and_attach_logger(433 lv_conn,434 node_context,435 )436 # Delete all the VMs.437 def _delete_nodes(self, environment: Environment, log: Logger) -> None:438 # Delete nodes.439 for node in environment.nodes.list():440 self._delete_node(node, log)441 # Delete VM disks directory.442 try:443 self.host_node.shell.remove(Path(self.vm_disks_dir), True)444 except Exception as ex:445 log.warning(f"Failed to delete VM files directory: {ex}")446 def _delete_node_watchdog_callback(self) -> None:447 print("VM delete watchdog timer fired.\n", file=sys.__stderr__)448 faulthandler.dump_traceback(file=sys.__stderr__, all_threads=True)449 os._exit(1)450 def _delete_node(self, node: Node, log: Logger) -> None:451 node_context = get_node_context(node)452 watchdog = Timer(60.0, self._delete_node_watchdog_callback)453 watchdog.start()454 # Stop the VM.455 if node_context.domain:456 log.debug(f"Stop VM: {node_context.vm_name}")457 try:458 # In the libvirt API, "destroy" means "stop".459 node_context.domain.destroy()460 except libvirt.libvirtError as ex:461 log.warning(f"VM stop failed. {ex}")462 # Wait for console log to close.463 # Note: libvirt can deadlock if you try to undefine the VM while the stream464 # is trying to close.465 if node_context.console_logger:466 log.debug(f"Close VM console log: {node_context.vm_name}")467 node_context.console_logger.close()468 node_context.console_logger = None469 # Undefine the VM.470 if node_context.domain:471 log.debug(f"Delete VM: {node_context.vm_name}")472 try:473 node_context.domain.undefineFlags(self._get_domain_undefine_flags())474 except libvirt.libvirtError as ex:475 log.warning(f"VM delete failed. {ex}")476 node_context.domain = None477 watchdog.cancel()478 def _get_domain_undefine_flags(self) -> int:479 return int(480 libvirt.VIR_DOMAIN_UNDEFINE_MANAGED_SAVE481 | libvirt.VIR_DOMAIN_UNDEFINE_SNAPSHOTS_METADATA482 | libvirt.VIR_DOMAIN_UNDEFINE_NVRAM483 | libvirt.VIR_DOMAIN_UNDEFINE_CHECKPOINTS_METADATA484 )485 def _stop_port_forwarding(self, environment: Environment, log: Logger) -> None:486 log.debug(f"Clearing port forwarding rules for environment {environment.name}")487 environment_context = get_environment_context(environment)488 for (port, address) in environment_context.port_forwarding_list:489 self.host_node.tools[Iptables].stop_forwarding(port, address, 22)490 # Retrieve the VMs' dynamic properties (e.g. IP address).491 def _fill_nodes_metadata(492 self, environment: Environment, log: Logger, lv_conn: libvirt.virConnect493 ) -> None:494 environment_context = get_environment_context(environment)495 # Give all the VMs some time to boot and then acquire an IP address.496 timeout = time.time() + environment_context.network_boot_timeout497 if self.host_node.is_remote:498 remote_node = cast(RemoteNode, self.host_node)499 conn_info = remote_node.connection_info...

Full Screen

Full Screen

snippet_client_v2.py

Source:snippet_client_v2.py Github

copy

Full Screen

...486 self._conn.close()487 self._conn = None488 finally:489 # Always clear the host port as part of the close step490 self._stop_port_forwarding()491 def _stop_port_forwarding(self):492 """Stops the adb port forwarding used by this client."""493 if self.host_port:494 self._device.adb.forward(['--remove', f'tcp:{self.host_port}'])495 self.host_port = None496 def _stop_server(self):497 """Releases all the resources acquired in `start_server`.498 Raises:499 android_device_lib_errors.DeviceError: if the server exited with errors on500 the device side.501 """502 # Although killing the snippet server would abort this subprocess anyway, we503 # want to call stop_standing_subprocess() to perform a health check,504 # print the failure stack trace if there was any, and reap it from the505 # process table. Note that it's much more important to ensure releasing all...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful