How to use _handler_loop method in Lemoncheesecake

Best Python code snippet using lemoncheesecake

viewerclient.py

Source:viewerclient.py Github

copy

Full Screen

...184 def _request_channel(self):185 return "DIRECTOR_TREE_VIEWER_REQUEST_<{:s}>".format(self.client_id)186 def _response_channel(self):187 return "DIRECTOR_TREE_VIEWER_RESPONSE_<{:s}>".format(self.client_id)188 def _handler_loop(self):189 while True:190 self.lcm.handle()191 def start_handler(self):192 if self.handler_thread is not None:193 return194 self.handler_thread = threading.Thread(195 target=self._handler_loop)196 self.handler_thread.daemon = True197 self.handler_thread.start()198 def _handle_response(self, channel, msgdata):199 msg = viewer2_comms_t.decode(msgdata)200 data = json.loads(msg.data)201 if data["status"] == 0:202 pass...

Full Screen

Full Screen

events.py

Source:events.py Github

copy

Full Screen

...85 print("Fire event %s" % event)86 self._queue.put(event)87 def get_pending_failure(self):88 return self._pending_failure89 def _handler_loop(self):90 while True:91 event = self._queue.get()92 if event is None:93 break94 try:95 self.handle_event(event)96 except Exception as excp:97 self._pending_failure = excp, serialize_current_exception()98 break99 finally:100 self._queue.task_done()101 @contextmanager102 def handle_events(self):103 self._queue = Queue()...

Full Screen

Full Screen

sanitarize.py

Source:sanitarize.py Github

copy

Full Screen

1""" Trying to control flow """2import asyncio3from apd_types import ApHass4from create_helpers import CreateHelpers5from entity_oper import EntityObject67import globals as g8from ws_comm import WsHA9from entity_register import EntityRegister10from app_system import AppSystem11from bootstart import boot_logger, boot_logger_off, boot_module121314class Sanitarize(ApHass):15 @boot_logger_off16 @boot_module17 def initialize(self):18 self.debug("Initialize")1920 def init(self, end_callback):21 """[summary]2223 Args:24 end_callback (function): Will be called when all processes are done25 """26 self.info("Starting init")27 self.end_callback = end_callback # for the boot system2829 # Going throw all global definitions and adding them30 # result g.helper_register3132 ########################################33 # reducing for only one entity for debug34 """35 new_register: RegisterType = []36 for p in g.helper_register:37 entity = p[INDEX_KEY]38 if entity == "input_text.mower_map":39 new_register.append(p)40 g.helper_register.clear()41 g.helper_register = new_register 42 self.logger.debug(g.helper_register)43 """44 ########################################45 self.debug(46 "Entity register execute and waiting for finishing with entity_register_done"47 )48 self.ws: WsHA = self.sync_get_app("ws_comm")49 self.appSystem: AppSystem = self.sync_get_app("app_system") # type: ignore50 self.entityRegister: EntityRegister = self.sync_get_app("entity_register") # type: ignore51 self.createHelpers: CreateHelpers = self.sync_get_app("create_helpers") # type: ignore5253 self._handler_loop = self.sync_create_task(self.main_loop())5455 async def main_loop(self):56 self.info("Starting sanitarize")57 control_buffer = asyncio.Queue()58 tasks = {59 "appSystem": self.appSystem.execute(control_buffer),60 "entityRegister": self.entityRegister.execute(control_buffer),61 "createHelpers": self.createHelpers.execute(control_buffer),62 "addToAD": self._add_to_ad(control_buffer),63 "done": self._task_done(control_buffer),64 }65 # without creating helpers66 """67 tasks = {68 "appSystem": self.appSystem.execute(control_buffer),69 "entityRegister": self.entityRegister.execute(control_buffer),70 "done": self._task_done(control_buffer),71 }72 """7374 for p in tasks.keys():75 await control_buffer.put(p)76 try:77 while True:78 process = await control_buffer.get()79 self.debug(f"Process: {process}")80 to_do = tasks.get(process)81 if to_do is None:82 self.error("Wrong definition in tasks")83 raise ValueError("Wrong definition in tasks")84 await to_do85 except Exception as ex:86 template = "An exception of type {0} occurred. Arguments:\n{1!r}"87 message = template.format(type(ex).__name__, ex.args)88 self.error(message)89 raise ValueError("Fatal error in core")9091 async def _task_done(self, control_buffer: asyncio.Queue):92 # info bootstart that it is done93 self._handler_loop.cancel() # type: ignore94 self.end_callback(self)9596 async def _add_to_ad(self, control_buffer: asyncio.Queue):97 self.info("Waiting for system")98 eo: EntityObject99 for eo in g.created_helpers_obj:100 if eo.cmd_finished:101 if not await self.entity_exists(eo.entity_id, replace=False):102 await self.set_state(eo.entity_id, state=eo.initial)103 control_buffer.task_done()104105 def _done(self, *kwargs):106 self.debug("Fired end created entity") ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Lemoncheesecake automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful