...33from ..utils import process34from ..utils import stacktrace35TEST_LOG = logging.getLogger("avocado.test")36APP_LOG = logging.getLogger("")37def add_runner_failure(test_state, new_status, message):38 """39 Append runner failure to the overall test status.40 :param test_state: Original test state (dict)41 :param new_status: New test status (PASS/FAIL/ERROR/INTERRUPTED/...)42 :param message: The error message43 """44 # Try to propagate the message everywhere45 message = ("Runner error occurred: %s\nOriginal status: %s\n%s"46 % (message, test_state.get("status"), test_state))47 TEST_LOG.error(message)48 test_log = test_state.get("logfile")49 if test_state.get("text_output"):50 test_state["text_output"] = "%s\n%s\n" % (test_state["text_output"],51 message)52 else:53 test_state["text_output"] = message + "\n"54 if test_log:55 open(test_log, "a").write('\n' + message + '\n')56 # Update the results57 if test_state.get("fail_reason"):58 test_state["fail_reason"] = "%s\n%s" % (test_state["fail_reason"],59 message)60 else:61 test_state["fail_reason"] = message62 if test_state.get("fail_class"):63 test_state["fail_class"] = "%s\nRUNNER" % test_state["fail_class"]64 else:65 test_state["fail_class"] = "RUNNER"66 test_state["status"] = new_status67 return test_state68class TestStatus(object):69 """70 Test status handler71 """72 def __init__(self, job, queue):73 """74 :param job: Associated job75 :param queue: test message queue76 """77 self.job = job78 self.queue = queue79 self._early_status = None80 self.status = {}81 self.interrupt = None82 self._failed = False83 def _get_msg_from_queue(self):84 """85 Helper method to handle safely getting messages from the queue.86 :return: Message, None if exception happened.87 :rtype: dict88 """89 try:90 return self.queue.get()91 # Let's catch all exceptions, since errors here mean a92 # crash in avocado.93 except Exception as details:94 self._failed = True95 TEST_LOG.error("RUNNER: Failed to read queue: %s", details)96 return None97 @property98 def early_status(self):99 """100 Get early status101 """102 if self._early_status:103 return self._early_status104 else:105 queue = []106 while not self.queue.empty():107 msg = self._get_msg_from_queue()108 if msg is None:109 break110 if "early_status" in msg:111 self._early_status = msg112 for _ in queue: # Return all unprocessed messages back113 self.queue.put(_)114 return msg115 else: # Not an early_status message116 queue.append(msg)117 def __getattribute__(self, name):118 # Update state before returning the value119 if name in ("status", "interrupt"):120 self._tick()121 return super(TestStatus, self).__getattribute__(name)122 def wait_for_early_status(self, proc, timeout):123 """124 Wait until early_status is obtained125 :param proc: test process126 :param timeout: timeout for early_state127 :raise exceptions.TestError: On timeout/error128 """129 step = 0.01130 end = time.time() + timeout131 while not self.early_status:132 if not proc.is_alive():133 if not self.early_status:134 raise exceptions.TestError("Process died before it pushed "135 "early test_status.")136 if time.time() > end and not self.early_status:137 msg = ("Unable to receive test's early-status in %ss, "138 "something wrong happened probably in the "139 "avocado framework." % timeout)140 os.kill(, signal.SIGKILL)141 raise exceptions.TestError(msg)142 time.sleep(step)143 def _tick(self):144 """145 Process the queue and update current status146 """147 while not self.queue.empty():148 msg = self._get_msg_from_queue()149 if msg is None:150 break151 if "func_at_exit" in msg:152 self.job.funcatexit.register(msg["func_at_exit"],153 msg.get("args", tuple()),154 msg.get("kwargs", {}),155 msg.get("once", False))156 elif not msg.get("running", True):157 self.status = msg158 self.interrupt = True159 elif "paused" in msg:160 self.status = msg161 self.job.result_proxy.notify_progress(False)162 self.job._result_events_dispatcher.map_method('test_progress',163 False)164 if msg['paused']:165 reason = msg['paused_msg']166 if reason:167 self.job.log.warning(reason)168 else: # test_status169 self.status = msg170 def _add_status_failures(self, test_state):171 """172 Append TestStatus error to test_state in case there were any.173 """174 if self._failed:175 return add_runner_failure(test_state, "ERROR", "TestStatus failed,"176 " see overall job.log for details.")177 return test_state178 def finish(self, proc, started, timeout, step):179 """180 Wait for the test process to finish and report status or error status181 if unable to obtain the status till deadline.182 :param proc: The test's process183 :param started: Time when the test started184 :param timeout: Timeout for waiting on status185 :param first: Delay before first check186 :param step: Step between checks for the status187 """188 # Wait for either process termination or test status189 wait.wait_for(lambda: not proc.is_alive() or self.status, timeout, 0,190 step)191 if self.status: # status exists, wait for process to finish192 if not wait.wait_for(lambda: not proc.is_alive(), timeout, 0,193 step):194 err = "Test reported status but did not finish"195 else: # Test finished and reported status, pass196 return self._add_status_failures(self.status)197 else: # proc finished, wait for late status delivery198 if not wait.wait_for(lambda: self.status, timeout, 0, step):199 err = "Test died without reporting the status."200 else:201 # Status delivered after the test process finished, pass202 return self._add_status_failures(self.status)203 # At this point there were failures, fill the new test status204 TEST_LOG.debug("Original status: %s", str(self.status))205 test_state = self.early_status206 test_state['time_start'] = started207 test_state['time_end'] = time.time()208 test_state['time_elapsed'] = test_state['time_end'] - started209 test_state['fail_reason'] = err210 test_state['status'] = exceptions.TestAbortError.status211 test_state['fail_class'] = (exceptions.TestAbortError.__class__.212 __name__)213 test_state['traceback'] = 'Traceback not available'214 try:215 with open(test_state['logfile'], 'r') as log_file_obj:216 test_state['text_output'] = except IOError:218 test_state["text_output"] = "Not available, file not created yet"219 TEST_LOG.error('ERROR %s -> TestAbortedError: '220 'Test process died without reporting the status.',221 test_state['name'])222 if proc.is_alive():223 TEST_LOG.warning("Killing hanged test process %s" % for _ in xrange(5): # I really want to destroy it225 os.kill(, signal.SIGKILL)226 if not proc.is_alive():227 break228 time.sleep(0.1)229 else:230 raise exceptions.TestError("Unable to destroy test's process "231 "(%s)" % return self._add_status_failures(test_state)233class TestRunner(object):234 """235 A test runner class that displays tests results.236 """237 DEFAULT_TIMEOUT = 86400238 def __init__(self, job, result):239 """240 Creates an instance of TestRunner class.241 :param job: an instance of :class:`avocado.core.job.Job`.242 :param result: an instance of :class:`avocado.core.result.Result`243 """244 self.job = job245 self.result = result246 self.sigstopped = False247 def _run_test(self, test_factory, queue):248 """249 Run a test instance.250 This code is the first thing that runs inside a new process, known here251 as the test process. It communicates to the test runner by using252 :param:`queue`. It's important that this early state is given to the253 test runner in a reliable way.254 :param test_factory: Test factory (test class and parameters).255 :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.256 :param queue: Multiprocess queue.257 :type queue: :class:`multiprocessing.Queue` instance.258 """259 signal.signal(signal.SIGTSTP, signal.SIG_IGN)260 logger_list_stdout = [logging.getLogger('avocado.test.stdout'),261 TEST_LOG,262 logging.getLogger('paramiko')]263 logger_list_stderr = [logging.getLogger('avocado.test.stderr'),264 TEST_LOG,265 logging.getLogger('paramiko')]266 sys.stdout = output.LoggingFile(logger=logger_list_stdout)267 sys.stderr = output.LoggingFile(logger=logger_list_stderr)268 def sigterm_handler(signum, frame): # pylint: disable=W0613269 """ Produce traceback on SIGTERM """270 raise SystemExit("Test interrupted by SIGTERM")271 signal.signal(signal.SIGTERM, sigterm_handler)272 # Replace STDIN (0) with the /dev/null's fd273 os.dup2(sys.stdin.fileno(), 0)274 instance = loader.load_test(test_factory)275 if instance.runner_queue is None:276 instance.runner_queue = queue277 runtime.CURRENT_TEST = instance278 early_state = instance.get_state()279 early_state['early_status'] = True280 try:281 queue.put(early_state)282 except Exception:283 instance.error(stacktrace.str_unpickable_object(early_state))284 self.result.start_test(early_state)285 self.job._result_events_dispatcher.map_method('start_test',286 self.result,287 early_state)288 try:289 instance.run_avocado()290 finally:291 try:292 state = instance.get_state()293 queue.put(state)294 except Exception:295 instance.error(stacktrace.str_unpickable_object(state))296 def run_test(self, test_factory, queue, summary, job_deadline=0):297 """298 Run a test instance inside a subprocess.299 :param test_factory: Test factory (test class and parameters).300 :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.301 :param queue: Multiprocess queue.302 :type queue: :class`multiprocessing.Queue` instance.303 :param summary: Contains types of test failures.304 :type summary: set.305 :param job_deadline: Maximum time to execute.306 :type job_deadline: int.307 """308 proc = None309 sigtstp = multiprocessing.Lock()310 def sigtstp_handler(signum, frame): # pylint: disable=W0613311 """ SIGSTOP all test processes on SIGTSTP """312 if not proc: # Ignore ctrl+z when proc not yet started313 return314 with sigtstp:315 msg = "ctrl+z pressed, %%s test (%s)" % proc.pid316 if self.sigstopped:317"\n" + msg, "resumming")318, "resumming")319 process.kill_process_tree(, signal.SIGCONT, False)320 self.sigstopped = False321 else:322"\n" + msg, "stopping")323, "stopping")324 process.kill_process_tree(, signal.SIGSTOP, False)325 self.sigstopped = True326 signal.signal(signal.SIGTSTP, sigtstp_handler)327 proc = multiprocessing.Process(target=self._run_test,328 args=(test_factory, queue,))329 test_status = TestStatus(self.job, queue)330 cycle_timeout = 1331 time_started = time.time()332 proc.start()333 test_status.wait_for_early_status(proc, 10)334 # At this point, the test is already initialized and we know335 # for sure if there's a timeout set.336 timeout = test_status.early_status.get('timeout')337 timeout = float(timeout or self.DEFAULT_TIMEOUT)338 test_deadline = time_started + timeout339 if job_deadline > 0:340 deadline = min(test_deadline, job_deadline)341 else:342 deadline = test_deadline343 ctrl_c_count = 0344 ignore_window = 2.0345 ignore_time_started = time.time()346 stage_1_msg_displayed = False347 stage_2_msg_displayed = False348 first = 0.01349 step = 0.01350 abort_reason = None351 result_dispatcher = self.job._result_events_dispatcher352 while True:353 try:354 if time.time() >= deadline:355 abort_reason = "Timeout reached"356 try:357 os.kill(, signal.SIGTERM)358 except OSError:359 pass360 break361 wait.wait_for(lambda: not queue.empty() or not proc.is_alive(),362 cycle_timeout, first, step)363 if test_status.interrupt:364 break365 if proc.is_alive():366 if ctrl_c_count == 0:367 if (test_status.status.get('running') or368 self.sigstopped):369 result_dispatcher.map_method('test_progress',370 False)371 else:372 result_dispatcher.map_method('test_progress', True)373 else:374 break375 except KeyboardInterrupt:376 time_elapsed = time.time() - ignore_time_started377 ctrl_c_count += 1378 if ctrl_c_count == 1:379 if not stage_1_msg_displayed:380 abort_reason = "Interrupted by ctrl+c"381 self.job.log.debug("\nInterrupt requested. Waiting %d "382 "seconds for test to finish "383 "(ignoring new Ctrl+C until then)",384 ignore_window)385 stage_1_msg_displayed = True386 ignore_time_started = time.time()387 if (ctrl_c_count > 1) and (time_elapsed > ignore_window):388 if not stage_2_msg_displayed:389 abort_reason = "Interrupted by ctrl+c (multiple-times)"390 self.job.log.debug("Killing test subprocess %s",391 stage_2_msg_displayed = True393 os.kill(, signal.SIGKILL)394 # Get/update the test status395 test_state = test_status.finish(proc, time_started, cycle_timeout,396 step)397 # Try to log the timeout reason to test's results and update test_state398 if abort_reason:399 test_state = add_runner_failure(test_state, "INTERRUPTED",400 abort_reason)401 # don't process other tests from the list402 if ctrl_c_count > 0:403 self.job.log.debug('')404 # Make sure the test status is correct405 if test_state.get('status') not in status.user_facing_status:406 test_state = add_runner_failure(test_state, "ERROR", "Test reports"407 " unsupported test status.")408 self.result.check_test(test_state)409 result_dispatcher.map_method('end_test', self.result, test_state)410 if test_state['status'] == "INTERRUPTED":411 summary.add("INTERRUPTED")412 elif not mapping[test_state['status']]:413 summary.add("FAIL")414 if getattr(self.job.args, 'failfast', 'off') == 'on':415 summary.add("INTERRUPTED")416 self.job.log.debug("Interrupting job (failfast).")417 return False418 if test_factory[1]['ct_params'].get('abort_on_error', 'no') in 'yes' \419 and test_state.get('status') in ('ERROR', 'FAIL'):420 ctrl_c_count += 1...

