How to use run_coroutine method in localstack

Best Python code snippet using localstack_python

tests.py

Source:tests.py Github

copy

Full Screen

...104 cls.loop = asyncio.new_event_loop()105 def setUp(self):106 super().setUp()107 set_up_config(self)108 def run_coroutine(self, coroutine):109 return self.loop.run_until_complete(coroutine)110 class HTTPAioTestCase(BaseAioTestCase):111 def setUp(self):112 super().setUp()113 self.client = pushka._http.aio.AioHTTPClient(self.loop)114 def test_get_method(self):115 @asyncio.coroutine116 def test():117 result = yield from self.client.get('http://httpbin.org/get')118 self.assertEqual(result['code'], 200)119 self.run_coroutine(test())120 def test_get_404(self):121 @asyncio.coroutine122 def test():123 result = yield from self.client.get('http://httpbin.org/status/404')124 self.assertEqual(result['code'], 404)125 self.run_coroutine(test())126 def test_post_method(self):127 @asyncio.coroutine128 def test():129 result = yield from self.client.post('http://httpbin.org/post', data={})130 self.assertEqual(result['code'], 200)131 self.run_coroutine(test())132 class ParseAioTestCase(BaseAioTestCase):133 """Test Parse on Tornado.134 """135 def setUp(self):136 super().setUp()137 self.push_service = create_parse(self.loop)138 if not self.push_service:139 self.skipTest("Parse not configured")140 elif not self.push_token:141 self.skipTest("No push token")142 def test_register_and_send_push(self):143 self.run_coroutine(register_and_send_push(self))144 class TwilioAioTestCase(BaseAioTestCase):145 """Test Twilio on aiohttp.146 """147 def setUp(self):148 super().setUp()149 self.sms = create_twilio(self.loop)150 if not self.sms:151 self.skipTest("Twilio not configured")152 def test_register_and_send_push(self):153 self.run_coroutine(send_sms(self))154 class SesAioTestCase(BaseAioTestCase):155 """Test SES on aiohttp.156 """157 def setUp(self):158 super().setUp()159 self.ses = create_ses(self.loop)160 if not self.ses:161 self.skipTest("Amazon SES not configured")162 def test_send_mail(self):163 self.run_coroutine(send_mail(self))164#########################165# Tornado powered tests #166#########################167if tornado:168 import pushka._http.tornado169 170 IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')171 class BaseTornadoTestCase(tornado.testing.AsyncTestCase):172 def setUp(self):173 super().setUp()174 set_up_config(self)175 self.aio_loop = self.io_loop.asyncio_loop176 def run_coroutine(self, coroutine):177 """ Run coroutine on specified loop and set result178 to Tornado's Future object.179 """180 future = tornado.concurrent.Future()181 @asyncio.coroutine182 def run():183 try:184 result = yield from coroutine185 future.set_result(result)186 except Exception as e:187 future.set_exception(e)188 self.aio_loop.create_task(run())189 return future190 191 class HTTPTornadoTestCase(BaseTornadoTestCase):192 def setUp(self):193 super().setUp()194 self.client = pushka._http.tornado.TornadoHTTPClient(self.io_loop)195 @tornado.testing.gen_test196 def test_get_method(self):197 @asyncio.coroutine198 def test():199 result = yield from self.client.get('http://httpbin.org/get')200 self.assertEqual(result['code'], 200)201 yield self.run_coroutine(test())202 @tornado.testing.gen_test203 def test_get_404(self):204 @asyncio.coroutine205 def test():206 result = yield from self.client.get('http://httpbin.org/status/404')207 self.assertEqual(result['code'], 404)208 yield self.run_coroutine(test())209 @tornado.testing.gen_test210 def test_post_method(self):211 @asyncio.coroutine212 def test():213 result = yield from self.client.post('http://httpbin.org/post', data={})214 self.assertEqual(result['code'], 200)215 yield self.run_coroutine(test())216 class ParseTornadoTestCase(BaseTornadoTestCase):217 """218 Test Parse service client on Tornado.219 """220 def setUp(self):221 super().setUp()222 self.push_service = create_parse(self.io_loop)223 if not self.push_service:224 self.skipTest("Parse not configured")225 elif not self.push_token:226 self.skipTest("No push token")227 @tornado.testing.gen_test228 def test_register_and_send_push(self):229 yield self.run_coroutine(register_and_send_push(self))230 class TwilioTornadoTestCase(BaseTornadoTestCase):231 """Test Twilio on Tornado.232 """233 def setUp(self):234 super().setUp()235 self.sms = create_twilio(self.io_loop)236 if not self.sms:237 self.skipTest("Twilio not configured")238 @tornado.testing.gen_test239 def test_register_and_send_push(self):240 yield self.run_coroutine(send_sms(self))241 class SesTornadoTestCase(BaseTornadoTestCase):242 """Test SES on Tornado.243 """244 def setUp(self):245 super().setUp()246 self.ses = create_ses(self.io_loop)247 if not self.ses:248 self.skipTest("Amazon SES not configured")249 @tornado.testing.gen_test250 def test_send_mail(self):251 yield self.run_coroutine(send_mail(self))252if __name__ == '__main__':...

Full Screen

Full Screen

test_http.py

Source:test_http.py Github

copy

Full Screen

...25 transport = MockTransport()26 protocol = HttpProtocol(mock_consumer, loop)27 protocol.connection_made(transport)28 return protocol29def run_coroutine(coroutine):30 loop = asyncio.get_event_loop()31 return loop.run_until_complete(coroutine)32def read_body(message, channels):33 body = message.get('body', b'')34 if 'body' in channels:35 while True:36 message_chunk = run_coroutine(channels['body'].receive())37 body += message_chunk['content']38 if not message_chunk.get('more_content', False):39 break40 return body41# Test cases...42def test_get_request():43 protocol = get_protocol()44 protocol.data_received(b'GET / HTTP/1.1\r\nHost: example.org\r\n\r\n')45 assert protocol.active_request is not None46 message, channels = protocol.active_request47 assert message['method'] == 'GET'48 assert message['path'] == '/'49 assert message['headers'] == [[b'host', b'example.org']]50 run_coroutine(channels['reply'].send({'status': 200, 'headers': [[b'content-length', b'3']], 'content': b'abc'}))51 output = protocol.transport.content52 assert b'content-length: 3' in output53 assert output.endswith(b'abc')54def test_post_request():55 protocol = get_protocol()56 protocol.data_received(b'POST / HTTP/1.1\r\nContent-Length: 12\r\n\r\nHello, world')57 assert protocol.active_request is not None58 message, channels = protocol.active_request59 assert message['method'] == 'POST'60 assert message['path'] == '/'61 body = read_body(message, channels)62 assert body == b'Hello, world'63def test_pipelined_request():64 protocol = get_protocol()65 protocol.data_received(66 b'GET /1 HTTP/1.1\r\nHost: example.org\r\n\r\n'67 b'GET /2 HTTP/1.1\r\nHost: example.org\r\n\r\n'68 b'GET /3 HTTP/1.1\r\nHost: example.org\r\n\r\n'69 )70 assert protocol.active_request is not None71 assert len(protocol.pipeline_queue) == 272 message, channels = protocol.active_request73 assert message['method'] == 'GET'74 assert message['path'] == '/1'75 assert message['headers'] == [[b'host', b'example.org']]76 run_coroutine(channels['reply'].send({'status': 204}))77 assert protocol.active_request is not None78 assert len(protocol.pipeline_queue) == 179 message, channels = protocol.active_request80 assert message['method'] == 'GET'81 assert message['path'] == '/2'82 assert message['headers'] == [[b'host', b'example.org']]83 run_coroutine(channels['reply'].send({'status': 204}))84 assert protocol.active_request is not None85 assert len(protocol.pipeline_queue) == 086 message, channels = protocol.active_request87 assert message['method'] == 'GET'88 assert message['path'] == '/3'89 assert message['headers'] == [[b'host', b'example.org']]90 run_coroutine(channels['reply'].send({'status': 204}))91 assert protocol.active_request is None92 assert len(protocol.pipeline_queue) == 093def test_release_request_body():94 protocol = get_protocol()95 protocol.data_received(b'POST / HTTP/1.1\r\nContent-Length: 100\r\n\r\n')96 # Send half of the request body.97 for idx in range(5):98 protocol.data_received(b'0123456789')99 assert protocol.buffer_size == 50100 # Sending the response should release the buffer.101 message, channels = protocol.active_request102 run_coroutine(channels['reply'].send({'status': 204}))103 assert protocol.buffer_size == 0104 # Sending the remaining request body shouldn't buffer any more data.105 for idx in range(5):106 protocol.data_received(b'0123456789')107 assert protocol.buffer_size == 0108def test_chunked_response():109 protocol = get_protocol()110 protocol.data_received(b'GET /1 HTTP/1.1\r\nHost: example.org\r\n\r\n')111 assert protocol.active_request is not None112 message, channels = protocol.active_request113 run_coroutine(channels['reply'].send({'status': 200, 'content': b'123', 'more_content': True}))114 run_coroutine(channels['reply'].send({'content': b'456', 'more_content': True}))115 run_coroutine(channels['reply'].send({'content': b'789', 'more_content': False}))116 output = protocol.transport.content117 assert b'transfer-encoding: chunked\r\n' in output118 assert output.endswith(b'3\r\n123\r\n3\r\n456\r\n3\r\n789\r\n0\r\n\r\n')119def test_server_connection_close():120 protocol = get_protocol()121 protocol.data_received(b'GET /1 HTTP/1.1\r\nHost: example.org\r\n\r\n')122 transport = protocol.transport123 assert protocol.active_request is not None124 message, channels = protocol.active_request125 run_coroutine(channels['reply'].send({'status': 204,'headers': [[b'connection', b'close']]}))126 assert transport.closed127 assert protocol.transport is None128def test_client_connection_close():129 protocol = get_protocol()130 protocol.data_received(b'GET /1 HTTP/1.1\r\nConnection: close\r\n\r\n')131 transport = protocol.transport132 assert protocol.active_request is not None133 message, channels = protocol.active_request134 run_coroutine(channels['reply'].send({'status': 204}))135 assert transport.closed...

Full Screen

Full Screen

agent_backchannel_client.py

Source:agent_backchannel_client.py Github

copy

Full Screen

...4from runners.agent_container import AgentContainer, create_agent_with_args_list5######################################################################6# coroutine utilities7######################################################################8def run_coroutine(coroutine, *args, **kwargs):9 loop = asyncio.get_event_loop()10 if not loop:11 loop = asyncio.new_event_loop()12 asyncio.set_event_loop(loop)13 try:14 return loop.run_until_complete(coroutine(*args, **kwargs))15 finally:16 pass17 # loop.close()18def async_sleep(delay):19 run_coroutine(asyncio.sleep, delay)20######################################################################21# high level aries agent interface22######################################################################23def create_agent_container_with_args(in_args: list):24 return run_coroutine(create_agent_with_args_list, in_args)25def aries_container_initialize(26 the_container: AgentContainer,27 schema_name: str = None,28 schema_attrs: list = None,29):30 run_coroutine(31 the_container.initialize,32 schema_name=schema_name,33 schema_attrs=schema_attrs,34 )35def agent_container_register_did(36 the_container: AgentContainer,37 did: str,38 verkey: str,39 role: str,40):41 run_coroutine(42 the_container.register_did,43 did,44 verkey,45 role,46 )47def aries_container_terminate(48 the_container: AgentContainer,49):50 return run_coroutine(the_container.terminate)51def aries_container_generate_invitation(52 the_container: AgentContainer,53):54 return run_coroutine(55 the_container.generate_invitation,56 )57def aries_container_receive_invitation(58 the_container: AgentContainer,59 invite_details: dict,60):61 return run_coroutine(62 the_container.input_invitation,63 invite_details,64 )65def aries_container_detect_connection(66 the_container: AgentContainer,67):68 run_coroutine(the_container.detect_connection)69def aries_container_create_schema_cred_def(70 the_container: AgentContainer,71 schema_name: str,72 schema_attrs: list,73 version: str = None,74):75 return run_coroutine(76 the_container.create_schema_and_cred_def,77 schema_name,78 schema_attrs,79 version=version,80 )81def aries_container_issue_credential(82 the_container: AgentContainer,83 cred_def_id: str,84 cred_attrs: list,85):86 return run_coroutine(87 the_container.issue_credential,88 cred_def_id,89 cred_attrs,90 )91def aries_container_receive_credential(92 the_container: AgentContainer,93 cred_def_id: str,94 cred_attrs: list,95):96 return run_coroutine(97 the_container.receive_credential,98 cred_def_id,99 cred_attrs,100 )101def aries_container_request_proof(102 the_container: AgentContainer,103 proof_request: dict,104):105 return run_coroutine(106 the_container.request_proof,107 proof_request,108 )109def aries_container_verify_proof(110 the_container: AgentContainer,111 proof_request: dict,112):113 return run_coroutine(114 the_container.verify_proof,115 proof_request,116 )117######################################################################118# aries agent admin api interface119######################################################################120######################################################################121# general utilities122######################################################################123def read_json_data(file_name: str):124 with open("features/data/" + file_name) as data_file:125 return json.load(data_file)126def read_schema_data(schema_name: str):127 return read_json_data("schema_" + schema_name + ".json")128def read_credential_data(schema_name: str, cred_scenario_name: str):129 schema_cred_data = read_json_data("cred_data_schema_" + schema_name + ".json")130 cred_data = schema_cred_data[cred_scenario_name]131 for attr in cred_data["attributes"]:132 if attr["value"] == "@uuid":133 attr["value"] = str(uuid.uuid4())134 return cred_data["attributes"]135def read_proof_req_data(proof_req_name: str):136 proof_request_info = read_json_data("proof_request_" + proof_req_name + ".json")137 return proof_request_info["presentation_proposal"]138def read_presentation_data(presentation_name: str):139 return read_json_data("presentation_" + presentation_name + ".json")140######################################################################141# probably obsolete ...142######################################################################143def agent_container_GET(144 the_container: AgentContainer,145 path: str,146 text: bool = False,147 params: dict = None,148) -> dict:149 return run_coroutine(150 the_container.admin_GET,151 path,152 text=text,153 params=params,154 )155def agent_container_POST(156 the_container: AgentContainer,157 path: str,158 data: dict = None,159 text: bool = False,160 params: dict = None,161) -> dict:162 return run_coroutine(163 the_container.admin_POST,164 path,165 data=data,166 text=text,167 params=params,168 )169def agent_container_PATCH(170 the_container: AgentContainer,171 path: str,172 data: dict = None,173 text: bool = False,174 params: dict = None,175) -> dict:176 return run_coroutine(177 the_container.admin_PATCH,178 path,179 data=data,180 text=text,181 params=params,182 )183def agent_container_PUT(184 the_container: AgentContainer,185 path: str,186 data: dict = None,187 text: bool = False,188 params: dict = None,189) -> dict:190 return run_coroutine(191 the_container.admin_PUT,192 path,193 data=data,194 text=text,195 params=params,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful