How to use _get_proxy_channel method in tempest

Best Python code snippet using tempest_python

ssh.py

Source:ssh.py Github

copy

Full Screen

...66 self.host, self.port, self.username, str(self.password))67 attempts = 068 while True:69 if self.proxy_client is not None:70 proxy_chan = self._get_proxy_channel()71 else:72 proxy_chan = None73 try:74 ssh.connect(self.host, port=self.port, username=self.username,75 password=self.password,76 look_for_keys=self.look_for_keys,77 key_filename=self.key_filename,78 timeout=self.channel_timeout, pkey=self.pkey,79 sock=proxy_chan)80 LOG.info("ssh connection to %s@%s successfully created",81 self.username, self.host)82 return ssh83 except (EOFError,84 socket.error, socket.timeout,85 paramiko.SSHException) as e:86 ssh.close()87 if self._is_timed_out(_start_time):88 LOG.exception("Failed to establish authenticated ssh"89 " connection to %s@%s after %d attempts",90 self.username, self.host, attempts)91 raise exceptions.SSHTimeout(host=self.host,92 user=self.username,93 password=self.password)94 bsleep += backoff95 attempts += 196 LOG.warning("Failed to establish authenticated ssh"97 " connection to %s@%s (%s). Number attempts: %s."98 " Retry after %d seconds.",99 self.username, self.host, e, attempts, bsleep)100 time.sleep(bsleep)101 def _is_timed_out(self, start_time):102 return (time.time() - self.timeout) > start_time103 @staticmethod104 def _can_system_poll():105 return hasattr(select, 'poll')106 def exec_command(self, cmd, encoding="utf-8"):107 """Execute the specified command on the server108 Note that this method is reading whole command outputs to memory, thus109 shouldn't be used for large outputs.110 :param str cmd: Command to run at remote server.111 :param str encoding: Encoding for result from paramiko.112 Result will not be decoded if None.113 :returns: data read from standard output of the command.114 :raises: SSHExecCommandFailed if command returns nonzero115 status. The exception contains command status stderr content.116 :raises: TimeoutException if cmd doesn't end when timeout expires.117 """118 ssh = self._get_ssh_connection()119 transport = ssh.get_transport()120 with transport.open_session() as channel:121 channel.fileno() # Register event pipe122 channel.exec_command(cmd)123 channel.shutdown_write()124 # If the executing host is linux-based, poll the channel125 if self._can_system_poll():126 out_data_chunks = []127 err_data_chunks = []128 poll = select.poll()129 poll.register(channel, select.POLLIN)130 start_time = time.time()131 while True:132 ready = poll.poll(self.channel_timeout)133 if not any(ready):134 if not self._is_timed_out(start_time):135 continue136 raise exceptions.SSHTimeout(host=self.host,137 user=self.username,138 password=self.password)139 # raise exceptions.TimeoutException(140 # "Command: '{0}' executed on host '{1}'.".format(141 # cmd, self.host))142 if not ready[0]: # If there is nothing to read.143 continue144 out_chunk = err_chunk = None145 if channel.recv_ready():146 out_chunk = channel.recv(self.buf_size)147 out_data_chunks += out_chunk,148 if channel.recv_stderr_ready():149 err_chunk = channel.recv_stderr(self.buf_size)150 err_data_chunks += err_chunk,151 if not err_chunk and not out_chunk:152 break153 out_data = b''.join(out_data_chunks)154 err_data = b''.join(err_data_chunks)155 # Just read from the channels156 else:157 out_file = channel.makefile('rb', self.buf_size)158 err_file = channel.makefile_stderr('rb', self.buf_size)159 out_data = out_file.read()160 err_data = err_file.read()161 if encoding:162 out_data = out_data.decode(encoding)163 err_data = err_data.decode(encoding)164 exit_status = channel.recv_exit_status()165 if 0 != exit_status:166 raise exceptions.SSHExecCommandFailed(167 command=cmd, exit_status=exit_status,168 stderr=err_data, stdout=out_data)169 transport.close()170 return out_data171 def test_connection_auth(self):172 """Raises an exception when we can not connect to server via ssh."""173 connection = self._get_ssh_connection()174 connection.close()175 def _get_proxy_channel(self):176 conn = self.proxy_client._get_ssh_connection()177 # Keep a reference to avoid g/c178 # https://github.com/paramiko/paramiko/issues/440179 self._proxy_conn = conn180 transport = conn.get_transport()181 chan = transport.open_session()182 cmd = 'nc %s %s' % (self.host, self.port)183 chan.exec_command(cmd)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful