Best Python code snippet using avocado_python
ai_state_machine.py
Source:ai_state_machine.py  
...50            return self.current_task51        return TaskThreadClass(self.task_com, *largs)52    53    54    def terminate_task(self):55        #signal termination sequence56        self.current_task.active = False57        #wait until task has finished58        print "Terminating Task"59        while self.current_task.isAlive():60            time.sleep(0.2)61        #now complete, join and return62        self.current_task.join()63        return self.current_task.result64        65        66    def begin_task(self, task_code, param_val, previous_task_result ):67        # simple 'switch' for the task code68        # quick check for alive task69        if self.current_task.isAlive():70            print "Cannot start", self.last_message.name, "when task:", self.current_task_code, "is running!"71            return72        elif task_code == TaskCode.FOO_SQUARE:73            self.current_task = self._define_task(FooSquareTask,  param_val, previous_task_result)74        elif task_code == TaskCode.STANDBY_AT_DEPTH:75            self.current_task = self._define_task(StandbyTask, param_val, previous_task_result)76        elif task_code == TaskCode.DIVE:77            self.current_task = self._define_task(DiveTask, param_val, previous_task_result)78        elif task_code == TaskCode.LINE_FOLLOW:79            self.current_task = self._define_task(LineFollowTask, param_val, previous_task_result)80        else:  # default to STANDBY for safety!!81            self.current_task = self._define_task(StandbyTask, param_val, previous_task_result)82        83        # save the code!84        self.current_task_code = task_code85        86        # start the task87        self.current_task.start()88        return89        90        91    def run(self):92        while True: #infinite loop!!!!!!!93            incoming_packet = self.com.get_last_message("decision/master_ai")94            if incoming_packet:95                message = incoming_packet["desired_state"]96            if incoming_packet and incoming_packet['timestamp'] > self.last_packet_time:97                self.last_packet_time = incoming_packet['timestamp']98                self.last_message = message99                print "GOT PACKET", message100                101                # restart Task102                if self.current_task_code == message["task_code"] and message["restart"]:103                    termination_result = self.terminate_task()104                    self.begin_task( message["task_code"], message["parameter"], termination_result )105                elif self.current_task_code != message["task_code"]:106                    termination_result = self.terminate_task()107                    self.begin_task( message["task_code"], message["parameter"], termination_result )108                    109            elif not self.current_task.isAlive():110                completion_result = self.current_task.result111                print TaskCode.get_task_name(self.current_task_code).upper(), "Task Finished with:", completion_result112                self.select_next_task(completion_result)113                114            print "."115            time.sleep(1)116            117    def select_next_task(self, completion_result):118        self.begin_task( TaskCode.STANDBY, 0, completion_result )119        120if __name__ == "__main__":...ledstrip.py
Source:ledstrip.py  
...29        except kasa.SmartDeviceException:30            print("SmartDeviceException: Unable to establish connection with device.")31    def set_request(self, request: LightingRequest) -> None:32        self.request = request33    async def terminate_task(self) -> None:34        if self.sequence_task is not None:35            self.sequence_cancel_event.clear()36            await self.sequence_task37            self.sequence_task = None38    async def on(self):39        if self.sequence_task is None:40            await self.strip.turn_on()41        else:42            await self.operation_callback_by_name[self.sequence_task.get_name()]()43    async def off(self):44        if self.sequence_task is not None:45            self.sequence_cancel_event.clear()46            await self.sequence_task47        await self.strip.turn_off()48    async def hsv(self):49        await self.terminate_task()50        await self.strip.set_hsv(51            int(self.request.h), int(self.request.s), int(self.request.v)52        )53    async def brightness(self):54        await self.strip.set_brightness(self.request.brightness)55    async def temperature(self):56        await self.terminate_task()57        (r, g, b) = self.convert_K_to_RGB(self.request.temperature)58        (r, g, b) = (r / 255, g / 255, b / 255)59        (h, s, v) = colorsys.rgb_to_hsv(r, g, b)60        await self.strip.set_hsv(61            int(h*360), int(s *62                            100), int(v*100)63        )64    async def rainbow(self):65        await self.terminate_task()66        self.sequence_cancel_event.set()67        self.sequence_task = asyncio.create_task(self.rainbow_loop())68        self.sequence_task.set_name("rainbow")69    async def rainbow_loop(self):70        running = True71        while running:72            for i in range(359):73                if not self.sequence_cancel_event.is_set():74                    running = False75                    break76                await self.strip.set_hsv(i, 100, 100)77                time.sleep(0.05)78    def convert_K_to_RGB(self, colour_temperature) -> tuple[int, int, int]:79        # range check...bulb.py
Source:bulb.py  
...27        except kasa.SmartDeviceException:28            print("SmartDeviceException: Unable to establish connection with device.")29    def set_request(self, request: LightingRequest) -> None:30        self.request = request31    async def terminate_task(self) -> None:32        if self.sequence_task is not None:33            self.sequence_cancel_event.clear()34            await self.sequence_task35            self.sequence_task = None36    async def on(self):37        if self.sequence_task is None:38            await self.bulb.turn_on()39        else:40            await self.operation_callback_by_name[self.sequence_task.get_name()]()41    async def off(self):42        if self.sequence_task is not None:43            self.sequence_cancel_event.clear()44            await self.sequence_task45        await self.bulb.turn_off()46    async def hsv(self):47        await self.terminate_task()48        await self.bulb.set_hsv(49            int(self.request.h), int(self.request.s), int(self.request.v)50        )51    async def brightness(self):52        await self.bulb.set_brightness(self.request.brightness)53    async def temperature(self):54        await self.terminate_task()55        await self.bulb.set_color_temp(int(self.request.temperature))56    async def rainbow(self):57        await self.terminate_task()58        self.sequence_cancel_event.set()59        self.sequence_task = asyncio.create_task(self.rainbow_loop())60        self.sequence_task.set_name("rainbow")61    async def rainbow_loop(self):62        running = True63        while running:64            for i in range(359):65                if not self.sequence_cancel_event.is_set():66                    running = False67                    break68                await self.bulb.set_hsv(i, 100, 100)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
