How to use event_loop method in pytest-asyncio

Best Python code snippet using pytest-asyncio_python

PipelineProcess.py

Source:PipelineProcess.py Github

copy

Full Screen

...20class TestConstruct(TmpDirMixin):21 def runTest(self):22 cwd = pathlib.Path(self.tmpdir.name).resolve()23 args = [which('touch'), "tmp.txt"]24 event_loop = asyncio.new_event_loop()25 with NullStream() as null_stream:26 process = PipelineProcess(27 event_loop,28 cwd = cwd,29 env = {},30 args = args,31 stdin_stream = null_stream,32 stdout_stream = null_stream,33 stderr_stream = null_stream34 )35 self.assertEqual(process.cwd, cwd)36 self.assertEqual(process.args, args)37 str(process)38 repr(process)39@export40@register()41class TestCreateAndWaitAsync(TmpDirMixin):42 def runTest(self):43 cwd = pathlib.Path(self.tmpdir.name)44 test_file = cwd / 'tmp.txt'45 args = [which('touch'), test_file]46 event_loop = asyncio.new_event_loop()47 async def run_and_wait():48 with NullStream() as null_stream:49 process = await PipelineProcess.create(50 event_loop,51 cwd = cwd.resolve(),52 env = {},53 args = args,54 stdin_stream = null_stream,55 stdout_stream = null_stream,56 stderr_stream = null_stream57 )58 await process.wait_async()59 return process60 process = event_loop.run_until_complete(run_and_wait())61 self.assertEqual(process.proc.returncode, 0)62 self.assertTrue(test_file.exists())63@export64@register()65class TestCreatePoll(TmpDirMixin):66 def runTest(self):67 cwd = pathlib.Path(self.tmpdir.name)68 test_file = cwd / 'tmp.txt'69 args = [which('touch'), test_file]70 event_loop = asyncio.new_event_loop()71 async def run_and_wait(event_loop):72 with NullStream() as null_stream:73 process = await PipelineProcess.create(74 event_loop,75 cwd = cwd.resolve(),76 env = {},77 args = args,78 stdin_stream = null_stream,79 stdout_stream = null_stream,80 stderr_stream = null_stream81 )82 time.sleep(0.1)83 return process84 process = event_loop.run_until_complete(run_and_wait(event_loop))85 86 poll_rc = process.poll()87 poll2_rc = process.poll()88 self.assertEqual(process.proc.returncode, 0)89 self.assertEqual(poll_rc, [process.proc.returncode])90 self.assertEqual(poll2_rc, [process.proc.returncode])91 self.assertTrue(test_file.exists())92@export93@register()94class TestCreateTerminatePoll(TmpDirMixin):95 def runTest(self):96 cwd = pathlib.Path(self.tmpdir.name)97 args = [which('sleep'), "1"]98 event_loop = asyncio.new_event_loop()99 async def run_and_wait(event_loop):100 with NullStream() as null_stream:101 process = await PipelineProcess.create(102 event_loop,103 cwd = cwd.resolve(),104 env = {},105 args = args,106 stdin_stream = null_stream,107 stdout_stream = null_stream,108 stderr_stream = null_stream109 )110 return process111 process = event_loop.run_until_complete(run_and_wait(event_loop))112 113 process.terminate()114 poll_rc = process.poll(0.1)115 self.assertEqual(process.proc.returncode, -signal.SIGTERM)116 self.assertEqual(poll_rc, [-signal.SIGTERM])117 self.assertEqual(process.wait(), [-signal.SIGTERM])118@export119@register()120class TestCreateKillPoll(TmpDirMixin):121 def runTest(self):122 cwd = pathlib.Path(self.tmpdir.name)123 args = [which('sleep'), "1"]124 event_loop = asyncio.new_event_loop()125 async def run_and_wait(event_loop):126 with NullStream() as null_stream:127 process = await PipelineProcess.create(128 event_loop,129 cwd = cwd.resolve(),130 env = {},131 args = args,132 stdin_stream = null_stream,133 stdout_stream = null_stream,134 stderr_stream = null_stream135 )136 return process137 process = event_loop.run_until_complete(run_and_wait(event_loop))138 139 process.kill()140 poll_rc = process.poll(0.1)141 self.assertEqual(process.proc.returncode, -signal.SIGKILL)142 self.assertEqual(poll_rc, [-signal.SIGKILL])143 self.assertEqual(process.wait(), [-signal.SIGKILL])144@export145@register()146class TestCreatePollFail(TmpDirMixin):147 def runTest(self):148 cwd = pathlib.Path(self.tmpdir.name)149 args = [which('sleep'), "1"]150 event_loop = asyncio.new_event_loop()151 async def run_and_wait(event_loop):152 with NullStream() as null_stream:153 process = await PipelineProcess.create(154 event_loop,155 cwd = cwd.resolve(),156 env = {},157 args = args,158 stdin_stream = null_stream,159 stdout_stream = null_stream,160 stderr_stream = null_stream161 )162 return process163 process = event_loop.run_until_complete(run_and_wait(event_loop))164 165 poll_rc = process.poll()166 self.assertEqual(process.proc.returncode, None)167 self.assertEqual(poll_rc, [None])168 self.assertEqual(process.wait(), [0])169@export170@register()171class TestCreateWithDifferentUser(TmpDirMixin):172 def setUp(self):173 super().setUp()174 if ((sys.version_info.major, sys.version_info.minor) < (3, 9)):175 raise unittest.SkipTest("Python version is less than 3.9")176 if os.getuid() != 0:177 raise unittest.SkipTest("Not running as root")178 def unless_key_error(fun):179 try:180 return fun()181 except KeyError:182 return None183 self.uid = unless_key_error(lambda: pwd.getpwnam('nobody').pw_uid)184 if self.uid is None:185 raise unittest.SkipTest("No user exists with name 'nobody'")186 def runTest(self):187 cwd = pathlib.Path(self.tmpdir.name)188 args = [which('id'), '-u']189 event_loop = asyncio.new_event_loop()190 async def run_and_wait(event_loop):191 with NullStream() as null_stream, PipeStream(None) as stdout_stream:192 process = await PipelineProcess.create(193 event_loop,194 cwd = cwd.resolve(),195 env = {},196 args = args,197 stdin_stream = null_stream,198 stdout_stream = stdout_stream,199 stderr_stream = null_stream,200 user = 'nobody'201 )202 stdout_stream.close_writer()203 await process.wait_async()204 return stdout_stream.reader().read()205 observed_uid = event_loop.run_until_complete(run_and_wait(event_loop))206 self.assertEqual(observed_uid.strip(), str(self.uid))207@export208@register()209class TestCreateWithDifferentGroup(TmpDirMixin):210 def setUp(self):211 super().setUp()212 if ((sys.version_info.major, sys.version_info.minor) < (3, 9)):213 raise unittest.SkipTest("Python version is less than 3.9")214 if os.getuid() != 0:215 raise unittest.SkipTest("Not running as root")216 def unless_key_error(fun):217 try:218 return fun()219 except KeyError:220 return None221 self.gid = unless_key_error(lambda: pwd.getpwnam('nobody').pw_gid)222 if self.gid is None:223 raise unittest.SkipTest("No group exists with name 'nobody'")224 def runTest(self):225 cwd = pathlib.Path(self.tmpdir.name)226 args = [which('id'), '-g']227 event_loop = asyncio.new_event_loop()228 async def run_and_wait(event_loop):229 with NullStream() as null_stream, PipeStream(None) as stdout_stream:230 process = await PipelineProcess.create(231 event_loop,232 cwd = cwd.resolve(),233 env = {},234 args = args,235 stdin_stream = null_stream,236 stdout_stream = stdout_stream,237 stderr_stream = null_stream,238 group = self.gid239 )240 stdout_stream.close_writer()241 await process.wait_async()242 return stdout_stream.reader().read()243 observed_gid = event_loop.run_until_complete(run_and_wait(event_loop))244 self.assertEqual(observed_gid.strip(), str(self.gid))245 246@export247@register()248class TestCreateWait(TmpDirMixin):249 def runTest(self):250 cwd = pathlib.Path(self.tmpdir.name)251 test_file = cwd / 'tmp.txt'252 args = [which('touch'), test_file]253 event_loop = asyncio.new_event_loop()254 async def run_and_wait(event_loop):255 with NullStream() as null_stream:256 process = await PipelineProcess.create(257 event_loop,258 cwd = cwd.resolve(),259 env = {},260 args = args,261 stdin_stream = null_stream,262 stdout_stream = null_stream,263 stderr_stream = null_stream264 )265 return process266 process = event_loop.run_until_complete(run_and_wait(event_loop))267 268 wait_rc = process.wait()269 self.assertEqual(process.proc.returncode, 0)270 self.assertEqual(wait_rc, [process.proc.returncode])271 self.assertTrue(test_file.exists())272@export273@register()274class TestEnvironmentVariableExists(TmpDirMixin):275 def runTest(self):276 cwd = pathlib.Path(self.tmpdir.name)277 from .. import test_util278 with importlib.resources.path(test_util.__package__, 'echo_env.py') as echo_env:279 args = [which('python3'), echo_env]280 message = 'Hello World!'281 event_loop = asyncio.new_event_loop()282 async def run_and_wait():283 with Pipe() as pipe:284 with NullStream() as null_stream, ManualStream(fileobj_w=pipe.writer) as stdout_stream:285 process = await PipelineProcess.create(286 event_loop,287 cwd = cwd.resolve(),288 env = {289 'A': 'wrong output',290 'MESSAGE': message,291 'Z': 'wrong output'292 },293 args = [which('python3'), echo_env, "MESSAGE"],294 stdin_stream = null_stream,295 stdout_stream = stdout_stream,...

Full Screen

Full Screen

test_mdns_functional.py

Source:test_mdns_functional.py Github

copy

Full Screen

1"""Functional tests pyatv.support.mdns."""2import asyncio3import logging4import typing5from ipaddress import IPv4Address6from unittest.mock import MagicMock, patch7import pytest8from pyatv.support import mdns, net9from tests import fake_udns, utils10SERVICE_NAME = "Kitchen"11MEDIAREMOTE_SERVICE = "_mediaremotetv._tcp.local"12DEVICE_INFO_SERVICE = "_device-info._tcp._local"13TEST_SERVICES = dict(14 [15 fake_udns.mrp_service(16 SERVICE_NAME, SERVICE_NAME, "mrp_id", address="127.0.0.1", port=123417 ),18 ]19)20@pytest.fixture(autouse=True)21def mdns_debug():22 logger = logging.getLogger("pyatv.support.mdns")23 logger.setLevel(mdns.TRAFFIC_LEVEL)24 yield25@pytest.fixture26async def udns_server(event_loop):27 server = fake_udns.FakeUdns(event_loop, TEST_SERVICES)28 await server.start()29 yield server30 server.close()31@pytest.fixture(autouse=True)32def stub_local_addresses():33 with patch("pyatv.net.get_private_addresses") as mock:34 mock.return_value = [IPv4Address("127.0.0.1")]35 yield36# Requests are normally not sent to localhost, so we need to fake that localhost37# is not s loopback address38@pytest.fixture(autouse=True)39def stub_ip_address():40 with patch("pyatv.support.mdns.ip_address") as mock:41 mock.return_value = mock42 mock.is_loopback = False43 yield44# Hack-ish fixture to make sure multicast does not listen on any global port,45# i.e. 5353 since data from other places can leak into the test46@pytest.fixture(autouse=True)47def redirect_mcast(udns_server):48 real_mcast_socket = net.mcast_socket49 with patch("pyatv.net.mcast_socket") as mock:50 mock.side_effect = lambda addr, port=0: real_mcast_socket(51 addr, port if port == udns_server.port else 052 )53 yield54# This is a very complex fixture that will hook into the receiver of multicast55# responses and abort the "waiting" period whenever all responses or a certain amount56# of requests have been received. Mainly this is for not slowing down tests.57@pytest.fixture58async def multicast_fastexit(event_loop, monkeypatch, udns_server):59 clients: typing.List[asyncio.Future] = []60 # Interface used to set number of expected responses (1 by default)61 conditions: typing.Dict[str, object] = {"responses": 1, "requests": 0}62 # Checks if either response or request count has been fulfilled63 def _check_cond(protocol: mdns.MulticastDnsSdClientProtocol) -> bool:64 if len(protocol.responses) == conditions["responses"]:65 return False66 if conditions["requests"] == 0:67 return True68 return udns_server.request_count < conditions["responses"]69 def _cast(typ, val):70 if isinstance(val, mdns.MulticastDnsSdClientProtocol):71 async def _poll_responses():72 await utils.until(lambda: not _check_cond(val))73 val.semaphore.release()74 clients.append(asyncio.ensure_future(_poll_responses()))75 return val76 monkeypatch.setattr(typing, "cast", _cast)77 yield lambda **kwargs: conditions.update(**kwargs)78 await asyncio.gather(*clients)79async def unicast(event_loop, udns_server, service_name, timeout=1):80 return (81 await mdns.unicast(82 event_loop,83 "127.0.0.1",84 [service_name],85 port=udns_server.port,86 timeout=timeout,87 ),88 TEST_SERVICES.get(service_name),89 )90@pytest.mark.asyncio91async def test_unicast_has_valid_service(event_loop, udns_server):92 resp, service = await unicast(event_loop, udns_server, MEDIAREMOTE_SERVICE)93 assert len(resp.services) == 194 assert resp.services[0].type == MEDIAREMOTE_SERVICE95 assert resp.services[0].name == service.name96 assert resp.services[0].port == service.port97@pytest.mark.asyncio98async def test_unicast_resend_if_no_response(event_loop, udns_server):99 udns_server.skip_count = 2100 resp, service = await unicast(event_loop, udns_server, MEDIAREMOTE_SERVICE, 3)101 assert len(resp.services) == 1102 assert resp.services[0].type == MEDIAREMOTE_SERVICE103 assert resp.services[0].name == service.name104 assert resp.services[0].port == service.port105@pytest.mark.asyncio106async def test_unicast_specific_service(event_loop, udns_server):107 resp, _ = await unicast(108 event_loop, udns_server, SERVICE_NAME + "." + MEDIAREMOTE_SERVICE109 )110 assert len(resp.services) == 1111 service = TEST_SERVICES.get(MEDIAREMOTE_SERVICE)112 assert resp.services[0].type == MEDIAREMOTE_SERVICE113 assert resp.services[0].name == service.name114 assert resp.services[0].port == service.port115@pytest.mark.asyncio116async def test_multicast_no_response(event_loop, udns_server, multicast_fastexit):117 multicast_fastexit(responses=0, requests=0)118 await mdns.multicast(event_loop, [], "127.0.0.1", udns_server.port)119@pytest.mark.asyncio120async def test_multicast_has_valid_service(event_loop, udns_server, multicast_fastexit):121 multicast_fastexit(responses=1, requests=0)122 resp = await mdns.multicast(123 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port124 )125 assert len(resp) == 1126 first = resp[IPv4Address("127.0.0.1")].services[0]127 assert first.type == MEDIAREMOTE_SERVICE128 assert first.name == SERVICE_NAME129 assert first.port == 1234130@pytest.mark.asyncio131async def test_multicast_end_condition_met(132 event_loop, udns_server, multicast_fastexit, stub_ip_address133):134 multicast_fastexit(responses=4, requests=10)135 actor = MagicMock()136 def _end_cond(response):137 actor(response)138 return True139 resp = await mdns.multicast(140 event_loop,141 [MEDIAREMOTE_SERVICE],142 "127.0.0.1",143 udns_server.port,144 end_condition=_end_cond,145 )146 assert len(resp) == 1147 actor.assert_called_once_with(resp[IPv4Address("127.0.0.1")])148@pytest.mark.asyncio149async def test_multicast_sleeping_device(event_loop, udns_server, multicast_fastexit):150 multicast_fastexit(responses=0, requests=3)151 udns_server.sleep_proxy = True152 udns_server.services = {153 MEDIAREMOTE_SERVICE: fake_udns.FakeDnsService(154 name=SERVICE_NAME, address=None, port=0, properties={}, model=None155 ),156 }157 resp = await mdns.multicast(158 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port159 )160 assert len(resp) == 0161 multicast_fastexit(responses=1, requests=0)162 udns_server.services = TEST_SERVICES163 resp = await mdns.multicast(164 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port165 )166 assert len(resp) == 1167@pytest.mark.asyncio168async def test_multicast_deep_sleep(event_loop, udns_server, multicast_fastexit):169 multicast_fastexit(responses=1, requests=0)170 resp = await mdns.multicast(171 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port172 )173 assert not resp[IPv4Address("127.0.0.1")].deep_sleep174 udns_server.sleep_proxy = True175 multicast_fastexit(responses=1, requests=0)176 resp = await mdns.multicast(177 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port178 )179 assert resp[IPv4Address("127.0.0.1")].deep_sleep180@pytest.mark.asyncio181async def test_multicast_device_model(event_loop, udns_server, multicast_fastexit):182 multicast_fastexit(responses=1, requests=0)183 resp = await mdns.multicast(184 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port185 )186 assert not resp[IPv4Address("127.0.0.1")].model187 udns_server.services = {188 MEDIAREMOTE_SERVICE: fake_udns.FakeDnsService(189 name=SERVICE_NAME,190 address="127.0.0.1",191 port=1234,192 properties={},193 model="dummy",194 ),195 }196 multicast_fastexit(responses=1, requests=0)197 resp = await mdns.multicast(198 event_loop, [MEDIAREMOTE_SERVICE], "127.0.0.1", udns_server.port199 )...

Full Screen

Full Screen

test_aiocontext.py

Source:test_aiocontext.py Github

copy

Full Screen

...108 def test_len(self, context, context_loop):109 context['key1'] = 'value1'110 context['key2'] = 'value2'111 assert len(context) == 2112 def test_missing_event_loop(self, context):113 with pytest.raises(EventLoopError):114 context.get_data()115 with pytest.raises(EventLoopError):116 context['key']117 with pytest.raises(EventLoopError):118 context['key'] = 'value'119 with pytest.raises(EventLoopError):120 del context['key']121 with pytest.raises(EventLoopError):122 list(context)123 with pytest.raises(EventLoopError):124 len(context)125 @pytest.mark.asyncio126 @asyncio.coroutine...

Full Screen

Full Screen

test_player.py

Source:test_player.py Github

copy

Full Screen

...16 player.play()17 sound = test_data.get_file('media', 'alert.wav')18 source = pyglet.media.load(sound, streaming=False)19 player.queue(source)20 event_loop.run_event_loop()21 event_loop.ask_question('Did you hear the alert sound playing?', screenshot=False)22 sound2 = test_data.get_file('media', 'receive.wav')23 source2 = pyglet.media.load(sound2, streaming=False)24 player.queue(source2)25 player.play()26 event_loop.run_event_loop()27 event_loop.ask_question('Did you hear the receive sound playing?', screenshot=False)28@pytest.mark.requires_user_validation29def test_playback_fire_and_forget(event_loop, test_data):30 """Test playing back sound files using fire and forget."""31 sound = test_data.get_file('media', 'alert.wav')32 source = pyglet.media.load(sound, streaming=False)33 source.play()34 event_loop.ask_question('Did you hear the alert sound playing?', screenshot=False)35@pytest.mark.requires_user_validation36def test_play_queue(event_loop):37 """Test playing a single sound on the queue."""38 source = procedural.WhiteNoise(1.0)39 player = Player()40 player.on_player_eos = event_loop.interrupt_event_loop41 player.play()42 player.queue(source)43 event_loop.run_event_loop()44 event_loop.ask_question('Did you hear white noise for 1 second?', screenshot=False)45@pytest.mark.requires_user_validation46def test_queue_play(event_loop):47 """Test putting a single sound on the queue and then starting the player."""48 source = procedural.WhiteNoise(1.0)49 player = Player()50 player.on_player_eos = event_loop.interrupt_event_loop51 player.queue(source)52 player.play()53 event_loop.run_event_loop()54 event_loop.ask_question('Did you hear white noise for 1 second?', screenshot=False)55@pytest.mark.requires_user_validation56def test_pause_queue(event_loop):57 """Test the queue is not played when player is paused."""58 source = procedural.WhiteNoise(1.0)59 player = Player()60 player.pause()61 player.queue(source)62 # Run for the duration of the sound63 event_loop.run_event_loop(1.0)64 event_loop.ask_question('Did you not hear any sound?', screenshot=False)65@pytest.mark.requires_user_validation66def test_pause_sound(event_loop):67 """Test that a playing sound can be paused."""68 source = procedural.WhiteNoise(60.0)69 player = Player()70 player.queue(source)71 player.play()72 event_loop.run_event_loop(1.0)73 player.pause()74 event_loop.ask_question('Did you hear white noise for 1 second and is it now silent?',75 screenshot=False)76 player.play()77 event_loop.ask_question('Do you hear white noise again?', screenshot=False)78 player.delete()79 event_loop.ask_question('Is it silent again?', screenshot=False)80@pytest.mark.requires_user_validation81def test_next_on_end_of_stream(event_loop):82 """Test that multiple items on the queue are played after each other."""83 source1 = procedural.WhiteNoise(1.0)84 source2 = procedural.Sine(1.0)85 player = Player()86 player.on_player_eos = event_loop.interrupt_event_loop87 player.queue(source1)88 player.queue(source2)89 player.play()90 event_loop.run_event_loop()91 event_loop.ask_question('Did you hear white noise for 1 second and then a tone at 440 Hz'92 '(A above middle C)?', screenshot=False)93@pytest.mark.requires_user_validation94def test_static_source_wrapping(event_loop):95 """Test that a sound can be recursively wrappend inside a static source."""96 source = procedural.WhiteNoise(1.0)97 source = StaticSource(source)98 source = StaticSource(source)99 player = Player()100 player.on_player_eos = event_loop.interrupt_event_loop101 player.queue(source)102 player.play()103 event_loop.run_event_loop()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pytest-asyncio automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful