How to use started method in Slash

Best Python code snippet using slash

test_udp.py

Source:test_udp.py Github

copy

Full Screen

1# -*- test-case-name: twisted.test.test_udp -*-2# Copyright (c) 2001-2004 Twisted Matrix Laboratories.3# See LICENSE for details.4# 5from twisted.trial import unittest, util6from twisted.internet import protocol, reactor, error, defer, interfaces, address7from twisted.python import log, failure, components8class Mixin:9 started = 010 stopped = 011 startedDeferred = None12 def __init__(self):13 self.packets = []14 15 def startProtocol(self):16 self.started = 117 if self.startedDeferred is not None:18 d, self.startedDeferred = self.startedDeferred, None19 d.callback(None)20 def stopProtocol(self):21 self.stopped = 122class Server(Mixin, protocol.DatagramProtocol):23 24 packetReceived = None25 refused = 026 def datagramReceived(self, data, addr):27 self.packets.append((data, addr))28 if self.packetReceived is not None:29 d, self.packetReceived = self.packetReceived, None30 d.callback(None)31class Client(Mixin, protocol.ConnectedDatagramProtocol):32 33 packetReceived = None34 refused = 035 def datagramReceived(self, data):36 self.packets.append(data)37 if self.packetReceived is not None:38 d, self.packetReceived = self.packetReceived, None39 d.callback(None)40 def connectionFailed(self, failure):41 if self.startedDeferred is not None:42 d, self.startedDeferred = self.startedDeferred, None43 d.errback(failure)44 self.failure = failure45 def connectionRefused(self):46 if self.startedDeferred is not None:47 d, self.startedDeferred = self.startedDeferred, None48 d.errback(error.ConnectionRefusedError("yup"))49 self.refused = 150class GoodClient(Server):51 52 def connectionRefused(self):53 if self.startedDeferred is not None:54 d, self.startedDeferred = self.startedDeferred, None55 d.errback(error.ConnectionRefusedError("yup"))56 self.refused = 157class PortCleanerUpper(unittest.TestCase):58 callToLoseCnx = 'loseConnection'59 def setUp(self):60 self.ports = []61 def tearDown(self):62 self.cleanPorts(*self.ports)63 def _addPorts(self, *ports):64 for p in ports:65 self.ports.append(p)66 def cleanPorts(self, *ports):67 for p in ports:68 if not hasattr(p, 'disconnected'):69 raise RuntimeError, ("You handed something to cleanPorts that"70 " doesn't have a disconnected attribute, dummy!")71 if not p.disconnected:72 d = getattr(p, self.callToLoseCnx)()73 if isinstance(d, defer.Deferred):74 wait(d)75 else:76 try:77 util.spinUntil(lambda :p.disconnected)78 except:79 failure.Failure().printTraceback()80class OldConnectedUDPTestCase(PortCleanerUpper):81 def testStartStop(self):82 client = Client()83 d = client.startedDeferred = defer.Deferred()84 port2 = reactor.connectUDP("127.0.0.1", 8888, client)85 def assertName():86 self.failUnless(repr(port2).find('test_udp.Client') >= 0)87 def cbStarted(ignored):88 self.assertEquals(client.started, 1)89 self.assertEquals(client.stopped, 0)90 assertName()91 return defer.maybeDeferred(port2.stopListening).addCallback(lambda ign: assertName())92 return d.addCallback(cbStarted)93 testStartStop.suppress = [94 util.suppress(message='use listenUDP and then transport.connect',95 category=DeprecationWarning)]96 def testDNSFailure(self):97 client = Client()98 d = client.startedDeferred = defer.Deferred()99 # if this domain exists, shoot your sysadmin100 reactor.connectUDP("xxxxxxxxx.zzzzzzzzz.yyyyy.", 8888, client)101 def didNotConnect(ign):102 self.assertEquals(client.stopped, 0)103 self.assertEquals(client.started, 0)104 105 d = self.assertFailure(d, error.DNSLookupError)106 d.addCallback(didNotConnect)107 return d108 testDNSFailure.suppress = [109 util.suppress(message='use listenUDP and then transport.connect',110 category=DeprecationWarning)]111 def testSendPackets(self):112 server = Server()113 serverStarted = server.startedDeferred = defer.Deferred()114 client = Client()115 clientStarted = client.startedDeferred = defer.Deferred()116 port1 = reactor.listenUDP(0, server, interface="127.0.0.1")117 def cbServerStarted(ignored):118 self.port2 = reactor.connectUDP("127.0.0.1", server.transport.getHost().port, client)119 return clientStarted120 d = serverStarted.addCallback(cbServerStarted)121 def cbClientStarted(ignored):122 clientSend = server.packetReceived = defer.Deferred()123 serverSend = client.packetReceived = defer.Deferred()124 cAddr = client.transport.getHost()125 server.transport.write("hello", (cAddr.host, cAddr.port))126 client.transport.write("world")127 # No one will ever call errback on either of these Deferreds,128 # otherwise I would pass fireOnOneErrback=True here.129 return defer.DeferredList([clientSend, serverSend])130 d.addCallback(cbClientStarted)131 def cbPackets(ignored):132 self.assertEquals(client.packets, ["hello"])133 self.assertEquals(server.packets, [("world", ("127.0.0.1", client.transport.getHost().port))])134 135 return defer.DeferredList([136 defer.maybeDeferred(port1.stopListening),137 defer.maybeDeferred(self.port2.stopListening)],138 fireOnOneErrback=True)139 d.addCallback(cbPackets)140 return d141 testSendPackets.suppress = [142 util.suppress(message='use listenUDP and then transport.connect',143 category=DeprecationWarning)]144 def testConnectionRefused(self):145 # assume no one listening on port 80 UDP146 client = Client()147 port = reactor.connectUDP("127.0.0.1", 80, client)148 server = Server()149 port2 = reactor.listenUDP(0, server, interface="127.0.0.1")150 reactor.iterate()151 reactor.iterate()152 reactor.iterate()153 client.transport.write("a")154 client.transport.write("b")155 server.transport.write("c", ("127.0.0.1", 80))156 server.transport.write("d", ("127.0.0.1", 80))157 server.transport.write("e", ("127.0.0.1", 80))158 server.transport.write("toserver", (port2.getHost().host,159 port2.getHost().port))160 server.transport.write("toclient", (port.getHost().host,161 port.getHost().port))162 reactor.iterate(); reactor.iterate()163 self.assertEquals(client.refused, 1)164 port.stopListening()165 port2.stopListening()166 reactor.iterate(); reactor.iterate()167 testConnectionRefused.suppress = [168 util.suppress(message='use listenUDP and then transport.connect',169 category=DeprecationWarning)]170class UDPTestCase(unittest.TestCase):171 def testOldAddress(self):172 server = Server()173 d = server.startedDeferred = defer.Deferred()174 p = reactor.listenUDP(0, server, interface="127.0.0.1")175 def cbStarted(ignored):176 addr = p.getHost()177 self.assertEquals(addr, ('INET_UDP', addr.host, addr.port))178 return p.stopListening()179 return d.addCallback(cbStarted)180 testOldAddress.suppress = [181 util.suppress(message='IPv4Address.__getitem__',182 category=DeprecationWarning)]183 184 def testStartStop(self):185 server = Server()186 d = server.startedDeferred = defer.Deferred()187 port1 = reactor.listenUDP(0, server, interface="127.0.0.1")188 def cbStarted(ignored):189 self.assertEquals(server.started, 1)190 self.assertEquals(server.stopped, 0)191 return port1.stopListening()192 def cbStopped(ignored):193 self.assertEquals(server.stopped, 1)194 return d.addCallback(cbStarted).addCallback(cbStopped)195 def testRebind(self):196 # Ensure binding the same DatagramProtocol repeatedly invokes all197 # the right callbacks.198 server = Server()199 d = server.startedDeferred = defer.Deferred()200 p = reactor.listenUDP(0, server, interface="127.0.0.1")201 def cbStarted(ignored, port):202 return port.stopListening()203 def cbStopped(ignored):204 d = server.startedDeferred = defer.Deferred()205 p = reactor.listenUDP(0, server, interface="127.0.0.1")206 return d.addCallback(cbStarted, p)207 return d.addCallback(cbStarted, p)208 209 def testBindError(self):210 server = Server()211 d = server.startedDeferred = defer.Deferred()212 port = reactor.listenUDP(0, server, interface='127.0.0.1')213 def cbStarted(ignored):214 self.assertEquals(port.getHost(), server.transport.getHost())215 server2 = Server()216 self.assertRaises(217 error.CannotListenError,218 reactor.listenUDP, port.getHost().port, server2, interface='127.0.0.1')219 d.addCallback(cbStarted)220 def cbFinished(ignored):221 return port.stopListening()222 d.addCallback(cbFinished)223 return d224 def testSendPackets(self):225 server = Server()226 serverStarted = server.startedDeferred = defer.Deferred()227 port1 = reactor.listenUDP(0, server, interface="127.0.0.1")228 client = GoodClient()229 clientStarted = client.startedDeferred = defer.Deferred()230 def cbServerStarted(ignored):231 self.port2 = reactor.listenUDP(0, client, interface="127.0.0.1")232 return clientStarted233 d = serverStarted.addCallback(cbServerStarted)234 def cbClientStarted(ignored):235 client.transport.connect("127.0.0.1", server.transport.getHost().port)236 cAddr = client.transport.getHost()237 sAddr = server.transport.getHost()238 serverSend = client.packetReceived = defer.Deferred()239 server.transport.write("hello", (cAddr.host, cAddr.port))240 clientWrites = [241 ("a",),242 ("b", None),243 ("c", (sAddr.host, sAddr.port))]244 def cbClientSend(ignored):245 if clientWrites:246 nextClientWrite = server.packetReceived = defer.Deferred()247 nextClientWrite.addCallback(cbClientSend)248 client.transport.write(*clientWrites.pop(0))249 return nextClientWrite250 # No one will ever call .errback on either of these Deferreds,251 # but there is a non-trivial amount of test code which might252 # cause them to fail somehow. So fireOnOneErrback=True.253 return defer.DeferredList([254 cbClientSend(None),255 serverSend],256 fireOnOneErrback=True)257 d.addCallback(cbClientStarted)258 def cbSendsFinished(ignored):259 cAddr = client.transport.getHost()260 sAddr = server.transport.getHost()261 self.assertEquals(262 client.packets,263 [("hello", (sAddr.host, sAddr.port))])264 clientAddr = (cAddr.host, cAddr.port)265 self.assertEquals(266 server.packets,267 [("a", clientAddr),268 ("b", clientAddr),269 ("c", clientAddr)])270 d.addCallback(cbSendsFinished)271 def cbFinished(ignored):272 return defer.DeferredList([273 defer.maybeDeferred(port1.stopListening),274 defer.maybeDeferred(self.port2.stopListening)],275 fireOnOneErrback=True)276 d.addCallback(cbFinished)277 return d278 def testConnectionRefused(self):279 # assume no one listening on port 80 UDP280 client = GoodClient()281 clientStarted = client.startedDeferred = defer.Deferred()282 port = reactor.listenUDP(0, client, interface="127.0.0.1")283 server = Server()284 serverStarted = server.startedDeferred = defer.Deferred()285 port2 = reactor.listenUDP(0, server, interface="127.0.0.1")286 d = defer.DeferredList(287 [clientStarted, serverStarted],288 fireOnOneErrback=True)289 def cbStarted(ignored):290 connectionRefused = client.startedDeferred = defer.Deferred()291 client.transport.connect("127.0.0.1", 80)292 for i in range(10):293 client.transport.write(str(i))294 server.transport.write(str(i), ("127.0.0.1", 80))295 return self.assertFailure(296 connectionRefused,297 error.ConnectionRefusedError)298 d.addCallback(cbStarted)299 def cbFinished(ignored):300 return defer.DeferredList([301 defer.maybeDeferred(port.stopListening),302 defer.maybeDeferred(port2.stopListening)],303 fireOnOneErrback=True)304 d.addCallback(cbFinished)305 return d306 def testBadConnect(self):307 client = GoodClient()308 port = reactor.listenUDP(0, client, interface="127.0.0.1")309 self.assertRaises(ValueError, client.transport.connect, "localhost", 80)310 client.transport.connect("127.0.0.1", 80)311 self.assertRaises(RuntimeError, client.transport.connect, "127.0.0.1", 80)312 return port.stopListening()313 def testPortRepr(self):314 client = GoodClient()315 p = reactor.listenUDP(0, client)316 portNo = str(p.getHost().port)317 self.failIf(repr(p).find(portNo) == -1)318 def stoppedListening(ign):319 self.failIf(repr(p).find(portNo) != -1)320 return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)321class ReactorShutdownInteraction(unittest.TestCase):322 def testShutdownFromDatagramReceived(self):323 # udp.Port's doRead calls recvfrom() in a loop, as an optimization.324 # It is important this loop terminate under various conditions. 325 # Previously, if datagramReceived synchronously invoked326 # reactor.stop(), under certain reactors, the Port's socket would327 # synchronously disappear, causing an AttributeError inside that328 # loop. This was mishandled, causing the loop to spin forever. 329 # This test is primarily to ensure that the loop never spins330 # forever.331 server = Server()332 finished = defer.Deferred()333 p = reactor.listenUDP(0, server, interface='127.0.0.1')334 pr = server.packetReceived = defer.Deferred()335 def pktRece(ignored):336 # Simulate reactor.stop() behavior :(337 server.transport.connectionLost()338 # Then delay this Deferred chain until the protocol has been339 # disconnected, as the reactor should do in an error condition340 # such as we are inducing. This is very much a whitebox test.341 reactor.callLater(0, finished.callback, None)342 pr.addCallback(pktRece)343 def flushErrors(ignored):344 # We are breaking abstraction and calling private APIs, any345 # number of horrible errors might occur. As long as the reactor346 # doesn't hang, this test is satisfied. (There may be room for347 # another, stricter test.)348 log.flushErrors()349 finished.addCallback(flushErrors)350 server.transport.write('\0' * 64, ('127.0.0.1', server.transport.getHost().port))351 return finished352class MulticastTestCase(unittest.TestCase):353 def _resultSet(self, result, l):354 l.append(result)355 def runUntilSuccess(self, method, *args, **kwargs):356 l = []357 d = method(*args, **kwargs)358 d.addCallback(self._resultSet, l).addErrback(self._resultSet, l)359 while not l:360 reactor.iterate()361 if isinstance(l[0], failure.Failure):362 raise l[0].value363 def setUp(self):364 self.server = Server()365 self.client = Client()366 # multicast won't work if we listen over loopback, apparently367 self.port1 = reactor.listenMulticast(0, self.server)368 self.port2 = reactor.listenMulticast(0, self.client)369 reactor.iterate()370 reactor.iterate()371 self.client.transport.connect("127.0.0.1", self.server.transport.getHost().port)372 def tearDown(self):373 self.port1.stopListening()374 self.port2.stopListening()375 del self.server376 del self.client377 del self.port1378 del self.port2379 reactor.iterate()380 reactor.iterate()381 382 def testTTL(self):383 for o in self.client, self.server:384 self.assertEquals(o.transport.getTTL(), 1)385 o.transport.setTTL(2)386 self.assertEquals(o.transport.getTTL(), 2)387 def testLoopback(self):388 self.assertEquals(self.server.transport.getLoopbackMode(), 1)389 self.runUntilSuccess(self.server.transport.joinGroup, "225.0.0.250")390 self.server.transport.write("hello", ("225.0.0.250", self.server.transport.getHost().port))391 reactor.iterate()392 self.assertEquals(len(self.server.packets), 1)393 self.server.transport.setLoopbackMode(0)394 self.assertEquals(self.server.transport.getLoopbackMode(), 0)395 self.server.transport.write("hello", ("225.0.0.250", self.server.transport.getHost().port))396 reactor.iterate()397 self.assertEquals(len(self.server.packets), 1)398 399 def testInterface(self):400 for o in self.client, self.server:401 self.assertEquals(o.transport.getOutgoingInterface(), "0.0.0.0")402 self.runUntilSuccess(o.transport.setOutgoingInterface, "127.0.0.1")403 self.assertEquals(o.transport.getOutgoingInterface(), "127.0.0.1")404 405 def testJoinLeave(self):406 for o in self.client, self.server:407 self.runUntilSuccess(o.transport.joinGroup, "225.0.0.250")408 self.runUntilSuccess(o.transport.leaveGroup, "225.0.0.250")409 def testMulticast(self):410 c = Server()411 p = reactor.listenMulticast(0, c)412 self.runUntilSuccess(self.server.transport.joinGroup, "225.0.0.250")413 c.transport.write("hello world", ("225.0.0.250", self.server.transport.getHost().port))414 415 iters = 0416 while iters < 100 and len(self.server.packets) == 0:417 reactor.iterate(0.05);418 iters += 1419 self.assertEquals(self.server.packets[0][0], "hello world")420 p.stopListening()421 def testMultiListen(self):422 c = Server()423 p = reactor.listenMulticast(0, c, listenMultiple=True)424 self.runUntilSuccess(self.server.transport.joinGroup, "225.0.0.250")425 portno = p.getHost().port426 c2 = Server()427 p2 = reactor.listenMulticast(portno, c2, listenMultiple=True)428 self.runUntilSuccess(self.server.transport.joinGroup, "225.0.0.250")429 c.transport.write("hello world", ("225.0.0.250", portno))430 d = defer.Deferred()431 reactor.callLater(0.4, d.callback, None, c, c2, p, p2)432 return d433 def _cbTestMultiListen(self, ignored, c, c2, p, p2):434 self.assertEquals(c.packets[0][0], "hello world")435 self.assertEquals(c2.packets[0][0], "hello world")436 p.stopListening()437 p2.stopListening()438 testMultiListen.skip = "on non-linux platforms it appears multiple processes can listen, but not multiple sockets in same process?"439if not interfaces.IReactorUDP(reactor, None):440 UDPTestCase.skip = "This reactor does not support UDP"441if not hasattr(reactor, "connectUDP"):442 OldConnectedUDPTestCase.skip = "This reactor does not support connectUDP"443if not interfaces.IReactorMulticast(reactor, None):444 MulticastTestCase.skip = "This reactor does not support multicast"445def checkForLinux22():446 import os447 if os.path.exists("/proc/version"):448 s = open("/proc/version").read()449 if s.startswith("Linux version"):450 s = s.split()[2]451 if s.split(".")[:2] == ["2", "2"]:452 MulticastTestCase.testInterface.im_func.todo = "figure out why this fails in linux 2.2"...

Full Screen

Full Screen

orf.py

Source:orf.py Github

copy

Full Screen

1import re2global out3def orf(inbases):4 """5 Takes in DNA bases from a file and translates them to RNA. Finds all indices6 of a start codon. 'started' is set to True when a start codon is encountered.7 'started' is set to False when a stop codon is encountered. If 'started' is8 True when a stop codon is encountered, then the translated string is added to9 the set 'strings', which gets returned at the end. 10 """11 #must start with AUG12 #must end with UAG, UGA, or UAA13 strings = set()14 translation = ''15 started = False16 17 #find anywhere there is a start codon18 indices = [m.start() for m in re.finditer('AUG', inbases)] 19 #iterate beginning at all start codons20 for index in indices:21 bases = inbases[index:]22 #while there are still at least 3 codons left23 #translate to amino acids24 while (len(bases[:3])==3):25 if(bases[0]=='U'):26 if(bases[1]=='U'):27 if(bases[2]=='U' or bases[2]=='C'):28 if(started):29 translation = translation + 'F'30 else:31 if(started):32 translation = translation + 'L'33 elif(bases[1]=='C'):34 if(started):35 translation = translation + 'S'36 elif(bases[1]=='A'):37 if(bases[2]=='U' or bases[2]=='C'):38 if(started):39 translation = translation + 'Y'40 else:41 #stop codon42 if(started):43 started = False44 strings.add(translation)45 translation = ''46 break47 else:48 if(bases[2]=='U' or bases[2]=='C'):49 if(started):50 translation = translation + 'C'51 elif(bases[2]=='A'):52 #stop codon53 if(started):54 started = False55 strings.add(translation)56 translation = ''57 break58 else:59 if(started):60 translation = translation + 'W'61 elif(bases[0]=='C'):62 if(bases[1]=='U'):63 if(started):64 translation = translation + 'L'65 elif(bases[1]=='C'):66 if(started):67 translation = translation + 'P'68 elif(bases[1]=='A'):69 if(bases[2]=='U' or bases[2]=='C'):70 if(started):71 translation = translation + 'H'72 else:73 if(started):74 translation = translation + 'Q'75 else:76 if(started):77 translation = translation + 'R'78 elif(bases[0]=='A'):79 if(bases[1]=='U'):80 if(bases[2]=='G'):81 #start codon82 started = True83 if(started):84 translation = translation + 'M'85 else:86 if(started):87 translation = translation + 'I'88 elif(bases[1]=='C'):89 if(started):90 translation = translation + 'T'91 elif(bases[1]=='A'):92 if(bases[2]=='U' or bases[2]=='C'):93 if(started):94 translation = translation + 'N'95 else:96 if(started):97 translation = translation + 'K'98 else:99 if(bases[2]=='U' or bases[2]=='C'):100 if(started):101 translation = translation + 'S'102 else:103 if(started):104 translation = translation + 'R'105 else:106 if(bases[1]=='U'):107 if(started):108 translation = translation + 'V'109 elif(bases[1]=='C'):110 if(started):111 translation = translation + 'A'112 elif(bases[1]=='A'):113 if(bases[2]=='U' or bases[2]=='C'):114 if(started):115 translation = translation + 'D'116 else:117 if(started):118 translation = translation + 'E'119 else:120 if(started):121 translation = translation + 'G'122 #move to next codon123 bases = bases[3:]124 125 return strings126def main():127 out = set()128 filename = input('Please enter the filename that you would like to read DNA codons from:\n')129 content = open(filename, 'r')130 bases = content.readline()131 132 #first, replace Ts with Us133 newbases = ''134 for codon in bases:135 if (codon=='A'):136 newbases = newbases + 'A'137 elif (codon=='T'):138 newbases = newbases + 'U'139 elif (codon=='G'):140 newbases = newbases + 'G'141 else:142 newbases = newbases + 'C'143 out.update(orf(newbases))144 145 #first reverse146 reverse = bases[::-1]147 #then complement148 for codon in reverse:149 if (codon=='A'):150 reverse = reverse + 'U'151 elif (codon=='T'):152 reverse = reverse + 'A'153 elif (codon=='G'):154 reverse = reverse + 'C'155 else:156 reverse = reverse + 'G'157 out.update(orf(reverse))158 #all done159 print(out)160if __name__ == "__main__":...

Full Screen

Full Screen

execution_info.py

Source:execution_info.py Github

copy

Full Screen

1import datetime2import pytz3class TaskEvent:4 def __init__(self, p_case, task_id, resource_id, resource_available_at=None, enabled_at=None, enabled_datetime=None, bpm_env=None):5 self.p_case = p_case # ID of the current trace, i.e., index of the trace in log_info list6 self.task_id = task_id # Name of the task related to the current event7 self.resource_id = resource_id # ID of the resource performing to the event8 self.waiting_time = None9 self.processing_time = None10 self.normalized_waiting = None11 self.normalized_processing = None12 if resource_available_at is not None:13 # Time moment in seconds from beginning, i.e., first event has time = 014 self.enabled_at = enabled_at15 # Datetime of the time-moment calculated from the starting simulation datetime16 self.enabled_datetime = enabled_datetime17 # Time moment in seconds from beginning, i.e., first event has time = 018 self.started_at = max(resource_available_at, enabled_at)19 # Datetime of the time-moment calculated from the starting simulation datetime20 self.started_datetime = bpm_env.simulation_datetime_from(self.started_at)21 # Ideal duration from the distribution-function if allocate resource doesn't rest22 self.ideal_duration = bpm_env.sim_setup.ideal_task_duration(task_id, resource_id)23 # Actual duration adding the resource resting-time according to their calendar24 self.real_duration = bpm_env.sim_setup.real_task_duration(self.ideal_duration, self.resource_id,25 self.started_datetime)26 # Time moment in seconds from beginning, i.e., first event has time = 027 self.completed_at = self.started_at + self.real_duration28 # Datetime of the time-moment calculated from the starting simulation datetime29 self.completed_datetime = bpm_env.simulation_datetime_from(self.completed_at)30 # Time of a resource was resting while performing a task (in seconds)31 self.idle_time = self.real_duration - self.ideal_duration32 # Time from an event is enabled until it is started by any resource33 self.waiting_time = self.started_at - self.enabled_at34 self.idle_cycle_time = self.completed_at - self.enabled_at35 self.idle_processing_time = self.completed_at - self.started_at36 self.cycle_time = self.idle_cycle_time - self.idle_time37 self.processing_time = self.idle_processing_time - self.idle_time38 else:39 self.task_name = None40 self.enabled_at = enabled_at41 self.enabled_by = None42 self.started_at = None43 self.completed_at = None44 self.idle_time = None45 def update_enabling_times(self, enabled_at):46 if self.started_at is None or enabled_at > self.started_at:47 raise Exception("Task ENABLED after STARTED")48 self.enabled_at = enabled_at49 self.waiting_time = (self.started_at - self.enabled_at).total_seconds()50 self.processing_time = (self.completed_at - self.started_at).total_seconds()51class LogEvent:52 def __int__(self, task_id, started_datetime, resource_id):53 self.task_id = task_id54 self.started_datetime = started_datetime55 self.resource_id = resource_id56 self.completed_datetime = None57class Trace:58 def __init__(self, p_case, started_at=datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, pytz.utc)):59 self.p_case = p_case60 self.started_at = started_at61 self.completed_at = started_at62 self.event_list = list()63 self.cycle_time = None64 self.idle_cycle_time = None65 self.processing_time = None66 self.idle_processing_time = None67 self.waiting_time = None68 self.idle_time = None69 def start_event(self, task_id, task_name, started_at, resource_name):70 event_info = TaskEvent(self.p_case, task_id, resource_name)71 event_info.task_name = task_name72 event_info.started_at = started_at73 event_index = len(self.event_list)74 self.event_list.append(event_info)75 self.started_at = min(self.started_at, started_at)76 return event_index77 def complete_event(self, event_index, completed_at, idle_time=0):78 self.event_list[event_index].completed_at = completed_at79 self.event_list[event_index].idle_time = idle_time80 self.completed_at = max(self.completed_at, self.event_list[event_index].completed_at)81 return self.event_list[event_index]82 def sort_by_completion_date(self, completed_at=False):83 if completed_at:84 self.event_list.sort(key=lambda e_info: e_info.completed_at)85 else:86 self.event_list.sort(key=lambda e_info: e_info.started_at)87 self.started_at = self.event_list[0].started_at88 self.completed_at = self.event_list[len(self.event_list) - 1].completed_at89 def filter_incomplete_events(self):90 filtered_list = list()91 filtered_events = 092 for ev_info in self.event_list:93 if ev_info.started_at is not None and ev_info.completed_at is not None:94 filtered_list.append(ev_info)95 else:96 filtered_events += 297 self.event_list = filtered_list98 return filtered_events99class EnabledEvent:100 def __init__(self, p_case, p_state, task_id, enabled_at, enabled_datetime):101 self.p_case = p_case102 self.p_state = p_state103 self.task_id = task_id104 self.enabled_datetime = enabled_datetime105 self.enabled_at = enabled_at106class ProcessInfo:107 def __init__(self):108 self.traces = dict()...

Full Screen

Full Screen

test_notification_template_serializers.py

Source:test_notification_template_serializers.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import pytest3from rest_framework.serializers import ValidationError4# AWX5from awx.api.serializers import NotificationTemplateSerializer6class StubNotificationTemplate():7 notification_type = 'email'8class TestNotificationTemplateSerializer():9 @pytest.mark.parametrize('valid_messages',10 [None,11 {'started': None},12 {'started': {'message': None}},13 {'started': {'message': 'valid'}},14 {'started': {'body': 'valid'}},15 {'started': {'message': 'valid', 'body': 'valid'}},16 {'started': None, 'success': None, 'error': None},17 {'started': {'message': None, 'body': None},18 'success': {'message': None, 'body': None},19 'error': {'message': None, 'body': None}},20 {'started': {'message': '{{ job.id }}', 'body': '{{ job.status }}'},21 'success': {'message': None, 'body': '{{ job_friendly_name }}'},22 'error': {'message': '{{ url }}', 'body': None}},23 {'started': {'body': '{{ job_metadata }}'}},24 {'started': {'body': '{{ job.summary_fields.inventory.total_hosts }}'}},25 {'started': {'body': u'Iñtërnâtiônàlizætiøn'}}26 ])27 def test_valid_messages(self, valid_messages):28 serializer = NotificationTemplateSerializer()29 serializer.instance = StubNotificationTemplate()30 serializer.validate_messages(valid_messages)31 @pytest.mark.parametrize('invalid_messages',32 [1,33 [],34 '',35 {'invalid_event': ''},36 {'started': 'should_be_dict'},37 {'started': {'bad_message_type': ''}},38 {'started': {'message': 1}},39 {'started': {'message': []}},40 {'started': {'message': {}}},41 {'started': {'message': '{{ unclosed_braces'}},42 {'started': {'message': '{{ undefined }}'}},43 {'started': {'message': '{{ job.undefined }}'}},44 {'started': {'message': '{{ job.id | bad_filter }}'}},45 {'started': {'message': '{{ job.__class__ }}'}},46 {'started': {'message': 'Newlines \n not allowed\n'}},47 ])48 def test_invalid__messages(self, invalid_messages):49 serializer = NotificationTemplateSerializer()50 serializer.instance = StubNotificationTemplate()51 with pytest.raises(ValidationError):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful