Best Python code snippet using lisa_python
core.py
Source:core.py  
...65    async def set_result(self,result):66        self.result=result67        self.event.set()68        69    async def wait_result(self)->any:70        await self.event.wait()71        return self.result72# AsyncCallBack ç±»åï¼ (弿¥åè°æ¹å¼æ¥åç»æï¼å³wait_result=False æ¶ç cb åæ°çç±»åå®ä¹)73# è¿ä¸ªç±»åæ¯ OmegaSide ä¸ä»¥å¼æ¥åè°æ¹å¼æ¥åç»ææè
æ¨éç彿°çç±»åå®ä¹74# è¿ä¸ªå½æ°å¯ä»¥æ¯ Noneï¼75# ä¹å¯ä»¥æ¯ä¸ä¸ª async def å¼å¤´ç彿°ï¼æè
æ¯æååé76# ä¾å:77# async def on_result(result):78#     print(result)79# await frame.xxxx(cb=on_result,wait_result=False)80RT=TypeVar("RT") # RT= result type è¿åå¼ç±»å81AsyncCallBack=Union[Callable[[RT],Awaitable],None]82# APIResult ç±»åï¼ ï¼åæ¥æ¹å¼æ¥åç»æï¼83# è¿ä¸ªå½æ°æ¯ææ Omega Side Python API 以忥形å¼è¿åç»ææ¶çè¿åå¼çç±»åå®ä¹84# è¿ä¸ªç±»åçæææ¯ï¼ä»¥åæ¥å½¢å¼è·åç»ææ¶ï¼å¿
须以ï¼85# result = await frame.xxx(wait_result=True)86# æ¹å¼è·å¾ç»æï¼ä¸è¬æ¥è¯´ï¼è¿æ ·çæç伿¯å¼æ¥ä½å¾å¤87# 注æï¼å¼æ¥å½¢å¼è½ç¶ç»æç±callbackæ¥åï¼ä½ä»ç¶éè¦ await88APIResult=Awaitable[Union[None,RT]]89class MainFrame(object):90    def __init__(self,client) -> None:91        self.client=client92        self.plugin_tasks=[]93        self.client.on_msg_cb=self._on_msg94        self.on_resp={}95        self.onAnyMCPkt=[]96        self.onTypedMCPkt={}97        self.onMenuTriggered={}98        self.started=False99        100    async def _on_push_msg(self,push_type,sub_type,data):101        if push_type=="mcPkt":102            data["id"]=sub_type103            if sub_type in self.onTypedMCPkt.keys():104                for cb in self.onTypedMCPkt[sub_type]:105                    if cb is not None:106                        await cb(data)107            for cb in self.onAnyMCPkt:108                if cb is not None:109                    await cb(data)110        elif push_type=="menuTriggered":111            await self.onMenuTriggered[sub_type](data)112    113    async def _on_msg(self,msg):114        msg=json.loads(msg)115        msgID=msg["client"]116        data=msg["data"]117        if msgID!=0:118            violate=msg["violate"]119            if violate:120                raise Exception(f"ä»omegaæ¡æ¶æ¶å°äº Violate æ°æ®å
: {msg}")121            cb=self.on_resp[msgID]122            if cb[2] is None:123                return124            else:125                await cb[1](data,cb[2])126        else:127            await self._on_push_msg(msg["type"],msg["sub"],data)128        129    async def _send_request(self,func:str="",args:dict={},cb=None,wait_result=False):130        msgID=await self.client.send_request(func=func,args=args)131        if not wait_result:132            self.on_resp[msgID]=cb133        else:134            waitor=ResultWaitor()135            self.on_resp[msgID]=(cb[0],cb[1],waitor.set_result)136            return await asyncio.wait([waitor.wait_result()])137    138    def _add_plugin(self,plugin):139        plugin_task=None140        try:141            if isinstance(plugin,Awaitable):142                # æä»¶å¯ä»¥ç´æ¥è¢« await plugin æ¶143                self.plugin_tasks.append(plugin)144                return145            elif isinstance(plugin,Callable) and isinstance(getattr(plugin,"__call__"),MethodType):146                # æä»¶æ¯ä¸ä¸ªç±»å¯¹è±¡ï¼ä¸ææå彿° async def __call__(self): æ¶147                plugin_task=plugin()148                assert isinstance(plugin_task,Awaitable),"å½æä»¶ä¸ºä¸ä¸ªç±»çå®ä¾æ¶ï¼å
¶å¿
é¡»ææå彿° async def __call__(self): "149                self.plugin_tasks.append(plugin_task)150                return...test_monotonic_event.py
Source:test_monotonic_event.py  
1from threading import Thread2from time import sleep3from unittest import TestCase, mock4from mycroft.util.monotonic_event import MonotonicEvent5class MonotonicEventTest(TestCase):6    def test_wait_set(self):7        event = MonotonicEvent()8        event.set()9        self.assertTrue(event.wait())10    def test_wait_timeout(self):11        event = MonotonicEvent()12        self.assertFalse(event.wait(0.1))13    def test_wait_set_with_timeout(self):14        wait_result = False15        event = MonotonicEvent()16        def wait_event():17            nonlocal wait_result18            wait_result = event.wait(30)19        wait_thread = Thread(target=wait_event)20        wait_thread.start()21        sleep(0.1)22        event.set()23        wait_thread.join()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
