How to use _spawn_ssh_process method in lisa

Best Python code snippet using lisa_python

shell.py

Source:shell.py Github

copy

Full Screen

...151# some images needs longer time to set up ssh connection.152# e.g. Oracle Oracle-Linux 7.5 7.5.20181207153# e.g. qubole-inc qubole-data-service default-img 0.7.4154@func_set_timeout(20) # type: ignore155def _spawn_ssh_process(shell: spur.ssh.SshShell, **kwargs: Any) -> spur.ssh.SshProcess:156 return shell.spawn(**kwargs)157class SshShell(InitializableMixin):158 def __init__(self, connection_info: schema.ConnectionInfo) -> None:159 super().__init__()160 self.is_remote = True161 self._connection_info = connection_info162 self._inner_shell: Optional[spur.SshShell] = None163 self._jump_boxes: List[Any] = []164 self._jump_box_sock: Any = None165 paramiko_logger = logging.getLogger("paramiko")166 paramiko_logger.setLevel(logging.WARN)167 def _initialize(self, *args: Any, **kwargs: Any) -> None:168 is_ready, tcp_error_code = wait_tcp_port_ready(169 self._connection_info.address, self._connection_info.port170 )171 if not is_ready:172 raise TcpConnectionException(173 self._connection_info.address,174 self._connection_info.port,175 tcp_error_code,176 )177 sock = self._establish_jump_boxes(178 address=self._connection_info.address,179 port=self._connection_info.port,180 )181 try:182 stdout = try_connect(self._connection_info, sock=sock)183 except Exception as identifier:184 raise LisaException(185 f"failed to connect SSH "186 f"[{self._connection_info.address}:{self._connection_info.port}], "187 f"{identifier.__class__.__name__}: {identifier}"188 )189 self._close_jump_boxes()190 # Some windows doesn't end the text stream, so read first line only.191 # it's enough to detect os.192 stdout_content = stdout.readline()193 stdout.close()194 if stdout_content and "Windows" in stdout_content:195 self.is_posix = False196 shell_type = WindowsShellType()197 else:198 self.is_posix = True199 shell_type = spur.ssh.ShellTypes.sh200 sock = self._establish_jump_boxes(201 address=self._connection_info.address,202 port=self._connection_info.port,203 )204 spur_kwargs = {205 "hostname": self._connection_info.address,206 "port": self._connection_info.port,207 "username": self._connection_info.username,208 "password": self._connection_info.password,209 "private_key_file": self._connection_info.private_key_file,210 "missing_host_key": spur.ssh.MissingHostKey.accept,211 # There are too many servers in cloud, and they may reuse the same212 # IP in different time. If so, there is host key conflict. So do not213 # load host keys to avoid this kind of error.214 "load_system_host_keys": False,215 "sock": sock,216 }217 spur_ssh_shell = spur.SshShell(shell_type=shell_type, **spur_kwargs)218 sftp = spurplus.sftp.ReconnectingSFTP(219 sftp_opener=spur_ssh_shell._open_sftp_client220 )221 self._inner_shell = spurplus.SshShell(spur_ssh_shell=spur_ssh_shell, sftp=sftp)222 def close(self) -> None:223 if self._inner_shell:224 self._inner_shell.close()225 # after closed, can be reconnect226 self._inner_shell = None227 self._is_initialized = False228 self._close_jump_boxes()229 @property230 def is_connected(self) -> bool:231 is_inner_shell_ready = False232 if self._inner_shell:233 is_inner_shell_ready = True234 return is_inner_shell_ready235 def spawn(236 self,237 command: Sequence[str],238 update_env: Optional[Mapping[str, str]] = None,239 store_pid: bool = False,240 cwd: Optional[Union[str, Path]] = None,241 stdout: Any = None,242 stderr: Any = None,243 encoding: str = "utf-8",244 use_pty: bool = True,245 allow_error: bool = True,246 ) -> spur.ssh.SshProcess:247 self.initialize()248 assert self._inner_shell249 try:250 process: spur.ssh.SshProcess = _spawn_ssh_process(251 self._inner_shell,252 command=command,253 update_env=update_env,254 store_pid=store_pid,255 cwd=cwd,256 stdout=stdout,257 stderr=stderr,258 encoding=encoding,259 use_pty=use_pty,260 allow_error=allow_error,261 )262 except FunctionTimedOut:263 raise LisaException(264 f"The remote node is timeout on execute {command}. "...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful