Best Python code snippet using molotov_python
test_server.py
Source:test_server.py  
...57    async def aclose(self):58        self.closed = True59async def mock_wait_for(coro, timeout):60    await coro61def run_coro(coro):62    # Mock wait_for() function with simple dummy63    asyncio.wait_for = (lambda c, t: await c)64    """Simple helper to run coroutine"""65    for i in coro:66        pass67# Tests68class Utils(unittest.TestCase):69    def testUrldecode(self):70        runs = [('abc%20def', 'abc def'),71                ('abc%%20def', 'abc% def'),72                ('%%%', '%%%'),73                ('%20%20', '  '),74                ('abc', 'abc'),75                ('a%25%25%25c', 'a%%%c'),76                ('a++b', 'a  b'),77                ('+%25+', ' % '),78                ('+%2B+', ' + '),79                ('%20+%2B+%41', '  + A'),80                ]81        for r in runs:82            self.assertEqual(urldecode_plus(r[0]), r[1])83    def testParseQueryString(self):84        runs = [('k1=v2', {'k1': 'v2'}),85                ('k1=v2&k11=v11', {'k1': 'v2',86                                   'k11': 'v11'}),87                ('k1=v2&k11=', {'k1': 'v2',88                                'k11': ''}),89                ('k1=+%20', {'k1': '  '}),90                ('%6b1=+%20', {'k1': '  '}),91                ('k1=%3d1', {'k1': '=1'}),92                ('11=22%26&%3d=%3d', {'11': '22&',93                                      '=': '='}),94                ]95        for r in runs:96            self.assertEqual(parse_query_string(r[0]), r[1])97class ServerParts(unittest.TestCase):98    def testRequestLine(self):99        runs = [('GETT / HTTP/1.1', 'GETT', '/'),100                ('TTEG\t/blah\tHTTP/1.1', 'TTEG', '/blah'),101                ('POST /qq/?q=q HTTP', 'POST', '/qq/', 'q=q'),102                ('POST /?q=q BSHT', 'POST', '/', 'q=q'),103                ('POST /?q=q&a=a JUNK', 'POST', '/', 'q=q&a=a')]104        for r in runs:105            try:106                req = request(mockReader(r[0]))107                run_coro(req.read_request_line())108                self.assertEqual(r[1].encode(), req.method)109                self.assertEqual(r[2].encode(), req.path)110                if len(r) > 3:111                    self.assertEqual(r[3].encode(), req.query_string)112            except Exception:113                self.fail('exception on payload --{}--'.format(r[0]))114    def testRequestLineEmptyLinesBefore(self):115        req = request(mockReader(['\n', '\r\n', 'GET /?a=a HTTP/1.1']))116        run_coro(req.read_request_line())117        self.assertEqual(b'GET', req.method)118        self.assertEqual(b'/', req.path)119        self.assertEqual(b'a=a', req.query_string)120    def testRequestLineNegative(self):121        runs = ['',122                '\t\t',123                '  ',124                ' / HTTP/1.1',125                'GET',126                'GET /',127                'GET / '128                ]129        for r in runs:130            with self.assertRaises(HTTPException):131                req = request(mockReader(r))132                run_coro(req.read_request_line())133    def testHeadersSimple(self):134        req = request(mockReader([HDR('Host: google.com'),135                                  HDRE]))136        run_coro(req.read_headers([b'Host']))137        self.assertEqual(req.headers, {b'Host': b'google.com'})138    def testHeadersSpaces(self):139        req = request(mockReader([HDR('Host:    \t    google.com   \t     '),140                                  HDRE]))141        run_coro(req.read_headers([b'Host']))142        self.assertEqual(req.headers, {b'Host': b'google.com'})143    def testHeadersEmptyValue(self):144        req = request(mockReader([HDR('Host:'),145                                  HDRE]))146        run_coro(req.read_headers([b'Host']))147        self.assertEqual(req.headers, {b'Host': b''})148    def testHeadersMultiple(self):149        req = request(mockReader([HDR('Host: google.com'),150                                  HDR('Junk: you    blah'),151                                  HDR('Content-type:      file'),152                                  HDRE]))153        hdrs = {b'Host': b'google.com',154                b'Junk': b'you    blah',155                b'Content-type': b'file'}156        run_coro(req.read_headers([b'Host', b'Junk', b'Content-type']))157        self.assertEqual(req.headers, hdrs)158    def testUrlFinderExplicit(self):159        urls = [('/', 1),160                ('/%20', 2),161                ('/a/b', 3),162                ('/aac', 5)]163        junk = ['//', '', '/a', '/aa', '/a/fhhfhfhfhfhf']164        # Create server, add routes165        srv = webserver()166        for u in urls:167            srv.add_route(u[0], u[1])168        # Search them all169        for u in urls:170            # Create mock request object with "pre-parsed" url path171            rq = request(mockReader([]))172            rq.path = u[0].encode()173            f, args = srv._find_url_handler(rq)174            self.assertEqual(u[1], f)175        # Some simple negative cases176        for j in junk:177            rq = request(mockReader([]))178            rq.path = j.encode()179            f, args = srv._find_url_handler(rq)180            self.assertIsNone(f)181            self.assertIsNone(args)182    def testUrlFinderParameterized(self):183        srv = webserver()184        # Add few routes185        srv.add_route('/', 0)186        srv.add_route('/<user_name>', 1)187        srv.add_route('/a/<id>', 2)188        # Check first url (non param)189        rq = request(mockReader([]))190        rq.path = b'/'191        f, args = srv._find_url_handler(rq)192        self.assertEqual(f, 0)193        # Check second url194        rq.path = b'/user1'195        f, args = srv._find_url_handler(rq)196        self.assertEqual(f, 1)197        self.assertEqual(args['_param_name'], 'user_name')198        self.assertEqual(rq._param, 'user1')199        # Check third url200        rq.path = b'/a/123456'201        f, args = srv._find_url_handler(rq)202        self.assertEqual(f, 2)203        self.assertEqual(args['_param_name'], 'id')204        self.assertEqual(rq._param, '123456')205        # When param is empty and there is no non param endpoint206        rq.path = b'/a/'207        f, args = srv._find_url_handler(rq)208        self.assertEqual(f, 2)209        self.assertEqual(rq._param, '')210    def testUrlFinderNegative(self):211        srv = webserver()212        # empty URL is not allowed213        with self.assertRaises(ValueError):214            srv.add_route('', 1)215        # Query string is not allowed216        with self.assertRaises(ValueError):217            srv.add_route('/?a=a', 1)218        # Duplicate urls219        srv.add_route('/duppp', 1)220        with self.assertRaises(ValueError):221            srv.add_route('/duppp', 1)222# We want to test decorators as well223server_for_decorators = webserver()224@server_for_decorators.route('/uid/<user_id>')225@server_for_decorators.route('/uid2/<user_id>')226async def route_for_decorator(req, resp, user_id):227    await resp.start_html()228    await resp.send('YO, {}'.format(user_id))229@server_for_decorators.resource('/rest1/<user_id>')230def resource_for_decorator1(data, user_id):231    return {'name': user_id}232@server_for_decorators.resource('/rest2/<user_id>')233async def resource_for_decorator2(data, user_id):234    yield '{"name": user_id}'235class ServerFull(unittest.TestCase):236    def setUp(self):237        self.dummy_called = False238        self.data = {}239        # "Register" one connection into map for dedicated decor server240        server_for_decorators.conns[id(1)] = None241        self.hello_world_history = ['HTTP/1.0 200 MSG\r\n' +242                                    'Content-Type: text/html\r\n\r\n',243                                    '<html><h1>Hello world</h1></html>']244        # Create one more server - to simplify bunch of tests245        self.srv = webserver()246        self.srv.conns[id(1)] = None247    def testRouteDecorator1(self):248        """Test @.route() decorator"""249        # First decorator250        rdr = mockReader(['GET /uid/man1 HTTP/1.1\r\n',251                          HDRE])252        wrt = mockWriter()253        # "Send" request254        run_coro(server_for_decorators._handler(rdr, wrt))255        # Ensure that proper response "sent"256        expected = ['HTTP/1.0 200 MSG\r\n' +257                    'Content-Type: text/html\r\n\r\n',258                    'YO, man1']259        self.assertEqual(wrt.history, expected)260        self.assertTrue(wrt.closed)261    def testRouteDecorator2(self):262        # Second decorator263        rdr = mockReader(['GET /uid2/man2 HTTP/1.1\r\n',264                          HDRE])265        wrt = mockWriter()266        # Re-register connection267        server_for_decorators.conns[id(1)] = None268        # "Send" request269        run_coro(server_for_decorators._handler(rdr, wrt))270        # Ensure that proper response "sent"271        expected = ['HTTP/1.0 200 MSG\r\n' +272                    'Content-Type: text/html\r\n\r\n',273                    'YO, man2']274        self.assertEqual(wrt.history, expected)275        self.assertTrue(wrt.closed)276    def testResourceDecorator1(self):277        """Test @.resource() decorator"""278        rdr = mockReader(['GET /rest1/man1 HTTP/1.1\r\n',279                          HDRE])280        wrt = mockWriter()281        run_coro(server_for_decorators._handler(rdr, wrt))282        expected = ['HTTP/1.0 200 MSG\r\n'283                    'Access-Control-Allow-Origin: *\r\n' +284                    'Access-Control-Allow-Headers: *\r\n' +285                    'Content-Length: 16\r\n' +286                    'Access-Control-Allow-Methods: GET\r\n' +287                    'Content-Type: application/json\r\n\r\n',288                    '{"name": "man1"}']289        self.assertEqual(wrt.history, expected)290        self.assertTrue(wrt.closed)291    def testResourceDecorator2(self):292        rdr = mockReader(['GET /rest2/man2 HTTP/1.1\r\n',293                          HDRE])294        wrt = mockWriter()295        run_coro(server_for_decorators._handler(rdr, wrt))296        expected = ['HTTP/1.1 200 MSG\r\n' +297                    'Access-Control-Allow-Methods: GET\r\n' +298                    'Connection: close\r\n' +299                    'Access-Control-Allow-Headers: *\r\n' +300                    'Content-Type: application/json\r\n' +301                    'Transfer-Encoding: chunked\r\n' +302                    'Access-Control-Allow-Origin: *\r\n\r\n',303                    '11\r\n',304                    '{"name": user_id}',305                    '\r\n',306                    '0\r\n\r\n'307                    ]308        self.assertEqual(wrt.history, expected)309        self.assertTrue(wrt.closed)310    def testCatchAllDecorator(self):311        # A fresh server for the catchall handler312        server_for_catchall_decorator = webserver()313        # Catchall decorator and handler314        @server_for_catchall_decorator.catchall()315        async def route_for_catchall_decorator(req, resp):316            await resp.start_html()317            await resp.send('my404')318        rdr = mockReader(['GET /this/is/an/invalid/url HTTP/1.1\r\n',319                          HDRE])320        wrt = mockWriter()321        server_for_catchall_decorator.conns[id(1)] = None322        run_coro(server_for_catchall_decorator._handler(rdr, wrt))323        expected = ['HTTP/1.0 200 MSG\r\n' +324                    'Content-Type: text/html\r\n\r\n',325                    'my404']326        self.assertEqual(wrt.history, expected)327        self.assertTrue(wrt.closed)328    async def dummy_handler(self, req, resp):329        """Dummy URL handler. It just records the fact - it has been called"""330        self.dummy_req = req331        self.dummy_resp = resp332        self.dummy_called = True333    async def dummy_post_handler(self, req, resp):334        self.data = await req.read_parse_form_data()335    async def hello_world_handler(self, req, resp):336        await resp.start_html()337        await resp.send('<html><h1>Hello world</h1></html>')338    async def redirect_handler(self, req, resp):339        await resp.redirect('/blahblah', msg='msg:)')340    def testStartHTML(self):341        """Verify that request.start_html() works well"""342        self.srv.add_route('/', self.hello_world_handler)343        rdr = mockReader(['GET / HTTP/1.1\r\n',344                          HDR('Host: blah.com'),345                          HDRE])346        wrt = mockWriter()347        # "Send" request348        run_coro(self.srv._handler(rdr, wrt))349        # Ensure that proper response "sent"350        self.assertEqual(wrt.history, self.hello_world_history)351        self.assertTrue(wrt.closed)352    def testRedirect(self):353        """Verify that request.start_html() works well"""354        self.srv.add_route('/', self.redirect_handler)355        rdr = mockReader(['GET / HTTP/1.1\r\n',356                          HDR('Host: blah.com'),357                          HDRE])358        wrt = mockWriter()359        # "Send" request360        run_coro(self.srv._handler(rdr, wrt))361        # Ensure that proper response "sent"362        exp = ['HTTP/1.0 302 MSG\r\n' +363               'Location: /blahblah\r\nContent-Length: 5\r\n\r\n',364               'msg:)']365        self.assertEqual(wrt.history, exp)366    def testRequestBodyUnknownType(self):367        """Unknow HTTP body test - empty dict expected"""368        self.srv.add_route('/', self.dummy_post_handler, methods=['POST'])369        rdr = mockReader(['POST / HTTP/1.1\r\n',370                          HDR('Host: blah.com'),371                          HDR('Content-Length: 5'),372                          HDRE,373                          '12345'])374        wrt = mockWriter()375        run_coro(self.srv._handler(rdr, wrt))376        # Check extracted POST body377        self.assertEqual(self.data, {})378    def testRequestBodyJson(self):379        """JSON encoded POST body"""380        self.srv.add_route('/',381                           self.dummy_post_handler,382                           methods=['POST'],383                           save_headers=['Content-Type', 'Content-Length'])384        rdr = mockReader(['POST / HTTP/1.1\r\n',385                          HDR('Content-Type: application/json'),386                          HDR('Content-Length: 10'),387                          HDRE,388                          '{"a": "b"}'])389        wrt = mockWriter()390        run_coro(self.srv._handler(rdr, wrt))391        # Check parsed POST body392        self.assertEqual(self.data, {'a': 'b'})393    def testRequestBodyUrlencoded(self):394        """Regular HTML form"""395        self.srv.add_route('/',396                           self.dummy_post_handler,397                           methods=['POST'],398                           save_headers=['Content-Type', 'Content-Length'])399        rdr = mockReader(['POST / HTTP/1.1\r\n',400                          HDR('Content-Type: application/x-www-form-urlencoded; charset=UTF-8'),401                          HDR('Content-Length: 10'),402                          HDRE,403                          'a=b&c=%20d'])404        wrt = mockWriter()405        run_coro(self.srv._handler(rdr, wrt))406        # Check parsed POST body407        self.assertEqual(self.data, {'a': 'b', 'c': ' d'})408    def testRequestBodyNegative(self):409        """Regular HTML form"""410        self.srv.add_route('/',411                           self.dummy_post_handler,412                           methods=['POST'],413                           save_headers=['Content-Type', 'Content-Length'])414        rdr = mockReader(['POST / HTTP/1.1\r\n',415                          HDR('Content-Type: application/json'),416                          HDR('Content-Length: 9'),417                          HDRE,418                          'some junk'])419        wrt = mockWriter()420        run_coro(self.srv._handler(rdr, wrt))421        # payload broken - HTTP 400 expected422        self.assertEqual(wrt.history, ['HTTP/1.0 400 MSG\r\n\r\n'])423    def testRequestLargeBody(self):424        """Max Body size check"""425        self.srv.add_route('/',426                           self.dummy_post_handler,427                           methods=['POST'],428                           save_headers=['Content-Type', 'Content-Length'],429                           max_body_size=5)430        rdr = mockReader(['POST / HTTP/1.1\r\n',431                          HDR('Content-Type: application/json'),432                          HDR('Content-Length: 9'),433                          HDRE,434                          'some junk'])435        wrt = mockWriter()436        run_coro(self.srv._handler(rdr, wrt))437        # payload broken - HTTP 400 expected438        self.assertEqual(wrt.history, ['HTTP/1.0 413 MSG\r\n\r\n'])439    async def route_parameterized_handler(self, req, resp, user_name):440        await resp.start_html()441        await resp.send('<html>Hello, {}</html>'.format(user_name))442    def testRouteParameterized(self):443        """Verify that route with params works fine"""444        self.srv.add_route('/db/<user_name>', self.route_parameterized_handler)445        rdr = mockReader(['GET /db/user1 HTTP/1.1\r\n',446                          HDR('Host: junk.com'),447                          HDRE])448        wrt = mockWriter()449        # "Send" request450        run_coro(self.srv._handler(rdr, wrt))451        # Ensure that proper response "sent"452        expected = ['HTTP/1.0 200 MSG\r\n' +453                    'Content-Type: text/html\r\n\r\n',454                    '<html>Hello, user1</html>']455        self.assertEqual(wrt.history, expected)456        self.assertTrue(wrt.closed)457    def testParseHeadersOnOff(self):458        """Verify parameter parse_headers works"""459        self.srv.add_route('/', self.dummy_handler, save_headers=['H1', 'H2'])460        rdr = mockReader(['GET / HTTP/1.1\r\n',461                          HDR('H1: blah.com'),462                          HDR('H2: lalalla'),463                          HDR('Junk: fsdfmsdjfgjsdfjunk.com'),464                          HDRE])465        # "Send" request466        wrt = mockWriter()467        run_coro(self.srv._handler(rdr, wrt))468        self.assertTrue(self.dummy_called)469        # Check for headers - only 2 of 3 should be collected, others - ignore470        hdrs = {b'H1': b'blah.com',471                b'H2': b'lalalla'}472        self.assertEqual(self.dummy_req.headers, hdrs)473        self.assertTrue(wrt.closed)474    def testDisallowedMethod(self):475        """Verify that server respects allowed methods"""476        self.srv.add_route('/', self.hello_world_handler)477        self.srv.add_route('/post_only', self.dummy_handler, methods=['POST'])478        rdr = mockReader(['GET / HTTP/1.0\r\n',479                          HDRE])480        # "Send" GET request, by default GET is enabled481        wrt = mockWriter()482        run_coro(self.srv._handler(rdr, wrt))483        self.assertEqual(wrt.history, self.hello_world_history)484        self.assertTrue(wrt.closed)485        # "Send" GET request to POST only location486        self.srv.conns[id(1)] = None487        self.dummy_called = False488        rdr = mockReader(['GET /post_only HTTP/1.1\r\n',489                          HDRE])490        wrt = mockWriter()491        run_coro(self.srv._handler(rdr, wrt))492        # Hanlder should not be called - method not allowed493        self.assertFalse(self.dummy_called)494        exp = ['HTTP/1.0 405 MSG\r\n\r\n']495        self.assertEqual(wrt.history, exp)496        # Connection must be closed497        self.assertTrue(wrt.closed)498    def testAutoOptionsMethod(self):499        """Test auto implementation of OPTIONS method"""500        self.srv.add_route('/', self.hello_world_handler, methods=['POST', 'PUT', 'DELETE'])501        self.srv.add_route('/disabled', self.hello_world_handler, auto_method_options=False)502        rdr = mockReader(['OPTIONS / HTTP/1.0\r\n',503                          HDRE])504        wrt = mockWriter()505        run_coro(self.srv._handler(rdr, wrt))506        exp = ['HTTP/1.0 200 MSG\r\n' +507               'Access-Control-Allow-Headers: *\r\n'508               'Content-Length: 0\r\n'509               'Access-Control-Allow-Origin: *\r\n'510               'Access-Control-Allow-Methods: POST, PUT, DELETE\r\n\r\n']511        self.assertEqual(wrt.history, exp)512        self.assertTrue(wrt.closed)513    def testPageNotFound(self):514        """Verify that malformed request generates proper response"""515        rdr = mockReader(['GET /not_existing HTTP/1.1\r\n',516                          HDR('Host: blah.com'),517                          HDRE])518        wrt = mockWriter()519        run_coro(self.srv._handler(rdr, wrt))520        exp = ['HTTP/1.0 404 MSG\r\n\r\n']521        self.assertEqual(wrt.history, exp)522        # Connection must be closed523        self.assertTrue(wrt.closed)524    def testMalformedRequest(self):525        """Verify that malformed request generates proper response"""526        rdr = mockReader(['GET /\r\n',527                          HDR('Host: blah.com'),528                          HDRE])529        wrt = mockWriter()530        run_coro(self.srv._handler(rdr, wrt))531        exp = ['HTTP/1.0 400 MSG\r\n\r\n']532        self.assertEqual(wrt.history, exp)533        # Connection must be closed534        self.assertTrue(wrt.closed)535class ResourceGetPost():536    """Simple REST API resource class with just two methods"""537    def get(self, data):538        return {'data1': 'junk'}539    def post(self, data):540        return data541class ResourceGetParam():542    """Parameterized REST API resource"""543    def __init__(self):544        self.user_id = 'user_id'545    def get(self, data, user_id):546        return {self.user_id: user_id}547class ResourceGetArgs():548    """REST API resource with additional arguments"""549    def get(self, data, arg1, arg2):550        return {'arg1': arg1, 'arg2': arg2}551class ResourceGenerator():552    """REST API with generator as result"""553    async def get(self, data):554        yield 'longlongchunkchunk1'555        yield 'chunk2'556        # unicode support557        yield '\u265E'558class ResourceNegative():559    """To cover negative test cases"""560    def delete(self, data):561        # Broken pipe emulation562        raise OSError(32, '', '')563    def put(self, data):564        # Simple unhandled expection565        raise Exception('something')566class ServerResource(unittest.TestCase):567    def setUp(self):568        self.srv = webserver()569        self.srv.conns[id(1)] = None570        self.srv.add_resource(ResourceGetPost, '/')571        self.srv.add_resource(ResourceGetParam, '/param/<user_id>')572        self.srv.add_resource(ResourceGetArgs, '/args', arg1=1, arg2=2)573        self.srv.add_resource(ResourceGenerator, '/gen')574        self.srv.add_resource(ResourceNegative, '/negative')575    def testOptions(self):576        # Ensure that only GET/POST methods are allowed:577        rdr = mockReader(['OPTIONS / HTTP/1.0\r\n',578                          HDRE])579        wrt = mockWriter()580        run_coro(self.srv._handler(rdr, wrt))581        exp = ['HTTP/1.0 200 MSG\r\n' +582               'Access-Control-Allow-Headers: *\r\n'583               'Content-Length: 0\r\n'584               'Access-Control-Allow-Origin: *\r\n'585               'Access-Control-Allow-Methods: GET, POST\r\n\r\n']586        self.assertEqual(wrt.history, exp)587    def testGet(self):588        rdr = mockReader(['GET / HTTP/1.0\r\n',589                          HDRE])590        wrt = mockWriter()591        run_coro(self.srv._handler(rdr, wrt))592        exp = ['HTTP/1.0 200 MSG\r\n' +593               'Access-Control-Allow-Origin: *\r\n'594               'Access-Control-Allow-Headers: *\r\n'595               'Content-Length: 17\r\n'596               'Access-Control-Allow-Methods: GET, POST\r\n'597               'Content-Type: application/json\r\n\r\n',598               '{"data1": "junk"}']599        self.assertEqual(wrt.history, exp)600    def testGetWithParam(self):601        rdr = mockReader(['GET /param/123 HTTP/1.0\r\n',602                          HDRE])603        wrt = mockWriter()604        run_coro(self.srv._handler(rdr, wrt))605        exp = ['HTTP/1.0 200 MSG\r\n' +606               'Access-Control-Allow-Origin: *\r\n'607               'Access-Control-Allow-Headers: *\r\n'608               'Content-Length: 18\r\n'609               'Access-Control-Allow-Methods: GET\r\n'610               'Content-Type: application/json\r\n\r\n',611               '{"user_id": "123"}']612        self.assertEqual(wrt.history, exp)613    def testGetWithArgs(self):614        rdr = mockReader(['GET /args HTTP/1.0\r\n',615                          HDRE])616        wrt = mockWriter()617        run_coro(self.srv._handler(rdr, wrt))618        exp = ['HTTP/1.0 200 MSG\r\n' +619               'Access-Control-Allow-Origin: *\r\n'620               'Access-Control-Allow-Headers: *\r\n'621               'Content-Length: 22\r\n'622               'Access-Control-Allow-Methods: GET\r\n'623               'Content-Type: application/json\r\n\r\n',624               '{"arg1": 1, "arg2": 2}']625        self.assertEqual(wrt.history, exp)626    def testGenerator(self):627        rdr = mockReader(['GET /gen HTTP/1.0\r\n',628                          HDRE])629        wrt = mockWriter()630        run_coro(self.srv._handler(rdr, wrt))631        exp = ['HTTP/1.1 200 MSG\r\n' +632               'Access-Control-Allow-Methods: GET\r\n' +633               'Connection: close\r\n' +634               'Access-Control-Allow-Headers: *\r\n' +635               'Content-Type: application/json\r\n' +636               'Transfer-Encoding: chunked\r\n' +637               'Access-Control-Allow-Origin: *\r\n\r\n',638               '13\r\n',639               'longlongchunkchunk1',640               '\r\n',641               '6\r\n',642               'chunk2',643               '\r\n',644               # next chunk is 1 char len UTF-8 string645               '3\r\n',646               '\u265E',647               '\r\n',648               '0\r\n\r\n']649        self.assertEqual(wrt.history, exp)650    def testPost(self):651        # Ensure that parameters from query string / body will be combined as well652        rdr = mockReader(['POST /?qs=qs1 HTTP/1.0\r\n',653                          HDR('Content-Length: 17'),654                          HDR('Content-Type: application/json'),655                          HDRE,656                          '{"body": "body1"}'])657        wrt = mockWriter()658        run_coro(self.srv._handler(rdr, wrt))659        exp = ['HTTP/1.0 200 MSG\r\n' +660               'Access-Control-Allow-Origin: *\r\n'661               'Access-Control-Allow-Headers: *\r\n'662               'Content-Length: 30\r\n'663               'Access-Control-Allow-Methods: GET, POST\r\n'664               'Content-Type: application/json\r\n\r\n',665               '{"qs": "qs1", "body": "body1"}']666        self.assertEqual(wrt.history, exp)667    def testInvalidMethod(self):668        rdr = mockReader(['PUT / HTTP/1.0\r\n',669                          HDRE])670        wrt = mockWriter()671        run_coro(self.srv._handler(rdr, wrt))672        exp = ['HTTP/1.0 405 MSG\r\n\r\n']673        self.assertEqual(wrt.history, exp)674    def testException(self):675        rdr = mockReader(['PUT /negative HTTP/1.0\r\n',676                          HDRE])677        wrt = mockWriter()678        run_coro(self.srv._handler(rdr, wrt))679        exp = ['HTTP/1.0 500 MSG\r\n\r\n']680        self.assertEqual(wrt.history, exp)681    def testBrokenPipe(self):682        rdr = mockReader(['DELETE /negative HTTP/1.0\r\n',683                          HDRE])684        wrt = mockWriter()685        run_coro(self.srv._handler(rdr, wrt))686        self.assertEqual(wrt.history, [])687class StaticContent(unittest.TestCase):688    def setUp(self):689        self.srv = webserver()690        self.srv.conns[id(1)] = None691        self.tempfn = '__tmp.html'692        self.ctype = None693        self.etype = None694        self.max_age = 2592000695        with open(self.tempfn, 'wb') as f:696            f.write('someContent blah blah')697    def tearDown(self):698        try:699            delete_file(self.tempfn)700        except OSError:701            pass702    async def send_file_handler(self, req, resp):703        await resp.send_file(self.tempfn,704                             content_type=self.ctype,705                             content_encoding=self.etype,706                             max_age=self.max_age)707    def testSendFileManual(self):708        """Verify send_file works great with manually defined parameters"""709        self.ctype = 'text/plain'710        self.etype = 'gzip'711        self.max_age = 100712        self.srv.add_route('/', self.send_file_handler)713        rdr = mockReader(['GET / HTTP/1.0\r\n',714                          HDRE])715        wrt = mockWriter()716        run_coro(self.srv._handler(rdr, wrt))717        exp = ['HTTP/1.0 200 MSG\r\n' +718               'Cache-Control: max-age=100, public\r\n'719               'Content-Type: text/plain\r\n'720               'Content-Length: 21\r\n'721               'Content-Encoding: gzip\r\n\r\n',722               bytearray(b'someContent blah blah')]723        self.assertEqual(wrt.history, exp)724        self.assertTrue(wrt.closed)725    def testSendFileNotFound(self):726        """Verify 404 error for non existing files"""727        self.srv.add_route('/', self.send_file_handler)728        rdr = mockReader(['GET / HTTP/1.0\r\n',729                          HDRE])730        wrt = mockWriter()731        # Intentionally delete file before request732        delete_file(self.tempfn)733        run_coro(self.srv._handler(rdr, wrt))734        exp = ['HTTP/1.0 404 MSG\r\n\r\n']735        self.assertEqual(wrt.history, exp)736        self.assertTrue(wrt.closed)737    def testSendFileConnectionReset(self):738        self.srv.add_route('/', self.send_file_handler)739        rdr = mockReader(['GET / HTTP/1.0\r\n',740                          HDRE])741        # tell mockWrite to raise error during send()742        wrt = mockWriter(generate_expection=OSError(errno.ECONNRESET))743        run_coro(self.srv._handler(rdr, wrt))744        # there should be no payload due to connected reset745        self.assertEqual(wrt.history, [])746        self.assertTrue(wrt.closed)747if __name__ == '__main__':...__init__.py
Source:__init__.py  
...21        self.running = False22    def run(self):23        self.running = True24        self.loop.run_forever()25    def run_coro(self, coro,wait_for_result=True):26        27        if wait_for_result:28            return asyncio.run_coroutine_threadsafe(coro, loop=self.loop).result()29        else:30            return asyncio.run_coroutine_threadsafe(coro, loop=self.loop)31    def stop(self):32        self.loop.call_soon_threadsafe(self.loop.stop)33        self.join()34        self.running = False35class DecentScale(AsyncioEventLoopThread):36    37    def __init__(self, *args, timeout=20, fix_dropped_command=True, **kwargs):38        super().__init__(*args, **kwargs)39        self.client = None40        self.timeout=timeout41        self.connected=False42        self.fix_dropped_command=fix_dropped_command43        self.dropped_command_sleep = 0.05  # API Docs says 50ms44        self.weight = None   45         46        #Constants47        self.CHAR_READ='0000FFF4-0000-1000-8000-00805F9B34FB'48        self.CHAR_WRITE='000036f5-0000-1000-8000-00805f9b34fb'49        50        51        #Tare the scale by sending "030FFD000000F1". 52        #Each tare needs to increment the 3rd byte pair, 53        #so you can cycle (for instance) though "030FFE000000F2" "030FFF000000F3" "030F000000000C".54        55        tare_commands=[  bytearray.fromhex(c) for c in ['030F000000000C','030F010000000D','030F020000000E']]56        self.tare_commands=cycle(tare_commands)57                58        self.led_on_command=bytearray.fromhex('030A0101000009')59        self.led_off_command=bytearray.fromhex('030A0000000009')60        self.start_time_command=bytearray.fromhex('030B030000000B')61        self.stop_time_command=bytearray.fromhex("030B0000000008")62        self.reset_time_command=bytearray.fromhex("030B020000000A" )63        64        self.daemon=True65        super().start()66        67    def check_connection(func):68        def is_connected(self):69            if self.connected:70                func(self)71            else:72                print("Scale is not connected.")73        return is_connected74    async def _find_address(self):75        76        device = await BleakScanner.find_device_by_filter(77        lambda d, ad: d.name and d.name == 'Decent Scale'78        ,timeout=self.timeout)79        80        if device:81            return device.address82        else:83            print('Error: Scale not found. Trying again...')84    85    async def _connect(self, address):86        87        self.client = BleakClient(address)88        89        if not self.running:90            super().start()91        92        try:93            return await self.client.connect(timeout=self.timeout)94        except Exception as e:95            print('Error:%s\nTrying again...' %e)96            return False   97        98    async def _disconnect(self):99        return await self.client.disconnect()   100    async def __send(self, cmd):101        """Send commands with firmware v1.0 bugfix (resending)"""102        await self.client.write_gatt_char(self.CHAR_WRITE, cmd)103        if self.fix_dropped_command:104            await asyncio.sleep(self.dropped_command_sleep)105            await self.client.write_gatt_char(self.CHAR_WRITE, cmd)106        # Wait 200ms for the command to finish107        # Alternative: receive the notifications and check if the command was acknowledged108        await asyncio.sleep(0.2)109    async def _tare(self):110        await self.__send(next(self.tare_commands))111    async def _led_on(self):112        await self.__send(self.led_on_command)113    async def _led_off(self):114        await self.__send(self.led_off_command)115    async def _start_time(self):116        await self.__send(self.start_time_command)117    async def _stop_time(self):118        await self.__send(self.stop_time_command)119    async def _reset_time(self):120        await self.__send(self.reset_time_command)121    def notification_handler(self, sender, data):122        if data[0] != 0x03 or len(data) != 7:123            # Basic sanity check124            logger.info("Invalid notification: not a Decent Scale?")125            return126        # Calculate XOR127        xor_msg = functools.reduce(operator.xor, data[:-1])128        if xor_msg != data[-1]:129            logger.warning("XOR verification failed for notification")130            return131        if sys.version_info >= (3, 8):132            logger.debug(f"Received Notification at {time.time()}: {binascii.hexlify(data, sep=':')}")133        else:134            logger.debug(f"Received Notification at {time.time()}: {binascii.hexlify(data)}")135        136        # Have to decide by type of the package137        type_ = data[1]138        if type_ in [0xCA, 0xCE]:139            # Weight information140            self.weight = int.from_bytes(data[2:4], byteorder='big', signed=True) / 10141        elif type_ == 0xAA:142            # Button press143            # NOTE: Despite the API documentation saying the XOR field is 0x00, it actually contains the XOR144            logger.debug(f"Button press: {data[2]}, duration: {data[3]}")145        elif type_ == 0x0F:146            # tare increment147            pass148        elif type_ == 0x0A:149            # LED on/off -> returns units and battery level150            logger.debug(f"Unit of scale: {'g' if data[3] == 0 else 'oz'}, battery level: {data[4]}%")151        elif type_ == 0x0B:152            # Timer153            # NOTE: The API documentation says there is a section on "Receiving Timer Info" but this is missing154            pass155        else:156            logger.warning(f"Unknown Notification Type received: 0x{type_:02x}")157    async def _enable_notification(self):158        await self.client.start_notify(self.CHAR_READ, self.notification_handler)159        await asyncio.sleep(1)160        161             162    async def _disable_notification(self):163        await self.client.stop_notify(self.CHAR_READ) 164    @check_connection    165    def enable_notification(self):   166        return self.run_coro(self._enable_notification())167    168    @check_connection 169    def disable_notification(self):   170        self.weight=None171        return self.run_coro(self._disable_notification())172 173    def find_address(self):   174        return self.run_coro(self._find_address())175    176    def connect(self,address):177        if not self.connected:178            self.connected= self.run_coro(self._connect(address))179            180            if self.connected:181                self.led_off()182                self.led_on()183        else:184            print('Already connected.')185    186        return self.connected187                188    def disconnect(self):189        if self.connected:190            self.connected= not self.run_coro(self._disconnect())191        else:192            print('Already disconnected.')193        194        return not self.connected195            196    def auto_connect(self,n_retries=3):    197        address = None198        for i in range(n_retries):199            address=self.find_address()200            if address:201                print('Found Decent Scale: %s' % address)202                break203            else:204                print(i)205        206        if address:        207            for i in range(n_retries):208                if self.connect(address):209                    print('Scale connected!')210                    return True211                212        213        print('Autoconnect failed. Make sure the scale is on.')214        return False215    216    @check_connection 217    def tare(self):   218        self.run_coro(self._tare())219        220    @check_connection 221    def start_time(self):   222        self.run_coro(self._start_time())223    224    @check_connection 225    def stop_time(self):   226        self.run_coro(self._stop_time())227                   228    @check_connection 229    def reset_time(self):   230        self.run_coro(self._reset_time())231    @check_connection 232    def led_off(self):   233        self.run_coro(self._led_off())234                   235    @check_connection 236    def led_on(self):   237        self.run_coro(self._led_on())238 ...sync_client.py
Source:sync_client.py  
...8        self.running = False9    def run(self):10        self.running = True11        self.loop.run_forever()12    def run_coro(self, coro):13        return asyncio.run_coroutine_threadsafe(coro, loop=self.loop).result()14    def stop(self):15        self.loop.call_soon_threadsafe(self.loop.stop)16        self.join()17        self.running = False18class TonlibClientSync:19    def __init__(self,20                 ls_index,21                 config,22                 keystore,23                 cdll_path=None,24                 verbosity_level=0):25        self.thread = AsyncioEventLoopThread()26        self.thread.start()27        self.client_impl = TonlibClient(ls_index, config, keystore, self.thread.loop, cdll_path, verbosity_level)28        self.thread.run_coro(self.client_impl.init())29    def __del__(self):30        del self.client_impl31        self.thread.stop()32    def set_verbosity_level(self, level):33        return self.thread.run_coro(self.client_impl.set_verbosity_level(level))34    def raw_get_transactions(self, account_address: str, from_transaction_lt: str, from_transaction_hash: str):35        return self.thread.run_coro(self.client_impl.raw_get_transactions(account_address, from_transaction_lt, from_transaction_hash))36    def raw_get_account_state(self, address: str):37        return self.thread.run_coro(self.client_impl.raw_get_account_state(address))38    def generic_get_account_state(self, address: str):39        return self.thread.run_coro(self.client_impl.generic_get_account_state(address))40    def _load_contract(self, address):41        return self.thread.run_coro(self.client_impl._load_contract(address))42    def raw_run_method(self, address, method, stack_data, output_layout=None):43        return self.thread.run_coro(self.client_impl.raw_run_method(address, method, stack_data, output_layout))44    def raw_send_message(self, serialized_boc):45        return self.thread.run_coro(self.client_impl.raw_send_message(serialized_boc))46    def _raw_create_query(self, destination, body, init_code=b'', init_data=b''):47        return self.thread.run_coro(self.client_impl._raw_create_query(destination, body, init_code, init_data))48    def _raw_send_query(self, query_info):49        return self.thread.run_coro(self.client_impl._raw_send_query(query_info))50        51    def raw_create_and_send_query(self, destination, body, init_code=b'', init_data=b''):52        return self.thread.run_coro(self.client_impl.raw_create_and_send_query(destination, body, init_code, init_data))53    def raw_create_and_send_message(self, destination, body, initial_account_state=b''):54        return self.thread.run_coro(self.client_impl.raw_create_and_send_message(destination, body, initial_account_state))55    def raw_estimate_fees(self, destination, body, init_code=b'', init_data=b'', ignore_chksig=True):56        return self.thread.run_coro(self.client_impl.raw_estimate_fees(destination, body, init_code, init_data, ignore_chksig))57    def raw_getBlockTransactions(self, fullblock, count, after_tx):58        return self.thread.run_coro(self.client_impl.raw_getBlockTransactions(fullblock, count, after_tx))59    def raw_getBlockTransactionsExt(self, fullblock, count, after_tx):60        return self.thread.run_coro(self.client_impl.raw_getBlockTransactionsExt(fullblock, count, after_tx))61    def getTransactions(self, account_address, from_transaction_lt=None, from_transaction_hash=None, to_transaction_lt=0, limit=10):62        return self.thread.run_coro(self.client_impl.getTransactions(account_address, from_transaction_lt, from_transaction_hash, to_transaction_lt, limit))63    def getMasterchainInfo(self):64        return self.thread.run_coro(self.client_impl.getMasterchainInfo())65    def lookupBlock(self, workchain, shard, seqno=None, lt=None, unixtime=None):66        return self.thread.run_coro(self.client_impl.lookupBlock(workchain, shard, seqno, lt, unixtime))67    def getShards(self, master_seqno=None, lt=None, unixtime=None):68        return self.thread.run_coro(self.client_impl.getShards(master_seqno, lt, unixtime))69    def getBlockTransactions(self, workchain, shard, seqno, count, root_hash=None, file_hash=None, after_lt=None, after_hash=None):70        return self.thread.run_coro(self.client_impl.getBlockTransactions(workchain, shard, seqno, count, root_hash, file_hash, after_lt, after_hash))71    def getBlockTransactionsExt(self, workchain, shard, seqno, count, root_hash=None, file_hash=None, after_lt=None, after_hash=None):72        return self.thread.run_coro(self.client_impl.getBlockTransactionsExt(workchain, shard, seqno, count, root_hash, file_hash, after_lt, after_hash))73    def getBlockHeader(self, workchain, shard, seqno, root_hash=None, file_hash=None):74        return self.thread.run_coro(self.client_impl.getBlockHeader(workchain, shard, seqno, root_hash, file_hash))75    def tryLocateTxByIncomingMessage(self, source, destination, creation_lt):76        return self.thread.run_coro(self.client_impl.tryLocateTxByIncomingMessage(source, destination, creation_lt))77    def tryLocateTxByOutcomingMessage(self, source, destination, creation_lt):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
