Best Python code snippet using lisa_python
features.py
Source:features.py  
...266                websockets.connect(connection_str)  # type: ignore267            )268        return self._ws269    def _write(self, cmd: str) -> None:270        self._initialize_serial_console(id=self.DEFAULT_SERIAL_PORT_ID)271        # connect to websocket and send command272        ws = self._get_connection()273        self._get_event_loop().run_until_complete(ws.send(cmd))274    def _read(self) -> str:275        self._initialize_serial_console(id=self.DEFAULT_SERIAL_PORT_ID)276        # connect to websocket277        ws = self._get_connection()278        # read all the available messages279        output: str = ""280        while True:281            try:282                msg = self._get_event_loop().run_until_complete(283                    asyncio.wait_for(ws.recv(), timeout=10)284                )285                output += msg286            except asyncio.TimeoutError:287                # this implies that the buffer is empty288                break289        # assert isinstance(self._output_string, str)290        if self._output_string in output:291            # implies that the connection was reset292            diff = output[len(self._output_string) :]293            self._output_string: str = output294            return diff295        else:296            self._output_string += output297        return output298    def _get_console_log(self, saved_path: Optional[Path]) -> bytes:299        platform: AzurePlatform = self._platform  # type: ignore300        compute_client = get_compute_client(platform)301        with global_credential_access_lock:302            diagnostic_data = (303                compute_client.virtual_machines.retrieve_boot_diagnostics_data(304                    resource_group_name=self._resource_group_name, vm_name=self._vm_name305                )306            )307        if saved_path:308            screenshot_raw_name = saved_path.joinpath("serial_console.bmp")309            screenshot_name = saved_path.joinpath("serial_console.png")310            screenshot_response = requests.get(311                diagnostic_data.console_screenshot_blob_uri312            )313            with open(screenshot_raw_name, mode="wb") as f:314                f.write(screenshot_response.content)315            try:316                with Image.open(screenshot_raw_name) as image:317                    image.save(screenshot_name, "PNG", optimize=True)318            except UnidentifiedImageError:319                self._log.debug(320                    "The screenshot is not generated. "321                    "The reason may be the VM is not started."322                )323            unlink(screenshot_raw_name)324        log_response = requests.get(diagnostic_data.serial_console_log_blob_uri)325        return log_response.content326    def _get_connection_string(self) -> str:327        # setup connection string328        platform: AzurePlatform = self._platform  # type: ignore329        connection = self._serial_port_operations.connect(330            resource_group_name=self._resource_group_name,331            resource_provider_namespace=self.RESOURCE_PROVIDER_NAMESPACE,332            parent_resource_type=self.PARENT_RESOURCE_TYPE,333            parent_resource=self._vm_name,334            serial_port=self._serial_port.name,335        )336        access_token = platform.credential.get_token(337            "https://management.core.windows.net/.default"338        ).token339        serial_port_connection_str = (340            f"{connection.connection_string}?authorization={access_token}"341        )342        return serial_port_connection_str343    def _initialize_serial_console(self, id: int) -> None:344        if self._serial_console_initialized:345            return346        platform: AzurePlatform = self._platform  # type: ignore347        with global_credential_access_lock:348            self._serial_console_client = MicrosoftSerialConsoleClient(349                credential=platform.credential, subscription_id=platform.subscription_id350            )351            self._serial_port_operations: FixedSerialPortsOperations = (352                FixedSerialPortsOperations(353                    self._serial_console_client._client,354                    self._serial_console_client._config,355                    self._serial_console_client._serialize,356                    self._serial_console_client._deserialize,357                )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
