Best Python code snippet using pyatom_python
EventRouterHTTPC.py
Source:EventRouterHTTPC.py  
1# Copyright L.P.Klyne 2013 2# Licenced under 3 clause BSD licence 3# -*- coding: utf-8 -*-4# $Id: EventRouterHTTPC.py 3675 2010-07-30 11:54:36Z tombushby $5#6"""7HTTP client event router implementation.8HTTP server and client event routers are connected,  all events, and forwarding9subscriptions received by one party are forwarded to the other.  Further, all10local subscriptions and event publication are handled as for a standard event router.11"""12from Queue     import Queue13import threading        # Thread, Event14import httplib15import socket16import time17from MiscLib.Logging     import Trace, Info, Warn, logging18from Status              import StatusVal19from Event               import makeEvent20from EventEnvelope       import constructEnvelope21from EventSerializer     import ( makeEnvelopeData, makeClosedownData, makeIdleData,22                                  parseMessageData )23from QueueDeferred       import makeQueueDeferred24from SyncDeferred        import makeDeferred25from EventAgent          import EventAgent26from EventRouter         import EventRouter27_log = logging.getLogger( "EventLib.EventRouterHTTPC" )28class EventRouterHTTPC(EventRouter):29    """30    This is a derivation of the EventRouter class that prpovides the same basic31    interface, but which also sends and reveives events using an HTTP connection.32    The constructed class is an unmodified EventRouter, but the initializer also33    creates and HTTP client event relay (see below) and hooks it up to the34    EventRouter constructed here.35    """36    def __init__(self, uri=None, host='', port=8082, simplex=False):37        """38        Initialize a new HTTP client event router object39        uri     is a URI for this event router.  A new URI derived from this is created40                for the HTTP client event relay.41        host    is the IP address of host name to which the HTTP connection is made.42        port    is the TCP port number to which the HTTP connection is made.43        """44        super(EventRouterHTTPC, self).__init__(uri)45        relayuri = self.getUri()+"/HTTPC"46        self._relay = EventRelayHTTPC(self, relayuri, host, port, simplex)47        return48    def close(self):49        """50        Function called to close down event router.51        """52        self._relay.close()53        super(EventRouterHTTPC, self).close()54        return55class EventRelayHTTPC(EventAgent):56    """57    Implements an HTTP client event router that runs as a separate thread until58    explicitly closed, which runs in tandem with a simple event router and59    provides a tiny subset of the event router interface (receive).60    The HTTP connection operates as a half duplex channel for sending and61    receiving events, with the direction of flow being controlled by the62    client:  a GET request is implicitly a request for an event to be delivered63    and blocks until an event is available, the request timeout period expires,64    or the client cancels the request;  a POST request supplies an event to be65    delivered and/or forwarded.66    Incoming events are queued for the client process, and are handled by the67    HTTP client running in its separate thread.68    """69    def __init__(self, router, uri=None, host='', port=8082, simplex=False):70        """71        Initialize a new HTTP client event passing object72        An HTTP client is associated with an existing event router, and73        sends all messages received from that router to the HTTP connection,74        and forwards all messages received from the HTTP connection to the75        router.76        Interaction with the indicated EventRouter object takes place primarily77        through the 'receive' methods of this class and the supplied router.78        Because messages received from HTTP are sent onwards using the normal79        forwarding mechanisms, this class must perform loop-detection to stop80        events being bounced back to the HTTP connection.81        """82        super(EventRelayHTTPC, self).__init__(uri)83        self._router  = router84        self._queue   = Queue()85        self._event   = threading.Event()86        self._closing = False87        self._queueEvent = threading.Event()88        self._simplex = simplex89        # Have 'router' send all subscriptions events to this object90        router.routeEventFrom(None, None, self)91        router.doSubscribeRequest(self, -1, None, None)92        # Create HTTP "connection", and start thread to respond to new events from it.93        94        self._httpcon = httplib.HTTPConnection(host=host, port=port)95        96        self._thread  = threading.Thread(name=uri, target=self.processEvent)97        self._thread.start()98        return99    def receive(self, fromrouter, envelope):100        """101        This function receives messages from the associated router and queues102        them for transmission on the HTTP interface.103        NOTE: receive and forward here perform loop-check for outgoing events,104        and add the extra envelope hop for incoming.  The sole purpose of this105        loop-check is to prevent incoming HTTP events from being sent out again.106        """107        event = envelope.unWrap(self.getUri())108        if event:109            Trace("%s receive %s from %s"%(self.getUri(),event,fromrouter), "EventLib.EventRelayHTTPC")110            return self.queueItem(["forward",envelope])111        return makeDeferred(StatusVal.OK)112    def forward(self, event, env):113        """114        Internal function to process event received from HTTP connection:115        add new hop to envelope and pass it straight on to the associated router object.116        """117        Trace("%s forward %s"%(self.getUri(),event), "EventLib.EventRelayHTTPC")118        return self._router.receive(self, env.nextHop(self.getUri()))119    def close(self):120        """121        Shut down the event router thread122        """123        Trace("%s close"%(self.getUri()), "EventLib.EventRelayHTTPC")124        self._httpcon.close()125        self._closing = True126        self._event.set()127        self._queueEvent.set()128        self._queue.put(["closedown",[]])129        self._thread.join()130        Trace("%s closed"%(self.getUri()), "EventLib.EventRelayHTTPC")131        return132    def queueItem(self, item):133        """134        Add item to the queue, and return a deferred object that fires when an item is removed135        (or the queue is empty).136        """        137        Trace("%s queueItem (%s)"%(self.getUri(),item), "EventLib.EventRelayHTTPC")138        if not self._closing:139            self._queue.put(item)140            self._queueEvent.set()141            return makeQueueDeferred(StatusVal.OK, self._queue, self._event)142        return makeDeferred(StatusVal.OK)143    def getQueuedItem(self):144        """145        Wait for an item to be queued, then return it.146        """147        Trace("%s getQueuedItem ..."%(self.getUri()), context="EventLib.EventRelayHTTPC")148        item = self._queue.get()149        Trace("%s getQueuedItem (%s)"%(self.getUri(),item), context="EventLib.EventRelayHTTPC")150        self._event.set()151        return item152    # --- HTTP client worker thread function ---153    def processEvent(self):154        """155        This function is the HTTP client worker thread.156        """157        # Note: break out of event dispatch loop when closedown event is received158        # and closing flag is set.  This is to prevent DoS attack by faked closedown159        # event type, and to ensure that prior events received are all processed.160        delay_on_error_min = 0.125              # Back off retry interval on error..161        delay_on_error_max = 20.0               # ..162        delay_on_error     = delay_on_error_min # ..163        while True:164            if delay_on_error < delay_on_error_max:165                delay_on_error *= 2166            try:167                # PLEASE NOTE: In the event that the HTTPC is run as duplex, not simplex168                # then the post methods will be delayed if nothing is sent down to the client169                # from the server. This timeout is controlled by QUEUE_WAIT_TIMEOUT in EventRouterHTTPS.py170                if self._simplex == True:171                    self._queueEvent.wait()172                    self._queueEvent.clear()173    174                if not self._queue.empty():175                    Trace("%s queue.get ..."%(self.getUri()), "EventLib.EventRelayHTTPC")176                    ###msgbody = self._queue.get()177                    ###Trace("%s get msgbody: %s"%(self.getUri(),msgbody), "EventLib.EventRelayHTTPC")178                    ###self._event.set()179                    msgbody = self.getQueuedItem()180                    [typ,env] = msgbody181                    if typ == "closedown":182                        if self._closing: break183                    else:184                        # process request as an HTTP POST request185                        data    = makeEnvelopeData(env)186                        headers = { "Content-type": "text/plain",187                                    "Accept": "text/plain",188                                    "Content-length": str(len(data)) }189                        self._httpcon.request("POST", "/request_path_ignored", data, headers)190                        response = self._httpcon.getresponse()191                        delay_on_error = delay_on_error_min192                elif self._simplex == False:193                    # Nothing in queue:194                    # issue a GET for incoming events195                    _log.info("%s HTTP get ..."%(self.getUri()))196                    headers = { "Accept": "text/plain" }197                    self._httpcon.request("GET", "/request_path_ignored", None, headers)198                    response = self._httpcon.getresponse()199                    if response.status == 200:200                        delay_on_error = delay_on_error_min201                        msgbody = response.read()202                        Trace("%s get msgbody: %s"%(self.getUri(),msgbody), "EventLib.EventRelayHTTPC")203                        # Parse message and act accordingly204                        msgdata = parseMessageData(msgbody)205                        Trace("%s get msgdata: %s"%(self.getUri(),str(msgdata)), "EventLib.EventRelayHTTPC")206                        if msgdata == None:207                            #TODO: Log "Request body malformed"208                            pass209                        elif msgdata[0] == "forward":210                            # msgdata = ["forward", [['R1', 'R2', 'R3'], 'ev:typ', 'ev:src', 'payload']]211                            event = makeEvent(evtype=msgdata[1][1],source=msgdata[1][2],payload=msgdata[1][3])212                            env   = constructEnvelope(msgdata[1][0], event)213                            self.forward(event, env)214                        elif msgdata[0] == "idle":215                            # Idle response gives client a chance to send if anything is queued216                            pass217                        else:218                            #TODO: handle closedown message?219                            Warn( "%s Request body unrecognized option: %s"%(self.getUri(),msgdata[0]), "EventRelayHTTPC")220                            pass221                    elif response.status == 503:222                        Trace( "%s processEvent error response: %u, %s"%(self.getUri(),response.status,response.reason), "EventLib.EventRelayHTTPC")223                        # Remote end closed down224                        break225                    else:226                        # TODO: (log error response)227                        Warn( "%s processEvent error response: %u, %s"%(self.getUri(),response.status,response.reason), "EventLib.EventRelayHTTPC")228                        time.sleep(delay_on_error)229                    230            except httplib.BadStatusLine, e:231                # This can happen at closedown232                Info( "%s processEvent bad response: %s"%(self.getUri(), str(e)), "EventLib.EventRelayHTTPC")233                time.sleep(delay_on_error)234            except httplib.CannotSendRequest, e:235                # This can happen at closedown236                Info( "%s Cannot send request: %s"%(self.getUri(), str(e)), "EventLib.EventRelayHTTPC")237                time.sleep(delay_on_error)238            except httplib.ResponseNotReady, e:239                # This can happen at startup and sometimes other times:240                # maybe multiple requests on a single HTTP connection object?241                Info( "%s Response not ready: (%s)"%(self.getUri(), str(e)), "EventLib.EventRelayHTTPC")242                time.sleep(delay_on_error)243            except socket.error, e:244                Warn( "%s Socket error: %s"%(self.getUri(), str(e)), "EventLib.EventRelayHTTPC")245                time.sleep(delay_on_error)246        return...queue.py
Source:queue.py  
1import time23import jimi45class _queueEvent(jimi.db._document):6    queueTriggerID = str()7    queueEventData = dict()8    runTime = int()9    autoClear = bool()10    delay = int()1112    _dbCollection = jimi.db.db["queueEvent"]1314    def new(self, acl, queueTriggerID, queueEventData, autoClear=True, delay=0):15        self.acl = acl16        self.queueTriggerID = queueTriggerID17        self.queueEventData = queueEventData18        self.autoClear = autoClear19        self.delay = int(time.time() + delay )
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
