How to use wait_for_early_status method in avocado

Best Python code snippet using avocado_python

runner.py

Source:runner.py Github

copy

Full Screen

...118 # Update state before returning the value119 if name in ("status", "interrupt"):120 self._tick()121 return super(TestStatus, self).__getattribute__(name)122 def wait_for_early_status(self, proc, timeout):123 """124 Wait until early_status is obtained125 :param proc: test process126 :param timeout: timeout for early_state127 :raise exceptions.TestError: On timeout/error128 """129 step = 0.01130 end = time.time() + timeout131 while not self.early_status:132 if not proc.is_alive():133 if not self.early_status:134 raise exceptions.TestError("Process died before it pushed "135 "early test_status.")136 if time.time() > end and not self.early_status:137 msg = ("Unable to receive test's early-status in %ss, "138 "something wrong happened probably in the "139 "avocado framework." % timeout)140 os.kill(proc.pid, signal.SIGKILL)141 raise exceptions.TestError(msg)142 time.sleep(step)143 def _tick(self):144 """145 Process the queue and update current status146 """147 while not self.queue.empty():148 msg = self._get_msg_from_queue()149 if msg is None:150 break151 if "func_at_exit" in msg:152 self.job.funcatexit.register(msg["func_at_exit"],153 msg.get("args", tuple()),154 msg.get("kwargs", {}),155 msg.get("once", False))156 elif not msg.get("running", True):157 self.status = msg158 self.interrupt = True159 elif "paused" in msg:160 self.status = msg161 self.job.result_proxy.notify_progress(False)162 self.job._result_events_dispatcher.map_method('test_progress',163 False)164 if msg['paused']:165 reason = msg['paused_msg']166 if reason:167 self.job.log.warning(reason)168 else: # test_status169 self.status = msg170 def _add_status_failures(self, test_state):171 """172 Append TestStatus error to test_state in case there were any.173 """174 if self._failed:175 return add_runner_failure(test_state, "ERROR", "TestStatus failed,"176 " see overall job.log for details.")177 return test_state178 def finish(self, proc, started, timeout, step):179 """180 Wait for the test process to finish and report status or error status181 if unable to obtain the status till deadline.182 :param proc: The test's process183 :param started: Time when the test started184 :param timeout: Timeout for waiting on status185 :param first: Delay before first check186 :param step: Step between checks for the status187 """188 # Wait for either process termination or test status189 wait.wait_for(lambda: not proc.is_alive() or self.status, timeout, 0,190 step)191 if self.status: # status exists, wait for process to finish192 if not wait.wait_for(lambda: not proc.is_alive(), timeout, 0,193 step):194 err = "Test reported status but did not finish"195 else: # Test finished and reported status, pass196 return self._add_status_failures(self.status)197 else: # proc finished, wait for late status delivery198 if not wait.wait_for(lambda: self.status, timeout, 0, step):199 err = "Test died without reporting the status."200 else:201 # Status delivered after the test process finished, pass202 return self._add_status_failures(self.status)203 # At this point there were failures, fill the new test status204 TEST_LOG.debug("Original status: %s", str(self.status))205 test_state = self.early_status206 test_state['time_start'] = started207 test_state['time_end'] = time.time()208 test_state['time_elapsed'] = test_state['time_end'] - started209 test_state['fail_reason'] = err210 test_state['status'] = exceptions.TestAbortError.status211 test_state['fail_class'] = (exceptions.TestAbortError.__class__.212 __name__)213 test_state['traceback'] = 'Traceback not available'214 try:215 with open(test_state['logfile'], 'r') as log_file_obj:216 test_state['text_output'] = log_file_obj.read()217 except IOError:218 test_state["text_output"] = "Not available, file not created yet"219 TEST_LOG.error('ERROR %s -> TestAbortedError: '220 'Test process died without reporting the status.',221 test_state['name'])222 if proc.is_alive():223 TEST_LOG.warning("Killing hanged test process %s" % proc.pid)224 for _ in xrange(5): # I really want to destroy it225 os.kill(proc.pid, signal.SIGKILL)226 if not proc.is_alive():227 break228 time.sleep(0.1)229 else:230 raise exceptions.TestError("Unable to destroy test's process "231 "(%s)" % proc.pid)232 return self._add_status_failures(test_state)233class TestRunner(object):234 """235 A test runner class that displays tests results.236 """237 DEFAULT_TIMEOUT = 86400238 def __init__(self, job, result):239 """240 Creates an instance of TestRunner class.241 :param job: an instance of :class:`avocado.core.job.Job`.242 :param result: an instance of :class:`avocado.core.result.Result`243 """244 self.job = job245 self.result = result246 self.sigstopped = False247 def _run_test(self, test_factory, queue):248 """249 Run a test instance.250 This code is the first thing that runs inside a new process, known here251 as the test process. It communicates to the test runner by using252 :param:`queue`. It's important that this early state is given to the253 test runner in a reliable way.254 :param test_factory: Test factory (test class and parameters).255 :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.256 :param queue: Multiprocess queue.257 :type queue: :class:`multiprocessing.Queue` instance.258 """259 signal.signal(signal.SIGTSTP, signal.SIG_IGN)260 logger_list_stdout = [logging.getLogger('avocado.test.stdout'),261 TEST_LOG,262 logging.getLogger('paramiko')]263 logger_list_stderr = [logging.getLogger('avocado.test.stderr'),264 TEST_LOG,265 logging.getLogger('paramiko')]266 sys.stdout = output.LoggingFile(logger=logger_list_stdout)267 sys.stderr = output.LoggingFile(logger=logger_list_stderr)268 def sigterm_handler(signum, frame): # pylint: disable=W0613269 """ Produce traceback on SIGTERM """270 raise SystemExit("Test interrupted by SIGTERM")271 signal.signal(signal.SIGTERM, sigterm_handler)272 # Replace STDIN (0) with the /dev/null's fd273 os.dup2(sys.stdin.fileno(), 0)274 instance = loader.load_test(test_factory)275 if instance.runner_queue is None:276 instance.runner_queue = queue277 runtime.CURRENT_TEST = instance278 early_state = instance.get_state()279 early_state['early_status'] = True280 try:281 queue.put(early_state)282 except Exception:283 instance.error(stacktrace.str_unpickable_object(early_state))284 self.result.start_test(early_state)285 self.job._result_events_dispatcher.map_method('start_test',286 self.result,287 early_state)288 try:289 instance.run_avocado()290 finally:291 try:292 state = instance.get_state()293 queue.put(state)294 except Exception:295 instance.error(stacktrace.str_unpickable_object(state))296 def run_test(self, test_factory, queue, summary, job_deadline=0):297 """298 Run a test instance inside a subprocess.299 :param test_factory: Test factory (test class and parameters).300 :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.301 :param queue: Multiprocess queue.302 :type queue: :class`multiprocessing.Queue` instance.303 :param summary: Contains types of test failures.304 :type summary: set.305 :param job_deadline: Maximum time to execute.306 :type job_deadline: int.307 """308 proc = None309 sigtstp = multiprocessing.Lock()310 def sigtstp_handler(signum, frame): # pylint: disable=W0613311 """ SIGSTOP all test processes on SIGTSTP """312 if not proc: # Ignore ctrl+z when proc not yet started313 return314 with sigtstp:315 msg = "ctrl+z pressed, %%s test (%s)" % proc.pid316 if self.sigstopped:317 APP_LOG.info("\n" + msg, "resumming")318 TEST_LOG.info(msg, "resumming")319 process.kill_process_tree(proc.pid, signal.SIGCONT, False)320 self.sigstopped = False321 else:322 APP_LOG.info("\n" + msg, "stopping")323 TEST_LOG.info(msg, "stopping")324 process.kill_process_tree(proc.pid, signal.SIGSTOP, False)325 self.sigstopped = True326 signal.signal(signal.SIGTSTP, sigtstp_handler)327 proc = multiprocessing.Process(target=self._run_test,328 args=(test_factory, queue,))329 test_status = TestStatus(self.job, queue)330 cycle_timeout = 1331 time_started = time.time()332 proc.start()333 test_status.wait_for_early_status(proc, 10)334 # At this point, the test is already initialized and we know335 # for sure if there's a timeout set.336 timeout = test_status.early_status.get('timeout')337 timeout = float(timeout or self.DEFAULT_TIMEOUT)338 test_deadline = time_started + timeout339 if job_deadline > 0:340 deadline = min(test_deadline, job_deadline)341 else:342 deadline = test_deadline343 ctrl_c_count = 0344 ignore_window = 2.0345 ignore_time_started = time.time()346 stage_1_msg_displayed = False347 stage_2_msg_displayed = False...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful