How to use add_runner_failure method in avocado

Best Python code snippet using avocado_python

runner.py

Source:runner.py Github

copy

Full Screen

...33from ..utils import process34from ..utils import stacktrace35TEST_LOG = logging.getLogger("avocado.test")36APP_LOG = logging.getLogger("avocado.app")37def add_runner_failure(test_state, new_status, message):38 """39 Append runner failure to the overall test status.40 :param test_state: Original test state (dict)41 :param new_status: New test status (PASS/FAIL/ERROR/INTERRUPTED/...)42 :param message: The error message43 """44 # Try to propagate the message everywhere45 message = ("Runner error occurred: %s\nOriginal status: %s\n%s"46 % (message, test_state.get("status"), test_state))47 TEST_LOG.error(message)48 test_log = test_state.get("logfile")49 if test_state.get("text_output"):50 test_state["text_output"] = "%s\n%s\n" % (test_state["text_output"],51 message)52 else:53 test_state["text_output"] = message + "\n"54 if test_log:55 open(test_log, "a").write('\n' + message + '\n')56 # Update the results57 if test_state.get("fail_reason"):58 test_state["fail_reason"] = "%s\n%s" % (test_state["fail_reason"],59 message)60 else:61 test_state["fail_reason"] = message62 if test_state.get("fail_class"):63 test_state["fail_class"] = "%s\nRUNNER" % test_state["fail_class"]64 else:65 test_state["fail_class"] = "RUNNER"66 test_state["status"] = new_status67 return test_state68class TestStatus(object):69 """70 Test status handler71 """72 def __init__(self, job, queue):73 """74 :param job: Associated job75 :param queue: test message queue76 """77 self.job = job78 self.queue = queue79 self._early_status = None80 self.status = {}81 self.interrupt = None82 self._failed = False83 def _get_msg_from_queue(self):84 """85 Helper method to handle safely getting messages from the queue.86 :return: Message, None if exception happened.87 :rtype: dict88 """89 try:90 return self.queue.get()91 # Let's catch all exceptions, since errors here mean a92 # crash in avocado.93 except Exception as details:94 self._failed = True95 TEST_LOG.error("RUNNER: Failed to read queue: %s", details)96 return None97 @property98 def early_status(self):99 """100 Get early status101 """102 if self._early_status:103 return self._early_status104 else:105 queue = []106 while not self.queue.empty():107 msg = self._get_msg_from_queue()108 if msg is None:109 break110 if "early_status" in msg:111 self._early_status = msg112 for _ in queue: # Return all unprocessed messages back113 self.queue.put(_)114 return msg115 else: # Not an early_status message116 queue.append(msg)117 def __getattribute__(self, name):118 # Update state before returning the value119 if name in ("status", "interrupt"):120 self._tick()121 return super(TestStatus, self).__getattribute__(name)122 def wait_for_early_status(self, proc, timeout):123 """124 Wait until early_status is obtained125 :param proc: test process126 :param timeout: timeout for early_state127 :raise exceptions.TestError: On timeout/error128 """129 step = 0.01130 end = time.time() + timeout131 while not self.early_status:132 if not proc.is_alive():133 if not self.early_status:134 raise exceptions.TestError("Process died before it pushed "135 "early test_status.")136 if time.time() > end and not self.early_status:137 msg = ("Unable to receive test's early-status in %ss, "138 "something wrong happened probably in the "139 "avocado framework." % timeout)140 os.kill(proc.pid, signal.SIGKILL)141 raise exceptions.TestError(msg)142 time.sleep(step)143 def _tick(self):144 """145 Process the queue and update current status146 """147 while not self.queue.empty():148 msg = self._get_msg_from_queue()149 if msg is None:150 break151 if "func_at_exit" in msg:152 self.job.funcatexit.register(msg["func_at_exit"],153 msg.get("args", tuple()),154 msg.get("kwargs", {}),155 msg.get("once", False))156 elif not msg.get("running", True):157 self.status = msg158 self.interrupt = True159 elif "paused" in msg:160 self.status = msg161 self.job.result_proxy.notify_progress(False)162 self.job._result_events_dispatcher.map_method('test_progress',163 False)164 if msg['paused']:165 reason = msg['paused_msg']166 if reason:167 self.job.log.warning(reason)168 else: # test_status169 self.status = msg170 def _add_status_failures(self, test_state):171 """172 Append TestStatus error to test_state in case there were any.173 """174 if self._failed:175 return add_runner_failure(test_state, "ERROR", "TestStatus failed,"176 " see overall job.log for details.")177 return test_state178 def finish(self, proc, started, timeout, step):179 """180 Wait for the test process to finish and report status or error status181 if unable to obtain the status till deadline.182 :param proc: The test's process183 :param started: Time when the test started184 :param timeout: Timeout for waiting on status185 :param first: Delay before first check186 :param step: Step between checks for the status187 """188 # Wait for either process termination or test status189 wait.wait_for(lambda: not proc.is_alive() or self.status, timeout, 0,190 step)191 if self.status: # status exists, wait for process to finish192 if not wait.wait_for(lambda: not proc.is_alive(), timeout, 0,193 step):194 err = "Test reported status but did not finish"195 else: # Test finished and reported status, pass196 return self._add_status_failures(self.status)197 else: # proc finished, wait for late status delivery198 if not wait.wait_for(lambda: self.status, timeout, 0, step):199 err = "Test died without reporting the status."200 else:201 # Status delivered after the test process finished, pass202 return self._add_status_failures(self.status)203 # At this point there were failures, fill the new test status204 TEST_LOG.debug("Original status: %s", str(self.status))205 test_state = self.early_status206 test_state['time_start'] = started207 test_state['time_end'] = time.time()208 test_state['time_elapsed'] = test_state['time_end'] - started209 test_state['fail_reason'] = err210 test_state['status'] = exceptions.TestAbortError.status211 test_state['fail_class'] = (exceptions.TestAbortError.__class__.212 __name__)213 test_state['traceback'] = 'Traceback not available'214 try:215 with open(test_state['logfile'], 'r') as log_file_obj:216 test_state['text_output'] = log_file_obj.read()217 except IOError:218 test_state["text_output"] = "Not available, file not created yet"219 TEST_LOG.error('ERROR %s -> TestAbortedError: '220 'Test process died without reporting the status.',221 test_state['name'])222 if proc.is_alive():223 TEST_LOG.warning("Killing hanged test process %s" % proc.pid)224 for _ in xrange(5): # I really want to destroy it225 os.kill(proc.pid, signal.SIGKILL)226 if not proc.is_alive():227 break228 time.sleep(0.1)229 else:230 raise exceptions.TestError("Unable to destroy test's process "231 "(%s)" % proc.pid)232 return self._add_status_failures(test_state)233class TestRunner(object):234 """235 A test runner class that displays tests results.236 """237 DEFAULT_TIMEOUT = 86400238 def __init__(self, job, result):239 """240 Creates an instance of TestRunner class.241 :param job: an instance of :class:`avocado.core.job.Job`.242 :param result: an instance of :class:`avocado.core.result.Result`243 """244 self.job = job245 self.result = result246 self.sigstopped = False247 def _run_test(self, test_factory, queue):248 """249 Run a test instance.250 This code is the first thing that runs inside a new process, known here251 as the test process. It communicates to the test runner by using252 :param:`queue`. It's important that this early state is given to the253 test runner in a reliable way.254 :param test_factory: Test factory (test class and parameters).255 :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.256 :param queue: Multiprocess queue.257 :type queue: :class:`multiprocessing.Queue` instance.258 """259 signal.signal(signal.SIGTSTP, signal.SIG_IGN)260 logger_list_stdout = [logging.getLogger('avocado.test.stdout'),261 TEST_LOG,262 logging.getLogger('paramiko')]263 logger_list_stderr = [logging.getLogger('avocado.test.stderr'),264 TEST_LOG,265 logging.getLogger('paramiko')]266 sys.stdout = output.LoggingFile(logger=logger_list_stdout)267 sys.stderr = output.LoggingFile(logger=logger_list_stderr)268 def sigterm_handler(signum, frame): # pylint: disable=W0613269 """ Produce traceback on SIGTERM """270 raise SystemExit("Test interrupted by SIGTERM")271 signal.signal(signal.SIGTERM, sigterm_handler)272 # Replace STDIN (0) with the /dev/null's fd273 os.dup2(sys.stdin.fileno(), 0)274 instance = loader.load_test(test_factory)275 if instance.runner_queue is None:276 instance.runner_queue = queue277 runtime.CURRENT_TEST = instance278 early_state = instance.get_state()279 early_state['early_status'] = True280 try:281 queue.put(early_state)282 except Exception:283 instance.error(stacktrace.str_unpickable_object(early_state))284 self.result.start_test(early_state)285 self.job._result_events_dispatcher.map_method('start_test',286 self.result,287 early_state)288 try:289 instance.run_avocado()290 finally:291 try:292 state = instance.get_state()293 queue.put(state)294 except Exception:295 instance.error(stacktrace.str_unpickable_object(state))296 def run_test(self, test_factory, queue, summary, job_deadline=0):297 """298 Run a test instance inside a subprocess.299 :param test_factory: Test factory (test class and parameters).300 :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.301 :param queue: Multiprocess queue.302 :type queue: :class`multiprocessing.Queue` instance.303 :param summary: Contains types of test failures.304 :type summary: set.305 :param job_deadline: Maximum time to execute.306 :type job_deadline: int.307 """308 proc = None309 sigtstp = multiprocessing.Lock()310 def sigtstp_handler(signum, frame): # pylint: disable=W0613311 """ SIGSTOP all test processes on SIGTSTP """312 if not proc: # Ignore ctrl+z when proc not yet started313 return314 with sigtstp:315 msg = "ctrl+z pressed, %%s test (%s)" % proc.pid316 if self.sigstopped:317 APP_LOG.info("\n" + msg, "resumming")318 TEST_LOG.info(msg, "resumming")319 process.kill_process_tree(proc.pid, signal.SIGCONT, False)320 self.sigstopped = False321 else:322 APP_LOG.info("\n" + msg, "stopping")323 TEST_LOG.info(msg, "stopping")324 process.kill_process_tree(proc.pid, signal.SIGSTOP, False)325 self.sigstopped = True326 signal.signal(signal.SIGTSTP, sigtstp_handler)327 proc = multiprocessing.Process(target=self._run_test,328 args=(test_factory, queue,))329 test_status = TestStatus(self.job, queue)330 cycle_timeout = 1331 time_started = time.time()332 proc.start()333 test_status.wait_for_early_status(proc, 10)334 # At this point, the test is already initialized and we know335 # for sure if there's a timeout set.336 timeout = test_status.early_status.get('timeout')337 timeout = float(timeout or self.DEFAULT_TIMEOUT)338 test_deadline = time_started + timeout339 if job_deadline > 0:340 deadline = min(test_deadline, job_deadline)341 else:342 deadline = test_deadline343 ctrl_c_count = 0344 ignore_window = 2.0345 ignore_time_started = time.time()346 stage_1_msg_displayed = False347 stage_2_msg_displayed = False348 first = 0.01349 step = 0.01350 abort_reason = None351 result_dispatcher = self.job._result_events_dispatcher352 while True:353 try:354 if time.time() >= deadline:355 abort_reason = "Timeout reached"356 try:357 os.kill(proc.pid, signal.SIGTERM)358 except OSError:359 pass360 break361 wait.wait_for(lambda: not queue.empty() or not proc.is_alive(),362 cycle_timeout, first, step)363 if test_status.interrupt:364 break365 if proc.is_alive():366 if ctrl_c_count == 0:367 if (test_status.status.get('running') or368 self.sigstopped):369 result_dispatcher.map_method('test_progress',370 False)371 else:372 result_dispatcher.map_method('test_progress', True)373 else:374 break375 except KeyboardInterrupt:376 time_elapsed = time.time() - ignore_time_started377 ctrl_c_count += 1378 if ctrl_c_count == 1:379 if not stage_1_msg_displayed:380 abort_reason = "Interrupted by ctrl+c"381 self.job.log.debug("\nInterrupt requested. Waiting %d "382 "seconds for test to finish "383 "(ignoring new Ctrl+C until then)",384 ignore_window)385 stage_1_msg_displayed = True386 ignore_time_started = time.time()387 if (ctrl_c_count > 1) and (time_elapsed > ignore_window):388 if not stage_2_msg_displayed:389 abort_reason = "Interrupted by ctrl+c (multiple-times)"390 self.job.log.debug("Killing test subprocess %s",391 proc.pid)392 stage_2_msg_displayed = True393 os.kill(proc.pid, signal.SIGKILL)394 # Get/update the test status395 test_state = test_status.finish(proc, time_started, cycle_timeout,396 step)397 # Try to log the timeout reason to test's results and update test_state398 if abort_reason:399 test_state = add_runner_failure(test_state, "INTERRUPTED",400 abort_reason)401 # don't process other tests from the list402 if ctrl_c_count > 0:403 self.job.log.debug('')404 # Make sure the test status is correct405 if test_state.get('status') not in status.user_facing_status:406 test_state = add_runner_failure(test_state, "ERROR", "Test reports"407 " unsupported test status.")408 self.result.check_test(test_state)409 result_dispatcher.map_method('end_test', self.result, test_state)410 if test_state['status'] == "INTERRUPTED":411 summary.add("INTERRUPTED")412 elif not mapping[test_state['status']]:413 summary.add("FAIL")414 if getattr(self.job.args, 'failfast', 'off') == 'on':415 summary.add("INTERRUPTED")416 self.job.log.debug("Interrupting job (failfast).")417 return False418 if test_factory[1]['ct_params'].get('abort_on_error', 'no') in 'yes' \419 and test_state.get('status') in ('ERROR', 'FAIL'):420 ctrl_c_count += 1...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful