How to use result_events_dispatcher method in avocado

Best Python code snippet using avocado_python

job.py

Source:job.py Github

copy

Full Screen

...130 self.replay_sourcejob = self.config.get('replay_sourcejob')131 self.exitcode = exit_codes.AVOCADO_ALL_OK132 self._result_events_dispatcher = None133 @property134 def result_events_dispatcher(self):135 # The result events dispatcher is shared with the test runner.136 # Because of our goal to support using the phases of a job137 # freely, let's get the result events dispatcher on first usage.138 # A future optimization may load it on demand.139 if self._result_events_dispatcher is None:140 self._result_events_dispatcher = dispatcher.ResultEventsDispatcher(141 self.config)142 output.log_plugin_failures(self._result_events_dispatcher143 .load_failures)144 return self._result_events_dispatcher145 def __enter__(self):146 self.setup()147 return self148 def __exit__(self, _exc_type, _exc_value, _traceback):...

Full Screen

Full Screen

runner.py

Source:runner.py Github

copy

Full Screen

1# This program is free software; you can redistribute it and/or modify2# it under the terms of the GNU General Public License as published by3# the Free Software Foundation; either version 2 of the License, or4# (at your option) any later version.5#6# This program is distributed in the hope that it will be useful,7# but WITHOUT ANY WARRANTY; without even the implied warranty of8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.9#10# See LICENSE for more details.11#12# Copyright: Red Hat Inc. 2014-201913# Authors: Lucas Meneghel Rodrigues <lmr@redhat.com>14# Ruda Moura <rmoura@redhat.com>15# Cleber Rosa <crosa@redhat.com>16"""17Conventional Test Runner Plugin18"""19import multiprocessing20import os21import signal22import sys23import time24from queue import Full as queueFullException25from avocado.core import output, tree, varianter26from avocado.core.loader import loader27from avocado.core.output import LOG_JOB as TEST_LOG28from avocado.core.output import LOG_UI as APP_LOG29from avocado.core.plugin_interfaces import Runner30from avocado.core.runner import TestStatus, add_runner_failure31from avocado.core.test import TimeOutSkipTest32from avocado.core.test_id import TestID33from avocado.core.teststatus import mapping, user_facing_status34from avocado.utils import process, stacktrace, wait35class TestRunner(Runner):36 """37 A test runner class that displays tests results.38 """39 name = 'runner'40 description = 'The conventional test runner'41 DEFAULT_TIMEOUT = 8640042 def __init__(self):43 """44 Creates an instance of TestRunner class.45 """46 self.sigstopped = False47 @staticmethod48 def _run_test(job, test_factory, queue):49 """50 Run a test instance.51 This code is the first thing that runs inside a new process, known here52 as the test process. It communicates to the test runner by using53 :param:`queue`. It's important that this early state is given to the54 test runner in a reliable way.55 :param test_factory: Test factory (test class and parameters).56 :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.57 :param queue: Multiprocess queue.58 :type queue: :class:`multiprocessing.Queue` instance.59 """60 sys.stdout = output.LoggingFile(["[stdout] "], loggers=[TEST_LOG])61 sys.stderr = output.LoggingFile(["[stderr] "], loggers=[TEST_LOG])62 def sigterm_handler(signum, frame): # pylint: disable=W061363 """ Produce traceback on SIGTERM """64 raise RuntimeError("Test interrupted by SIGTERM")65 signal.signal(signal.SIGTERM, sigterm_handler)66 # At this point, the original `sys.stdin` has already been67 # closed and replaced with `os.devnull` by68 # `multiprocessing.Process()` (not directly from Avocado69 # code). Still, tests trying to use file descriptor 0 would70 # be able to read from the tty, and would hang. Let's replace71 # STDIN fd (0), with the same fd previously set by72 # `multiprocessing.Process()`73 os.dup2(sys.stdin.fileno(), 0)74 instance = loader.load_test(test_factory)75 if instance.runner_queue is None:76 instance.set_runner_queue(queue)77 early_state = instance.get_state()78 early_state['early_status'] = True79 try:80 queue.put(early_state)81 except queueFullException:82 instance.error(stacktrace.str_unpickable_object(early_state))83 job.result.start_test(early_state)84 job.result_events_dispatcher.map_method('start_test',85 job.result,86 early_state)87 if job.config.get('run.log_test_data_directories'):88 data_sources = getattr(instance, "DATA_SOURCES", [])89 if data_sources:90 locations = []91 for source in data_sources:92 locations.append(instance.get_data("", source=source,93 must_exist=False))94 TEST_LOG.info('Test data directories: ')95 for source, location in zip(data_sources, locations):96 if location is not None:97 TEST_LOG.info(' %s: %s', source, location)98 TEST_LOG.info('')99 try:100 instance.run_avocado()101 finally:102 try:103 state = instance.get_state()104 queue.put(state)105 except queueFullException:106 instance.error(stacktrace.str_unpickable_object(state))107 def run_test(self, job, test_factory, queue, summary, job_deadline=0):108 """109 Run a test instance inside a subprocess.110 :param test_factory: Test factory (test class and parameters).111 :type test_factory: tuple of :class:`avocado.core.test.Test` and dict.112 :param queue: Multiprocess queue.113 :type queue: :class`multiprocessing.Queue` instance.114 :param summary: Contains types of test failures.115 :type summary: set.116 :param job_deadline: Maximum time to execute.117 :type job_deadline: int.118 """119 proc = None120 sigtstp = multiprocessing.Lock()121 def sigtstp_handler(signum, frame): # pylint: disable=W0613122 """ SIGSTOP all test processes on SIGTSTP """123 if not proc: # Ignore ctrl+z when proc not yet started124 return125 with sigtstp:126 msg = "ctrl+z pressed, %%s test (%s)" % proc.pid127 app_log_msg = '\n%s' % msg128 if self.sigstopped:129 APP_LOG.info(app_log_msg, "resumming")130 TEST_LOG.info(msg, "resumming")131 process.kill_process_tree(proc.pid, signal.SIGCONT, False)132 self.sigstopped = False133 else:134 APP_LOG.info(app_log_msg, "stopping")135 TEST_LOG.info(msg, "stopping")136 process.kill_process_tree(proc.pid, signal.SIGSTOP, False)137 self.sigstopped = True138 proc = multiprocessing.Process(target=self._run_test,139 args=(job, test_factory, queue,))140 test_status = TestStatus(job, queue)141 cycle_timeout = 1142 time_started = time.time()143 signal.signal(signal.SIGTSTP, signal.SIG_IGN)144 proc.start()145 signal.signal(signal.SIGTSTP, sigtstp_handler)146 test_status.wait_for_early_status(proc, 60)147 # At this point, the test is already initialized and we know148 # for sure if there's a timeout set.149 timeout = test_status.early_status.get('timeout')150 timeout = float(timeout or self.DEFAULT_TIMEOUT)151 test_deadline = time_started + timeout152 if job_deadline is not None and job_deadline > 0:153 deadline = min(test_deadline, job_deadline)154 else:155 deadline = test_deadline156 ctrl_c_count = 0157 ignore_window = 2.0158 ignore_time_started = time.time()159 stage_1_msg_displayed = False160 stage_2_msg_displayed = False161 first = 0.01162 step = 0.01163 abort_reason = None164 result_dispatcher = job.result_events_dispatcher165 while True:166 try:167 if time.time() >= deadline:168 abort_reason = "Timeout reached"169 try:170 os.kill(proc.pid, signal.SIGTERM)171 except OSError:172 pass173 break174 wait.wait_for(lambda: not queue.empty() or not proc.is_alive(),175 cycle_timeout, first, step)176 if test_status.interrupt:177 break178 if proc.is_alive():179 if ctrl_c_count == 0:180 if (test_status.status.get('running') or181 self.sigstopped):182 result_dispatcher.map_method('test_progress',183 False)184 else:185 result_dispatcher.map_method('test_progress', True)186 else:187 break188 except KeyboardInterrupt:189 time_elapsed = time.time() - ignore_time_started190 ctrl_c_count += 1191 if ctrl_c_count == 1:192 if not stage_1_msg_displayed:193 abort_reason = "Interrupted by ctrl+c"194 job.log.debug("\nInterrupt requested. Waiting %d "195 "seconds for test to finish "196 "(ignoring new Ctrl+C until then)",197 ignore_window)198 stage_1_msg_displayed = True199 ignore_time_started = time.time()200 process.kill_process_tree(proc.pid, signal.SIGINT)201 if (ctrl_c_count > 1) and (time_elapsed > ignore_window):202 if not stage_2_msg_displayed:203 abort_reason = "Interrupted by ctrl+c (multiple-times)"204 job.log.debug("Killing test subprocess %s",205 proc.pid)206 stage_2_msg_displayed = True207 process.kill_process_tree(proc.pid, signal.SIGKILL)208 # Get/update the test status (decrease timeout on abort)209 if abort_reason:210 after_interrupted = job.config.get('runner.timeout.after_interrupted')211 finish_deadline = time.time() + after_interrupted212 else:213 finish_deadline = deadline214 test_state = test_status.finish(proc, time_started, step,215 finish_deadline,216 result_dispatcher)217 # Try to log the timeout reason to test's results and update test_state218 if abort_reason:219 test_state = add_runner_failure(test_state, "INTERRUPTED",220 abort_reason)221 # don't process other tests from the list222 if ctrl_c_count > 0:223 job.log.debug('')224 # Make sure the test status is correct225 if test_state.get('status') not in user_facing_status:226 test_state = add_runner_failure(test_state, "ERROR", "Test reports"227 " unsupported test status.")228 job.result.check_test(test_state)229 result_dispatcher.map_method('end_test', job.result, test_state)230 if test_state['status'] == "INTERRUPTED":231 summary.add("INTERRUPTED")232 elif not mapping[test_state['status']]:233 summary.add("FAIL")234 if job.config.get('run.failfast'):235 summary.add("INTERRUPTED")236 job.interrupted_reason = "Interrupting job (failfast)."237 return False238 if ctrl_c_count > 0:239 return False240 return True241 @staticmethod242 def _template_to_factory(test_parameters, template, variant):243 """244 Applies test params from variant to the test template245 :param test_parameters: a simpler set of parameters (currently246 given to the run command via "-p" parameters)247 :param template: a test template, containing the class name,248 followed by parameters to the class249 :type template: tuple250 :param variant: variant to be applied, usually containing251 the keys: paths, variant and variant_id252 :type variant: dict253 :return: tuple(new_test_factory, applied_variant)254 """255 var = variant.get("variant")256 paths = variant.get("paths")257 empty_variants = varianter.is_empty_variant(var)258 if "params" not in template[1]:259 factory = [template[0], template[1].copy()]260 if test_parameters and empty_variants:261 var[0] = tree.TreeNode().get_node("/", True)262 var[0].value = test_parameters263 paths = ["/"]264 factory[1]["params"] = (var, paths)265 return factory, variant266 if not empty_variants:267 raise NotImplementedError("Specifying test params from test loader "268 "and from varianter at the same time is "269 "not yet supported. Please remove either "270 "variants defined by the varianter (%s) "271 "or make the test loader of test %s to "272 "not to fill variants." % (variant,273 template))274 return template, {"variant": var,275 "variant_id": varianter.generate_variant_id(var),276 "paths": paths}277 def _iter_suite(self, test_suite, execution_order):278 """279 Iterates through test_suite and variants in defined order280 :param test_suite: a TestSuite object to run281 :param execution_order: way of iterating through tests/variants282 :return: generator yielding tuple(test_factory, variant)283 """284 if execution_order == "variants-per-test":285 return (self._template_to_factory(test_suite.test_parameters,286 template, variant)287 for template in test_suite.tests288 for variant in test_suite.variants.itertests())289 elif execution_order == "tests-per-variant":290 return (self._template_to_factory(test_suite.test_parameters,291 template, variant)292 for variant in test_suite.variants.itertests()293 for template in test_suite.tests)294 else:295 raise NotImplementedError("Suite_order %s is not supported"296 % execution_order)297 def run_suite(self, job, test_suite):298 """299 Run one or more tests and report with test result.300 :param job: an instance of :class:`avocado.core.job.Job`.301 :param test_suite: a list of tests to run.302 :return: a set with types of test failures.303 """304 summary = set()305 replay_map = job.config.get('replay_map')306 execution_order = job.config.get('run.execution_order')307 queue = multiprocessing.SimpleQueue()308 if job.timeout > 0:309 deadline = time.time() + job.timeout310 else:311 deadline = None312 test_result_total = test_suite.variants.get_number_of_tests(test_suite.tests)313 no_digits = len(str(test_result_total))314 job.result.tests_total = test_result_total315 index = 1316 try:317 for test_factory in test_suite.tests:318 test_factory[1]["base_logdir"] = job.logdir319 test_factory[1]["job"] = job320 for test_factory, variant in self._iter_suite(test_suite,321 execution_order):322 test_parameters = test_factory[1]323 name = test_parameters.get("name")324 if test_suite.name:325 prefix = "{}-{}".format(test_suite.name, index)326 else:327 prefix = index328 test_parameters["name"] = TestID(prefix,329 name,330 variant,331 no_digits)332 if deadline is not None and time.time() > deadline:333 summary.add('INTERRUPTED')334 if 'methodName' in test_parameters:335 del test_parameters['methodName']336 test_factory = (TimeOutSkipTest, test_parameters)337 if not self.run_test(job, test_factory, queue, summary):338 break339 else:340 if (replay_map is not None and341 replay_map[index - 1] is not None):342 test_factory = (replay_map[index - 1], test_parameters)343 if not self.run_test(job, test_factory, queue, summary,344 deadline):345 break346 index += 1347 except KeyboardInterrupt:348 TEST_LOG.error('Job interrupted by ctrl+c.')349 summary.add('INTERRUPTED')350 job.result.end_tests()351 job.funcatexit.run()352 signal.signal(signal.SIGTSTP, signal.SIG_IGN)...

