How to use _listen method in ATX

Best Python code snippet using ATX Github


Full Screen

1# This Source Code Form is subject to the terms of the Mozilla Public2# License, v. 2.0. If a copy of the MPL was not distributed with this3# file, You can obtain one at socket5import time6import sys7from Peach.Engine.engine import Engine8from Peach.Engine.common import PeachException9from Peach.publisher import Publisher10from Peach.publisher import Timeout11from Peach.publisher import PublisherSoftException12from Peach.Utilities.common import *13import Peach14def Debug(msg):15 if Peach.Engine.engine.Engine.debug:16 print(msg)17#class Timeout(SoftException):18# def __init__(self, msg):19# self.msg = msg20# 21# def __str__(self):22# return self.msg23class Tcp(Publisher):24 """25 A simple TCP client publisher.26 """27 def __init__(self, host, port, timeout=0.25, throttle=0):28 """29 @type host: string30 @param host: Remote host31 @type port: number32 @param port: Remote port33 @type timeout: number34 @param timeout: How long to wait for reponse35 @type throttle: number36 @param throttle: How long to wait between connections37 """38 Publisher.__init__(self)39 self._host = host40 try:41 self._port = int(port)42 except:43 raise PeachException("The Tcp publisher parameter for port was not a valid number.")44 try:45 self._timeout = float(timeout)46 except:47 raise PeachException("The Tcp publisher parameter for timeout was not a valid number.")48 try:49 self._throttle = float(throttle)50 except:51 raise PeachException("The Tcp publisher parameter for throttle was not a valid number.")52 self._socket = None53 def start(self):54 pass55 def stop(self):56 self.close()57 def connect(self):58 """59 Create connection.60 """61 self.close()62 if self._throttle > 0:63 time.sleep(self._throttle)64 # Try connecting many times65 # before we crash.66 for i in range(30):67 try:68 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)69 self._socket.connect((self._host, self._port))70 exception = None71 break72 except:73 self._socket = None74 exception = sys.exc_info()75 # Wait half sec and try again76 time.sleep(1)77 if self._socket is None:78 value = ""79 try:80 value = str(exception[1])81 except:82 pass83 raise PeachException("TCP onnection attempt failed: %s" % value)84 self.buff = ""85 self.pos = 086 def close(self):87 """88 Close connection if open.89 """90 try:91 if self._socket is not None:92 self._socket.close()93 finally:94 self._socket = None95 self.buff = ""96 self.pos = 097 def send(self, data):98 """99 Send data via sendall.100 @type data: string101 @param data: Data to send102 """103 if Peach.Engine.engine.Engine.debug:104 print(">>>>>>>>>>>>>>>>")105 print("tcp.Tcp.send():")106 printHex(data)107 try:108 self._socket.sendall(data)109 except:110 if Peach.Engine.engine.Engine.debug:111 print("Tcp: Sendall failed: " + str(sys.exc_info()[1]))112 raise PublisherSoftException("sendall failed: " + str(sys.exc_info()[1]))113 def _receiveBySize(self, size):114 """115 This is now a buffered receiver.116 @rtype: string117 @return: received data.118 """119 # Do we already a have it?120 if size + self.pos < len(self.buff):121 ret = self.buff[self.pos:self.pos + size]122 self.pos += size123 return ret124 # Only ask for the diff of what we don't already have125 diffSize = (self.pos + size) - len(self.buff)126 try:127 if Peach.Engine.engine.Engine.debug:128 print("Asking for %d, need %d, have %d" % (size, diffSize, len(self.buff) - self.pos))129 self._socket.settimeout(self._timeout)130 ret = self._socket.recv(diffSize)131 if not ret:132 # Socket was closed133 if Peach.Engine.engine.Engine.debug:134 print("Socket is closed")135 raise PublisherSoftException("Socket is closed")136 if Peach.Engine.engine.Engine.debug:137 print("<<<<<<<<<<<<<<<<<")138 print("tcp.Tcp.receive():")139 printHex(ret)140 self.buff += ret141 except socket.error as e:142 if str(e).find('The socket operation could not complete without blocking') != -1:143 if Peach.Engine.engine.Engine.debug:144 print("timed out waiting for data")145 raise Timeout(146 "Timed out waiting for data [%d:%d:%d:%d]" % (len(self.buff), (size + self.pos), size, diffSize))147 elif str(e).find('An existing connection was forcibly') != -1:148 if Peach.Engine.engine.Engine.debug:149 print("Socket was closed!")150 raise PublisherSoftException("Socket is closed")151 else:152 if Peach.Engine.engine.Engine.debug:153 print("recv failed: " + str(sys.exc_info()[1]))154 raise PublisherSoftException("recv failed: " + str(sys.exc_info()[1]))155 finally:156 self._socket.settimeout(None)157 ret = self.buff[self.pos:]158 self.pos = len(self.buff)159 return ret160 def _receiveByAvailable(self):161 """162 Receive as much as possible prior to timeout.163 @rtype: string164 @return: received data.165 """166 self._socket.settimeout(self._timeout)167 try:168 ret = self._socket.recv(4096)169 if not ret:170 raise PublisherSoftException("Socket is closed")171 if Peach.Engine.engine.Engine.debug:172 print("<<<<<<<<<<<<<<<<<")173 print("tcp.Tcp.receive():")174 printHex(ret)175 self.buff += ret176 except socket.error as e:177 if str(e).find('The socket operation could not complete without blocking') == -1:178 pass179 else:180 raise PublisherSoftException("recv failed: " + str(sys.exc_info()[1]))181 self._socket.settimeout(None)182 ret = self.buff[self.pos:]183 self.pos = len(self.buff)184 return ret185 def receive(self, size=None):186 if size is None:187 return self._receiveByAvailable()188 else:189 return self._receiveBySize(size)190class TcpListener(Tcp):191 """192 A TCP Listener publisher. This publisher193 supports the following state actions:194 * start - Start listening195 * stop - Stop listening196 * accept - Accept a client connection197 * close - Close a client connection198 """199 def __init__(self, host, port, timeout=0.25):200 Tcp.__init__(self, host, port, timeout)201 self._listen = None202 self._clientAddr = None203 def start(self):204 self.close()205 if self._listen is None:206 for i in range(3):207 try:208 self._listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM)209 self._listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)210 self._listen.bind((self._host, self._port))211 self._listen.listen(1)212 exception = None213 break214 except:215 self._listen = None216 exception = sys.exc_info()217 time.sleep(0.5)218 if self._listen is None:219 value = ""220 try:221 value = str(exception[1])222 except:223 pass224 raise PeachException("TCP bind attempt failed: %s" % value)225 self.buff = ""226 self.pos = 0227 def stop(self):228 try:229 if self._socket is not None:230 self._socket.close()231 except:232 pass233 finally:234 self._socket = None235 # Avoid TIME_WAIT state by not closing our listener236 #try:237 # if self._listen != None:238 # self._listen.close()239 #except:240 # pass241 #242 #finally:243 # self._listen = None244 def accept(self):245 self.buff = ""246 self.pos = 0247 conn, addr = self._listen.accept()248 self._socket = conn249 self._clientAddr = addr250 def close(self):251 try:252 if self._socket is not None:253 self._socket.close()254 except:255 pass256 finally:257 self._socket = None258 def connect(self):259 raise PeachException("Action 'connect' not supported")260try:261 import win32gui, win32con, win32process262 import sys, time, os, signal263 class TcpListenerLaunchGui(TcpListener):264 """265 Does TcpListener goodness and also can laun a program. After266 some defined amount of time we will try and close the267 GUI application by sending WM_CLOSE than kill it.268 """269 def __init__(self, host, port, windowname, timeout=0.25):270 TcpListener.__init__(self, host, port, timeout)271 self._windowName = windowname272 def stop(self):273 self.closeApp(self._windowName)274 TcpListener.stop(self)275 def call(self, method, args):276 """277 Launch program to consume file278 @type method: string279 @param method: Command to execute280 @type args: array of objects281 @param args: Arguments to pass282 """283 realArgs = [method, "/c", method]284 for a in args:285 realArgs.append(a)286 #print "Spawning %s" % method287 os.spawnv(os.P_NOWAIT, os.path.join(os.getenv('SystemRoot'), 'system32', 'cmd.exe'), realArgs)288 @staticmethod289 def enumCallback(hwnd, windowName):290 """291 Will get called by win32gui.EnumWindows, once for each292 top level application window.293 """294 try:295 # Get window title296 title = win32gui.GetWindowText(hwnd)297 # Is this our guy?298 if title.find(windowName) == -1:299 return300 (threadId, processId) = win32process.GetWindowThreadProcessId(hwnd)301 # Send WM_CLOSE message302 try:303 win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)304 win32gui.PostQuitMessage(hwnd)305 except:306 pass307 # Give it upto 5 sec308 for i in range(100):309 if win32process.GetExitCodeProcess(processId) != win32con.STILL_ACTIVE:310 # Process exited already311 return312 time.sleep(0.25)313 try:314 # Kill application315 win32process.TerminateProcess(processId, 0)316 except:317 pass318 except:319 pass320 def closeApp(self, title):321 """322 Close Application by window title323 """324 #print "CloseApp: %s" % title325 win32gui.EnumWindows(TcpListenerLaunchGui.enumCallback, title)326except:327 pass328class TcpProxyB(TcpListener):329 def __init__(self, host, port):330 TcpListener.__init__(self, host, port)331class TcpProxyA(Tcp):332 def __init__(self, host, port):...

