How to use _init_transport method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...111 self.transport.pauseProducing()112 def _init_transport_producer(self):113 self.transport.pauseProducing()114 self.paused = 1115 def _init_transport(self):116 transport = self._queue.wait()117 self.transport = transport118 if self.transportBufferSize is not None:119 transport.bufferSize = self.transportBufferSize120 self._init_transport_producer()121 transport.registerProducer(Producer2Event(self._write_event), False)122class Protocol(twistedProtocol):123 def __init__(self, recepient):124 self._recepient = recepient125 def connectionMade(self):126 self._recepient._got_transport(self.transport)127 def dataReceived(self, data):128 self._recepient._got_data(data)129 def connectionLost(self, reason):130 self._recepient._connectionLost(reason)131class UnbufferedTransport(GreenTransportBase):132 """A very simple implementation of a green transport without an additional buffer"""133 protocol_class = Protocol134 def recv(self):135 """Receive a single chunk of undefined size.136 Return '' if connection was closed cleanly, raise the exception if it was closed137 in a non clean fashion. After that all successive calls return ''.138 """139 if self._disconnected_event.ready():140 return ''141 try:142 return self._wait()143 except ConnectionDone:144 return ''145 def read(self):146 """Read the data from the socket until the connection is closed cleanly.147 If connection was closed in a non-clean fashion, the appropriate exception148 is raised. In that case already received data is lost.149 Next time read() is called it returns ''.150 """151 result = ''152 while True:153 recvd = self.recv()154 if not recvd:155 break156 result += recvd157 return result158 # iterator protocol:159 def __iter__(self):160 return self161 def next(self):162 result = self.recv()163 if not result:164 raise StopIteration165 return result166class GreenTransport(GreenTransportBase):167 protocol_class = Protocol168 _buffer = ''169 _error = None170 def read(self, size=-1):171 """Read size bytes or until EOF"""172 if not self._disconnected_event.ready():173 try:174 while len(self._buffer) < size or size < 0:175 self._buffer += self._wait()176 except ConnectionDone:177 pass178 except:179 if not self._disconnected_event.has_exception():180 raise181 if size>=0:182 result, self._buffer = self._buffer[:size], self._buffer[size:]183 else:184 result, self._buffer = self._buffer, ''185 if not result and self._disconnected_event.has_exception():186 try:187 self._disconnected_event.wait()188 except ConnectionDone:189 pass190 return result191 def recv(self, buflen=None):192 """Receive a single chunk of undefined size but no bigger than buflen"""193 if not self._disconnected_event.ready():194 self.resumeProducing()195 try:196 try:197 recvd = self._wait()198 #print 'received %r' % recvd199 self._buffer += recvd200 except ConnectionDone:201 pass202 except:203 if not self._disconnected_event.has_exception():204 raise205 finally:206 self.pauseProducing()207 if buflen is None:208 result, self._buffer = self._buffer, ''209 else:210 result, self._buffer = self._buffer[:buflen], self._buffer[buflen:]211 if not result and self._disconnected_event.has_exception():212 try:213 self._disconnected_event.wait()214 except ConnectionDone:215 pass216 return result217 # iterator protocol:218 def __iter__(self):219 return self220 def next(self):221 res = self.recv()222 if not res:223 raise StopIteration224 return res225class GreenInstanceFactory(ClientFactory):226 def __init__(self, instance, event):227 self.instance = instance228 self.event = event229 def buildProtocol(self, addr):230 return self.instance231 def clientConnectionFailed(self, connector, reason):232 self.event.send_exception(reason.type, reason.value, reason.tb)233class GreenClientCreator(object):234 """Connect to a remote host and return a connected green transport instance.235 """236 gtransport_class = GreenTransport237 def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs):238 if reactor is None:239 from twisted.internet import reactor240 self.reactor = reactor241 if gtransport_class is not None:242 self.gtransport_class = gtransport_class243 self.args = args244 self.kwargs = kwargs245 def _make_transport_and_factory(self):246 gtransport = self.gtransport_class(*self.args, **self.kwargs)247 protocol = gtransport.build_protocol()248 factory = GreenInstanceFactory(protocol, gtransport._queue)249 return gtransport, factory250 def connectTCP(self, host, port, *args, **kwargs):251 gtransport, factory = self._make_transport_and_factory()252 self.reactor.connectTCP(host, port, factory, *args, **kwargs)253 gtransport._init_transport()254 return gtransport255 def connectSSL(self, host, port, *args, **kwargs):256 gtransport, factory = self._make_transport_and_factory()257 self.reactor.connectSSL(host, port, factory, *args, **kwargs)258 gtransport._init_transport()259 return gtransport260 def connectTLS(self, host, port, *args, **kwargs):261 gtransport, factory = self._make_transport_and_factory()262 self.reactor.connectTLS(host, port, factory, *args, **kwargs)263 gtransport._init_transport()264 return gtransport265 def connectUNIX(self, address, *args, **kwargs):266 gtransport, factory = self._make_transport_and_factory()267 self.reactor.connectUNIX(address, factory, *args, **kwargs)268 gtransport._init_transport()269 return gtransport270 def connectSRV(self, service, domain, *args, **kwargs):271 SRVConnector = kwargs.pop('ConnectorClass', None)272 if SRVConnector is None:273 from twisted.names.srvconnect import SRVConnector274 gtransport, factory = self._make_transport_and_factory()275 c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs)276 c.connect()277 gtransport._init_transport()278 return gtransport279class SimpleSpawnFactory(Factory):280 """Factory that spawns a new greenlet for each incoming connection.281 For an incoming connection a new greenlet is created using the provided282 callback as a function and a connected green transport instance as an283 argument.284 """285 gtransport_class = GreenTransport286 def __init__(self, handler, gtransport_class=None, *args, **kwargs):287 if callable(handler):288 self.handler = handler289 else:290 self.handler = handler.send291 if hasattr(handler, 'send_exception'):292 self.exc_handler = handler.send_exception293 if gtransport_class is not None:294 self.gtransport_class = gtransport_class295 self.args = args296 self.kwargs = kwargs297 def exc_handler(self, *args):298 pass299 def buildProtocol(self, addr):300 gtransport = self.gtransport_class(*self.args, **self.kwargs)301 protocol = gtransport.build_protocol()302 protocol.factory = self303 self._do_spawn(gtransport, protocol)304 return protocol305 def _do_spawn(self, gtransport, protocol):306 greenthread.spawn(self._run_handler, gtransport, protocol)307 def _run_handler(self, gtransport, protocol):308 try:309 gtransport._init_transport()310 except Exception:311 self.exc_handler(*sys.exc_info())312 else:313 self.handler(gtransport)314class SpawnFactory(SimpleSpawnFactory):315 """An extension to SimpleSpawnFactory that provides some control over316 the greenlets it has spawned.317 """318 def __init__(self, handler, gtransport_class=None, *args, **kwargs):319 self.greenlets = set()320 SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs)321 def _do_spawn(self, gtransport, protocol):322 g = greenthread.spawn(self._run_handler, gtransport, protocol)323 self.greenlets.add(g)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?