Best Python code snippet using ATX
tcp.py
Source:tcp.py  
1# This Source Code Form is subject to the terms of the Mozilla Public2# License, v. 2.0. If a copy of the MPL was not distributed with this3# file, You can obtain one at http://mozilla.org/MPL/2.0/.4import socket5import time6import sys7from Peach.Engine.engine import Engine8from Peach.Engine.common import PeachException9from Peach.publisher import Publisher10from Peach.publisher import Timeout11from Peach.publisher import PublisherSoftException12from Peach.Utilities.common import *13import Peach14def Debug(msg):15    if Peach.Engine.engine.Engine.debug:16        print(msg)17#class Timeout(SoftException):18#	def __init__(self, msg):19#		self.msg = msg20#	21#	def __str__(self):22#		return self.msg23class Tcp(Publisher):24    """25    A simple TCP client publisher.26    """27    def __init__(self, host, port, timeout=0.25, throttle=0):28        """29        @type	host: string30        @param	host: Remote host31        @type	port: number32        @param	port: Remote port33        @type	timeout: number34        @param	timeout: How long to wait for reponse35        @type	throttle: number36        @param	throttle: How long to wait between connections37        """38        Publisher.__init__(self)39        self._host = host40        try:41            self._port = int(port)42        except:43            raise PeachException("The Tcp publisher parameter for port was not a valid number.")44        try:45            self._timeout = float(timeout)46        except:47            raise PeachException("The Tcp publisher parameter for timeout was not a valid number.")48        try:49            self._throttle = float(throttle)50        except:51            raise PeachException("The Tcp publisher parameter for throttle was not a valid number.")52        self._socket = None53    def start(self):54        pass55    def stop(self):56        self.close()57    def connect(self):58        """59        Create connection.60        """61        self.close()62        if self._throttle > 0:63            time.sleep(self._throttle)64        # Try connecting many times65        # before we crash.66        for i in range(30):67            try:68                self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)69                self._socket.connect((self._host, self._port))70                exception = None71                break72            except:73                self._socket = None74                exception = sys.exc_info()75            # Wait half sec and try again76            time.sleep(1)77        if self._socket is None:78            value = ""79            try:80                value = str(exception[1])81            except:82                pass83            raise PeachException("TCP onnection attempt failed: %s" % value)84        self.buff = ""85        self.pos = 086    def close(self):87        """88        Close connection if open.89        """90        try:91            if self._socket is not None:92                self._socket.close()93        finally:94            self._socket = None95        self.buff = ""96        self.pos = 097    def send(self, data):98        """99        Send data via sendall.100        @type	data: string101        @param	data: Data to send102        """103        if Peach.Engine.engine.Engine.debug:104            print(">>>>>>>>>>>>>>>>")105            print("tcp.Tcp.send():")106            printHex(data)107        try:108            self._socket.sendall(data)109        except:110            if Peach.Engine.engine.Engine.debug:111                print("Tcp: Sendall failed: " + str(sys.exc_info()[1]))112            raise PublisherSoftException("sendall failed: " + str(sys.exc_info()[1]))113    def _receiveBySize(self, size):114        """115        This is now a buffered receiver.116        @rtype: string117        @return: received data.118        """119        # Do we already a have it?120        if size + self.pos < len(self.buff):121            ret = self.buff[self.pos:self.pos + size]122            self.pos += size123            return ret124        # Only ask for the diff of what we don't already have125        diffSize = (self.pos + size) - len(self.buff)126        try:127            if Peach.Engine.engine.Engine.debug:128                print("Asking for %d, need %d, have %d" % (size, diffSize, len(self.buff) - self.pos))129            self._socket.settimeout(self._timeout)130            ret = self._socket.recv(diffSize)131            if not ret:132                # Socket was closed133                if Peach.Engine.engine.Engine.debug:134                    print("Socket is closed")135                raise PublisherSoftException("Socket is closed")136            if Peach.Engine.engine.Engine.debug:137                print("<<<<<<<<<<<<<<<<<")138                print("tcp.Tcp.receive():")139                printHex(ret)140            self.buff += ret141        except socket.error as e:142            if str(e).find('The socket operation could not complete without blocking') != -1:143                if Peach.Engine.engine.Engine.debug:144                    print("timed out waiting for data")145                raise Timeout(146                    "Timed out waiting for data [%d:%d:%d:%d]" % (len(self.buff), (size + self.pos), size, diffSize))147            elif str(e).find('An existing connection was forcibly') != -1:148                if Peach.Engine.engine.Engine.debug:149                    print("Socket was closed!")150                raise PublisherSoftException("Socket is closed")151            else:152                if Peach.Engine.engine.Engine.debug:153                    print("recv failed: " + str(sys.exc_info()[1]))154                raise PublisherSoftException("recv failed: " + str(sys.exc_info()[1]))155        finally:156            self._socket.settimeout(None)157        ret = self.buff[self.pos:]158        self.pos = len(self.buff)159        return ret160    def _receiveByAvailable(self):161        """162        Receive as much as possible prior to timeout.163        @rtype: string164        @return: received data.165        """166        self._socket.settimeout(self._timeout)167        try:168            ret = self._socket.recv(4096)169            if not ret:170                raise PublisherSoftException("Socket is closed")171            if Peach.Engine.engine.Engine.debug:172                print("<<<<<<<<<<<<<<<<<")173                print("tcp.Tcp.receive():")174                printHex(ret)175            self.buff += ret176        except socket.error as e:177            if str(e).find('The socket operation could not complete without blocking') == -1:178                pass179            else:180                raise PublisherSoftException("recv failed: " + str(sys.exc_info()[1]))181        self._socket.settimeout(None)182        ret = self.buff[self.pos:]183        self.pos = len(self.buff)184        return ret185    def receive(self, size=None):186        if size is None:187            return self._receiveByAvailable()188        else:189            return self._receiveBySize(size)190class TcpListener(Tcp):191    """192    A TCP Listener publisher.  This publisher193    supports the following state actions:194     * start - Start listening195     * stop - Stop listening196     * accept - Accept a client connection197     * close - Close a client connection198    """199    def __init__(self, host, port, timeout=0.25):200        Tcp.__init__(self, host, port, timeout)201        self._listen = None202        self._clientAddr = None203    def start(self):204        self.close()205        if self._listen is None:206            for i in range(3):207                try:208                    self._listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM)209                    self._listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)210                    self._listen.bind((self._host, self._port))211                    self._listen.listen(1)212                    exception = None213                    break214                except:215                    self._listen = None216                    exception = sys.exc_info()217                time.sleep(0.5)218            if self._listen is None:219                value = ""220                try:221                    value = str(exception[1])222                except:223                    pass224                raise PeachException("TCP bind attempt failed: %s" % value)225        self.buff = ""226        self.pos = 0227    def stop(self):228        try:229            if self._socket is not None:230                self._socket.close()231        except:232            pass233        finally:234            self._socket = None235        # Avoid TIME_WAIT state by not closing our listener236        #try:237        #	if self._listen != None:238        #		self._listen.close()239        #except:240        #	pass241        #242        #finally:243        #	self._listen = None244    def accept(self):245        self.buff = ""246        self.pos = 0247        conn, addr = self._listen.accept()248        self._socket = conn249        self._clientAddr = addr250    def close(self):251        try:252            if self._socket is not None:253                self._socket.close()254        except:255            pass256        finally:257            self._socket = None258    def connect(self):259        raise PeachException("Action 'connect' not supported")260try:261    import win32gui, win32con, win32process262    import sys, time, os, signal263    class TcpListenerLaunchGui(TcpListener):264        """265        Does TcpListener goodness and also can laun a program.  After266        some defined amount of time we will try and close the267        GUI application by sending WM_CLOSE than kill it.268        """269        def __init__(self, host, port, windowname, timeout=0.25):270            TcpListener.__init__(self, host, port, timeout)271            self._windowName = windowname272        def stop(self):273            self.closeApp(self._windowName)274            TcpListener.stop(self)275        def call(self, method, args):276            """277            Launch program to consume file278            @type	method: string279            @param	method: Command to execute280            @type	args: array of objects281            @param	args: Arguments to pass282            """283            realArgs = [method, "/c", method]284            for a in args:285                realArgs.append(a)286            #print "Spawning %s" % method287            os.spawnv(os.P_NOWAIT, os.path.join(os.getenv('SystemRoot'), 'system32', 'cmd.exe'), realArgs)288        @staticmethod289        def enumCallback(hwnd, windowName):290            """291            Will get called by win32gui.EnumWindows, once for each292            top level application window.293            """294            try:295                # Get window title296                title = win32gui.GetWindowText(hwnd)297                # Is this our guy?298                if title.find(windowName) == -1:299                    return300                (threadId, processId) = win32process.GetWindowThreadProcessId(hwnd)301                # Send WM_CLOSE message302                try:303                    win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)304                    win32gui.PostQuitMessage(hwnd)305                except:306                    pass307                # Give it upto 5 sec308                for i in range(100):309                    if win32process.GetExitCodeProcess(processId) != win32con.STILL_ACTIVE:310                        # Process exited already311                        return312                    time.sleep(0.25)313                try:314                    # Kill application315                    win32process.TerminateProcess(processId, 0)316                except:317                    pass318            except:319                pass320        def closeApp(self, title):321            """322            Close Application by window title323            """324            #print "CloseApp: %s" % title325            win32gui.EnumWindows(TcpListenerLaunchGui.enumCallback, title)326except:327    pass328class TcpProxyB(TcpListener):329    def __init__(self, host, port):330        TcpListener.__init__(self, host, port)331class TcpProxyA(Tcp):332    def __init__(self, host, port):...irc.py
Source:irc.py  
1# This Source Code Form is subject to the terms of the Mozilla Public2# License, v. 2.0. If a copy of the MPL was not distributed with this3# file, You can obtain one at http://mozilla.org/MPL/2.0/.4import sys5import time6import socket7from Peach.Engine.common import PeachException8from Peach.Publishers.tcp import TcpListener9#10# CAP REQ :multi-prefix11# NICK cdiehl12# USER cdiehl 0 * :Christoph Diehl13#14class FakeServer(TcpListener):15    def __init__(self, host, port):16        TcpListener.__init__(self, host, port)17        self._accepted = False18    def start(self):19        if self._listen is None:20            for i in range(3):21                try:22                    self._listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM)23                    self._listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)24                    self._listen.bind((self._host, self._port))25                    self._listen.listen(1)26                    exception = None27                    break28                except:29                    self._listen = None30                    exception = sys.exc_info()31                time.sleep(0.5)32            if self._listen is None:33                value = ""34                try:35                    value = str(exception[1])36                except:37                    pass38                raise PeachException("TCP bind attempt failed: %s" % value)39        self.buff = ""40        self.pos = 041    def accept(self):42        if not self._accepted:43            self.buff = ""44            self.pos = 045            conn, addr = self._listen.accept()46            self._socket = conn47            self._clientAddr = addr48            self._accepted = True49    def close(self):50        pass51    def stop(self):52        pass53# connectedUsers = []54#55# request = client.recv(1024)56# initialized = True57#58# request = request.split("\r\n")59# for line in request:60# 	if line.startswith("NICK"):61# 		connectedUsers.append(line.split("NICK ")[1])62#63# hostname = client.gethostname()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