Full Screen

Full Screen Github


Full Screen

1# This Source Code Form is subject to the terms of the Mozilla Public2# License, v. 2.0. If a copy of the MPL was not distributed with this3# file, You can obtain one at sys5import time6import socket7from Peach.Engine.common import PeachException8from Peach.Publishers.tcp import TcpListener9#10# CAP REQ :multi-prefix11# NICK cdiehl12# USER cdiehl 0 * :Christoph Diehl13#14class FakeServer(TcpListener):15 def __init__(self, host, port):16 TcpListener.__init__(self, host, port)17 self._accepted = False18 def start(self):19 if self._listen is None:20 for i in range(3):21 try:22 self._listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM)23 self._listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)24 self._listen.bind((self._host, self._port))25 self._listen.listen(1)26 exception = None27 break28 except:29 self._listen = None30 exception = sys.exc_info()31 time.sleep(0.5)32 if self._listen is None:33 value = ""34 try:35 value = str(exception[1])36 except:37 pass38 raise PeachException("TCP bind attempt failed: %s" % value)39 self.buff = ""40 self.pos = 041 def accept(self):42 if not self._accepted:43 self.buff = ""44 self.pos = 045 conn, addr = self._listen.accept()46 self._socket = conn47 self._clientAddr = addr48 self._accepted = True49 def close(self):50 pass51 def stop(self):52 pass53# connectedUsers = []54#55# request = client.recv(1024)56# initialized = True57#58# request = request.split("\r\n")59# for line in request:60# if line.startswith("NICK"):61# connectedUsers.append(line.split("NICK ")[1])62#63# hostname = client.gethostname()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?