Best Python code snippet using avocado_python
service.py
Source:service.py  
...85            headers=headers,86            serializer='json',87            retry=True88        )89    def _handle_task_finished(self):90        correlation_id = self.task.message.properties['correlation_id']91        try:92            status = "success"93            result = self.task.future.result()94        except Exception as e:95            logger.exception(96                f"[{correlation_id}]Exception occurred during handling"97            )98            status = "error"99            result = f"Request {self.task.message.decode()} failed\n{repr(e)}"100        self._reply(self.task.message, result, status)101        self.task.message.ack()102        logger.info(f"[{correlation_id}] done")103    def wait_ready(self):104        while not self.ready:105            time.sleep(0.1)106    def stop(self):107        self.executor.shutdown()108        self.should_stop = True109    def get_consumers(self, Consumer, channel):110        self._setup_dlx_exchange(channel)111        self._setup_invalid_queue(channel)112        self._setup_worker_queue(channel)113        return [Consumer(queues=self.worker_queue,114                         callbacks=[self._handle_incoming_message],115                         accept=['json'],116                         prefetch_count=1)]117    def on_iteration(self):118        if self.task is not None and self.task.future.done():119            logger.debug("task done")120            self._handle_task_finished()121            self.task = None122    def on_connection_error(self, exc, interval):123        super().on_connection_error(exc, interval)124        if self.task:125            logger.info("Dropping currently running task")126            # there is no way to kill the running thread, so we wait for it127            # future.exception() does not reraise compared to future.result()128            self.task.future.exception()129            self.task = None130            logger.info("Dropping currently running task...Done")131    def on_consume_ready(self, connection, channel, consumers):132        """Callback executed one we are ready to consume"""133        self.ready = True134        logger.info("{} is ready for consuming", self)...repo.py
Source:repo.py  
...17        #: status that have been superseded by newer status.18        self._status_journal_summary = []19        #: Contains the task IDs keyed by the result received20        self._by_result = {}21    def _handle_task_finished(self, message):22        self._set_by_result(message)23        self._set_task_data(message)24    def _handle_task_started(self, message):25        if 'output_dir' not in message:26            raise StatusMsgMissingDataError('output_dir')27        self._set_task_data(message)28    def _set_by_result(self, message):29        """Sets an entry in the aggregate by result.30        For messages that include a "result" key, expected for example,31        from a "finished" status message, this will allow users to query32        for tasks with a given result."""33        result = message.get('result')34        if result not in self._by_result:35            self._by_result[result] = []...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