Full Screen

Full Screen

runner_nrunner.py

Source:runner_nrunner.py Github

copy

Full Screen

1# This program is free software; you can redistribute it and/or modify2# it under the terms of the GNU General Public License as published by3# the Free Software Foundation; either version 2 of the License, or4# (at your option) any later version.5#6# This program is distributed in the hope that it will be useful,7# but WITHOUT ANY WARRANTY; without even the implied warranty of8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.9#10# See LICENSE for more details.11#12# Copyright: Red Hat Inc. 2019-202013# Authors: Cleber Rosa <crosa@redhat.com>14"""15NRunner based implementation of job compliant runner16"""17import asyncio18import json19import multiprocessing20import os21import random22from copy import copy23from avocado.core import nrunner24from avocado.core.dispatcher import SpawnerDispatcher25from avocado.core.plugin_interfaces import CLI, Init26from avocado.core.plugin_interfaces import Runner as RunnerInterface27from avocado.core.settings import settings28from avocado.core.status.repo import StatusRepo29from avocado.core.status.server import StatusServer30from avocado.core.task.runtime import RuntimeTask31from avocado.core.task.statemachine import TaskStateMachine, Worker32from avocado.core.test_id import TestID33from avocado.core.teststatus import mapping34class RunnerInit(Init):35 name = 'nrunner'36 description = 'nrunner initialization'37 def initialize(self):38 section = 'nrunner'39 help_msg = 'Shuffle the tasks to be executed'40 settings.register_option(section=section,41 key='shuffle',42 default=False,43 help_msg=help_msg,44 key_type=bool)45 help_msg = ('URI for listing the status server. Usually '46 'a "HOST:PORT" string')47 settings.register_option(section=section,48 key='status_server_listen',49 default='127.0.0.1:8888',50 metavar="HOST:PORT",51 help_msg=help_msg)52 help_msg = ('URI for connecting to the status server, usually '53 'a "HOST:PORT" string. Use this if your status server '54 'is in another host, or different port')55 settings.register_option(section=section,56 key='status_server_uri',57 default='127.0.0.1:8888',58 metavar="HOST:PORT",59 help_msg=help_msg)60 help_msg = ('Number of maximum number tasks running in parallel. You '61 'can disable parallel execution by setting this to 1. '62 'Defaults to the amount of CPUs on this machine.')63 settings.register_option(section=section,64 key='max_parallel_tasks',65 default=multiprocessing.cpu_count(),66 key_type=int,67 help_msg=help_msg)68 help_msg = ("Spawn tasks in a specific spawner. Available spawners: "69 "'process' and 'podman'")70 settings.register_option(section=section,71 key="spawner",72 default='process',73 help_msg=help_msg)74class RunnerCLI(CLI):75 name = 'nrunner'76 description = 'nrunner command line options for "run"'77 def configure(self, parser):78 super(RunnerCLI, self).configure(parser)79 parser = parser.subcommands.choices.get('run', None)80 if parser is None:81 return82 parser = parser.add_argument_group('nrunner specific options')83 settings.add_argparser_to_option(namespace='nrunner.shuffle',84 parser=parser,85 long_arg='--nrunner-shuffle',86 action='store_true')87 # namespace mapping88 ns = {'nrunner.status_server_listen': '--nrunner-status-server-listen',89 'nrunner.status_server_uri': '--nrunner-status-server-uri',90 'nrunner.max_parallel_tasks': '--nrunner-max-parallel-tasks',91 'nrunner.spawner': '--nrunner-spawner'}92 for k, v in ns.items():93 settings.add_argparser_to_option(namespace=k,94 parser=parser,95 long_arg=v)96 def run(self, config):97 pass98class Runner(RunnerInterface):99 name = 'nrunner'100 description = 'nrunner based implementation of job compliant runner'101 def _save_to_file(self, filename, buff, mode='wb'):102 with open(filename, mode) as fp:103 fp.write(buff)104 def _populate_task_logdir(self, base_path, task, statuses, debug=False):105 # We are copying here to avoid printing duplicated information106 local_statuses = copy(statuses)107 last = local_statuses[-1]108 try:109 stdout = last.pop('stdout')110 except KeyError:111 stdout = None112 try:113 stderr = last.pop('stderr')114 except KeyError:115 stderr = None116 # Create task dir117 task_path = os.path.join(base_path, task.identifier.str_filesystem)118 os.makedirs(task_path, exist_ok=True)119 # Save stdout and stderr120 if stdout is not None:121 stdout_file = os.path.join(task_path, 'stdout')122 self._save_to_file(stdout_file, stdout)123 if stderr is not None:124 stderr_file = os.path.join(task_path, 'stderr')125 self._save_to_file(stderr_file, stderr)126 # Save debug127 if debug:128 debug = os.path.join(task_path, 'debug')129 with open(debug, 'w') as fp:130 json.dump(local_statuses, fp)131 data_file = os.path.join(task_path, 'data')132 with open(data_file, 'w') as fp:133 fp.write("{}\n".format(task.output_dir))134 def _get_all_runtime_tasks(self, test_suite):135 result = []136 no_digits = len(str(len(test_suite)))137 for index, task in enumerate(test_suite.tests, start=1):138 task.known_runners = nrunner.RUNNERS_REGISTRY_PYTHON_CLASS139 # this is all rubbish data140 if test_suite.name:141 prefix = "{}-{}".format(test_suite.name, index)142 else:143 prefix = index144 test_id = TestID(prefix,145 task.runnable.uri,146 None,147 no_digits)148 task.identifier = test_id149 result.append(RuntimeTask(task))150 return result151 def _start_status_server(self, status_server_listen):152 # pylint: disable=W0201153 self.status_repo = StatusRepo()154 # pylint: disable=W0201155 self.status_server = StatusServer(status_server_listen,156 self.status_repo)157 asyncio.ensure_future(self.status_server.serve_forever())158 async def _update_status(self, job):159 tasks_by_id = {str(runtime_task.task.identifier): runtime_task.task160 for runtime_task in self.tasks}161 while True:162 try:163 (task_id, status, _) = self.status_repo.status_journal_summary.pop(0)164 except IndexError:165 await asyncio.sleep(0.05)166 continue167 task = tasks_by_id.get(task_id)168 early_state = {'name': task.identifier,169 'job_logdir': job.logdir,170 'job_unique_id': job.unique_id}171 if status == 'started':172 job.result.start_test(early_state)173 job.result_events_dispatcher.map_method('start_test',174 job.result,175 early_state)176 elif status == 'finished':177 this_task_data = self.status_repo.get_task_data(task_id)178 last_task_status = this_task_data[-1]179 test_state = {'status': last_task_status.get('result').upper()}180 test_state.update(early_state)181 time_start = this_task_data[0]['time']182 time_end = last_task_status['time']183 time_elapsed = time_end - time_start184 test_state['time_start'] = time_start185 test_state['time_end'] = time_end186 test_state['time_elapsed'] = time_elapsed187 # fake log dir, needed by some result plugins such as HTML188 test_state['logdir'] = ''189 base_path = os.path.join(job.logdir, 'test-results')190 self._populate_task_logdir(base_path,191 task,192 this_task_data,193 job.config.get('core.debug'))194 job.result.check_test(test_state)195 job.result_events_dispatcher.map_method('end_test',196 job.result,197 test_state)198 if not mapping[test_state['status']]:199 self.summary.add("FAIL")200 def run_suite(self, job, test_suite):201 # pylint: disable=W0201202 self.summary = set()203 test_suite.tests, _ = nrunner.check_tasks_requirements(test_suite.tests)204 job.result.tests_total = test_suite.size # no support for variants yet205 listen = test_suite.config.get('nrunner.status_server_listen')206 self._start_status_server(listen)207 # pylint: disable=W0201208 self.tasks = self._get_all_runtime_tasks(test_suite)209 if test_suite.config.get('nrunner.shuffle'):210 random.shuffle(self.tasks)211 tsm = TaskStateMachine(self.tasks)212 spawner_name = test_suite.config.get('nrunner.spawner')213 spawner = SpawnerDispatcher(test_suite.config)[spawner_name].obj214 max_running = test_suite.config.get('nrunner.max_parallel_tasks')215 workers = [Worker(tsm, spawner, max_running=max_running).run()216 for _ in range(max_running)]217 asyncio.ensure_future(self._update_status(job))218 loop = asyncio.get_event_loop()219 try:220 loop.run_until_complete(asyncio.wait_for(asyncio.gather(*workers),221 job.timeout or None))222 except (KeyboardInterrupt, asyncio.TimeoutError):223 self.summary.add("INTERRUPTED")224 # Wait until all messages may have been processed by the225 # status_updater. This should be replaced by a mechanism226 # that only waits if there are missing status messages to227 # be processed, and, only for a given amount of time.228 # Tests with non received status will always show as SKIP229 # because of result reconciliation.230 loop.run_until_complete(asyncio.sleep(0.05))231 job.result.end_tests()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful