How to use async_coro method in pytest-asyncio

Best Python code snippet using pytest-asyncio_python

test_simple.py

Source:test_simple.py Github

copy

Full Screen

...3import os4import pytest5import pytest_asyncio.plugin6@asyncio.coroutine7def async_coro(loop=None):8 """A very simple coroutine."""9 yield from asyncio.sleep(0, loop=loop)10 return 'ok'11def test_event_loop_fixture(event_loop):12 """Test the injection of the event_loop fixture."""13 assert event_loop14 ret = event_loop.run_until_complete(async_coro(event_loop))15 assert ret == 'ok'16def test_event_loop_processpool_fixture(event_loop_process_pool):17 """Test the injection of the event_loop with a process pool fixture."""18 assert event_loop_process_pool19 ret = event_loop_process_pool.run_until_complete(20 async_coro(event_loop_process_pool))21 assert ret == 'ok'22 this_pid = os.getpid()23 future = event_loop_process_pool.run_in_executor(None, os.getpid)24 pool_pid = event_loop_process_pool.run_until_complete(future)25 assert this_pid != pool_pid26@pytest.mark.asyncio27def test_asyncio_marker():28 """Test the asyncio pytest marker."""29 yield # sleep(0)30@pytest.mark.xfail(reason='need a failure', strict=True)31@pytest.mark.asyncio32def test_asyncio_marker_fail():33 assert False34@pytest.mark.asyncio35def test_asyncio_marker_with_default_param(a_param=None):36 """Test the asyncio pytest marker."""37 yield # sleep(0)38@pytest.mark.asyncio_process_pool39def test_asyncio_process_pool_marker(event_loop):40 """Test the asyncio pytest marker."""41 ret = yield from async_coro(event_loop)42 assert ret == 'ok'43@pytest.mark.asyncio44def test_unused_port_fixture(unused_tcp_port, event_loop):45 """Test the unused TCP port fixture."""46 @asyncio.coroutine47 def closer(_, writer):48 writer.close()49 server1 = yield from asyncio.start_server(closer, host='localhost',50 port=unused_tcp_port,51 loop=event_loop)52 with pytest.raises(IOError):53 yield from asyncio.start_server(closer, host='localhost',54 port=unused_tcp_port,55 loop=event_loop)56 server1.close()57 yield from server1.wait_closed()58@pytest.mark.asyncio59def test_unused_port_factory_fixture(unused_tcp_port_factory, event_loop):60 """Test the unused TCP port factory fixture."""61 @asyncio.coroutine62 def closer(_, writer):63 writer.close()64 port1, port2, port3 = (unused_tcp_port_factory(), unused_tcp_port_factory(),65 unused_tcp_port_factory())66 server1 = yield from asyncio.start_server(closer, host='localhost',67 port=port1,68 loop=event_loop)69 server2 = yield from asyncio.start_server(closer, host='localhost',70 port=port2,71 loop=event_loop)72 server3 = yield from asyncio.start_server(closer, host='localhost',73 port=port3,74 loop=event_loop)75 for port in port1, port2, port3:76 with pytest.raises(IOError):77 yield from asyncio.start_server(closer, host='localhost',78 port=port,79 loop=event_loop)80 server1.close()81 yield from server1.wait_closed()82 server2.close()83 yield from server2.wait_closed()84 server3.close()85 yield from server3.wait_closed()86def test_unused_port_factory_duplicate(unused_tcp_port_factory, monkeypatch):87 """Test correct avoidance of duplicate ports."""88 counter = 089 def mock_unused_tcp_port():90 """Force some duplicate ports."""91 nonlocal counter92 counter += 193 if counter < 5:94 return 1000095 else:96 return 10000 + counter97 monkeypatch.setattr(pytest_asyncio.plugin, 'unused_tcp_port',98 mock_unused_tcp_port)99 assert unused_tcp_port_factory() == 10000100 assert unused_tcp_port_factory() > 10000101class Test:102 """Test that asyncio marked functions work in test methods."""103 @pytest.mark.asyncio104 def test_asyncio_marker_method(self, event_loop):105 """Test the asyncio pytest marker in a Test class."""106 ret = yield from async_coro(event_loop)107 assert ret == 'ok'108class TestUnexistingLoop:109 @pytest.fixture110 def remove_loop(self):111 old_loop = asyncio.get_event_loop()112 asyncio.set_event_loop(None)113 yield114 asyncio.set_event_loop(old_loop)115 @pytest.mark.asyncio116 def test_asyncio_marker_without_loop(self, remove_loop):117 """Test the asyncio pytest marker in a Test class."""118 ret = yield from async_coro()...

Full Screen

Full Screen

teste_simple.py

Source:teste_simple.py Github

copy

Full Screen

2import asyncio3import os4import pytest5import pytest_asyncio.plugin6async def async_coro(loop=None):7 """A very simple coroutine."""8 await asyncio.sleep(0, loop=loop)9 return 'ok'10def test_event_loop_fixture(event_loop):11 """Test the injection of the event_loop fixture."""12 assert event_loop13 ret = event_loop.run_until_complete(async_coro(event_loop))14 assert ret == 'ok'15@pytest.mark.asyncio16def test_asyncio_marker():17 """Test the asyncio pytest marker."""18 yield # sleep(0)19@pytest.mark.xfail(reason='need a failure', strict=True)20@pytest.mark.asyncio21def test_asyncio_marker_fail():22 assert False23@pytest.mark.asyncio24def test_asyncio_marker_with_default_param(a_param=None):25 """Test the asyncio pytest marker."""26 yield # sleep(0)27@pytest.mark.asyncio28async def test_unused_port_fixture(unused_tcp_port, event_loop):29 """Test the unused TCP port fixture."""30 async def closer(_, writer):31 writer.close()32 server1 = await asyncio.start_server(closer, host='localhost',33 port=unused_tcp_port,34 loop=event_loop)35 with pytest.raises(IOError):36 await asyncio.start_server(closer, host='localhost',37 port=unused_tcp_port,38 loop=event_loop)39 server1.close()40 await server1.wait_closed()41@pytest.mark.asyncio42async def test_unused_port_factory_fixture(unused_tcp_port_factory, event_loop):43 """Test the unused TCP port factory fixture."""44 async def closer(_, writer):45 writer.close()46 port1, port2, port3 = (unused_tcp_port_factory(), unused_tcp_port_factory(),47 unused_tcp_port_factory())48 server1 = await asyncio.start_server(closer, host='localhost',49 port=port1,50 loop=event_loop)51 server2 = await asyncio.start_server(closer, host='localhost',52 port=port2,53 loop=event_loop)54 server3 = await asyncio.start_server(closer, host='localhost',55 port=port3,56 loop=event_loop)57 for port in port1, port2, port3:58 with pytest.raises(IOError):59 await asyncio.start_server(closer, host='localhost',60 port=port,61 loop=event_loop)62 server1.close()63 await server1.wait_closed()64 server2.close()65 await server2.wait_closed()66 server3.close()67 await server3.wait_closed()68def test_unused_port_factory_duplicate(unused_tcp_port_factory, monkeypatch):69 """Test correct avoidance of duplicate ports."""70 counter = 071 def mock_unused_tcp_port():72 """Force some duplicate ports."""73 nonlocal counter74 counter += 175 if counter < 5:76 return 1000077 else:78 return 10000 + counter79 monkeypatch.setattr(pytest_asyncio.plugin, '_unused_tcp_port',80 mock_unused_tcp_port)81 assert unused_tcp_port_factory() == 1000082 assert unused_tcp_port_factory() > 1000083class Test:84 """Test that asyncio marked functions work in test methods."""85 @pytest.mark.asyncio86 async def test_asyncio_marker_method(self, event_loop):87 """Test the asyncio pytest marker in a Test class."""88 ret = await async_coro(event_loop)89 assert ret == 'ok'90class TestUnexistingLoop:91 @pytest.fixture92 def remove_loop(self):93 old_loop = asyncio.get_event_loop()94 asyncio.set_event_loop(None)95 yield96 asyncio.set_event_loop(old_loop)97 @pytest.mark.asyncio98 async def test_asyncio_marker_without_loop(self, remove_loop):99 """Test the asyncio pytest marker in a Test class."""100 ret = await async_coro()...

Full Screen

Full Screen

urls.py

Source:urls.py Github

copy

Full Screen

1from frontik_patterns.pages import index, example_request2from frontik_patterns.pages.async_coro import (3 tasks as async_coro_tasks,4 serial as async_coro_serial,5 parallel as async_coro_parallel,6 parallel_dict as async_coro_parallel_dict,7 parallel_coro as async_coro_parallel_coro8)9from frontik_patterns.pages.tornado_coro import (10 tasks as tornado_coro_tasks,11 serial as tornado_coro_serial,12 parallel as tornado_coro_parallel,13 parallel_dict as tornado_coro_parallel_dict,14 parallel_coro as tornado_coro_parallel_coro15)16from frontik_patterns.pages.tornado_callbacks import (17 tasks as tornado_callbacks_tasks,18 serial as tornado_callbacks_serial,19 parallel as tornado_callbacks_parallel20)21def url(matcher):22 return r'^{}/?(?:\?.*)?$'.format(matcher)23URLS = [24 # Main25 (url(r'/'), index.Page),26 (url(r'/example_request'), example_request.Page),27 # Native coroutines28 (url(r'/async_coro/serial'), async_coro_serial.Page),29 (url(r'/async_coro/parallel'), async_coro_parallel.Page),30 (url(r'/async_coro/parallel_dict'), async_coro_parallel_dict.Page),31 (url(r'/async_coro/parallel_coro'), async_coro_parallel_coro.Page),32 (url(r'/async_coro/tasks'), async_coro_tasks.Page),33 # Tornado coroutines (deprecated)34 (url(r'/tornado_coro/serial'), tornado_coro_serial.Page),35 (url(r'/tornado_coro/parallel'), tornado_coro_parallel.Page),36 (url(r'/tornado_coro/parallel_dict'), tornado_coro_parallel_dict.Page),37 (url(r'/tornado_coro/parallel_coro'), tornado_coro_parallel_coro.Page),38 (url(r'/tornado_coro/tasks'), tornado_coro_tasks.Page),39 # Tornado callbacks (deprecated)40 (url(r'/tornado_callbacks/serial'), tornado_callbacks_serial.Page),41 (url(r'/tornado_callbacks/parallel'), tornado_callbacks_parallel.Page),42 (url(r'/tornado_callbacks/tasks'), tornado_callbacks_tasks.Page),43 # Default rule44 # (r'.*', error_page.Page)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pytest-asyncio automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful