Best Python code snippet using slash
test_udp.py
Source:test_udp.py  
1# -*- test-case-name: twisted.test.test_udp -*-2# Copyright (c) 2001-2004 Twisted Matrix Laboratories.3# See LICENSE for details.4# 5from twisted.trial import unittest, util6from twisted.internet import protocol, reactor, error, defer, interfaces, address7from twisted.python import log, failure, components8class Mixin:9    started = 010    stopped = 011    startedDeferred = None12    def __init__(self):13        self.packets = []14    15    def startProtocol(self):16        self.started = 117        if self.startedDeferred is not None:18            d, self.startedDeferred = self.startedDeferred, None19            d.callback(None)20    def stopProtocol(self):21        self.stopped = 122class Server(Mixin, protocol.DatagramProtocol):23    24    packetReceived = None25    refused = 026    def datagramReceived(self, data, addr):27        self.packets.append((data, addr))28        if self.packetReceived is not None:29            d, self.packetReceived = self.packetReceived, None30            d.callback(None)31class Client(Mixin, protocol.ConnectedDatagramProtocol):32    33    packetReceived = None34    refused = 035    def datagramReceived(self, data):36        self.packets.append(data)37        if self.packetReceived is not None:38            d, self.packetReceived = self.packetReceived, None39            d.callback(None)40    def connectionFailed(self, failure):41        if self.startedDeferred is not None:42            d, self.startedDeferred = self.startedDeferred, None43            d.errback(failure)44        self.failure = failure45    def connectionRefused(self):46        if self.startedDeferred is not None:47            d, self.startedDeferred = self.startedDeferred, None48            d.errback(error.ConnectionRefusedError("yup"))49        self.refused = 150class GoodClient(Server):51    52    def connectionRefused(self):53        if self.startedDeferred is not None:54            d, self.startedDeferred = self.startedDeferred, None55            d.errback(error.ConnectionRefusedError("yup"))56        self.refused = 157class PortCleanerUpper(unittest.TestCase):58    callToLoseCnx = 'loseConnection'59    def setUp(self):60        self.ports = []61    def tearDown(self):62        self.cleanPorts(*self.ports)63    def _addPorts(self, *ports):64        for p in ports:65            self.ports.append(p)66    def cleanPorts(self, *ports):67        for p in ports:68            if not hasattr(p, 'disconnected'):69                raise RuntimeError, ("You handed something to cleanPorts that"70                                     " doesn't have a disconnected attribute, dummy!")71            if not p.disconnected:72                d = getattr(p, self.callToLoseCnx)()73                if isinstance(d, defer.Deferred):74                    wait(d)75                else:76                    try:77                        util.spinUntil(lambda :p.disconnected)78                    except:79                        failure.Failure().printTraceback()80class OldConnectedUDPTestCase(PortCleanerUpper):81    def testStartStop(self):82        client = Client()83        d = client.startedDeferred = defer.Deferred()84        port2 = reactor.connectUDP("127.0.0.1", 8888, client)85        def assertName():86            self.failUnless(repr(port2).find('test_udp.Client') >= 0)87        def cbStarted(ignored):88            self.assertEquals(client.started, 1)89            self.assertEquals(client.stopped, 0)90            assertName()91            return defer.maybeDeferred(port2.stopListening).addCallback(lambda ign: assertName())92        return d.addCallback(cbStarted)93    testStartStop.suppress = [94        util.suppress(message='use listenUDP and then transport.connect',95                      category=DeprecationWarning)]96    def testDNSFailure(self):97        client = Client()98        d = client.startedDeferred = defer.Deferred()99        # if this domain exists, shoot your sysadmin100        reactor.connectUDP("xxxxxxxxx.zzzzzzzzz.yyyyy.", 8888, client)101        def didNotConnect(ign):102            self.assertEquals(client.stopped, 0)103            self.assertEquals(client.started, 0)104        105        d = self.assertFailure(d, error.DNSLookupError)106        d.addCallback(didNotConnect)107        return d108    testDNSFailure.suppress = [109        util.suppress(message='use listenUDP and then transport.connect',110                      category=DeprecationWarning)]111    def testSendPackets(self):112        server = Server()113        serverStarted = server.startedDeferred = defer.Deferred()114        client = Client()115        clientStarted = client.startedDeferred = defer.Deferred()116        port1 = reactor.listenUDP(0, server, interface="127.0.0.1")117        def cbServerStarted(ignored):118            self.port2 = reactor.connectUDP("127.0.0.1", server.transport.getHost().port, client)119            return clientStarted120        d = serverStarted.addCallback(cbServerStarted)121        def cbClientStarted(ignored):122            clientSend = server.packetReceived = defer.Deferred()123            serverSend = client.packetReceived = defer.Deferred()124            cAddr = client.transport.getHost()125            server.transport.write("hello", (cAddr.host, cAddr.port))126            client.transport.write("world")127            # No one will ever call errback on either of these Deferreds,128            # otherwise I would pass fireOnOneErrback=True here.129            return defer.DeferredList([clientSend, serverSend])130        d.addCallback(cbClientStarted)131        def cbPackets(ignored):132            self.assertEquals(client.packets, ["hello"])133            self.assertEquals(server.packets, [("world", ("127.0.0.1", client.transport.getHost().port))])134            135            return defer.DeferredList([136                defer.maybeDeferred(port1.stopListening),137                defer.maybeDeferred(self.port2.stopListening)],138                fireOnOneErrback=True)139        d.addCallback(cbPackets)140        return d141    testSendPackets.suppress = [142        util.suppress(message='use listenUDP and then transport.connect',143                      category=DeprecationWarning)]144    def testConnectionRefused(self):145        # assume no one listening on port 80 UDP146        client = Client()147        port = reactor.connectUDP("127.0.0.1", 80, client)148        server = Server()149        port2 = reactor.listenUDP(0, server, interface="127.0.0.1")150        reactor.iterate()151        reactor.iterate()152        reactor.iterate()153        client.transport.write("a")154        client.transport.write("b")155        server.transport.write("c", ("127.0.0.1", 80))156        server.transport.write("d", ("127.0.0.1", 80))157        server.transport.write("e", ("127.0.0.1", 80))158        server.transport.write("toserver", (port2.getHost().host,159                                            port2.getHost().port))160        server.transport.write("toclient", (port.getHost().host,161                                            port.getHost().port))162        reactor.iterate(); reactor.iterate()163        self.assertEquals(client.refused, 1)164        port.stopListening()165        port2.stopListening()166        reactor.iterate(); reactor.iterate()167    testConnectionRefused.suppress = [168        util.suppress(message='use listenUDP and then transport.connect',169                      category=DeprecationWarning)]170class UDPTestCase(unittest.TestCase):171    def testOldAddress(self):172        server = Server()173        d = server.startedDeferred = defer.Deferred()174        p = reactor.listenUDP(0, server, interface="127.0.0.1")175        def cbStarted(ignored):176            addr = p.getHost()177            self.assertEquals(addr, ('INET_UDP', addr.host, addr.port))178            return p.stopListening()179        return d.addCallback(cbStarted)180    testOldAddress.suppress = [181        util.suppress(message='IPv4Address.__getitem__',182                      category=DeprecationWarning)]183    184    def testStartStop(self):185        server = Server()186        d = server.startedDeferred = defer.Deferred()187        port1 = reactor.listenUDP(0, server, interface="127.0.0.1")188        def cbStarted(ignored):189            self.assertEquals(server.started, 1)190            self.assertEquals(server.stopped, 0)191            return port1.stopListening()192        def cbStopped(ignored):193            self.assertEquals(server.stopped, 1)194        return d.addCallback(cbStarted).addCallback(cbStopped)195    def testRebind(self):196        # Ensure binding the same DatagramProtocol repeatedly invokes all197        # the right callbacks.198        server = Server()199        d = server.startedDeferred = defer.Deferred()200        p = reactor.listenUDP(0, server, interface="127.0.0.1")201        def cbStarted(ignored, port):202            return port.stopListening()203        def cbStopped(ignored):204            d = server.startedDeferred = defer.Deferred()205            p = reactor.listenUDP(0, server, interface="127.0.0.1")206            return d.addCallback(cbStarted, p)207        return d.addCallback(cbStarted, p)208    209    def testBindError(self):210        server = Server()211        d = server.startedDeferred = defer.Deferred()212        port = reactor.listenUDP(0, server, interface='127.0.0.1')213        def cbStarted(ignored):214            self.assertEquals(port.getHost(), server.transport.getHost())215            server2 = Server()216            self.assertRaises(217                error.CannotListenError,218                reactor.listenUDP, port.getHost().port, server2, interface='127.0.0.1')219        d.addCallback(cbStarted)220        def cbFinished(ignored):221            return port.stopListening()222        d.addCallback(cbFinished)223        return d224    def testSendPackets(self):225        server = Server()226        serverStarted = server.startedDeferred = defer.Deferred()227        port1 = reactor.listenUDP(0, server, interface="127.0.0.1")228        client = GoodClient()229        clientStarted = client.startedDeferred = defer.Deferred()230        def cbServerStarted(ignored):231            self.port2 = reactor.listenUDP(0, client, interface="127.0.0.1")232            return clientStarted233        d = serverStarted.addCallback(cbServerStarted)234        def cbClientStarted(ignored):235            client.transport.connect("127.0.0.1", server.transport.getHost().port)236            cAddr = client.transport.getHost()237            sAddr = server.transport.getHost()238            serverSend = client.packetReceived = defer.Deferred()239            server.transport.write("hello", (cAddr.host, cAddr.port))240            clientWrites = [241                ("a",),242                ("b", None),243                ("c", (sAddr.host, sAddr.port))]244            def cbClientSend(ignored):245                if clientWrites:246                    nextClientWrite = server.packetReceived = defer.Deferred()247                    nextClientWrite.addCallback(cbClientSend)248                    client.transport.write(*clientWrites.pop(0))249                    return nextClientWrite250            # No one will ever call .errback on either of these Deferreds,251            # but there is a non-trivial amount of test code which might252            # cause them to fail somehow.  So fireOnOneErrback=True.253            return defer.DeferredList([254                cbClientSend(None),255                serverSend],256                fireOnOneErrback=True)257        d.addCallback(cbClientStarted)258        def cbSendsFinished(ignored):259            cAddr = client.transport.getHost()260            sAddr = server.transport.getHost()261            self.assertEquals(262                client.packets,263                [("hello", (sAddr.host, sAddr.port))])264            clientAddr = (cAddr.host, cAddr.port)265            self.assertEquals(266                server.packets,267                [("a", clientAddr),268                 ("b", clientAddr),269                 ("c", clientAddr)])270        d.addCallback(cbSendsFinished)271        def cbFinished(ignored):272            return defer.DeferredList([273                defer.maybeDeferred(port1.stopListening),274                defer.maybeDeferred(self.port2.stopListening)],275                fireOnOneErrback=True)276        d.addCallback(cbFinished)277        return d278    def testConnectionRefused(self):279        # assume no one listening on port 80 UDP280        client = GoodClient()281        clientStarted = client.startedDeferred = defer.Deferred()282        port = reactor.listenUDP(0, client, interface="127.0.0.1")283        server = Server()284        serverStarted = server.startedDeferred = defer.Deferred()285        port2 = reactor.listenUDP(0, server, interface="127.0.0.1")286        d = defer.DeferredList(287            [clientStarted, serverStarted],288            fireOnOneErrback=True)289        def cbStarted(ignored):290            connectionRefused = client.startedDeferred = defer.Deferred()291            client.transport.connect("127.0.0.1", 80)292            for i in range(10):293                client.transport.write(str(i))294                server.transport.write(str(i), ("127.0.0.1", 80))295            return self.assertFailure(296                connectionRefused,297                error.ConnectionRefusedError)298        d.addCallback(cbStarted)299        def cbFinished(ignored):300            return defer.DeferredList([301                defer.maybeDeferred(port.stopListening),302                defer.maybeDeferred(port2.stopListening)],303                fireOnOneErrback=True)304        d.addCallback(cbFinished)305        return d306    def testBadConnect(self):307        client = GoodClient()308        port = reactor.listenUDP(0, client, interface="127.0.0.1")309        self.assertRaises(ValueError, client.transport.connect, "localhost", 80)310        client.transport.connect("127.0.0.1", 80)311        self.assertRaises(RuntimeError, client.transport.connect, "127.0.0.1", 80)312        return port.stopListening()313    def testPortRepr(self):314        client = GoodClient()315        p = reactor.listenUDP(0, client)316        portNo = str(p.getHost().port)317        self.failIf(repr(p).find(portNo) == -1)318        def stoppedListening(ign):319            self.failIf(repr(p).find(portNo) != -1)320        return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)321class ReactorShutdownInteraction(unittest.TestCase):322    def testShutdownFromDatagramReceived(self):323        # udp.Port's doRead calls recvfrom() in a loop, as an optimization.324        # It is important this loop terminate under various conditions. 325        # Previously, if datagramReceived synchronously invoked326        # reactor.stop(), under certain reactors, the Port's socket would327        # synchronously disappear, causing an AttributeError inside that328        # loop.  This was mishandled, causing the loop to spin forever. 329        # This test is primarily to ensure that the loop never spins330        # forever.331        server = Server()332        finished = defer.Deferred()333        p = reactor.listenUDP(0, server, interface='127.0.0.1')334        pr = server.packetReceived = defer.Deferred()335        def pktRece(ignored):336            # Simulate reactor.stop() behavior :(337            server.transport.connectionLost()338            # Then delay this Deferred chain until the protocol has been339            # disconnected, as the reactor should do in an error condition340            # such as we are inducing.  This is very much a whitebox test.341            reactor.callLater(0, finished.callback, None)342        pr.addCallback(pktRece)343        def flushErrors(ignored):344            # We are breaking abstraction and calling private APIs, any345            # number of horrible errors might occur.  As long as the reactor346            # doesn't hang, this test is satisfied.  (There may be room for347            # another, stricter test.)348            log.flushErrors()349        finished.addCallback(flushErrors)350        server.transport.write('\0' * 64, ('127.0.0.1', server.transport.getHost().port))351        return finished352class MulticastTestCase(unittest.TestCase):353    def _resultSet(self, result, l):354        l.append(result)355    def runUntilSuccess(self, method, *args, **kwargs):356        l = []357        d = method(*args, **kwargs)358        d.addCallback(self._resultSet, l).addErrback(self._resultSet, l)359        while not l:360            reactor.iterate()361        if isinstance(l[0], failure.Failure):362            raise l[0].value363    def setUp(self):364        self.server = Server()365        self.client = Client()366        # multicast won't work if we listen over loopback, apparently367        self.port1 = reactor.listenMulticast(0, self.server)368        self.port2 = reactor.listenMulticast(0, self.client)369        reactor.iterate()370        reactor.iterate()371        self.client.transport.connect("127.0.0.1", self.server.transport.getHost().port)372    def tearDown(self):373        self.port1.stopListening()374        self.port2.stopListening()375        del self.server376        del self.client377        del self.port1378        del self.port2379        reactor.iterate()380        reactor.iterate()381    382    def testTTL(self):383        for o in self.client, self.server:384            self.assertEquals(o.transport.getTTL(), 1)385            o.transport.setTTL(2)386            self.assertEquals(o.transport.getTTL(), 2)387    def testLoopback(self):388        self.assertEquals(self.server.transport.getLoopbackMode(), 1)389        self.runUntilSuccess(self.server.transport.joinGroup, "225.0.0.250")390        self.server.transport.write("hello", ("225.0.0.250", self.server.transport.getHost().port))391        reactor.iterate()392        self.assertEquals(len(self.server.packets), 1)393        self.server.transport.setLoopbackMode(0)394        self.assertEquals(self.server.transport.getLoopbackMode(), 0)395        self.server.transport.write("hello", ("225.0.0.250", self.server.transport.getHost().port))396        reactor.iterate()397        self.assertEquals(len(self.server.packets), 1)398    399    def testInterface(self):400        for o in self.client, self.server:401            self.assertEquals(o.transport.getOutgoingInterface(), "0.0.0.0")402            self.runUntilSuccess(o.transport.setOutgoingInterface, "127.0.0.1")403            self.assertEquals(o.transport.getOutgoingInterface(), "127.0.0.1")404    405    def testJoinLeave(self):406        for o in self.client, self.server:407            self.runUntilSuccess(o.transport.joinGroup, "225.0.0.250")408            self.runUntilSuccess(o.transport.leaveGroup, "225.0.0.250")409    def testMulticast(self):410        c = Server()411        p = reactor.listenMulticast(0, c)412        self.runUntilSuccess(self.server.transport.joinGroup, "225.0.0.250")413        c.transport.write("hello world", ("225.0.0.250", self.server.transport.getHost().port))414        415        iters = 0416        while iters < 100 and len(self.server.packets) == 0:417            reactor.iterate(0.05);418            iters += 1419        self.assertEquals(self.server.packets[0][0], "hello world")420        p.stopListening()421    def testMultiListen(self):422        c = Server()423        p = reactor.listenMulticast(0, c, listenMultiple=True)424        self.runUntilSuccess(self.server.transport.joinGroup, "225.0.0.250")425        portno = p.getHost().port426        c2 = Server()427        p2 = reactor.listenMulticast(portno, c2, listenMultiple=True)428        self.runUntilSuccess(self.server.transport.joinGroup, "225.0.0.250")429        c.transport.write("hello world", ("225.0.0.250", portno))430        d = defer.Deferred()431        reactor.callLater(0.4, d.callback, None, c, c2, p, p2)432        return d433    def _cbTestMultiListen(self, ignored, c, c2, p, p2):434        self.assertEquals(c.packets[0][0], "hello world")435        self.assertEquals(c2.packets[0][0], "hello world")436        p.stopListening()437        p2.stopListening()438    testMultiListen.skip = "on non-linux platforms it appears multiple processes can listen, but not multiple sockets in same process?"439if not interfaces.IReactorUDP(reactor, None):440    UDPTestCase.skip = "This reactor does not support UDP"441if not hasattr(reactor, "connectUDP"):442    OldConnectedUDPTestCase.skip = "This reactor does not support connectUDP"443if not interfaces.IReactorMulticast(reactor, None):444    MulticastTestCase.skip = "This reactor does not support multicast"445def checkForLinux22():446    import os447    if os.path.exists("/proc/version"):448        s = open("/proc/version").read()449        if s.startswith("Linux version"):450            s = s.split()[2]451            if s.split(".")[:2] == ["2", "2"]:452                MulticastTestCase.testInterface.im_func.todo = "figure out why this fails in linux 2.2"...orf.py
Source:orf.py  
1import re2global out3def orf(inbases):4    """5    Takes in DNA bases from a file and translates them to RNA. Finds all indices6    of a start codon. 'started' is set to True when a start codon is encountered.7    'started' is set to False when a stop codon is encountered. If 'started' is8    True when a stop codon is encountered, then the translated string is added to9    the set 'strings', which gets returned at the end. 10    """11    #must start with AUG12    #must end with UAG, UGA, or UAA13    strings = set()14    translation = ''15    started = False16    17    #find anywhere there is a start codon18    indices = [m.start() for m in re.finditer('AUG', inbases)] 19    #iterate beginning at all start codons20    for index in indices:21        bases = inbases[index:]22        #while there are still at least 3 codons left23        #translate to amino acids24        while (len(bases[:3])==3):25            if(bases[0]=='U'):26                if(bases[1]=='U'):27                    if(bases[2]=='U' or bases[2]=='C'):28                        if(started):29                            translation = translation + 'F'30                    else:31                        if(started):32                            translation = translation + 'L'33                elif(bases[1]=='C'):34                    if(started):35                        translation = translation + 'S'36                elif(bases[1]=='A'):37                    if(bases[2]=='U' or bases[2]=='C'):38                        if(started):39                            translation = translation + 'Y'40                    else:41                        #stop codon42                        if(started):43                            started = False44                            strings.add(translation)45                            translation = ''46                            break47                else:48                    if(bases[2]=='U' or bases[2]=='C'):49                        if(started):50                            translation = translation + 'C'51                    elif(bases[2]=='A'):52                        #stop codon53                        if(started):54                            started = False55                            strings.add(translation)56                            translation = ''57                            break58                    else:59                        if(started):60                            translation = translation + 'W'61            elif(bases[0]=='C'):62                if(bases[1]=='U'):63                    if(started):64                        translation = translation + 'L'65                elif(bases[1]=='C'):66                    if(started):67                        translation = translation + 'P'68                elif(bases[1]=='A'):69                    if(bases[2]=='U' or bases[2]=='C'):70                        if(started):71                            translation = translation + 'H'72                    else:73                        if(started):74                            translation = translation + 'Q'75                else:76                    if(started):77                        translation = translation + 'R'78            elif(bases[0]=='A'):79                if(bases[1]=='U'):80                    if(bases[2]=='G'):81                        #start codon82                        started = True83                        if(started):84                            translation = translation + 'M'85                    else:86                        if(started):87                            translation = translation + 'I'88                elif(bases[1]=='C'):89                    if(started):90                        translation = translation + 'T'91                elif(bases[1]=='A'):92                    if(bases[2]=='U' or bases[2]=='C'):93                        if(started):94                            translation = translation + 'N'95                    else:96                        if(started):97                            translation = translation + 'K'98                else:99                    if(bases[2]=='U' or bases[2]=='C'):100                        if(started):101                            translation = translation + 'S'102                    else:103                        if(started):104                            translation = translation + 'R'105            else:106                if(bases[1]=='U'):107                    if(started):108                        translation = translation + 'V'109                elif(bases[1]=='C'):110                    if(started):111                        translation = translation + 'A'112                elif(bases[1]=='A'):113                    if(bases[2]=='U' or bases[2]=='C'):114                        if(started):115                            translation = translation + 'D'116                    else:117                        if(started):118                            translation = translation + 'E'119                else:120                    if(started):121                        translation = translation + 'G'122            #move to next codon123            bases = bases[3:]124        125    return strings126def main():127    out = set()128    filename = input('Please enter the filename that you would like to read DNA codons from:\n')129    content = open(filename, 'r')130    bases = content.readline()131    132    #first, replace Ts with Us133    newbases = ''134    for codon in bases:135        if (codon=='A'):136            newbases = newbases + 'A'137        elif (codon=='T'):138            newbases = newbases + 'U'139        elif (codon=='G'):140            newbases = newbases + 'G'141        else:142            newbases = newbases + 'C'143    out.update(orf(newbases))144    145    #first reverse146    reverse = bases[::-1]147    #then complement148    for codon in reverse:149        if (codon=='A'):150            reverse = reverse + 'U'151        elif (codon=='T'):152            reverse = reverse + 'A'153        elif (codon=='G'):154            reverse = reverse + 'C'155        else:156            reverse = reverse + 'G'157    out.update(orf(reverse))158    #all done159    print(out)160if __name__ == "__main__":...execution_info.py
Source:execution_info.py  
1import datetime2import pytz3class TaskEvent:4    def __init__(self, p_case, task_id, resource_id, resource_available_at=None, enabled_at=None, enabled_datetime=None, bpm_env=None):5        self.p_case = p_case  # ID of the current trace, i.e., index of the trace in log_info list6        self.task_id = task_id  # Name of the task related to the current event7        self.resource_id = resource_id  # ID of the resource performing to the event8        self.waiting_time = None9        self.processing_time = None10        self.normalized_waiting = None11        self.normalized_processing = None12        if resource_available_at is not None:13            # Time moment in seconds from beginning, i.e., first event has time = 014            self.enabled_at = enabled_at15            # Datetime of the time-moment calculated from the starting simulation datetime16            self.enabled_datetime = enabled_datetime17            # Time moment in seconds from beginning, i.e., first event has time = 018            self.started_at = max(resource_available_at, enabled_at)19            # Datetime of the time-moment calculated from the starting simulation datetime20            self.started_datetime = bpm_env.simulation_datetime_from(self.started_at)21            # Ideal duration from the distribution-function if allocate resource doesn't rest22            self.ideal_duration = bpm_env.sim_setup.ideal_task_duration(task_id, resource_id)23            # Actual duration adding the resource resting-time according to their calendar24            self.real_duration = bpm_env.sim_setup.real_task_duration(self.ideal_duration, self.resource_id,25                                                                      self.started_datetime)26            # Time moment in seconds from beginning, i.e., first event has time = 027            self.completed_at = self.started_at + self.real_duration28            # Datetime of the time-moment calculated from the starting simulation datetime29            self.completed_datetime = bpm_env.simulation_datetime_from(self.completed_at)30            # Time of a resource was resting while performing a task (in seconds)31            self.idle_time = self.real_duration - self.ideal_duration32            # Time from an event is enabled until it is started by any resource33            self.waiting_time = self.started_at - self.enabled_at34            self.idle_cycle_time = self.completed_at - self.enabled_at35            self.idle_processing_time = self.completed_at - self.started_at36            self.cycle_time = self.idle_cycle_time - self.idle_time37            self.processing_time = self.idle_processing_time - self.idle_time38        else:39            self.task_name = None40            self.enabled_at = enabled_at41            self.enabled_by = None42            self.started_at = None43            self.completed_at = None44            self.idle_time = None45    def update_enabling_times(self, enabled_at):46        if self.started_at is None or enabled_at > self.started_at:47            raise Exception("Task ENABLED after STARTED")48        self.enabled_at = enabled_at49        self.waiting_time = (self.started_at - self.enabled_at).total_seconds()50        self.processing_time = (self.completed_at - self.started_at).total_seconds()51class LogEvent:52    def __int__(self, task_id, started_datetime, resource_id):53        self.task_id = task_id54        self.started_datetime = started_datetime55        self.resource_id = resource_id56        self.completed_datetime = None57class Trace:58    def __init__(self, p_case, started_at=datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, pytz.utc)):59        self.p_case = p_case60        self.started_at = started_at61        self.completed_at = started_at62        self.event_list = list()63        self.cycle_time = None64        self.idle_cycle_time = None65        self.processing_time = None66        self.idle_processing_time = None67        self.waiting_time = None68        self.idle_time = None69    def start_event(self, task_id, task_name, started_at, resource_name):70        event_info = TaskEvent(self.p_case, task_id, resource_name)71        event_info.task_name = task_name72        event_info.started_at = started_at73        event_index = len(self.event_list)74        self.event_list.append(event_info)75        self.started_at = min(self.started_at, started_at)76        return event_index77    def complete_event(self, event_index, completed_at, idle_time=0):78        self.event_list[event_index].completed_at = completed_at79        self.event_list[event_index].idle_time = idle_time80        self.completed_at = max(self.completed_at, self.event_list[event_index].completed_at)81        return self.event_list[event_index]82    def sort_by_completion_date(self, completed_at=False):83        if completed_at:84            self.event_list.sort(key=lambda e_info: e_info.completed_at)85        else:86            self.event_list.sort(key=lambda e_info: e_info.started_at)87        self.started_at = self.event_list[0].started_at88        self.completed_at = self.event_list[len(self.event_list) - 1].completed_at89    def filter_incomplete_events(self):90        filtered_list = list()91        filtered_events = 092        for ev_info in self.event_list:93            if ev_info.started_at is not None and ev_info.completed_at is not None:94                filtered_list.append(ev_info)95            else:96                filtered_events += 297        self.event_list = filtered_list98        return filtered_events99class EnabledEvent:100    def __init__(self, p_case, p_state, task_id, enabled_at, enabled_datetime):101        self.p_case = p_case102        self.p_state = p_state103        self.task_id = task_id104        self.enabled_datetime = enabled_datetime105        self.enabled_at = enabled_at106class ProcessInfo:107    def __init__(self):108        self.traces = dict()...test_notification_template_serializers.py
Source:test_notification_template_serializers.py  
1# -*- coding: utf-8 -*-2import pytest3from rest_framework.serializers import ValidationError4# AWX5from awx.api.serializers import NotificationTemplateSerializer6class StubNotificationTemplate():7    notification_type = 'email'8class TestNotificationTemplateSerializer():9    @pytest.mark.parametrize('valid_messages',10                             [None,11                              {'started': None},12                              {'started': {'message': None}},13                              {'started': {'message': 'valid'}},14                              {'started': {'body': 'valid'}},15                              {'started': {'message': 'valid', 'body': 'valid'}},16                              {'started': None, 'success': None, 'error': None},17                              {'started': {'message': None, 'body': None},18                               'success': {'message': None, 'body': None},19                               'error': {'message': None, 'body': None}},20                              {'started': {'message': '{{ job.id }}', 'body': '{{ job.status }}'},21                               'success': {'message': None, 'body': '{{ job_friendly_name }}'},22                               'error': {'message': '{{ url }}', 'body': None}},23                              {'started': {'body': '{{ job_metadata }}'}},24                              {'started': {'body': '{{ job.summary_fields.inventory.total_hosts }}'}},25                              {'started': {'body': u'Iñtërnâtiônàlizætiøn'}}26                              ])27    def test_valid_messages(self, valid_messages):28        serializer = NotificationTemplateSerializer()29        serializer.instance = StubNotificationTemplate()30        serializer.validate_messages(valid_messages)31    @pytest.mark.parametrize('invalid_messages',32                             [1,33                              [],34                              '',35                              {'invalid_event': ''},36                              {'started': 'should_be_dict'},37                              {'started': {'bad_message_type': ''}},38                              {'started': {'message': 1}},39                              {'started': {'message': []}},40                              {'started': {'message': {}}},41                              {'started': {'message': '{{ unclosed_braces'}},42                              {'started': {'message': '{{ undefined }}'}},43                              {'started': {'message': '{{ job.undefined }}'}},44                              {'started': {'message': '{{ job.id | bad_filter }}'}},45                              {'started': {'message': '{{ job.__class__ }}'}},46                              {'started': {'message': 'Newlines \n not allowed\n'}},47                              ])48    def test_invalid__messages(self, invalid_messages):49        serializer = NotificationTemplateSerializer()50        serializer.instance = StubNotificationTemplate()51        with pytest.raises(ValidationError):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
