How to use wait_result method in lisa

Best Python code snippet using lisa_python

core.py

Source:core.py Github

copy

Full Screen

...65 async def set_result(self,result):66 self.result=result67 self.event.set()68 69 async def wait_result(self)->any:70 await self.event.wait()71 return self.result72# AsyncCallBack 类型: (异步回调方式接受结果,即wait_result=False 时的 cb 参数的类型定义)73# 这个类型是 OmegaSide 中以异步回调方式接受结果或者推送的函数的类型定义74# 这个函数可以是 None,75# 也可以是一个 async def 开头的函数,或者是成员变量76# 例子:77# async def on_result(result):78# print(result)79# await frame.xxxx(cb=on_result,wait_result=False)80RT=TypeVar("RT") # RT= result type 返回值类型81AsyncCallBack=Union[Callable[[RT],Awaitable],None]82# APIResult 类型: (同步方式接受结果)83# 这个函数是所有 Omega Side Python API 以同步形式返回结果时的返回值的类型定义84# 这个类型的意思是,以同步形式获取结果时,必须以:85# result = await frame.xxx(wait_result=True)86# 方式获得结果,一般来说,这样的效率会比异步低很多87# 注意,异步形式虽然结果由callback接受,但仍然需要 await88APIResult=Awaitable[Union[None,RT]]89class MainFrame(object):90 def __init__(self,client) -> None:91 self.client=client92 self.plugin_tasks=[]93 self.client.on_msg_cb=self._on_msg94 self.on_resp={}95 self.onAnyMCPkt=[]96 self.onTypedMCPkt={}97 self.onMenuTriggered={}98 self.started=False99 100 async def _on_push_msg(self,push_type,sub_type,data):101 if push_type=="mcPkt":102 data["id"]=sub_type103 if sub_type in self.onTypedMCPkt.keys():104 for cb in self.onTypedMCPkt[sub_type]:105 if cb is not None:106 await cb(data)107 for cb in self.onAnyMCPkt:108 if cb is not None:109 await cb(data)110 elif push_type=="menuTriggered":111 await self.onMenuTriggered[sub_type](data)112 113 async def _on_msg(self,msg):114 msg=json.loads(msg)115 msgID=msg["client"]116 data=msg["data"]117 if msgID!=0:118 violate=msg["violate"]119 if violate:120 raise Exception(f"从omega框架收到了 Violate 数据包: {msg}")121 cb=self.on_resp[msgID]122 if cb[2] is None:123 return124 else:125 await cb[1](data,cb[2])126 else:127 await self._on_push_msg(msg["type"],msg["sub"],data)128 129 async def _send_request(self,func:str="",args:dict={},cb=None,wait_result=False):130 msgID=await self.client.send_request(func=func,args=args)131 if not wait_result:132 self.on_resp[msgID]=cb133 else:134 waitor=ResultWaitor()135 self.on_resp[msgID]=(cb[0],cb[1],waitor.set_result)136 return await asyncio.wait([waitor.wait_result()])137 138 def _add_plugin(self,plugin):139 plugin_task=None140 try:141 if isinstance(plugin,Awaitable):142 # 插件可以直接被 await plugin 时143 self.plugin_tasks.append(plugin)144 return145 elif isinstance(plugin,Callable) and isinstance(getattr(plugin,"__call__"),MethodType):146 # 插件是一个类对象,且有成员函数 async def __call__(self): 时147 plugin_task=plugin()148 assert isinstance(plugin_task,Awaitable),"当插件为一个类的实例时,其必须有成员函数 async def __call__(self): "149 self.plugin_tasks.append(plugin_task)150 return...

Full Screen

Full Screen

test_monotonic_event.py

Source:test_monotonic_event.py Github

copy

Full Screen

1from threading import Thread2from time import sleep3from unittest import TestCase, mock4from mycroft.util.monotonic_event import MonotonicEvent5class MonotonicEventTest(TestCase):6 def test_wait_set(self):7 event = MonotonicEvent()8 event.set()9 self.assertTrue(event.wait())10 def test_wait_timeout(self):11 event = MonotonicEvent()12 self.assertFalse(event.wait(0.1))13 def test_wait_set_with_timeout(self):14 wait_result = False15 event = MonotonicEvent()16 def wait_event():17 nonlocal wait_result18 wait_result = event.wait(30)19 wait_thread = Thread(target=wait_event)20 wait_thread.start()21 sleep(0.1)22 event.set()23 wait_thread.join()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful