Best Python code snippet using avocado_python
runner.py
Source:runner.py  
...33from ..utils import process34from ..utils import stacktrace35TEST_LOG = logging.getLogger("avocado.test")36APP_LOG = logging.getLogger("avocado.app")37def add_runner_failure(test_state, new_status, message):38    """39    Append runner failure to the overall test status.40    :param test_state: Original test state (dict)41    :param new_status: New test status (PASS/FAIL/ERROR/INTERRUPTED/...)42    :param message: The error message43    """44    # Try to propagate the message everywhere45    message = ("Runner error occurred: %s\nOriginal status: %s\n%s"46               % (message, test_state.get("status"), test_state))47    TEST_LOG.error(message)48    test_log = test_state.get("logfile")49    if test_state.get("text_output"):50        test_state["text_output"] = "%s\n%s\n" % (test_state["text_output"],51                                                  message)52    else:53        test_state["text_output"] = message + "\n"54    if test_log:55        open(test_log, "a").write('\n' + message + '\n')56    # Update the results57    if test_state.get("fail_reason"):58        test_state["fail_reason"] = "%s\n%s" % (test_state["fail_reason"],59                                                message)60    else:61        test_state["fail_reason"] = message62    if test_state.get("fail_class"):63        test_state["fail_class"] = "%s\nRUNNER" % test_state["fail_class"]64    else:65        test_state["fail_class"] = "RUNNER"66    test_state["status"] = new_status67    return test_state68class TestStatus(object):69    """70    Test status handler71    """72    def __init__(self, job, queue):73        """74        :param job: Associated job75        :param queue: test message queue76        """77        self.job = job78        self.queue = queue79        self._early_status = None80        self.status = {}81        self.interrupt = None82        self._failed = False83    def _get_msg_from_queue(self):84        """85        Helper method to handle safely getting messages from the queue.86        :return: Message, None if exception happened.87        :rtype: dict88        """89        try:90            return self.queue.get()91        # Let's catch all exceptions, since errors here mean a92        # crash in avocado.93        except Exception as details:94            self._failed = True95            TEST_LOG.error("RUNNER: Failed to read queue: %s", details)96            return None97    @property98    def early_status(self):99        """100        Get early status101        """102        if self._early_status:103            return self._early_status104        else:105            queue = []106            while not self.queue.empty():107                msg = self._get_msg_from_queue()108                if msg is None:109                    break110                if "early_status" in msg:111                    self._early_status = msg112                    for _ in queue:     # Return all unprocessed messages back113                        self.queue.put(_)114                    return msg115                else:   # Not an early_status message116                    queue.append(msg)117    def __getattribute__(self, name):118        # Update state before returning the value119        if name in ("status", "interrupt"):120            self._tick()121        return super(TestStatus, self).__getattribute__(name)122    def wait_for_early_status(self, proc, timeout):123        """124        Wait until early_status is obtained125        :param proc: test process126        :param timeout: timeout for early_state127        :raise exceptions.TestError: On timeout/error128        """129        step = 0.01130        end = time.time() + timeout131        while not self.early_status:132            if not proc.is_alive():133                if not self.early_status:134                    raise exceptions.TestError("Process died before it pushed "135                                               "early test_status.")136            if time.time() > end and not self.early_status:137                msg = ("Unable to receive test's early-status in %ss, "138                       "something wrong happened probably in the "139                       "avocado framework." % timeout)140                os.kill(proc.pid, signal.SIGKILL)141                raise exceptions.TestError(msg)142            time.sleep(step)143    def _tick(self):144        """145        Process the queue and update current status146        """147        while not self.queue.empty():148            msg = self._get_msg_from_queue()149            if msg is None:150                break151            if "func_at_exit" in msg:152                self.job.funcatexit.register(msg["func_at_exit"],153                                             msg.get("args", tuple()),154                                             msg.get("kwargs", {}),155                                             msg.get("once", False))156            elif not msg.get("running", True):157                self.status = msg158                self.interrupt = True159            elif "paused" in msg:160                self.status = msg161                self.job.result_proxy.notify_progress(False)162                self.job._result_events_dispatcher.map_method('test_progress',163                                                              False)164                if msg['paused']:165                    reason = msg['paused_msg']166                    if reason:167                        self.job.log.warning(reason)168            else:       # test_status169                self.status = msg170    def _add_status_failures(self, test_state):171        """172        Append TestStatus error to test_state in case there were any.173        """174        if self._failed:175            return add_runner_failure(test_state, "ERROR", "TestStatus failed,"176                                      " see overall job.log for details.")177        return test_state178    def finish(self, proc, started, timeout, step):179        """180        Wait for the test process to finish and report status or error status181        if unable to obtain the status till deadline.182        :param proc: The test's process183        :param started: Time when the test started184        :param timeout: Timeout for waiting on status185        :param first: Delay before first check186        :param step: Step between checks for the status187        """188        # Wait for either process termination or test status189        wait.wait_for(lambda: not proc.is_alive() or self.status, timeout, 0,190                      step)191        if self.status:     # status exists, wait for process to finish192            if not wait.wait_for(lambda: not proc.is_alive(), timeout, 0,193                                 step):194                err = "Test reported status but did not finish"195            else:   # Test finished and reported status, pass196                return self._add_status_failures(self.status)197        else:   # proc finished, wait for late status delivery198            if not wait.wait_for(lambda: self.status, timeout, 0, step):199                err = "Test died without reporting the status."200            else:201                # Status delivered after the test process finished, pass202                return self._add_status_failures(self.status)203        # At this point there were failures, fill the new test status204        TEST_LOG.debug("Original status: %s", str(self.status))205        test_state = self.early_status206        test_state['time_start'] = started207        test_state['time_end'] = time.time()208        test_state['time_elapsed'] = test_state['time_end'] - started209        test_state['fail_reason'] = err210        test_state['status'] = exceptions.TestAbortError.status211        test_state['fail_class'] = (exceptions.TestAbortError.__class__.212                                    __name__)213        test_state['traceback'] = 'Traceback not available'214        try:215            with open(test_state['logfile'], 'r') as log_file_obj:216                test_state['text_output'] = log_file_obj.read()217        except IOError:218            test_state["text_output"] = "Not available, file not created yet"219        TEST_LOG.error('ERROR %s -> TestAbortedError: '220                       'Test process died without reporting the status.',221                       test_state['name'])222        if proc.is_alive():223            TEST_LOG.warning("Killing hanged test process %s" % proc.pid)224            for _ in xrange(5):     # I really want to destroy it225                os.kill(proc.pid, signal.SIGKILL)226                if not proc.is_alive():227                    break228                time.sleep(0.1)229            else:230                raise exceptions.TestError("Unable to destroy test's process "231                                           "(%s)" % proc.pid)232        return self._add_status_failures(test_state)233class TestRunner(object):234    """235    A test runner class that displays tests results.236    """237    DEFAULT_TIMEOUT = 86400238    def __init__(self, job, result):239        """240        Creates an instance of TestRunner class.241        :param job: an instance of :class:`avocado.core.job.Job`.242        :param result: an instance of :class:`avocado.core.result.Result`243        """244        self.job = job245        self.result = result246        self.sigstopped = False247    def _run_test(self, test_factory, queue):248        """249        Run a test instance.250        This code is the first thing that runs inside a new process, known here251        as the test process. It communicates to the test runner by using252        :param:`queue`. It's important that this early state is given to the253        test runner in a reliable way.254        :param test_factory: Test factory (test class and parameters).255        :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.256        :param queue: Multiprocess queue.257        :type queue: :class:`multiprocessing.Queue` instance.258        """259        signal.signal(signal.SIGTSTP, signal.SIG_IGN)260        logger_list_stdout = [logging.getLogger('avocado.test.stdout'),261                              TEST_LOG,262                              logging.getLogger('paramiko')]263        logger_list_stderr = [logging.getLogger('avocado.test.stderr'),264                              TEST_LOG,265                              logging.getLogger('paramiko')]266        sys.stdout = output.LoggingFile(logger=logger_list_stdout)267        sys.stderr = output.LoggingFile(logger=logger_list_stderr)268        def sigterm_handler(signum, frame):     # pylint: disable=W0613269            """ Produce traceback on SIGTERM """270            raise SystemExit("Test interrupted by SIGTERM")271        signal.signal(signal.SIGTERM, sigterm_handler)272        # Replace STDIN (0) with the /dev/null's fd273        os.dup2(sys.stdin.fileno(), 0)274        instance = loader.load_test(test_factory)275        if instance.runner_queue is None:276            instance.runner_queue = queue277        runtime.CURRENT_TEST = instance278        early_state = instance.get_state()279        early_state['early_status'] = True280        try:281            queue.put(early_state)282        except Exception:283            instance.error(stacktrace.str_unpickable_object(early_state))284        self.result.start_test(early_state)285        self.job._result_events_dispatcher.map_method('start_test',286                                                      self.result,287                                                      early_state)288        try:289            instance.run_avocado()290        finally:291            try:292                state = instance.get_state()293                queue.put(state)294            except Exception:295                instance.error(stacktrace.str_unpickable_object(state))296    def run_test(self, test_factory, queue, summary, job_deadline=0):297        """298        Run a test instance inside a subprocess.299        :param test_factory: Test factory (test class and parameters).300        :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.301        :param queue: Multiprocess queue.302        :type queue: :class`multiprocessing.Queue` instance.303        :param summary: Contains types of test failures.304        :type summary: set.305        :param job_deadline: Maximum time to execute.306        :type job_deadline: int.307        """308        proc = None309        sigtstp = multiprocessing.Lock()310        def sigtstp_handler(signum, frame):     # pylint: disable=W0613311            """ SIGSTOP all test processes on SIGTSTP """312            if not proc:    # Ignore ctrl+z when proc not yet started313                return314            with sigtstp:315                msg = "ctrl+z pressed, %%s test (%s)" % proc.pid316                if self.sigstopped:317                    APP_LOG.info("\n" + msg, "resumming")318                    TEST_LOG.info(msg, "resumming")319                    process.kill_process_tree(proc.pid, signal.SIGCONT, False)320                    self.sigstopped = False321                else:322                    APP_LOG.info("\n" + msg, "stopping")323                    TEST_LOG.info(msg, "stopping")324                    process.kill_process_tree(proc.pid, signal.SIGSTOP, False)325                    self.sigstopped = True326        signal.signal(signal.SIGTSTP, sigtstp_handler)327        proc = multiprocessing.Process(target=self._run_test,328                                       args=(test_factory, queue,))329        test_status = TestStatus(self.job, queue)330        cycle_timeout = 1331        time_started = time.time()332        proc.start()333        test_status.wait_for_early_status(proc, 10)334        # At this point, the test is already initialized and we know335        # for sure if there's a timeout set.336        timeout = test_status.early_status.get('timeout')337        timeout = float(timeout or self.DEFAULT_TIMEOUT)338        test_deadline = time_started + timeout339        if job_deadline > 0:340            deadline = min(test_deadline, job_deadline)341        else:342            deadline = test_deadline343        ctrl_c_count = 0344        ignore_window = 2.0345        ignore_time_started = time.time()346        stage_1_msg_displayed = False347        stage_2_msg_displayed = False348        first = 0.01349        step = 0.01350        abort_reason = None351        result_dispatcher = self.job._result_events_dispatcher352        while True:353            try:354                if time.time() >= deadline:355                    abort_reason = "Timeout reached"356                    try:357                        os.kill(proc.pid, signal.SIGTERM)358                    except OSError:359                        pass360                    break361                wait.wait_for(lambda: not queue.empty() or not proc.is_alive(),362                              cycle_timeout, first, step)363                if test_status.interrupt:364                    break365                if proc.is_alive():366                    if ctrl_c_count == 0:367                        if (test_status.status.get('running') or368                                self.sigstopped):369                            result_dispatcher.map_method('test_progress',370                                                         False)371                        else:372                            result_dispatcher.map_method('test_progress', True)373                else:374                    break375            except KeyboardInterrupt:376                time_elapsed = time.time() - ignore_time_started377                ctrl_c_count += 1378                if ctrl_c_count == 1:379                    if not stage_1_msg_displayed:380                        abort_reason = "Interrupted by ctrl+c"381                        self.job.log.debug("\nInterrupt requested. Waiting %d "382                                           "seconds for test to finish "383                                           "(ignoring new Ctrl+C until then)",384                                           ignore_window)385                        stage_1_msg_displayed = True386                    ignore_time_started = time.time()387                if (ctrl_c_count > 1) and (time_elapsed > ignore_window):388                    if not stage_2_msg_displayed:389                        abort_reason = "Interrupted by ctrl+c (multiple-times)"390                        self.job.log.debug("Killing test subprocess %s",391                                           proc.pid)392                        stage_2_msg_displayed = True393                    os.kill(proc.pid, signal.SIGKILL)394        # Get/update the test status395        test_state = test_status.finish(proc, time_started, cycle_timeout,396                                        step)397        # Try to log the timeout reason to test's results and update test_state398        if abort_reason:399            test_state = add_runner_failure(test_state, "INTERRUPTED",400                                            abort_reason)401        # don't process other tests from the list402        if ctrl_c_count > 0:403            self.job.log.debug('')404        # Make sure the test status is correct405        if test_state.get('status') not in status.user_facing_status:406            test_state = add_runner_failure(test_state, "ERROR", "Test reports"407                                            " unsupported test status.")408        self.result.check_test(test_state)409        result_dispatcher.map_method('end_test', self.result, test_state)410        if test_state['status'] == "INTERRUPTED":411            summary.add("INTERRUPTED")412        elif not mapping[test_state['status']]:413            summary.add("FAIL")414            if getattr(self.job.args, 'failfast', 'off') == 'on':415                summary.add("INTERRUPTED")416                self.job.log.debug("Interrupting job (failfast).")417                return False418        if test_factory[1]['ct_params'].get('abort_on_error', 'no') in 'yes' \419           and test_state.get('status') in ('ERROR', 'FAIL'):420            ctrl_c_count += 1...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
