How to use run_coro method in Molotov

Best Python code snippet using molotov_python

test_server.py

Source:test_server.py Github

copy

Full Screen

...57 async def aclose(self):58 self.closed = True59async def mock_wait_for(coro, timeout):60 await coro61def run_coro(coro):62 # Mock wait_for() function with simple dummy63 asyncio.wait_for = (lambda c, t: await c)64 """Simple helper to run coroutine"""65 for i in coro:66 pass67# Tests68class Utils(unittest.TestCase):69 def testUrldecode(self):70 runs = [('abc%20def', 'abc def'),71 ('abc%%20def', 'abc% def'),72 ('%%%', '%%%'),73 ('%20%20', ' '),74 ('abc', 'abc'),75 ('a%25%25%25c', 'a%%%c'),76 ('a++b', 'a b'),77 ('+%25+', ' % '),78 ('+%2B+', ' + '),79 ('%20+%2B+%41', ' + A'),80 ]81 for r in runs:82 self.assertEqual(urldecode_plus(r[0]), r[1])83 def testParseQueryString(self):84 runs = [('k1=v2', {'k1': 'v2'}),85 ('k1=v2&k11=v11', {'k1': 'v2',86 'k11': 'v11'}),87 ('k1=v2&k11=', {'k1': 'v2',88 'k11': ''}),89 ('k1=+%20', {'k1': ' '}),90 ('%6b1=+%20', {'k1': ' '}),91 ('k1=%3d1', {'k1': '=1'}),92 ('11=22%26&%3d=%3d', {'11': '22&',93 '=': '='}),94 ]95 for r in runs:96 self.assertEqual(parse_query_string(r[0]), r[1])97class ServerParts(unittest.TestCase):98 def testRequestLine(self):99 runs = [('GETT / HTTP/1.1', 'GETT', '/'),100 ('TTEG\t/blah\tHTTP/1.1', 'TTEG', '/blah'),101 ('POST /qq/?q=q HTTP', 'POST', '/qq/', 'q=q'),102 ('POST /?q=q BSHT', 'POST', '/', 'q=q'),103 ('POST /?q=q&a=a JUNK', 'POST', '/', 'q=q&a=a')]104 for r in runs:105 try:106 req = request(mockReader(r[0]))107 run_coro(req.read_request_line())108 self.assertEqual(r[1].encode(), req.method)109 self.assertEqual(r[2].encode(), req.path)110 if len(r) > 3:111 self.assertEqual(r[3].encode(), req.query_string)112 except Exception:113 self.fail('exception on payload --{}--'.format(r[0]))114 def testRequestLineEmptyLinesBefore(self):115 req = request(mockReader(['\n', '\r\n', 'GET /?a=a HTTP/1.1']))116 run_coro(req.read_request_line())117 self.assertEqual(b'GET', req.method)118 self.assertEqual(b'/', req.path)119 self.assertEqual(b'a=a', req.query_string)120 def testRequestLineNegative(self):121 runs = ['',122 '\t\t',123 ' ',124 ' / HTTP/1.1',125 'GET',126 'GET /',127 'GET / '128 ]129 for r in runs:130 with self.assertRaises(HTTPException):131 req = request(mockReader(r))132 run_coro(req.read_request_line())133 def testHeadersSimple(self):134 req = request(mockReader([HDR('Host: google.com'),135 HDRE]))136 run_coro(req.read_headers([b'Host']))137 self.assertEqual(req.headers, {b'Host': b'google.com'})138 def testHeadersSpaces(self):139 req = request(mockReader([HDR('Host: \t google.com \t '),140 HDRE]))141 run_coro(req.read_headers([b'Host']))142 self.assertEqual(req.headers, {b'Host': b'google.com'})143 def testHeadersEmptyValue(self):144 req = request(mockReader([HDR('Host:'),145 HDRE]))146 run_coro(req.read_headers([b'Host']))147 self.assertEqual(req.headers, {b'Host': b''})148 def testHeadersMultiple(self):149 req = request(mockReader([HDR('Host: google.com'),150 HDR('Junk: you blah'),151 HDR('Content-type: file'),152 HDRE]))153 hdrs = {b'Host': b'google.com',154 b'Junk': b'you blah',155 b'Content-type': b'file'}156 run_coro(req.read_headers([b'Host', b'Junk', b'Content-type']))157 self.assertEqual(req.headers, hdrs)158 def testUrlFinderExplicit(self):159 urls = [('/', 1),160 ('/%20', 2),161 ('/a/b', 3),162 ('/aac', 5)]163 junk = ['//', '', '/a', '/aa', '/a/fhhfhfhfhfhf']164 # Create server, add routes165 srv = webserver()166 for u in urls:167 srv.add_route(u[0], u[1])168 # Search them all169 for u in urls:170 # Create mock request object with "pre-parsed" url path171 rq = request(mockReader([]))172 rq.path = u[0].encode()173 f, args = srv._find_url_handler(rq)174 self.assertEqual(u[1], f)175 # Some simple negative cases176 for j in junk:177 rq = request(mockReader([]))178 rq.path = j.encode()179 f, args = srv._find_url_handler(rq)180 self.assertIsNone(f)181 self.assertIsNone(args)182 def testUrlFinderParameterized(self):183 srv = webserver()184 # Add few routes185 srv.add_route('/', 0)186 srv.add_route('/<user_name>', 1)187 srv.add_route('/a/<id>', 2)188 # Check first url (non param)189 rq = request(mockReader([]))190 rq.path = b'/'191 f, args = srv._find_url_handler(rq)192 self.assertEqual(f, 0)193 # Check second url194 rq.path = b'/user1'195 f, args = srv._find_url_handler(rq)196 self.assertEqual(f, 1)197 self.assertEqual(args['_param_name'], 'user_name')198 self.assertEqual(rq._param, 'user1')199 # Check third url200 rq.path = b'/a/123456'201 f, args = srv._find_url_handler(rq)202 self.assertEqual(f, 2)203 self.assertEqual(args['_param_name'], 'id')204 self.assertEqual(rq._param, '123456')205 # When param is empty and there is no non param endpoint206 rq.path = b'/a/'207 f, args = srv._find_url_handler(rq)208 self.assertEqual(f, 2)209 self.assertEqual(rq._param, '')210 def testUrlFinderNegative(self):211 srv = webserver()212 # empty URL is not allowed213 with self.assertRaises(ValueError):214 srv.add_route('', 1)215 # Query string is not allowed216 with self.assertRaises(ValueError):217 srv.add_route('/?a=a', 1)218 # Duplicate urls219 srv.add_route('/duppp', 1)220 with self.assertRaises(ValueError):221 srv.add_route('/duppp', 1)222# We want to test decorators as well223server_for_decorators = webserver()224@server_for_decorators.route('/uid/<user_id>')225@server_for_decorators.route('/uid2/<user_id>')226async def route_for_decorator(req, resp, user_id):227 await resp.start_html()228 await resp.send('YO, {}'.format(user_id))229@server_for_decorators.resource('/rest1/<user_id>')230def resource_for_decorator1(data, user_id):231 return {'name': user_id}232@server_for_decorators.resource('/rest2/<user_id>')233async def resource_for_decorator2(data, user_id):234 yield '{"name": user_id}'235class ServerFull(unittest.TestCase):236 def setUp(self):237 self.dummy_called = False238 self.data = {}239 # "Register" one connection into map for dedicated decor server240 server_for_decorators.conns[id(1)] = None241 self.hello_world_history = ['HTTP/1.0 200 MSG\r\n' +242 'Content-Type: text/html\r\n\r\n',243 '<html><h1>Hello world</h1></html>']244 # Create one more server - to simplify bunch of tests245 self.srv = webserver()246 self.srv.conns[id(1)] = None247 def testRouteDecorator1(self):248 """Test @.route() decorator"""249 # First decorator250 rdr = mockReader(['GET /uid/man1 HTTP/1.1\r\n',251 HDRE])252 wrt = mockWriter()253 # "Send" request254 run_coro(server_for_decorators._handler(rdr, wrt))255 # Ensure that proper response "sent"256 expected = ['HTTP/1.0 200 MSG\r\n' +257 'Content-Type: text/html\r\n\r\n',258 'YO, man1']259 self.assertEqual(wrt.history, expected)260 self.assertTrue(wrt.closed)261 def testRouteDecorator2(self):262 # Second decorator263 rdr = mockReader(['GET /uid2/man2 HTTP/1.1\r\n',264 HDRE])265 wrt = mockWriter()266 # Re-register connection267 server_for_decorators.conns[id(1)] = None268 # "Send" request269 run_coro(server_for_decorators._handler(rdr, wrt))270 # Ensure that proper response "sent"271 expected = ['HTTP/1.0 200 MSG\r\n' +272 'Content-Type: text/html\r\n\r\n',273 'YO, man2']274 self.assertEqual(wrt.history, expected)275 self.assertTrue(wrt.closed)276 def testResourceDecorator1(self):277 """Test @.resource() decorator"""278 rdr = mockReader(['GET /rest1/man1 HTTP/1.1\r\n',279 HDRE])280 wrt = mockWriter()281 run_coro(server_for_decorators._handler(rdr, wrt))282 expected = ['HTTP/1.0 200 MSG\r\n'283 'Access-Control-Allow-Origin: *\r\n' +284 'Access-Control-Allow-Headers: *\r\n' +285 'Content-Length: 16\r\n' +286 'Access-Control-Allow-Methods: GET\r\n' +287 'Content-Type: application/json\r\n\r\n',288 '{"name": "man1"}']289 self.assertEqual(wrt.history, expected)290 self.assertTrue(wrt.closed)291 def testResourceDecorator2(self):292 rdr = mockReader(['GET /rest2/man2 HTTP/1.1\r\n',293 HDRE])294 wrt = mockWriter()295 run_coro(server_for_decorators._handler(rdr, wrt))296 expected = ['HTTP/1.1 200 MSG\r\n' +297 'Access-Control-Allow-Methods: GET\r\n' +298 'Connection: close\r\n' +299 'Access-Control-Allow-Headers: *\r\n' +300 'Content-Type: application/json\r\n' +301 'Transfer-Encoding: chunked\r\n' +302 'Access-Control-Allow-Origin: *\r\n\r\n',303 '11\r\n',304 '{"name": user_id}',305 '\r\n',306 '0\r\n\r\n'307 ]308 self.assertEqual(wrt.history, expected)309 self.assertTrue(wrt.closed)310 def testCatchAllDecorator(self):311 # A fresh server for the catchall handler312 server_for_catchall_decorator = webserver()313 # Catchall decorator and handler314 @server_for_catchall_decorator.catchall()315 async def route_for_catchall_decorator(req, resp):316 await resp.start_html()317 await resp.send('my404')318 rdr = mockReader(['GET /this/is/an/invalid/url HTTP/1.1\r\n',319 HDRE])320 wrt = mockWriter()321 server_for_catchall_decorator.conns[id(1)] = None322 run_coro(server_for_catchall_decorator._handler(rdr, wrt))323 expected = ['HTTP/1.0 200 MSG\r\n' +324 'Content-Type: text/html\r\n\r\n',325 'my404']326 self.assertEqual(wrt.history, expected)327 self.assertTrue(wrt.closed)328 async def dummy_handler(self, req, resp):329 """Dummy URL handler. It just records the fact - it has been called"""330 self.dummy_req = req331 self.dummy_resp = resp332 self.dummy_called = True333 async def dummy_post_handler(self, req, resp):334 self.data = await req.read_parse_form_data()335 async def hello_world_handler(self, req, resp):336 await resp.start_html()337 await resp.send('<html><h1>Hello world</h1></html>')338 async def redirect_handler(self, req, resp):339 await resp.redirect('/blahblah', msg='msg:)')340 def testStartHTML(self):341 """Verify that request.start_html() works well"""342 self.srv.add_route('/', self.hello_world_handler)343 rdr = mockReader(['GET / HTTP/1.1\r\n',344 HDR('Host: blah.com'),345 HDRE])346 wrt = mockWriter()347 # "Send" request348 run_coro(self.srv._handler(rdr, wrt))349 # Ensure that proper response "sent"350 self.assertEqual(wrt.history, self.hello_world_history)351 self.assertTrue(wrt.closed)352 def testRedirect(self):353 """Verify that request.start_html() works well"""354 self.srv.add_route('/', self.redirect_handler)355 rdr = mockReader(['GET / HTTP/1.1\r\n',356 HDR('Host: blah.com'),357 HDRE])358 wrt = mockWriter()359 # "Send" request360 run_coro(self.srv._handler(rdr, wrt))361 # Ensure that proper response "sent"362 exp = ['HTTP/1.0 302 MSG\r\n' +363 'Location: /blahblah\r\nContent-Length: 5\r\n\r\n',364 'msg:)']365 self.assertEqual(wrt.history, exp)366 def testRequestBodyUnknownType(self):367 """Unknow HTTP body test - empty dict expected"""368 self.srv.add_route('/', self.dummy_post_handler, methods=['POST'])369 rdr = mockReader(['POST / HTTP/1.1\r\n',370 HDR('Host: blah.com'),371 HDR('Content-Length: 5'),372 HDRE,373 '12345'])374 wrt = mockWriter()375 run_coro(self.srv._handler(rdr, wrt))376 # Check extracted POST body377 self.assertEqual(self.data, {})378 def testRequestBodyJson(self):379 """JSON encoded POST body"""380 self.srv.add_route('/',381 self.dummy_post_handler,382 methods=['POST'],383 save_headers=['Content-Type', 'Content-Length'])384 rdr = mockReader(['POST / HTTP/1.1\r\n',385 HDR('Content-Type: application/json'),386 HDR('Content-Length: 10'),387 HDRE,388 '{"a": "b"}'])389 wrt = mockWriter()390 run_coro(self.srv._handler(rdr, wrt))391 # Check parsed POST body392 self.assertEqual(self.data, {'a': 'b'})393 def testRequestBodyUrlencoded(self):394 """Regular HTML form"""395 self.srv.add_route('/',396 self.dummy_post_handler,397 methods=['POST'],398 save_headers=['Content-Type', 'Content-Length'])399 rdr = mockReader(['POST / HTTP/1.1\r\n',400 HDR('Content-Type: application/x-www-form-urlencoded; charset=UTF-8'),401 HDR('Content-Length: 10'),402 HDRE,403 'a=b&c=%20d'])404 wrt = mockWriter()405 run_coro(self.srv._handler(rdr, wrt))406 # Check parsed POST body407 self.assertEqual(self.data, {'a': 'b', 'c': ' d'})408 def testRequestBodyNegative(self):409 """Regular HTML form"""410 self.srv.add_route('/',411 self.dummy_post_handler,412 methods=['POST'],413 save_headers=['Content-Type', 'Content-Length'])414 rdr = mockReader(['POST / HTTP/1.1\r\n',415 HDR('Content-Type: application/json'),416 HDR('Content-Length: 9'),417 HDRE,418 'some junk'])419 wrt = mockWriter()420 run_coro(self.srv._handler(rdr, wrt))421 # payload broken - HTTP 400 expected422 self.assertEqual(wrt.history, ['HTTP/1.0 400 MSG\r\n\r\n'])423 def testRequestLargeBody(self):424 """Max Body size check"""425 self.srv.add_route('/',426 self.dummy_post_handler,427 methods=['POST'],428 save_headers=['Content-Type', 'Content-Length'],429 max_body_size=5)430 rdr = mockReader(['POST / HTTP/1.1\r\n',431 HDR('Content-Type: application/json'),432 HDR('Content-Length: 9'),433 HDRE,434 'some junk'])435 wrt = mockWriter()436 run_coro(self.srv._handler(rdr, wrt))437 # payload broken - HTTP 400 expected438 self.assertEqual(wrt.history, ['HTTP/1.0 413 MSG\r\n\r\n'])439 async def route_parameterized_handler(self, req, resp, user_name):440 await resp.start_html()441 await resp.send('<html>Hello, {}</html>'.format(user_name))442 def testRouteParameterized(self):443 """Verify that route with params works fine"""444 self.srv.add_route('/db/<user_name>', self.route_parameterized_handler)445 rdr = mockReader(['GET /db/user1 HTTP/1.1\r\n',446 HDR('Host: junk.com'),447 HDRE])448 wrt = mockWriter()449 # "Send" request450 run_coro(self.srv._handler(rdr, wrt))451 # Ensure that proper response "sent"452 expected = ['HTTP/1.0 200 MSG\r\n' +453 'Content-Type: text/html\r\n\r\n',454 '<html>Hello, user1</html>']455 self.assertEqual(wrt.history, expected)456 self.assertTrue(wrt.closed)457 def testParseHeadersOnOff(self):458 """Verify parameter parse_headers works"""459 self.srv.add_route('/', self.dummy_handler, save_headers=['H1', 'H2'])460 rdr = mockReader(['GET / HTTP/1.1\r\n',461 HDR('H1: blah.com'),462 HDR('H2: lalalla'),463 HDR('Junk: fsdfmsdjfgjsdfjunk.com'),464 HDRE])465 # "Send" request466 wrt = mockWriter()467 run_coro(self.srv._handler(rdr, wrt))468 self.assertTrue(self.dummy_called)469 # Check for headers - only 2 of 3 should be collected, others - ignore470 hdrs = {b'H1': b'blah.com',471 b'H2': b'lalalla'}472 self.assertEqual(self.dummy_req.headers, hdrs)473 self.assertTrue(wrt.closed)474 def testDisallowedMethod(self):475 """Verify that server respects allowed methods"""476 self.srv.add_route('/', self.hello_world_handler)477 self.srv.add_route('/post_only', self.dummy_handler, methods=['POST'])478 rdr = mockReader(['GET / HTTP/1.0\r\n',479 HDRE])480 # "Send" GET request, by default GET is enabled481 wrt = mockWriter()482 run_coro(self.srv._handler(rdr, wrt))483 self.assertEqual(wrt.history, self.hello_world_history)484 self.assertTrue(wrt.closed)485 # "Send" GET request to POST only location486 self.srv.conns[id(1)] = None487 self.dummy_called = False488 rdr = mockReader(['GET /post_only HTTP/1.1\r\n',489 HDRE])490 wrt = mockWriter()491 run_coro(self.srv._handler(rdr, wrt))492 # Hanlder should not be called - method not allowed493 self.assertFalse(self.dummy_called)494 exp = ['HTTP/1.0 405 MSG\r\n\r\n']495 self.assertEqual(wrt.history, exp)496 # Connection must be closed497 self.assertTrue(wrt.closed)498 def testAutoOptionsMethod(self):499 """Test auto implementation of OPTIONS method"""500 self.srv.add_route('/', self.hello_world_handler, methods=['POST', 'PUT', 'DELETE'])501 self.srv.add_route('/disabled', self.hello_world_handler, auto_method_options=False)502 rdr = mockReader(['OPTIONS / HTTP/1.0\r\n',503 HDRE])504 wrt = mockWriter()505 run_coro(self.srv._handler(rdr, wrt))506 exp = ['HTTP/1.0 200 MSG\r\n' +507 'Access-Control-Allow-Headers: *\r\n'508 'Content-Length: 0\r\n'509 'Access-Control-Allow-Origin: *\r\n'510 'Access-Control-Allow-Methods: POST, PUT, DELETE\r\n\r\n']511 self.assertEqual(wrt.history, exp)512 self.assertTrue(wrt.closed)513 def testPageNotFound(self):514 """Verify that malformed request generates proper response"""515 rdr = mockReader(['GET /not_existing HTTP/1.1\r\n',516 HDR('Host: blah.com'),517 HDRE])518 wrt = mockWriter()519 run_coro(self.srv._handler(rdr, wrt))520 exp = ['HTTP/1.0 404 MSG\r\n\r\n']521 self.assertEqual(wrt.history, exp)522 # Connection must be closed523 self.assertTrue(wrt.closed)524 def testMalformedRequest(self):525 """Verify that malformed request generates proper response"""526 rdr = mockReader(['GET /\r\n',527 HDR('Host: blah.com'),528 HDRE])529 wrt = mockWriter()530 run_coro(self.srv._handler(rdr, wrt))531 exp = ['HTTP/1.0 400 MSG\r\n\r\n']532 self.assertEqual(wrt.history, exp)533 # Connection must be closed534 self.assertTrue(wrt.closed)535class ResourceGetPost():536 """Simple REST API resource class with just two methods"""537 def get(self, data):538 return {'data1': 'junk'}539 def post(self, data):540 return data541class ResourceGetParam():542 """Parameterized REST API resource"""543 def __init__(self):544 self.user_id = 'user_id'545 def get(self, data, user_id):546 return {self.user_id: user_id}547class ResourceGetArgs():548 """REST API resource with additional arguments"""549 def get(self, data, arg1, arg2):550 return {'arg1': arg1, 'arg2': arg2}551class ResourceGenerator():552 """REST API with generator as result"""553 async def get(self, data):554 yield 'longlongchunkchunk1'555 yield 'chunk2'556 # unicode support557 yield '\u265E'558class ResourceNegative():559 """To cover negative test cases"""560 def delete(self, data):561 # Broken pipe emulation562 raise OSError(32, '', '')563 def put(self, data):564 # Simple unhandled expection565 raise Exception('something')566class ServerResource(unittest.TestCase):567 def setUp(self):568 self.srv = webserver()569 self.srv.conns[id(1)] = None570 self.srv.add_resource(ResourceGetPost, '/')571 self.srv.add_resource(ResourceGetParam, '/param/<user_id>')572 self.srv.add_resource(ResourceGetArgs, '/args', arg1=1, arg2=2)573 self.srv.add_resource(ResourceGenerator, '/gen')574 self.srv.add_resource(ResourceNegative, '/negative')575 def testOptions(self):576 # Ensure that only GET/POST methods are allowed:577 rdr = mockReader(['OPTIONS / HTTP/1.0\r\n',578 HDRE])579 wrt = mockWriter()580 run_coro(self.srv._handler(rdr, wrt))581 exp = ['HTTP/1.0 200 MSG\r\n' +582 'Access-Control-Allow-Headers: *\r\n'583 'Content-Length: 0\r\n'584 'Access-Control-Allow-Origin: *\r\n'585 'Access-Control-Allow-Methods: GET, POST\r\n\r\n']586 self.assertEqual(wrt.history, exp)587 def testGet(self):588 rdr = mockReader(['GET / HTTP/1.0\r\n',589 HDRE])590 wrt = mockWriter()591 run_coro(self.srv._handler(rdr, wrt))592 exp = ['HTTP/1.0 200 MSG\r\n' +593 'Access-Control-Allow-Origin: *\r\n'594 'Access-Control-Allow-Headers: *\r\n'595 'Content-Length: 17\r\n'596 'Access-Control-Allow-Methods: GET, POST\r\n'597 'Content-Type: application/json\r\n\r\n',598 '{"data1": "junk"}']599 self.assertEqual(wrt.history, exp)600 def testGetWithParam(self):601 rdr = mockReader(['GET /param/123 HTTP/1.0\r\n',602 HDRE])603 wrt = mockWriter()604 run_coro(self.srv._handler(rdr, wrt))605 exp = ['HTTP/1.0 200 MSG\r\n' +606 'Access-Control-Allow-Origin: *\r\n'607 'Access-Control-Allow-Headers: *\r\n'608 'Content-Length: 18\r\n'609 'Access-Control-Allow-Methods: GET\r\n'610 'Content-Type: application/json\r\n\r\n',611 '{"user_id": "123"}']612 self.assertEqual(wrt.history, exp)613 def testGetWithArgs(self):614 rdr = mockReader(['GET /args HTTP/1.0\r\n',615 HDRE])616 wrt = mockWriter()617 run_coro(self.srv._handler(rdr, wrt))618 exp = ['HTTP/1.0 200 MSG\r\n' +619 'Access-Control-Allow-Origin: *\r\n'620 'Access-Control-Allow-Headers: *\r\n'621 'Content-Length: 22\r\n'622 'Access-Control-Allow-Methods: GET\r\n'623 'Content-Type: application/json\r\n\r\n',624 '{"arg1": 1, "arg2": 2}']625 self.assertEqual(wrt.history, exp)626 def testGenerator(self):627 rdr = mockReader(['GET /gen HTTP/1.0\r\n',628 HDRE])629 wrt = mockWriter()630 run_coro(self.srv._handler(rdr, wrt))631 exp = ['HTTP/1.1 200 MSG\r\n' +632 'Access-Control-Allow-Methods: GET\r\n' +633 'Connection: close\r\n' +634 'Access-Control-Allow-Headers: *\r\n' +635 'Content-Type: application/json\r\n' +636 'Transfer-Encoding: chunked\r\n' +637 'Access-Control-Allow-Origin: *\r\n\r\n',638 '13\r\n',639 'longlongchunkchunk1',640 '\r\n',641 '6\r\n',642 'chunk2',643 '\r\n',644 # next chunk is 1 char len UTF-8 string645 '3\r\n',646 '\u265E',647 '\r\n',648 '0\r\n\r\n']649 self.assertEqual(wrt.history, exp)650 def testPost(self):651 # Ensure that parameters from query string / body will be combined as well652 rdr = mockReader(['POST /?qs=qs1 HTTP/1.0\r\n',653 HDR('Content-Length: 17'),654 HDR('Content-Type: application/json'),655 HDRE,656 '{"body": "body1"}'])657 wrt = mockWriter()658 run_coro(self.srv._handler(rdr, wrt))659 exp = ['HTTP/1.0 200 MSG\r\n' +660 'Access-Control-Allow-Origin: *\r\n'661 'Access-Control-Allow-Headers: *\r\n'662 'Content-Length: 30\r\n'663 'Access-Control-Allow-Methods: GET, POST\r\n'664 'Content-Type: application/json\r\n\r\n',665 '{"qs": "qs1", "body": "body1"}']666 self.assertEqual(wrt.history, exp)667 def testInvalidMethod(self):668 rdr = mockReader(['PUT / HTTP/1.0\r\n',669 HDRE])670 wrt = mockWriter()671 run_coro(self.srv._handler(rdr, wrt))672 exp = ['HTTP/1.0 405 MSG\r\n\r\n']673 self.assertEqual(wrt.history, exp)674 def testException(self):675 rdr = mockReader(['PUT /negative HTTP/1.0\r\n',676 HDRE])677 wrt = mockWriter()678 run_coro(self.srv._handler(rdr, wrt))679 exp = ['HTTP/1.0 500 MSG\r\n\r\n']680 self.assertEqual(wrt.history, exp)681 def testBrokenPipe(self):682 rdr = mockReader(['DELETE /negative HTTP/1.0\r\n',683 HDRE])684 wrt = mockWriter()685 run_coro(self.srv._handler(rdr, wrt))686 self.assertEqual(wrt.history, [])687class StaticContent(unittest.TestCase):688 def setUp(self):689 self.srv = webserver()690 self.srv.conns[id(1)] = None691 self.tempfn = '__tmp.html'692 self.ctype = None693 self.etype = None694 self.max_age = 2592000695 with open(self.tempfn, 'wb') as f:696 f.write('someContent blah blah')697 def tearDown(self):698 try:699 delete_file(self.tempfn)700 except OSError:701 pass702 async def send_file_handler(self, req, resp):703 await resp.send_file(self.tempfn,704 content_type=self.ctype,705 content_encoding=self.etype,706 max_age=self.max_age)707 def testSendFileManual(self):708 """Verify send_file works great with manually defined parameters"""709 self.ctype = 'text/plain'710 self.etype = 'gzip'711 self.max_age = 100712 self.srv.add_route('/', self.send_file_handler)713 rdr = mockReader(['GET / HTTP/1.0\r\n',714 HDRE])715 wrt = mockWriter()716 run_coro(self.srv._handler(rdr, wrt))717 exp = ['HTTP/1.0 200 MSG\r\n' +718 'Cache-Control: max-age=100, public\r\n'719 'Content-Type: text/plain\r\n'720 'Content-Length: 21\r\n'721 'Content-Encoding: gzip\r\n\r\n',722 bytearray(b'someContent blah blah')]723 self.assertEqual(wrt.history, exp)724 self.assertTrue(wrt.closed)725 def testSendFileNotFound(self):726 """Verify 404 error for non existing files"""727 self.srv.add_route('/', self.send_file_handler)728 rdr = mockReader(['GET / HTTP/1.0\r\n',729 HDRE])730 wrt = mockWriter()731 # Intentionally delete file before request732 delete_file(self.tempfn)733 run_coro(self.srv._handler(rdr, wrt))734 exp = ['HTTP/1.0 404 MSG\r\n\r\n']735 self.assertEqual(wrt.history, exp)736 self.assertTrue(wrt.closed)737 def testSendFileConnectionReset(self):738 self.srv.add_route('/', self.send_file_handler)739 rdr = mockReader(['GET / HTTP/1.0\r\n',740 HDRE])741 # tell mockWrite to raise error during send()742 wrt = mockWriter(generate_expection=OSError(errno.ECONNRESET))743 run_coro(self.srv._handler(rdr, wrt))744 # there should be no payload due to connected reset745 self.assertEqual(wrt.history, [])746 self.assertTrue(wrt.closed)747if __name__ == '__main__':...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

...21 self.running = False22 def run(self):23 self.running = True24 self.loop.run_forever()25 def run_coro(self, coro,wait_for_result=True):26 27 if wait_for_result:28 return asyncio.run_coroutine_threadsafe(coro, loop=self.loop).result()29 else:30 return asyncio.run_coroutine_threadsafe(coro, loop=self.loop)31 def stop(self):32 self.loop.call_soon_threadsafe(self.loop.stop)33 self.join()34 self.running = False35class DecentScale(AsyncioEventLoopThread):36 37 def __init__(self, *args, timeout=20, fix_dropped_command=True, **kwargs):38 super().__init__(*args, **kwargs)39 self.client = None40 self.timeout=timeout41 self.connected=False42 self.fix_dropped_command=fix_dropped_command43 self.dropped_command_sleep = 0.05 # API Docs says 50ms44 self.weight = None 45 46 #Constants47 self.CHAR_READ='0000FFF4-0000-1000-8000-00805F9B34FB'48 self.CHAR_WRITE='000036f5-0000-1000-8000-00805f9b34fb'49 50 51 #Tare the scale by sending "030FFD000000F1". 52 #Each tare needs to increment the 3rd byte pair, 53 #so you can cycle (for instance) though "030FFE000000F2" "030FFF000000F3" "030F000000000C".54 55 tare_commands=[ bytearray.fromhex(c) for c in ['030F000000000C','030F010000000D','030F020000000E']]56 self.tare_commands=cycle(tare_commands)57 58 self.led_on_command=bytearray.fromhex('030A0101000009')59 self.led_off_command=bytearray.fromhex('030A0000000009')60 self.start_time_command=bytearray.fromhex('030B030000000B')61 self.stop_time_command=bytearray.fromhex("030B0000000008")62 self.reset_time_command=bytearray.fromhex("030B020000000A" )63 64 self.daemon=True65 super().start()66 67 def check_connection(func):68 def is_connected(self):69 if self.connected:70 func(self)71 else:72 print("Scale is not connected.")73 return is_connected74 async def _find_address(self):75 76 device = await BleakScanner.find_device_by_filter(77 lambda d, ad: d.name and d.name == 'Decent Scale'78 ,timeout=self.timeout)79 80 if device:81 return device.address82 else:83 print('Error: Scale not found. Trying again...')84 85 async def _connect(self, address):86 87 self.client = BleakClient(address)88 89 if not self.running:90 super().start()91 92 try:93 return await self.client.connect(timeout=self.timeout)94 except Exception as e:95 print('Error:%s\nTrying again...' %e)96 return False 97 98 async def _disconnect(self):99 return await self.client.disconnect() 100 async def __send(self, cmd):101 """Send commands with firmware v1.0 bugfix (resending)"""102 await self.client.write_gatt_char(self.CHAR_WRITE, cmd)103 if self.fix_dropped_command:104 await asyncio.sleep(self.dropped_command_sleep)105 await self.client.write_gatt_char(self.CHAR_WRITE, cmd)106 # Wait 200ms for the command to finish107 # Alternative: receive the notifications and check if the command was acknowledged108 await asyncio.sleep(0.2)109 async def _tare(self):110 await self.__send(next(self.tare_commands))111 async def _led_on(self):112 await self.__send(self.led_on_command)113 async def _led_off(self):114 await self.__send(self.led_off_command)115 async def _start_time(self):116 await self.__send(self.start_time_command)117 async def _stop_time(self):118 await self.__send(self.stop_time_command)119 async def _reset_time(self):120 await self.__send(self.reset_time_command)121 def notification_handler(self, sender, data):122 if data[0] != 0x03 or len(data) != 7:123 # Basic sanity check124 logger.info("Invalid notification: not a Decent Scale?")125 return126 # Calculate XOR127 xor_msg = functools.reduce(operator.xor, data[:-1])128 if xor_msg != data[-1]:129 logger.warning("XOR verification failed for notification")130 return131 if sys.version_info >= (3, 8):132 logger.debug(f"Received Notification at {time.time()}: {binascii.hexlify(data, sep=':')}")133 else:134 logger.debug(f"Received Notification at {time.time()}: {binascii.hexlify(data)}")135 136 # Have to decide by type of the package137 type_ = data[1]138 if type_ in [0xCA, 0xCE]:139 # Weight information140 self.weight = int.from_bytes(data[2:4], byteorder='big', signed=True) / 10141 elif type_ == 0xAA:142 # Button press143 # NOTE: Despite the API documentation saying the XOR field is 0x00, it actually contains the XOR144 logger.debug(f"Button press: {data[2]}, duration: {data[3]}")145 elif type_ == 0x0F:146 # tare increment147 pass148 elif type_ == 0x0A:149 # LED on/off -> returns units and battery level150 logger.debug(f"Unit of scale: {'g' if data[3] == 0 else 'oz'}, battery level: {data[4]}%")151 elif type_ == 0x0B:152 # Timer153 # NOTE: The API documentation says there is a section on "Receiving Timer Info" but this is missing154 pass155 else:156 logger.warning(f"Unknown Notification Type received: 0x{type_:02x}")157 async def _enable_notification(self):158 await self.client.start_notify(self.CHAR_READ, self.notification_handler)159 await asyncio.sleep(1)160 161 162 async def _disable_notification(self):163 await self.client.stop_notify(self.CHAR_READ) 164 @check_connection 165 def enable_notification(self): 166 return self.run_coro(self._enable_notification())167 168 @check_connection 169 def disable_notification(self): 170 self.weight=None171 return self.run_coro(self._disable_notification())172 173 def find_address(self): 174 return self.run_coro(self._find_address())175 176 def connect(self,address):177 if not self.connected:178 self.connected= self.run_coro(self._connect(address))179 180 if self.connected:181 self.led_off()182 self.led_on()183 else:184 print('Already connected.')185 186 return self.connected187 188 def disconnect(self):189 if self.connected:190 self.connected= not self.run_coro(self._disconnect())191 else:192 print('Already disconnected.')193 194 return not self.connected195 196 def auto_connect(self,n_retries=3): 197 address = None198 for i in range(n_retries):199 address=self.find_address()200 if address:201 print('Found Decent Scale: %s' % address)202 break203 else:204 print(i)205 206 if address: 207 for i in range(n_retries):208 if self.connect(address):209 print('Scale connected!')210 return True211 212 213 print('Autoconnect failed. Make sure the scale is on.')214 return False215 216 @check_connection 217 def tare(self): 218 self.run_coro(self._tare())219 220 @check_connection 221 def start_time(self): 222 self.run_coro(self._start_time())223 224 @check_connection 225 def stop_time(self): 226 self.run_coro(self._stop_time())227 228 @check_connection 229 def reset_time(self): 230 self.run_coro(self._reset_time())231 @check_connection 232 def led_off(self): 233 self.run_coro(self._led_off())234 235 @check_connection 236 def led_on(self): 237 self.run_coro(self._led_on())238 ...

Full Screen

Full Screen

sync_client.py

Source:sync_client.py Github

copy

Full Screen

...8 self.running = False9 def run(self):10 self.running = True11 self.loop.run_forever()12 def run_coro(self, coro):13 return asyncio.run_coroutine_threadsafe(coro, loop=self.loop).result()14 def stop(self):15 self.loop.call_soon_threadsafe(self.loop.stop)16 self.join()17 self.running = False18class TonlibClientSync:19 def __init__(self,20 ls_index,21 config,22 keystore,23 cdll_path=None,24 verbosity_level=0):25 self.thread = AsyncioEventLoopThread()26 self.thread.start()27 self.client_impl = TonlibClient(ls_index, config, keystore, self.thread.loop, cdll_path, verbosity_level)28 self.thread.run_coro(self.client_impl.init())29 def __del__(self):30 del self.client_impl31 self.thread.stop()32 def set_verbosity_level(self, level):33 return self.thread.run_coro(self.client_impl.set_verbosity_level(level))34 def raw_get_transactions(self, account_address: str, from_transaction_lt: str, from_transaction_hash: str):35 return self.thread.run_coro(self.client_impl.raw_get_transactions(account_address, from_transaction_lt, from_transaction_hash))36 def raw_get_account_state(self, address: str):37 return self.thread.run_coro(self.client_impl.raw_get_account_state(address))38 def generic_get_account_state(self, address: str):39 return self.thread.run_coro(self.client_impl.generic_get_account_state(address))40 def _load_contract(self, address):41 return self.thread.run_coro(self.client_impl._load_contract(address))42 def raw_run_method(self, address, method, stack_data, output_layout=None):43 return self.thread.run_coro(self.client_impl.raw_run_method(address, method, stack_data, output_layout))44 def raw_send_message(self, serialized_boc):45 return self.thread.run_coro(self.client_impl.raw_send_message(serialized_boc))46 def _raw_create_query(self, destination, body, init_code=b'', init_data=b''):47 return self.thread.run_coro(self.client_impl._raw_create_query(destination, body, init_code, init_data))48 def _raw_send_query(self, query_info):49 return self.thread.run_coro(self.client_impl._raw_send_query(query_info))50 51 def raw_create_and_send_query(self, destination, body, init_code=b'', init_data=b''):52 return self.thread.run_coro(self.client_impl.raw_create_and_send_query(destination, body, init_code, init_data))53 def raw_create_and_send_message(self, destination, body, initial_account_state=b''):54 return self.thread.run_coro(self.client_impl.raw_create_and_send_message(destination, body, initial_account_state))55 def raw_estimate_fees(self, destination, body, init_code=b'', init_data=b'', ignore_chksig=True):56 return self.thread.run_coro(self.client_impl.raw_estimate_fees(destination, body, init_code, init_data, ignore_chksig))57 def raw_getBlockTransactions(self, fullblock, count, after_tx):58 return self.thread.run_coro(self.client_impl.raw_getBlockTransactions(fullblock, count, after_tx))59 def raw_getBlockTransactionsExt(self, fullblock, count, after_tx):60 return self.thread.run_coro(self.client_impl.raw_getBlockTransactionsExt(fullblock, count, after_tx))61 def getTransactions(self, account_address, from_transaction_lt=None, from_transaction_hash=None, to_transaction_lt=0, limit=10):62 return self.thread.run_coro(self.client_impl.getTransactions(account_address, from_transaction_lt, from_transaction_hash, to_transaction_lt, limit))63 def getMasterchainInfo(self):64 return self.thread.run_coro(self.client_impl.getMasterchainInfo())65 def lookupBlock(self, workchain, shard, seqno=None, lt=None, unixtime=None):66 return self.thread.run_coro(self.client_impl.lookupBlock(workchain, shard, seqno, lt, unixtime))67 def getShards(self, master_seqno=None, lt=None, unixtime=None):68 return self.thread.run_coro(self.client_impl.getShards(master_seqno, lt, unixtime))69 def getBlockTransactions(self, workchain, shard, seqno, count, root_hash=None, file_hash=None, after_lt=None, after_hash=None):70 return self.thread.run_coro(self.client_impl.getBlockTransactions(workchain, shard, seqno, count, root_hash, file_hash, after_lt, after_hash))71 def getBlockTransactionsExt(self, workchain, shard, seqno, count, root_hash=None, file_hash=None, after_lt=None, after_hash=None):72 return self.thread.run_coro(self.client_impl.getBlockTransactionsExt(workchain, shard, seqno, count, root_hash, file_hash, after_lt, after_hash))73 def getBlockHeader(self, workchain, shard, seqno, root_hash=None, file_hash=None):74 return self.thread.run_coro(self.client_impl.getBlockHeader(workchain, shard, seqno, root_hash, file_hash))75 def tryLocateTxByIncomingMessage(self, source, destination, creation_lt):76 return self.thread.run_coro(self.client_impl.tryLocateTxByIncomingMessage(source, destination, creation_lt))77 def tryLocateTxByOutcomingMessage(self, source, destination, creation_lt):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Molotov automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful