How to use test_watchers method in uiautomator

Best Python code snippet using uiautomator

test_mux.py

Source:test_mux.py Github

copy

Full Screen

1# Copyright 2014 The Kubernetes Authors.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import asyncio15import unittest16from typing import NamedTuple, Optional17from aiok8s.cache.testing.util import async_test18from aiok8s.util import wait19from aiok8s.watch import watch20from aiok8s.watch.mux import Broadcaster, FullChannelBehavior21class TestBrodcaster(unittest.TestCase):22 @async_test23 async def test(self):24 table = [25 {"type": watch.EventType.ADDED, "object": MyType("foo", "hello world 1")},26 {"type": watch.EventType.ADDED, "object": MyType("bar", "hello world 2")},27 {28 "type": watch.EventType.MODIFIED,29 "object": MyType("foo", "goodbye world 3"),30 },31 {"type": watch.EventType.DELETED, "object": MyType("bar", "hello world 4")},32 ]33 m = Broadcaster(0, FullChannelBehavior.WAIT_IF_CHANNEL_FULL)34 test_watchers = 235 async def coro(watcher, w):36 table_line = 037 async for event in w:38 self.assertEqual(39 event,40 table[table_line],41 msg=f"Watcher {watcher}, line {table_line}",42 )43 table_line += 144 tasks = [45 asyncio.ensure_future(coro(i, await m.watch()))46 for i in range(test_watchers)47 ]48 for item in table:49 await m.action(item["type"], item["object"])50 await m.shutdown()51 await asyncio.gather(*tasks)52 @async_test53 async def test_watcher_close(self):54 m = Broadcaster(0, FullChannelBehavior.WAIT_IF_CHANNEL_FULL)55 w = await m.watch()56 w2 = await m.watch()57 await w.stop()58 await m.shutdown()59 async for _ in w:60 self.fail("Stop didn't work?")61 async for _ in w2:62 self.fail("Shutdown didn't work?")63 await w.stop()64 await w2.stop()65 @async_test66 async def test_watcher_stop_deadlock(self):67 done = asyncio.Event()68 m = Broadcaster(0, FullChannelBehavior.WAIT_IF_CHANNEL_FULL)69 async def coro(w0, w1):70 async def aw0():71 async for _ in w0:72 await w1.stop()73 async def aw1():74 async for _ in w1:75 await w0.stop()76 await asyncio.wait(77 [asyncio.ensure_future(aw0()), asyncio.ensure_future(aw1())],78 return_when=asyncio.FIRST_COMPLETED,79 )80 done.set()81 asyncio.ensure_future(coro(await m.watch(), await m.watch()))82 await m.action(watch.EventType.ADDED, MyType())83 await asyncio.wait_for(done.wait(), wait.FOREVER_TEST_TIMEOUT)84 await m.shutdown()85 @async_test86 async def test_drop_if_channel_full(self):87 m = Broadcaster(1, FullChannelBehavior.DROP_IF_CHANNEL_FULL)88 event1 = {89 "type": watch.EventType.ADDED,90 "object": MyType("foo", "hello world 1"),91 }92 event2 = {93 "type": watch.EventType.ADDED,94 "object": MyType("bar", "hello world 2"),95 }96 watches = [await m.watch() for _ in range(2)]97 await m.action(event1["type"], event1["object"])98 await m.action(event2["type"], event2["object"])99 await m.shutdown()100 async def coro(watcher, w):101 e1 = await w.__anext__()102 self.assertEqual(e1, event1, msg=f"Watcher {watcher}")103 async for e2 in w:104 self.fail(105 f"Watcher {watcher} received second event {e2!r} "106 f"even though it shouldn't have."107 )108 tasks = [asyncio.ensure_future(coro(i, w)) for i, w in enumerate(watches)]109 await asyncio.gather(*tasks)110class MyType(NamedTuple):111 id: Optional[str] = None112 value: Optional[str] = None113if __name__ == "__main__":...

Full Screen

Full Screen

run_failure.py

Source:run_failure.py Github

copy

Full Screen

1import os2import sys3def test(arg):4 return os.system("bin/pytest -v %s" % arg)5def main(args):6 if not args:7 print(8 "Run as bin/python run_failure.py <test>, for example: \n"9 "bin/python run_failure.py "10 "kazoo.tests.test_watchers:KazooChildrenWatcherTests"11 )12 return13 arg = args[0]14 i = 015 while 1:16 i += 117 print("Run number: %s" % i)18 ret = test(arg)19 if ret != 0:20 break21if __name__ == "__main__":...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1from test_utils import *2from test_watchers import *...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run uiautomator automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful