Best Python code snippet using playwright-python
test_bus2.py
Source:test_bus2.py  
...59    def setUp(self):60        self.subscriber = BasicSubscriber("test")61        self.publisher = BasicPublisher("test")62    def tearDown(self):63        # This, along with code in minibus._cleanup(),  was found via trail 64        # and error using suggestions from http://blackjml.livejournal.com/23029.html65        s1 = self.subscriber._cleanup()66        p1 = self.publisher._cleanup()67        return defer.gatherResults([s1, p1])68    def examin(self):69        self.publisher.fini()70        self.subscriber.fini()71#         print "sent=", self.publisher.counter72#         print "received=", self.subscriber.counter73        self.assertEqual(self.publisher.counter, self.subscriber.counter)74    def test_test(self):75        d = task.deferLater(reactor, 0.5, self.examin)76        return d77class TestMultipleSeperateTopics(unittest.TestCase):78    """ Test two sets of publisher and subscriber pairs talking on different topics.79    There should not be any cross talk.80    """81    def setUp(self):82        self.subscriber1 = BasicSubscriber("test1")83        self.publisher1 = BasicPublisher("test1")84        self.subscriber2 = BasicSubscriber("test2")85        self.publisher2 = BasicPublisher("test2")86    def tearDown(self):87        a = self.subscriber1._cleanup()88        b = self.publisher1._cleanup()89        c = self.subscriber2._cleanup()90        d = self.publisher2._cleanup()91        return defer.gatherResults([a, b, c, d])92    def examin(self):93        self.publisher1.fini()94        self.publisher2.fini()95        self.subscriber1.fini()96        self.subscriber2.fini()97        self.assertEqual(self.publisher1.counter,self.subscriber1.counter)98        self.assertEqual(self.publisher2.counter,self.subscriber2.counter)99    def test_test(self):100        d = task.deferLater(reactor, 0.5, self.examin)101        return d102class TestMultipleSubscribersToTopic(unittest.TestCase):103    """ Tests a single publisher sending to multiple subscribers.104    Each subscriber should hear exactly the same set of messages.105    """106    def setUp(self):107        self.subscriber1 = BasicSubscriber("test")108        self.subscriber2 = BasicSubscriber("test")109        self.subscriber3 = BasicSubscriber("test")110        self.publisher = BasicPublisher("test")111    def tearDown(self):112        a = self.subscriber1._cleanup()113        b = self.subscriber2._cleanup()114        c = self.subscriber3._cleanup()115        d = self.publisher._cleanup()116        return defer.gatherResults([a, b, c, d])117    def examin(self):118        self.publisher.fini()119        self.subscriber1.fini()120        self.subscriber2.fini()121        self.subscriber3.fini()122#         print "sent=", self.publisher.counter123#         print "received=", self.subscriber1.counter124#         print "received=", self.subscriber2.counter125#         print "received=", self.subscriber3.counter126        self.assertEqual(self.publisher.counter,self.subscriber1.counter)127        self.assertEqual(self.publisher.counter,self.subscriber2.counter)128        self.assertEqual(self.publisher.counter,self.subscriber3.counter)129    def test_test(self):130        d = task.deferLater(reactor, 0.5, self.examin)131        return d132class TestMultiplePublishersToTopic(unittest.TestCase):133    """ Tests multiple publishers sending to a single subscriber.134    The subscriber should hear from all the publishers.135    """136    def setUp(self):137        self.subscriber = BasicSubscriber("test")138        self.publisher1 = BasicPublisher("test")139        self.publisher2 = BasicPublisher("test")140        self.publisher3 = BasicPublisher("test")141    def tearDown(self):142        a = self.subscriber._cleanup()143        b = self.publisher1._cleanup()144        c = self.publisher2._cleanup()145        d = self.publisher3._cleanup()146        return defer.gatherResults([a, b, c, d])147    def examin(self):148        self.publisher1.fini()149        self.publisher2.fini()150        self.publisher3.fini()151        self.subscriber.fini()152#         print "sent=", self.publisher1.counter153#         print "sent=", self.publisher2.counter154#         print "sent=", self.publisher3.counter155#         print "received=", self.subscriber.counter156        total_sent = self.publisher1.counter + self.publisher2.counter + self.publisher3.counter157        total_recv = self.subscriber.counter158        self.assertEqual(total_sent,total_recv)159    def test_test(self):160        d = task.deferLater(reactor, 0.5, self.examin)161        return d162class TestMixedTopics(unittest.TestCase):163    """ Test that the regular expressions which define topic names are working164    correctly to route data. """165    def setUp(self):166        self.pub1 = BasicPublisher("foo1")167        self.pub2 = BasicPublisher("foo2")168        self.pub3 = BasicPublisher("bar1")169        self.sub1 = BasicSubscriber(".*1")170        self.sub2 = BasicSubscriber("foo.")171        self.sub3 = BasicSubscriber("foo2")172        self.sub4 = BasicSubscriber("bar")173    def tearDown(self):174        self.pub1._cleanup()175        self.pub2._cleanup()176        self.pub3._cleanup()177        self.sub1._cleanup()178        self.sub2._cleanup()179        self.sub3._cleanup()180        self.sub4._cleanup()181    def test_test(self):182        def deferred_test():183            self.pub1.fini()184            self.pub2.fini()185            self.pub3.fini()186            self.assertEqual(self.pub1.counter + self.pub3.counter, self.sub1.counter)187            self.assertEqual(self.pub1.counter + self.pub2.counter, self.sub2.counter)188            self.assertEqual(self.pub2.counter, self.sub3.counter)189            self.assertEqual(0, self.sub4.counter)190        return task.deferLater(reactor, 0.5, deferred_test)191class TestSchema(unittest.TestCase):192    """ Tests multiple publishers sending to a single subscriber.193    The subscriber should hear from all the publishers.194    """195    def setUp(self):196        self.client = MiniBusTwistedClient()197        self.pub = self.client.publisher("test", {"type": "string"})198        self.client.subscribe("test", {"type": "string"}, self.callback)199        self.count = 0200    def tearDown(self):201        a = self.client._cleanup()202        return defer.gatherResults([a])203    def callback(self, data):204        self.count += 1205    def test_good_schema(self):206        def deferred_good_schema():207            self.pub("this is a test")208        d = task.deferLater(reactor, 0.5, deferred_good_schema)209        return d210    def test_good_schema_callback(self):211        def deferred_good_callback():212            if not self.count == 1:213                raise Exception("bad count")214        def deferred_good_callback_pub():215            self.pub("this is a test")216        d1 = task.deferLater(reactor, 0.5, deferred_good_callback_pub)217        d = task.deferLater(reactor, 1, deferred_good_callback)218        return d219    def test_bad_schema(self):220        def deferred_bad_schema():221            try:222                self.pub({"one": 1, "two": 2})223            except Exception:224                return225            raise Exception("I should not have passed")226        d = task.deferLater(reactor, 0.5, deferred_bad_schema)227        return d228    def test_conflict_schema(self):229        try: 230            self.client.subscribe("test", {"type": "number"}, self.callback)231        except Exception:232            return233        raise Exception("I should not have passed")234class Service1Test(unittest.TestCase):235    def add_two_ints(self, data):236        return sum(data)237    def setUp(self):238        self.srvreq = {'type': 'array' }239        self.srvrply = { 'type': 'integer' }240        self.node1 = MiniBusTwistedClient()241        self.node1.service("add_two_ints", self.srvreq, self.srvrply, self.add_two_ints)242        self.node2 = MiniBusTwistedClient()243    def tearDown(self):244        a = self.node1._cleanup()245        b = self.node2._cleanup()246    @MiniBusTwistedClient.inlineServiceCallbacks247    def test_call(self):248        adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)249        ret = yield task.deferLater(reactor, 0.5, adder, [4,5,6])250        self.assertEqual(ret, 15)251    @MiniBusTwistedClient.inlineServiceCallbacks252    def test_bad_input_call(self):253        adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)254        try:255            ret = yield task.deferLater(reactor, 0.5, adder, 4)256            raise Exception("I should have failed due to not being passed an array")257        except jsonschema.exceptions.ValidationError:258            pass259    @MiniBusTwistedClient.inlineServiceCallbacks260    def test_bad_output_call(self):261        adder = self.node2.service_client("add_two_ints", self.srvreq, {'type': 'string'})262        try:263            ret = yield task.deferLater(reactor, 0.5, adder, [4,5,6])264            raise Exception("I should have failed due to being returned a number instead of a string")265        except jsonschema.exceptions.ValidationError:266            pass267    @MiniBusTwistedClient.inlineServiceCallbacks268    def test_bad_input_data(self):269        adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)270        try:271            ret = yield task.deferLater(reactor, 0.5, adder, [4,'5',6])272            raise Exception("I should have thrown a RemoteServiceException")273        except RemoteServiceException:274            pass275class TestMultiService(unittest.TestCase):276    def add_ints(self, data):277        return sum(data)278    def double_ints(self, data):279        return [x*2 for x in data]280    def print_ints(self, data):281        return " ".join(data)282    def setUp(self):283        self.srvreq = {'type': 'array' }284        self.srvrply = { 'type': 'integer' }285        self.server1 = MiniBusTwistedClient()286        self.server1.service("add_ints", {'type': 'array'}, {'type': 'integer'}, self.add_ints)287        self.server1.service("double_ints", {'type': 'array'}, {'type': 'array'}, self.double_ints)288        self.server2 = MiniBusTwistedClient()289        self.server2.service("print_ints", {'type': 'array'}, {'type': 'string'}, self.print_ints)290        self.client1 = MiniBusTwistedClient()291        self.client2 = MiniBusTwistedClient()292    def tearDown(self):293        a = self.server1._cleanup()294        b = self.server2._cleanup()295        c = self.client1._cleanup()296        d = self.client2._cleanup()297    @MiniBusTwistedClient.inlineServiceCallbacks298    def test_calls(self):299        adder = self.client1.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})300        printer = self.client1.service_client("print_ints", {'type': 'array'}, {'type': 'string'})301        double  = self.client1.service_client("double_ints", {'type': 'array'}, {'type': 'array'})302        ret1 = yield task.deferLater(reactor, 0.5, adder, [4,5,6])303        ret2 = yield task.deferLater(reactor, 0.5, printer, ['a','b','c'])304        ret3 = yield task.deferLater(reactor, 0.5, double, [1, 2, 3])305        self.assertEqual(ret1, 15)306        self.assertEqual(ret2, "a b c")307        self.assertEqual(ret3, [2, 4, 6])308    @MiniBusTwistedClient.inlineServiceCallbacks309    def test_multiclients(self):310        adder = self.client1.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})311        adder2 = self.client2.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})312        ret1 = yield task.deferLater(reactor, 0.5, adder, [1,2,3])313        ret2 = yield task.deferLater(reactor, 0.5, adder2, [4,5,6])314        self.assertEqual(ret1, 6)315        self.assertEqual(ret2, 15)316# This is not working yet317# class TestChainedServices(unittest.TestCase):318#     @MiniBusTwistedClient.inlineServiceCallbacks319#     def service1(self, data):320#         data = data + " stuff"321#         call2val = yield self.call2(data)322#         defer.returnValue(call2val)323# 324#     def service2(self, data):325#         return data + " things"326# 327#     def setUp(self):328#         self.server1 = MiniBusTwistedClient()329#         self.server1.service("service1", {'type':'string'}, {}, self.service1)330#         self.client1 = MiniBusTwistedClient()331#         self.call1 = self.client1.service_client("service1",{'type':'string'}, {})332#         self.server2 = MiniBusTwistedClient()333#         self.server2.service("service2", {'type':'string'}, {'type':'string'}, self.service2)334#         self.client2 = MiniBusTwistedClient()335#         self.call2 = self.client2.service_client("service2",{'type':'string'}, {'type':'string'})336# 337# 338#     def tearDown(self):339#         a = self.server1._cleanup()340#         b = self.server2._cleanup()341#         c = self.client1._cleanup()342#         d = self.client2._cleanup()343# 344#     @MiniBusTwistedClient.inlineServiceCallbacks345#     def test_calls(self):346#         retval = yield task.deferLater(reactor, 0.5, self.call1, "mine")347#         self.assertEqual(retval, "mine stuff things")...test_bus.py
Source:test_bus.py  
...58    def setUp(self):59        self.subscriber = BasicSubscriber("test")60        self.publisher = BasicPublisher("test")61    def tearDown(self):62        # This, along with code in minibus._cleanup(),  was found via trail 63        # and error using suggestions from http://blackjml.livejournal.com/23029.html64        s1 = self.subscriber._cleanup()65        p1 = self.publisher._cleanup()66        return defer.gatherResults([s1, p1])67    def examin(self):68        self.publisher.fini()69        self.subscriber.fini()70#         print "sent=", self.publisher.counter71#         print "received=", self.subscriber.counter72        self.assertEqual(self.publisher.counter, self.subscriber.counter)73    def test_test(self):74        d = task.deferLater(reactor, 0.5, self.examin)75        return d76class TestMultipleSeperateTopics(unittest.TestCase):77    """ Test two sets of publisher and subscriber pairs talking on different topics.78    There should not be any cross talk.79    """80    def setUp(self):81        self.subscriber1 = BasicSubscriber("test1")82        self.publisher1 = BasicPublisher("test1")83        self.subscriber2 = BasicSubscriber("test2")84        self.publisher2 = BasicPublisher("test2")85    def tearDown(self):86        a = self.subscriber1._cleanup()87        b = self.publisher1._cleanup()88        c = self.subscriber2._cleanup()89        d = self.publisher2._cleanup()90        return defer.gatherResults([a, b, c, d])91    def examin(self):92        self.publisher1.fini()93        self.publisher2.fini()94        self.subscriber1.fini()95        self.subscriber2.fini()96        self.assertEqual(self.publisher1.counter,self.subscriber1.counter)97        self.assertEqual(self.publisher2.counter,self.subscriber2.counter)98    def test_test(self):99        d = task.deferLater(reactor, 0.5, self.examin)100        return d101class TestMultipleSubscribersToTopic(unittest.TestCase):102    """ Tests a single publisher sending to multiple subscribers.103    Each subscriber should hear exactly the same set of messages.104    """105    def setUp(self):106        self.subscriber1 = BasicSubscriber("test")107        self.subscriber2 = BasicSubscriber("test")108        self.subscriber3 = BasicSubscriber("test")109        self.publisher = BasicPublisher("test")110    def tearDown(self):111        a = self.subscriber1._cleanup()112        b = self.subscriber2._cleanup()113        c = self.subscriber3._cleanup()114        d = self.publisher._cleanup()115        return defer.gatherResults([a, b, c, d])116    def examin(self):117        self.publisher.fini()118        self.subscriber1.fini()119        self.subscriber2.fini()120        self.subscriber3.fini()121#         print "sent=", self.publisher.counter122#         print "received=", self.subscriber1.counter123#         print "received=", self.subscriber2.counter124#         print "received=", self.subscriber3.counter125        self.assertEqual(self.publisher.counter,self.subscriber1.counter)126        self.assertEqual(self.publisher.counter,self.subscriber2.counter)127        self.assertEqual(self.publisher.counter,self.subscriber3.counter)128    def test_test(self):129        d = task.deferLater(reactor, 0.5, self.examin)130        return d131class TestMultiplePublishersToTopic(unittest.TestCase):132    """ Tests multiple publishers sending to a single subscriber.133    The subscriber should hear from all the publishers.134    """135    def setUp(self):136        self.subscriber = BasicSubscriber("test")137        self.publisher1 = BasicPublisher("test")138        self.publisher2 = BasicPublisher("test")139        self.publisher3 = BasicPublisher("test")140    def tearDown(self):141        a = self.subscriber._cleanup()142        b = self.publisher1._cleanup()143        c = self.publisher2._cleanup()144        d = self.publisher3._cleanup()145        return defer.gatherResults([a, b, c, d])146    def examin(self):147        self.publisher1.fini()148        self.publisher2.fini()149        self.publisher3.fini()150        self.subscriber.fini()151#         print "sent=", self.publisher1.counter152#         print "sent=", self.publisher2.counter153#         print "sent=", self.publisher3.counter154#         print "received=", self.subscriber.counter155        total_sent = self.publisher1.counter + self.publisher2.counter + self.publisher3.counter156        total_recv = self.subscriber.counter157        self.assertEqual(total_sent,total_recv)158    def test_test(self):159        d = task.deferLater(reactor, 0.5, self.examin)160        return d161class TestMixedTopics(unittest.TestCase):162    """ Test that the regular expressions which define topic names are working163    correctly to route data. """164    def setUp(self):165        self.pub1 = BasicPublisher("foo1")166        self.pub2 = BasicPublisher("foo2")167        self.pub3 = BasicPublisher("bar1")168        self.sub1 = BasicSubscriber(".*1")169        self.sub2 = BasicSubscriber("foo.")170        self.sub3 = BasicSubscriber("foo2")171        self.sub4 = BasicSubscriber("bar")172    def tearDown(self):173        self.pub1._cleanup()174        self.pub2._cleanup()175        self.pub3._cleanup()176        self.sub1._cleanup()177        self.sub2._cleanup()178        self.sub3._cleanup()179        self.sub4._cleanup()180    def test_test(self):181        def deferred_test():182            self.pub1.fini()183            self.pub2.fini()184            self.pub3.fini()185            self.assertEqual(self.pub1.counter + self.pub3.counter, self.sub1.counter)186            self.assertEqual(self.pub1.counter + self.pub2.counter, self.sub2.counter)187            self.assertEqual(self.pub2.counter, self.sub3.counter)188            self.assertEqual(0, self.sub4.counter)189        return task.deferLater(reactor, 0.5, deferred_test)190class TestSchema(unittest.TestCase):191    """ Tests multiple publishers sending to a single subscriber.192    The subscriber should hear from all the publishers.193    """194    def setUp(self):195        self.client = MiniBusClient()196        self.pub = self.client.publisher("test", {"type": "string"})197        self.client.subscribe("test", {"type": "string"}, self.callback)198        self.count = 0199    def tearDown(self):200        a = self.client._cleanup()201        return defer.gatherResults([a])202    def callback(self, data):203        self.count += 1204    def test_good_schema(self):205        def deferred_good_schema():206            self.pub("this is a test")207        d = task.deferLater(reactor, 0.5, deferred_good_schema)208        return d209    def test_good_schema_callback(self):210        def deferred_good_callback():211            if not self.count == 1:212                raise Exception("bad count")213        def deferred_good_callback_pub():214            self.pub("this is a test")215        d1 = task.deferLater(reactor, 0.5, deferred_good_callback_pub)216        d = task.deferLater(reactor, 1, deferred_good_callback)217        return d218    def test_bad_schema(self):219        def deferred_bad_schema():220            try:221                self.pub({"one": 1, "two": 2})222            except Exception:223                return224            raise Exception("I should not have passed")225        d = task.deferLater(reactor, 0.5, deferred_bad_schema)226        return d227    def test_conflict_schema(self):228        try: 229            self.client.subscribe("test", {"type": "number"}, self.callback)230        except Exception:231            return232        raise Exception("I should not have passed")233class Service1Test(unittest.TestCase):234    def add_two_ints(self, data):235        return sum(data)236    def setUp(self):237        self.srvreq = {'type': 'array' }238        self.srvrply = { 'type': 'integer' }239        self.node1 = MiniBusClient()240        self.node1.service("add_two_ints", self.srvreq, self.srvrply, self.add_two_ints)241        self.node2 = MiniBusClient()242    def tearDown(self):243        a = self.node1._cleanup()244        b = self.node2._cleanup()245    @MiniBusClient.inlineServiceCallbacks246    def test_call(self):247        adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)248        ret = yield task.deferLater(reactor, 0.5, adder, [4,5,6])249        self.assertEqual(ret, 15)250    @MiniBusClient.inlineServiceCallbacks251    def test_bad_input_call(self):252        adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)253        try:254            ret = yield task.deferLater(reactor, 0.5, adder, 4)255            raise Exception("I should have failed due to not being passed an array")256        except jsonschema.exceptions.ValidationError:257            pass258    @MiniBusClient.inlineServiceCallbacks259    def test_bad_output_call(self):260        adder = self.node2.service_client("add_two_ints", self.srvreq, {'type': 'string'})261        try:262            ret = yield task.deferLater(reactor, 0.5, adder, [4,5,6])263            raise Exception("I should have failed due to being returned a number instead of a string")264        except jsonschema.exceptions.ValidationError:265            pass266    @MiniBusClient.inlineServiceCallbacks267    def test_bad_input_data(self):268        adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)269        try:270            ret = yield task.deferLater(reactor, 0.5, adder, [4,'5',6])271            raise Exception("I should have thrown a RemoteServiceException")272        except RemoteServiceException:273            pass274class TestMultiService(unittest.TestCase):275    def add_ints(self, data):276        return sum(data)277    def double_ints(self, data):278        return [x*2 for x in data]279    def print_ints(self, data):280        return " ".join(data)281    def setUp(self):282        self.srvreq = {'type': 'array' }283        self.srvrply = { 'type': 'integer' }284        self.server1 = MiniBusClient()285        self.server1.service("add_ints", {'type': 'array'}, {'type': 'integer'}, self.add_ints)286        self.server1.service("double_ints", {'type': 'array'}, {'type': 'array'}, self.double_ints)287        self.server2 = MiniBusClient()288        self.server2.service("print_ints", {'type': 'array'}, {'type': 'string'}, self.print_ints)289        self.client1 = MiniBusClient()290        self.client2 = MiniBusClient()291    def tearDown(self):292        a = self.server1._cleanup()293        b = self.server2._cleanup()294        c = self.client1._cleanup()295        d = self.client2._cleanup()296    @MiniBusClient.inlineServiceCallbacks297    def test_calls(self):298        adder = self.client1.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})299        printer = self.client1.service_client("print_ints", {'type': 'array'}, {'type': 'string'})300        double  = self.client1.service_client("double_ints", {'type': 'array'}, {'type': 'array'})301        ret1 = yield task.deferLater(reactor, 0.5, adder, [4,5,6])302        ret2 = yield task.deferLater(reactor, 0.5, printer, ['a','b','c'])303        ret3 = yield task.deferLater(reactor, 0.5, double, [1, 2, 3])304        self.assertEqual(ret1, 15)305        self.assertEqual(ret2, "a b c")306        self.assertEqual(ret3, [2, 4, 6])307    @MiniBusClient.inlineServiceCallbacks308    def test_multiclients(self):309        adder = self.client1.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})310        adder2 = self.client2.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})311        ret1 = yield task.deferLater(reactor, 0.5, adder, [1,2,3])312        ret2 = yield task.deferLater(reactor, 0.5, adder2, [4,5,6])313        self.assertEqual(ret1, 6)314        self.assertEqual(ret2, 15)315# This is not working yet316# class TestChainedServices(unittest.TestCase):317#     @MiniBusClient.inlineServiceCallbacks318#     def service1(self, data):319#         data = data + " stuff"320#         call2val = yield self.call2(data)321#         defer.returnValue(call2val)322# 323#     def service2(self, data):324#         return data + " things"325# 326#     def setUp(self):327#         self.server1 = MiniBusClient()328#         self.server1.service("service1", {'type':'string'}, {}, self.service1)329#         self.client1 = MiniBusClient()330#         self.call1 = self.client1.service_client("service1",{'type':'string'}, {})331#         self.server2 = MiniBusClient()332#         self.server2.service("service2", {'type':'string'}, {'type':'string'}, self.service2)333#         self.client2 = MiniBusClient()334#         self.call2 = self.client2.service_client("service2",{'type':'string'}, {'type':'string'})335# 336# 337#     def tearDown(self):338#         a = self.server1._cleanup()339#         b = self.server2._cleanup()340#         c = self.client1._cleanup()341#         d = self.client2._cleanup()342# 343#     @MiniBusClient.inlineServiceCallbacks344#     def test_calls(self):345#         retval = yield task.deferLater(reactor, 0.5, self.call1, "mine")346#         self.assertEqual(retval, "mine stuff things")...test_fops.py
Source:test_fops.py  
...25from unittest import mock26from pathlib import Path27from eljef.core import fops28logging.disable(logging.ERROR)29def _cleanup(*args) -> None:30    for p in args:31        if os.path.exists(p):32            fops.delete(p)33def _get_empty_file(suffix: str) -> str:34    path = os.path.join(tempfile.gettempdir(), "testFile.{0!s}".format(suffix))35    Path(path).touch()36    return path37def _get_file(data: str, extension: str) -> str:38    fd, path = tempfile.mkstemp(extension, None, tempfile.gettempdir(), True)39    os.write(fd, data.encode('UTF-8'))40    os.close(fd)41    return path42# noinspection PyBroadException43class TestBackupPath(unittest.TestCase):44    def test_backup_path(self):45        path = _get_empty_file('tmp')46        expected = "{0!s}.bak".format(path)47        fops.backup_path(path)48        try:49            self.assertTrue(os.path.isfile(expected))50            _cleanup(expected, path)51        except Exception:52            _cleanup(expected, path)53            raise54    def test_backup_path_multiple_files(self):55        path = _get_empty_file('tmp')56        path2 = _get_empty_file("tmp.bak")57        path3 = _get_empty_file("tmp.bak.1")58        expected = "{0!s}.bak.2".format(path)59        fops.backup_path(path)60        try:61            self.assertTrue(os.path.isfile(expected))62            _cleanup(expected, path, path2, path3)63        except Exception:64            _cleanup(expected, path, path2, path3)65            raise66# noinspection PyBroadException67class TestDelete(unittest.TestCase):68    def test_delete_exception(self):69        path = _get_empty_file('tmp')70        try:71            with mock.patch('os.remove', new=mock.Mock(side_effect=OSError(5, 'test'))):72                self.assertRaises(OSError, fops.delete, path)73        except Exception:74            _cleanup(path)75            raise76        _cleanup(path)77    def test_delete_path_does_not_exist(self):78        raised = False79        path = os.path.join(tempfile.gettempdir(), "hopefully_this_file_does_not_exist.tmp")80        try:81            fops.delete(path)82        except Exception:83            raised = True84        self.assertFalse(raised)85    def test_delete_symlink_only(self):86        path = _get_empty_file('tmp')87        link = "{0!s}.lnk".format(path)88        os.symlink(path, link)89        fops.delete(link)90        try:91            self.assertFalse(os.path.exists(link))92            _cleanup(link, path)93        except Exception:94            _cleanup(link, path)95            raise96    def test_delete_symlink_backup(self):97        path = _get_empty_file('tmp')98        link = "{0!s}.lnk".format(path)99        os.symlink(path, link)100        fops.delete(link, follow=True, backup=True)101        try:102            self.assertFalse(os.path.exists(link))103            self.assertTrue(os.path.isfile(path))104            _cleanup(link, path)105        except Exception:106            _cleanup(link, path)107            raise108    def test_delete_symlink_and_parent(self):109        path = _get_empty_file('tmp')110        link = "{0!s}.lnk".format(path)111        os.symlink(path, link)112        fops.delete(link, follow=True)113        try:114            self.assertFalse(os.path.exists(link))115            self.assertFalse(os.path.exists(path))116            _cleanup(link, path)117        except Exception:118            _cleanup(link, path)119            raise120    def test_delete_backup(self):121        path = _get_empty_file('tmp')122        expected = "{0!s}.bak".format(path)123        fops.delete(path, backup=True)124        try:125            self.assertFalse(os.path.exists(path))126            self.assertTrue(os.path.exists(expected))127            _cleanup(path, expected)128        except Exception:129            _cleanup(path, expected)130            raise131    def test_delete_directory(self):132        path = os.path.join(tempfile.gettempdir(), "testDir")133        os.mkdir(path)134        fops.delete(path)135        try:136            self.assertFalse(os.path.exists(path))137        except Exception:138            _cleanup(path)139            raise140    def test_delete_file(self):141        path = _get_empty_file('tmp')142        fops.delete(path)143        try:144            self.assertFalse(os.path.exists(path))145            _cleanup(path)146        except Exception:147            _cleanup(path)148            raise149class TestFileRead(unittest.TestCase):150    def test_file_read_no_strip(self):151        data = """test file data152        test file new line153        """154        path = _get_file(data, ".tmp")155        got = fops.file_read(path)156        os.remove(path)157        self.assertEqual(data, got)158    def test_file_read_strip(self):159        data = """test file data160        test file new line161        """162        path = _get_file(data, ".tmp")163        got = fops.file_read(path, True)164        os.remove(path)165        self.assertEqual(data.strip(), got)166class TestFileReadConvert(unittest.TestCase):167    def test_file_read_convert_unknown_type(self):168        self.assertRaises(ValueError, fops.file_read_convert, 'no_path', 'unknown_type')169    def test_file_read_convert_file_does_not_exist(self):170        self.assertRaises(FileNotFoundError, fops.file_read_convert, 'should_not_exist_hopefully_maybe', fops.JSON)171    def test_file_read_convert_not_file(self):172        self.assertRaises(IOError, fops.file_read_convert, tempfile.gettempdir(), fops.JSON)173    def test_file_read_convert_file_does_not_exist_get_default(self):174        want = dict()175        got = fops.file_read_convert("should_not_exist_hopefully_maybe", fops.JSON, default=True)176        self.assertDictEqual(got, want)177    def test_file_read_convert_json(self):178        data = """{179            "test": "test"180        }181        """182        want = {183            'test': 'test'184        }185        path = _get_file(data, ".json")186        got = fops.file_read_convert(path, fops.JSON)187        os.remove(path)188        self.assertDictEqual(got, want)189    def test_file_read_convert_kv(self):190        data = """test=test191        """192        want = {193            'test': 'test'194        }195        path = _get_file(data, ".tmp")196        got = fops.file_read_convert(path, fops.KV)197        os.remove(path)198        self.assertDictEqual(got, want)199    def test_file_read_convert_xml(self):200        data = """<?xml version="1.0"?>201            <test>test</test>202        """203        want = {204            'test': 'test'205        }206        path = _get_file(data, ".xml")207        got = fops.file_read_convert(path, fops.XML)208        os.remove(path)209        self.assertDictEqual(got, want)210    def test_file_read_convert_yaml(self):211        data = '''test: "test"'''212        want = {213            'test': 'test'214        }215        path = _get_file(data, ".yml")216        got = fops.file_read_convert(path, fops.YAML)217        os.remove(path)218        self.assertDictEqual(got, want)219# noinspection PyBroadException220class TestFileWrite(unittest.TestCase):221    def test_file_write_file_does_not_exist(self):222        data = """test data"""223        path = os.path.join(tempfile.gettempdir(), "testFile.tmp")224        fops.file_write(path, data, newline='\n')225        try:226            self.assertTrue(os.path.isfile(path))227        except Exception:228            _cleanup(path)229            raise230        got = fops.file_read(path, strip=True)231        _cleanup(path)232        self.assertEqual(data, got)233    def test_file_write_file_exists(self):234        data = """test data"""235        path = _get_file("testing data", ".tmp")236        fops.file_write(path, data, newline='\n')237        got = fops.file_read(path, strip=True)238        _cleanup(path)239        self.assertEqual(data, got)240    def test_file_write_with_backup(self):241        data = """test data"""242        path = _get_file("testing data", ".tmp")243        expected = "{0!s}.bak".format(path)244        fops.file_write(path, data, backup=True, newline='\n')245        try:246            self.assertTrue(os.path.isfile(expected))247        except Exception:248            _cleanup(path, expected)249            raise250        got = fops.file_read(path, strip=True)251        _cleanup(path, expected)252        self.assertEqual(data, got)253class TestFileWriteCovertDefaults(unittest.TestCase):254    def test_file_write_convert_defaults_unknown_type(self):255        self.assertRaises(ValueError, fops.file_write_convert_defaults, 'unknown')256    def test_file_write_convert_defaults(self):257        tests = {258            fops.JSON: {'indent': 4},259            fops.KV: {'spaced': False},260            fops.XML: {'pretty': True, 'full_document': True, 'indent': '    '},261            fops.YAML: {'default_flow_style': False}262        }263        for key, value in tests.items():264            got = fops.file_write_convert_defaults(key)265            self.assertDictEqual(value, got, msg=key)266# noinspection PyBroadException267class TestFileWriteConvert(unittest.TestCase):268    def test_file_write_convert(self):269        data = {"test": "test"}270        tests = {271            fops.JSON: '{\n    "test": "test"\n}',272            fops.KV: 'test=test',273            fops.XML: '<?xml version="1.0" encoding="utf-8"?>\n<test>test</test>',274            fops.YAML: 'test: test'275        }276        for key, value in tests.items():277            path = os.path.join(tempfile.gettempdir(), "tempFile.{0!s}".format(key))278            args = fops.file_write_convert_defaults(key)279            fops.file_write_convert(path, key, data, dumper_args=args)280            try:281                self.assertTrue(os.path.isfile(path))282            except Exception:283                _cleanup(path)284                raise285            got = fops.file_read(path, strip=True)286            _cleanup(path)287            self.assertEqual(value, got, msg=key)288class TestListDirsByExtension(unittest.TestCase):289    def test_list_dirs_by_extension(self):290        path = tempfile.mkdtemp(dir=tempfile.gettempdir())291        path1 = tempfile.mkdtemp(dir=path)292        path2 = tempfile.mkdtemp(dir=path)293        path3 = tempfile.mkdtemp(dir=path)294        path4 = tempfile.mkdtemp(dir=path)295        path5 = tempfile.mkdtemp(dir=path)296        data = {os.path.basename(path1), os.path.basename(path2), os.path.basename(path5)}297        Path(os.path.join(path1, 'test.txt')).touch()298        Path(os.path.join(path2, 'test.txt')).touch()299        Path(os.path.join(path3, 'test.json')).touch()300        Path(os.path.join(path4, 'test.not_txt')).touch()301        Path(os.path.join(path5, 'test.txt')).touch()302        got = fops.list_dirs_by_extension(path, 'txt')303        _cleanup(path)304        self.assertCountEqual(data, got)305        self.assertSetEqual(data, got)306class TestListFilesByExtension(unittest.TestCase):307    def test_list_files_by_extension(self):308        data = ['test1.txt', 'test2.txt']309        path = tempfile.mkdtemp(dir=tempfile.gettempdir())310        Path(os.path.join(path, 'test1.txt')).touch()311        Path(os.path.join(path, 'test2.txt')).touch()312        Path(os.path.join(path, 'test3.txt.nope')).touch()313        got = fops.list_files_by_extension(path, 'txt')314        _cleanup(path)315        self.assertCountEqual(data, got)316        self.assertListEqual(sorted(data), sorted(got))317class TestPushd(unittest.TestCase):318    def test_pushd_directory_does_not_exist(self):319        def child():320            with fops.pushd('something_that_should_not_exist'):321                print('')322        self.assertRaises(FileNotFoundError, child)323    def test_pushd(self):324        data = 'pushd test data'325        path = os.path.join(tempfile.gettempdir(), 'testFile.tmp')326        with fops.pushd(tempfile.gettempdir()):327            fops.file_write('testFile.tmp', data)328        got = fops.file_read(path, True)329        _cleanup(path)330        self.assertEqual(data, got)331# noinspection PyBroadException332class TestRequiredExecutables(unittest.TestCase):333    def test_required_executables_exist(self):334        try:335            fops.required_executables(['python'])336            test_var = True337        except Exception:338            test_var = False339        self.assertTrue(test_var)340    def test_required_executables_not_exist(self):341        def child():342            fops.required_executables(['this_definitely_should_not_exist_at_all'])343        self.assertRaises(SystemExit, child)...nmea.py
Source:nmea.py  
1############################################################################2#                                                                          #3# Copyright (c)2008, 2009, Digi International (Digi). All Rights Reserved. #4#                                                                          #5# Permission to use, copy, modify, and distribute this software and its    #6# documentation, without fee and without a signed licensing agreement, is  #7# hereby granted, provided that the software is used on Digi products only #8# and that the software contain this copyright notice,  and the following  #9# two paragraphs appear in all copies, modifications, and distributions as #10# well. Contact Product Management, Digi International, Inc., 11001 Bren   #11# Road East, Minnetonka, MN, +1 952-912-3444, for commercial licensing     #12# opportunities for non-Digi products.                                     #13#                                                                          #14# DIGI SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED   #15# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A          #16# PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION, IF ANY, #17# PROVIDED HEREUNDER IS PROVIDED "AS IS" AND WITHOUT WARRANTY OF ANY KIND. #18# DIGI HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,         #19# ENHANCEMENTS, OR MODIFICATIONS.                                          #20#                                                                          #21# IN NO EVENT SHALL DIGI BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,      #22# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS,   #23# ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF   #24# DIGI HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.                #25#                                                                          #26############################################################################27"""Parse NMEA data to extract useful GPS information28"""29# Module to accept an NMEA stream and provide asynchronous access to30# the contained data content.31import re as _re32# NMEA sentence, begins with a '$', ends with EOL, may have an33# optional checksum sequence of a '*' and two hex digits at the end.34# Group 1 is sentence proper, Group 2 is the optional checksum.35_sentence_def = r"\$(.*?)(?:\*([0-9a-fA-F]{2}))?[\n\r]+"36_sentence_re = _re.compile(_sentence_def, _re.MULTILINE)37_sentence_templates = {38    # GPS fix data39    "GGA": ["fix_time", "latitude_magnitude", "latitude_hemisphere",40            "longitude_magnitude", "longitude_hemisphere",41            "fix_quality", "num_satellites", "hdop",42            "altitude", "altitude_units",43            "geoid_height", "geoid_units", "DGPS_time", "DGPS_station_id"],44    # Latitude and Longitude45    "GLL": ["latitude_magnitude", "latitude_hemisphere",46            "longitude_magnitude", "longitude_hemisphere",47            "fix_time", "fix_good"],48    # Recommended minimum GPS info49    "RMC": ["fix_time", "fix_good",50            "latitude_magnitude", "latitude_hemisphere",51            "longitude_magnitude", "longitude_hemisphere",52            "speed_over_ground", "course_over_ground", "fix_date",53            "magnetic_variation", "variation_direction"],54    }55_position_items = set(['latitude_magnitude',56                       'longitude_magnitude',57                       'latitude_hemisphere',58                       'longitude_hemisphere'])59# Report unit information for items as appropriate60units = {"fix_time": "UTC",61         "latitude_magnitude": "degrees",62         "longitude_magnitude": "degrees",63         "speed_over_ground": "knots",64         "course_over_ground": "degrees",65         "altitude": "meters",66         "num_satellites": "satellites",67         }68# Convert string values to cleaner abstracted and typed values for consumption69_cleanup = dict()70_cleanup['latitude_magnitude'] = lambda val: int(val[:2]) + float(val[2:])/6071_cleanup['longitude_magnitude'] = lambda val: int(val[:3]) + float(val[3:])/6072_cleanup['fix_time'] = lambda val: "%s:%s:%s" % (73    val[0:2],val[2:4],val[4:6])74_cleanup['fix_date'] = lambda val: "%s/%s/%s" % (75    val[2:4],val[0:2],val[4:6])76_cleanup['speed_over_ground'] = lambda val: float(val)77_cleanup['course_over_ground'] = lambda val: float(val)78_cleanup['num_satellites'] = lambda val: int(val)79_cleanup['altitude'] = lambda val: float(val)80_cleanup['hdop'] = lambda val: float(val)81_cleanup['magnetic_variation'] = lambda val: float(val)82_cleanup['fix_good'] = lambda val: val == 'A'83class NMEA:84    """NMEA 0183 data stream parsing object85    86    feed(string) - Parse stream of NMEA protocol data87    position() -> (latitude, longitude)88    Once parsing has begun, raw NMEA data elements can be retrieved89    from the object using direct attribute access.90    """91    def __init__(self):92        self._position = (51.4772, 0) # Greenwich royal observatory93        self._working_sentence = ""94# _valid is passed a MatchObject from feed() that represents the95# sentence.  If the sentence contains a group(2), we will calculate96# and compare the check sequence and return appropriate truth values.97    def _valid(self, sentence):98        if not sentence.group(2): #have to believe it valid99            return True100        # Calculate check sequence101        #checkcalc = reduce(lambda x, y: x^ord(y), sentence.group(1), 0)102        checkcalc = 0103        s = sentence.group(1)104        for i in xrange(len(s)):105            checkcalc = checkcalc ^ ord(s[i])106            107        check = int(sentence.group(2), 16)108        if check != checkcalc:109            return False110        else:111            return True112    def set_position(self):113        try:114            lat = self.latitude_magnitude115            lon = self.longitude_magnitude116            if self.latitude_hemisphere == 'S': lat = -lat117            if self.longitude_hemisphere == 'W': lon = -lon118            self.latitude_degrees = lat119            self.longitude_degrees = lon120            self._position = (lat, lon)121        finally:122            return123    def position(self):124        """position() -> (latitude, longitude)125        Return the position tuple with coordinates represented in126        numeric decimal format.127        """128        self.set_position()129        return self._position130# Given a sentence w/ possible check sequence and header/footer data131# removed, process for interesting information132    def _extract(self, sentence, report):133        update_position = False134        135        sentence = sentence.split(",")136        try:137            template = _sentence_templates[sentence[0][2:5]]138        except KeyError:139            return # Don't understand this sentence140        141        for i in range(len(template)):142            # Sometimes a field is not populated.143            # If it is not, we need to just simply skip over it.144            if sentence[i + 1] == "":145                continue146            if template[i] in _cleanup:147                sentence[i+1] = _cleanup[template[i]](sentence[i+1])148            self.__dict__[template[i]] = sentence[i + 1]149            if report:150                report(template[i], sentence[i+1])151            if template[i] in _position_items:152                update_position = True153        return update_position154    def feed(self, stream, report=None):155        """feed(string) - Parse NMEA 0183 data stream156        As data from your NMEA source is received, provide it to the157        object with this routine.  This function updates the state of158        the object with extracted position information.159        """160        self._working_sentence += stream161        sentence = _sentence_re.search(self._working_sentence)162        end = 0163        while sentence:164            if self._valid(sentence):165                update_position = self._extract(sentence.group(1), report)166                if update_position:167                    self.set_position()168                    if report:169                        report('latitude_degrees', self.latitude_degrees)170                        report('longitude_degrees', self.longitude_degrees)171                172            end = sentence.end()173            sentence = _sentence_re.search(self._working_sentence,174                                           sentence.end())175        self._working_sentence = self._working_sentence[end:]176if __name__ == "__main__":177    def print_args(*args):178        print args179        try:180            print units[args[0]]181        except:182            pass183    def dummy(*args):184        pass185    for i in xrange(10000):186        gps = NMEA()187        #print "RMC"188        gps.feed(189            "$GPRMC,225446,A,4916.45,N,12311.12,W,000.5,054.7,191194,020.3,E*68\r",190            dummy)191        #print gps.position()192        #print '-'*70193        #print "GGA"194        gps.feed(195            "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47\r",196            dummy)197        #print gps.position()198        #print '-'*70199        #print "GLL"200        gps.feed("$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r",201                 dummy)202        #print gps.position()203        #print '-'*70204    from pprint import pprint...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
