Best Python code snippet using autotest_python
protocol.py
Source:protocol.py  
...111            self.transport.pauseProducing()112    def _init_transport_producer(self):113        self.transport.pauseProducing()114        self.paused = 1115    def _init_transport(self):116        transport = self._queue.wait()117        self.transport = transport118        if self.transportBufferSize is not None:119            transport.bufferSize = self.transportBufferSize120        self._init_transport_producer()121        transport.registerProducer(Producer2Event(self._write_event), False)122class Protocol(twistedProtocol):123    def __init__(self, recepient):124        self._recepient = recepient125    def connectionMade(self):126        self._recepient._got_transport(self.transport)127    def dataReceived(self, data):128        self._recepient._got_data(data)129    def connectionLost(self, reason):130        self._recepient._connectionLost(reason)131class UnbufferedTransport(GreenTransportBase):132    """A very simple implementation of a green transport without an additional buffer"""133    protocol_class = Protocol134    def recv(self):135        """Receive a single chunk of undefined size.136        Return '' if connection was closed cleanly, raise the exception if it was closed137        in a non clean fashion. After that all successive calls return ''.138        """139        if self._disconnected_event.ready():140            return ''141        try:142            return self._wait()143        except ConnectionDone:144            return ''145    def read(self):146        """Read the data from the socket until the connection is closed cleanly.147        If connection was closed in a non-clean fashion, the appropriate exception148        is raised. In that case already received data is lost.149        Next time read() is called it returns ''.150        """151        result = ''152        while True:153            recvd = self.recv()154            if not recvd:155                break156            result += recvd157        return result158    # iterator protocol:159    def __iter__(self):160        return self161    def next(self):162        result = self.recv()163        if not result:164            raise StopIteration165        return result166class GreenTransport(GreenTransportBase):167    protocol_class = Protocol168    _buffer = ''169    _error = None170    def read(self, size=-1):171        """Read size bytes or until EOF"""172        if not self._disconnected_event.ready():173            try:174                while len(self._buffer) < size or size < 0:175                    self._buffer += self._wait()176            except ConnectionDone:177                pass178            except:179                if not self._disconnected_event.has_exception():180                    raise181        if size>=0:182            result, self._buffer = self._buffer[:size], self._buffer[size:]183        else:184            result, self._buffer = self._buffer, ''185        if not result and self._disconnected_event.has_exception():186            try:187                self._disconnected_event.wait()188            except ConnectionDone:189                pass190        return result191    def recv(self, buflen=None):192        """Receive a single chunk of undefined size but no bigger than buflen"""193        if not self._disconnected_event.ready():194            self.resumeProducing()195            try:196                try:197                    recvd = self._wait()198                    #print 'received %r' % recvd199                    self._buffer += recvd200                except ConnectionDone:201                    pass202                except:203                    if not self._disconnected_event.has_exception():204                        raise205            finally:206                self.pauseProducing()207        if buflen is None:208            result, self._buffer = self._buffer, ''209        else:210            result, self._buffer = self._buffer[:buflen], self._buffer[buflen:]211        if not result and self._disconnected_event.has_exception():212            try:213                self._disconnected_event.wait()214            except ConnectionDone:215                pass216        return result217    # iterator protocol:218    def __iter__(self):219        return self220    def next(self):221        res = self.recv()222        if not res:223            raise StopIteration224        return res225class GreenInstanceFactory(ClientFactory):226    def __init__(self, instance, event):227        self.instance = instance228        self.event = event229    def buildProtocol(self, addr):230        return self.instance231    def clientConnectionFailed(self, connector, reason):232        self.event.send_exception(reason.type, reason.value, reason.tb)233class GreenClientCreator(object):234    """Connect to a remote host and return a connected green transport instance.235    """236    gtransport_class = GreenTransport237    def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs):238        if reactor is None:239            from twisted.internet import reactor240        self.reactor = reactor241        if gtransport_class is not None:242            self.gtransport_class = gtransport_class243        self.args = args244        self.kwargs = kwargs245    def _make_transport_and_factory(self):246        gtransport = self.gtransport_class(*self.args, **self.kwargs)247        protocol = gtransport.build_protocol()248        factory = GreenInstanceFactory(protocol, gtransport._queue)249        return gtransport, factory250    def connectTCP(self, host, port, *args, **kwargs):251        gtransport, factory = self._make_transport_and_factory()252        self.reactor.connectTCP(host, port, factory, *args, **kwargs)253        gtransport._init_transport()254        return gtransport255    def connectSSL(self, host, port, *args, **kwargs):256        gtransport, factory = self._make_transport_and_factory()257        self.reactor.connectSSL(host, port, factory, *args, **kwargs)258        gtransport._init_transport()259        return gtransport260    def connectTLS(self, host, port, *args, **kwargs):261        gtransport, factory = self._make_transport_and_factory()262        self.reactor.connectTLS(host, port, factory, *args, **kwargs)263        gtransport._init_transport()264        return gtransport265    def connectUNIX(self, address, *args, **kwargs):266        gtransport, factory = self._make_transport_and_factory()267        self.reactor.connectUNIX(address, factory, *args, **kwargs)268        gtransport._init_transport()269        return gtransport270    def connectSRV(self, service, domain, *args, **kwargs):271        SRVConnector = kwargs.pop('ConnectorClass', None)272        if SRVConnector is None:273            from twisted.names.srvconnect import SRVConnector274        gtransport, factory = self._make_transport_and_factory()275        c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs)276        c.connect()277        gtransport._init_transport()278        return gtransport279class SimpleSpawnFactory(Factory):280    """Factory that spawns a new greenlet for each incoming connection.281    For an incoming connection a new greenlet is created using the provided282    callback as a function and a connected green transport instance as an283    argument.284    """285    gtransport_class = GreenTransport286    def __init__(self, handler, gtransport_class=None, *args, **kwargs):287        if callable(handler):288            self.handler = handler289        else:290            self.handler = handler.send291        if hasattr(handler, 'send_exception'):292            self.exc_handler = handler.send_exception293        if gtransport_class is not None:294            self.gtransport_class = gtransport_class295        self.args = args296        self.kwargs = kwargs297    def exc_handler(self, *args):298        pass299    def buildProtocol(self, addr):300        gtransport = self.gtransport_class(*self.args, **self.kwargs)301        protocol = gtransport.build_protocol()302        protocol.factory = self303        self._do_spawn(gtransport, protocol)304        return protocol305    def _do_spawn(self, gtransport, protocol):306        greenthread.spawn(self._run_handler, gtransport, protocol)307    def _run_handler(self, gtransport, protocol):308        try:309            gtransport._init_transport()310        except Exception:311            self.exc_handler(*sys.exc_info())312        else:313            self.handler(gtransport)314class SpawnFactory(SimpleSpawnFactory):315    """An extension to SimpleSpawnFactory that provides some control over316    the greenlets it has spawned.317    """318    def __init__(self, handler, gtransport_class=None, *args, **kwargs):319        self.greenlets = set()320        SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs)321    def _do_spawn(self, gtransport, protocol):322        g = greenthread.spawn(self._run_handler, gtransport, protocol)323        self.greenlets.add(g)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
