How to use _queueEvent method in pyatom

Best Python code snippet using pyatom_python

EventRouterHTTPC.py

Source:EventRouterHTTPC.py Github

copy

Full Screen

1# Copyright L.P.Klyne 2013 2# Licenced under 3 clause BSD licence 3# -*- coding: utf-8 -*-4# $Id: EventRouterHTTPC.py 3675 2010-07-30 11:54:36Z tombushby $5#6"""7HTTP client event router implementation.8HTTP server and client event routers are connected, all events, and forwarding9subscriptions received by one party are forwarded to the other. Further, all10local subscriptions and event publication are handled as for a standard event router.11"""12from Queue import Queue13import threading # Thread, Event14import httplib15import socket16import time17from MiscLib.Logging import Trace, Info, Warn, logging18from Status import StatusVal19from Event import makeEvent20from EventEnvelope import constructEnvelope21from EventSerializer import ( makeEnvelopeData, makeClosedownData, makeIdleData,22 parseMessageData )23from QueueDeferred import makeQueueDeferred24from SyncDeferred import makeDeferred25from EventAgent import EventAgent26from EventRouter import EventRouter27_log = logging.getLogger( "EventLib.EventRouterHTTPC" )28class EventRouterHTTPC(EventRouter):29 """30 This is a derivation of the EventRouter class that prpovides the same basic31 interface, but which also sends and reveives events using an HTTP connection.32 The constructed class is an unmodified EventRouter, but the initializer also33 creates and HTTP client event relay (see below) and hooks it up to the34 EventRouter constructed here.35 """36 def __init__(self, uri=None, host='', port=8082, simplex=False):37 """38 Initialize a new HTTP client event router object39 uri is a URI for this event router. A new URI derived from this is created40 for the HTTP client event relay.41 host is the IP address of host name to which the HTTP connection is made.42 port is the TCP port number to which the HTTP connection is made.43 """44 super(EventRouterHTTPC, self).__init__(uri)45 relayuri = self.getUri()+"/HTTPC"46 self._relay = EventRelayHTTPC(self, relayuri, host, port, simplex)47 return48 def close(self):49 """50 Function called to close down event router.51 """52 self._relay.close()53 super(EventRouterHTTPC, self).close()54 return55class EventRelayHTTPC(EventAgent):56 """57 Implements an HTTP client event router that runs as a separate thread until58 explicitly closed, which runs in tandem with a simple event router and59 provides a tiny subset of the event router interface (receive).60 The HTTP connection operates as a half duplex channel for sending and61 receiving events, with the direction of flow being controlled by the62 client: a GET request is implicitly a request for an event to be delivered63 and blocks until an event is available, the request timeout period expires,64 or the client cancels the request; a POST request supplies an event to be65 delivered and/or forwarded.66 Incoming events are queued for the client process, and are handled by the67 HTTP client running in its separate thread.68 """69 def __init__(self, router, uri=None, host='', port=8082, simplex=False):70 """71 Initialize a new HTTP client event passing object72 An HTTP client is associated with an existing event router, and73 sends all messages received from that router to the HTTP connection,74 and forwards all messages received from the HTTP connection to the75 router.76 Interaction with the indicated EventRouter object takes place primarily77 through the 'receive' methods of this class and the supplied router.78 Because messages received from HTTP are sent onwards using the normal79 forwarding mechanisms, this class must perform loop-detection to stop80 events being bounced back to the HTTP connection.81 """82 super(EventRelayHTTPC, self).__init__(uri)83 self._router = router84 self._queue = Queue()85 self._event = threading.Event()86 self._closing = False87 self._queueEvent = threading.Event()88 self._simplex = simplex89 # Have 'router' send all subscriptions events to this object90 router.routeEventFrom(None, None, self)91 router.doSubscribeRequest(self, -1, None, None)92 # Create HTTP "connection", and start thread to respond to new events from it.93 94 self._httpcon = httplib.HTTPConnection(host=host, port=port)95 96 self._thread = threading.Thread(name=uri, target=self.processEvent)97 self._thread.start()98 return99 def receive(self, fromrouter, envelope):100 """101 This function receives messages from the associated router and queues102 them for transmission on the HTTP interface.103 NOTE: receive and forward here perform loop-check for outgoing events,104 and add the extra envelope hop for incoming. The sole purpose of this105 loop-check is to prevent incoming HTTP events from being sent out again.106 """107 event = envelope.unWrap(self.getUri())108 if event:109 Trace("%s receive %s from %s"%(self.getUri(),event,fromrouter), "EventLib.EventRelayHTTPC")110 return self.queueItem(["forward",envelope])111 return makeDeferred(StatusVal.OK)112 def forward(self, event, env):113 """114 Internal function to process event received from HTTP connection:115 add new hop to envelope and pass it straight on to the associated router object.116 """117 Trace("%s forward %s"%(self.getUri(),event), "EventLib.EventRelayHTTPC")118 return self._router.receive(self, env.nextHop(self.getUri()))119 def close(self):120 """121 Shut down the event router thread122 """123 Trace("%s close"%(self.getUri()), "EventLib.EventRelayHTTPC")124 self._httpcon.close()125 self._closing = True126 self._event.set()127 self._queueEvent.set()128 self._queue.put(["closedown",[]])129 self._thread.join()130 Trace("%s closed"%(self.getUri()), "EventLib.EventRelayHTTPC")131 return132 def queueItem(self, item):133 """134 Add item to the queue, and return a deferred object that fires when an item is removed135 (or the queue is empty).136 """ 137 Trace("%s queueItem (%s)"%(self.getUri(),item), "EventLib.EventRelayHTTPC")138 if not self._closing:139 self._queue.put(item)140 self._queueEvent.set()141 return makeQueueDeferred(StatusVal.OK, self._queue, self._event)142 return makeDeferred(StatusVal.OK)143 def getQueuedItem(self):144 """145 Wait for an item to be queued, then return it.146 """147 Trace("%s getQueuedItem ..."%(self.getUri()), context="EventLib.EventRelayHTTPC")148 item = self._queue.get()149 Trace("%s getQueuedItem (%s)"%(self.getUri(),item), context="EventLib.EventRelayHTTPC")150 self._event.set()151 return item152 # --- HTTP client worker thread function ---153 def processEvent(self):154 """155 This function is the HTTP client worker thread.156 """157 # Note: break out of event dispatch loop when closedown event is received158 # and closing flag is set. This is to prevent DoS attack by faked closedown159 # event type, and to ensure that prior events received are all processed.160 delay_on_error_min = 0.125 # Back off retry interval on error..161 delay_on_error_max = 20.0 # ..162 delay_on_error = delay_on_error_min # ..163 while True:164 if delay_on_error < delay_on_error_max:165 delay_on_error *= 2166 try:167 # PLEASE NOTE: In the event that the HTTPC is run as duplex, not simplex168 # then the post methods will be delayed if nothing is sent down to the client169 # from the server. This timeout is controlled by QUEUE_WAIT_TIMEOUT in EventRouterHTTPS.py170 if self._simplex == True:171 self._queueEvent.wait()172 self._queueEvent.clear()173 174 if not self._queue.empty():175 Trace("%s queue.get ..."%(self.getUri()), "EventLib.EventRelayHTTPC")176 ###msgbody = self._queue.get()177 ###Trace("%s get msgbody: %s"%(self.getUri(),msgbody), "EventLib.EventRelayHTTPC")178 ###self._event.set()179 msgbody = self.getQueuedItem()180 [typ,env] = msgbody181 if typ == "closedown":182 if self._closing: break183 else:184 # process request as an HTTP POST request185 data = makeEnvelopeData(env)186 headers = { "Content-type": "text/plain",187 "Accept": "text/plain",188 "Content-length": str(len(data)) }189 self._httpcon.request("POST", "/request_path_ignored", data, headers)190 response = self._httpcon.getresponse()191 delay_on_error = delay_on_error_min192 elif self._simplex == False:193 # Nothing in queue:194 # issue a GET for incoming events195 _log.info("%s HTTP get ..."%(self.getUri()))196 headers = { "Accept": "text/plain" }197 self._httpcon.request("GET", "/request_path_ignored", None, headers)198 response = self._httpcon.getresponse()199 if response.status == 200:200 delay_on_error = delay_on_error_min201 msgbody = response.read()202 Trace("%s get msgbody: %s"%(self.getUri(),msgbody), "EventLib.EventRelayHTTPC")203 # Parse message and act accordingly204 msgdata = parseMessageData(msgbody)205 Trace("%s get msgdata: %s"%(self.getUri(),str(msgdata)), "EventLib.EventRelayHTTPC")206 if msgdata == None:207 #TODO: Log "Request body malformed"208 pass209 elif msgdata[0] == "forward":210 # msgdata = ["forward", [['R1', 'R2', 'R3'], 'ev:typ', 'ev:src', 'payload']]211 event = makeEvent(evtype=msgdata[1][1],source=msgdata[1][2],payload=msgdata[1][3])212 env = constructEnvelope(msgdata[1][0], event)213 self.forward(event, env)214 elif msgdata[0] == "idle":215 # Idle response gives client a chance to send if anything is queued216 pass217 else:218 #TODO: handle closedown message?219 Warn( "%s Request body unrecognized option: %s"%(self.getUri(),msgdata[0]), "EventRelayHTTPC")220 pass221 elif response.status == 503:222 Trace( "%s processEvent error response: %u, %s"%(self.getUri(),response.status,response.reason), "EventLib.EventRelayHTTPC")223 # Remote end closed down224 break225 else:226 # TODO: (log error response)227 Warn( "%s processEvent error response: %u, %s"%(self.getUri(),response.status,response.reason), "EventLib.EventRelayHTTPC")228 time.sleep(delay_on_error)229 230 except httplib.BadStatusLine, e:231 # This can happen at closedown232 Info( "%s processEvent bad response: %s"%(self.getUri(), str(e)), "EventLib.EventRelayHTTPC")233 time.sleep(delay_on_error)234 except httplib.CannotSendRequest, e:235 # This can happen at closedown236 Info( "%s Cannot send request: %s"%(self.getUri(), str(e)), "EventLib.EventRelayHTTPC")237 time.sleep(delay_on_error)238 except httplib.ResponseNotReady, e:239 # This can happen at startup and sometimes other times:240 # maybe multiple requests on a single HTTP connection object?241 Info( "%s Response not ready: (%s)"%(self.getUri(), str(e)), "EventLib.EventRelayHTTPC")242 time.sleep(delay_on_error)243 except socket.error, e:244 Warn( "%s Socket error: %s"%(self.getUri(), str(e)), "EventLib.EventRelayHTTPC")245 time.sleep(delay_on_error)246 return...

Full Screen

Full Screen

queue.py

Source:queue.py Github

copy

Full Screen

1import time23import jimi45class _queueEvent(jimi.db._document):6 queueTriggerID = str()7 queueEventData = dict()8 runTime = int()9 autoClear = bool()10 delay = int()1112 _dbCollection = jimi.db.db["queueEvent"]1314 def new(self, acl, queueTriggerID, queueEventData, autoClear=True, delay=0):15 self.acl = acl16 self.queueTriggerID = queueTriggerID17 self.queueEventData = queueEventData18 self.autoClear = autoClear19 self.delay = int(time.time() + delay ) ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pyatom automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful