Best Python code snippet using avocado_python
runner.py
Source:runner.py  
...118        # Update state before returning the value119        if name in ("status", "interrupt"):120            self._tick()121        return super(TestStatus, self).__getattribute__(name)122    def wait_for_early_status(self, proc, timeout):123        """124        Wait until early_status is obtained125        :param proc: test process126        :param timeout: timeout for early_state127        :raise exceptions.TestError: On timeout/error128        """129        step = 0.01130        end = time.time() + timeout131        while not self.early_status:132            if not proc.is_alive():133                if not self.early_status:134                    raise exceptions.TestError("Process died before it pushed "135                                               "early test_status.")136            if time.time() > end and not self.early_status:137                msg = ("Unable to receive test's early-status in %ss, "138                       "something wrong happened probably in the "139                       "avocado framework." % timeout)140                os.kill(proc.pid, signal.SIGKILL)141                raise exceptions.TestError(msg)142            time.sleep(step)143    def _tick(self):144        """145        Process the queue and update current status146        """147        while not self.queue.empty():148            msg = self._get_msg_from_queue()149            if msg is None:150                break151            if "func_at_exit" in msg:152                self.job.funcatexit.register(msg["func_at_exit"],153                                             msg.get("args", tuple()),154                                             msg.get("kwargs", {}),155                                             msg.get("once", False))156            elif not msg.get("running", True):157                self.status = msg158                self.interrupt = True159            elif "paused" in msg:160                self.status = msg161                self.job.result_proxy.notify_progress(False)162                self.job._result_events_dispatcher.map_method('test_progress',163                                                              False)164                if msg['paused']:165                    reason = msg['paused_msg']166                    if reason:167                        self.job.log.warning(reason)168            else:       # test_status169                self.status = msg170    def _add_status_failures(self, test_state):171        """172        Append TestStatus error to test_state in case there were any.173        """174        if self._failed:175            return add_runner_failure(test_state, "ERROR", "TestStatus failed,"176                                      " see overall job.log for details.")177        return test_state178    def finish(self, proc, started, timeout, step):179        """180        Wait for the test process to finish and report status or error status181        if unable to obtain the status till deadline.182        :param proc: The test's process183        :param started: Time when the test started184        :param timeout: Timeout for waiting on status185        :param first: Delay before first check186        :param step: Step between checks for the status187        """188        # Wait for either process termination or test status189        wait.wait_for(lambda: not proc.is_alive() or self.status, timeout, 0,190                      step)191        if self.status:     # status exists, wait for process to finish192            if not wait.wait_for(lambda: not proc.is_alive(), timeout, 0,193                                 step):194                err = "Test reported status but did not finish"195            else:   # Test finished and reported status, pass196                return self._add_status_failures(self.status)197        else:   # proc finished, wait for late status delivery198            if not wait.wait_for(lambda: self.status, timeout, 0, step):199                err = "Test died without reporting the status."200            else:201                # Status delivered after the test process finished, pass202                return self._add_status_failures(self.status)203        # At this point there were failures, fill the new test status204        TEST_LOG.debug("Original status: %s", str(self.status))205        test_state = self.early_status206        test_state['time_start'] = started207        test_state['time_end'] = time.time()208        test_state['time_elapsed'] = test_state['time_end'] - started209        test_state['fail_reason'] = err210        test_state['status'] = exceptions.TestAbortError.status211        test_state['fail_class'] = (exceptions.TestAbortError.__class__.212                                    __name__)213        test_state['traceback'] = 'Traceback not available'214        try:215            with open(test_state['logfile'], 'r') as log_file_obj:216                test_state['text_output'] = log_file_obj.read()217        except IOError:218            test_state["text_output"] = "Not available, file not created yet"219        TEST_LOG.error('ERROR %s -> TestAbortedError: '220                       'Test process died without reporting the status.',221                       test_state['name'])222        if proc.is_alive():223            TEST_LOG.warning("Killing hanged test process %s" % proc.pid)224            for _ in xrange(5):     # I really want to destroy it225                os.kill(proc.pid, signal.SIGKILL)226                if not proc.is_alive():227                    break228                time.sleep(0.1)229            else:230                raise exceptions.TestError("Unable to destroy test's process "231                                           "(%s)" % proc.pid)232        return self._add_status_failures(test_state)233class TestRunner(object):234    """235    A test runner class that displays tests results.236    """237    DEFAULT_TIMEOUT = 86400238    def __init__(self, job, result):239        """240        Creates an instance of TestRunner class.241        :param job: an instance of :class:`avocado.core.job.Job`.242        :param result: an instance of :class:`avocado.core.result.Result`243        """244        self.job = job245        self.result = result246        self.sigstopped = False247    def _run_test(self, test_factory, queue):248        """249        Run a test instance.250        This code is the first thing that runs inside a new process, known here251        as the test process. It communicates to the test runner by using252        :param:`queue`. It's important that this early state is given to the253        test runner in a reliable way.254        :param test_factory: Test factory (test class and parameters).255        :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.256        :param queue: Multiprocess queue.257        :type queue: :class:`multiprocessing.Queue` instance.258        """259        signal.signal(signal.SIGTSTP, signal.SIG_IGN)260        logger_list_stdout = [logging.getLogger('avocado.test.stdout'),261                              TEST_LOG,262                              logging.getLogger('paramiko')]263        logger_list_stderr = [logging.getLogger('avocado.test.stderr'),264                              TEST_LOG,265                              logging.getLogger('paramiko')]266        sys.stdout = output.LoggingFile(logger=logger_list_stdout)267        sys.stderr = output.LoggingFile(logger=logger_list_stderr)268        def sigterm_handler(signum, frame):     # pylint: disable=W0613269            """ Produce traceback on SIGTERM """270            raise SystemExit("Test interrupted by SIGTERM")271        signal.signal(signal.SIGTERM, sigterm_handler)272        # Replace STDIN (0) with the /dev/null's fd273        os.dup2(sys.stdin.fileno(), 0)274        instance = loader.load_test(test_factory)275        if instance.runner_queue is None:276            instance.runner_queue = queue277        runtime.CURRENT_TEST = instance278        early_state = instance.get_state()279        early_state['early_status'] = True280        try:281            queue.put(early_state)282        except Exception:283            instance.error(stacktrace.str_unpickable_object(early_state))284        self.result.start_test(early_state)285        self.job._result_events_dispatcher.map_method('start_test',286                                                      self.result,287                                                      early_state)288        try:289            instance.run_avocado()290        finally:291            try:292                state = instance.get_state()293                queue.put(state)294            except Exception:295                instance.error(stacktrace.str_unpickable_object(state))296    def run_test(self, test_factory, queue, summary, job_deadline=0):297        """298        Run a test instance inside a subprocess.299        :param test_factory: Test factory (test class and parameters).300        :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.301        :param queue: Multiprocess queue.302        :type queue: :class`multiprocessing.Queue` instance.303        :param summary: Contains types of test failures.304        :type summary: set.305        :param job_deadline: Maximum time to execute.306        :type job_deadline: int.307        """308        proc = None309        sigtstp = multiprocessing.Lock()310        def sigtstp_handler(signum, frame):     # pylint: disable=W0613311            """ SIGSTOP all test processes on SIGTSTP """312            if not proc:    # Ignore ctrl+z when proc not yet started313                return314            with sigtstp:315                msg = "ctrl+z pressed, %%s test (%s)" % proc.pid316                if self.sigstopped:317                    APP_LOG.info("\n" + msg, "resumming")318                    TEST_LOG.info(msg, "resumming")319                    process.kill_process_tree(proc.pid, signal.SIGCONT, False)320                    self.sigstopped = False321                else:322                    APP_LOG.info("\n" + msg, "stopping")323                    TEST_LOG.info(msg, "stopping")324                    process.kill_process_tree(proc.pid, signal.SIGSTOP, False)325                    self.sigstopped = True326        signal.signal(signal.SIGTSTP, sigtstp_handler)327        proc = multiprocessing.Process(target=self._run_test,328                                       args=(test_factory, queue,))329        test_status = TestStatus(self.job, queue)330        cycle_timeout = 1331        time_started = time.time()332        proc.start()333        test_status.wait_for_early_status(proc, 10)334        # At this point, the test is already initialized and we know335        # for sure if there's a timeout set.336        timeout = test_status.early_status.get('timeout')337        timeout = float(timeout or self.DEFAULT_TIMEOUT)338        test_deadline = time_started + timeout339        if job_deadline > 0:340            deadline = min(test_deadline, job_deadline)341        else:342            deadline = test_deadline343        ctrl_c_count = 0344        ignore_window = 2.0345        ignore_time_started = time.time()346        stage_1_msg_displayed = False347        stage_2_msg_displayed = False...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
