How to use _log_job_id method in avocado

Best Python code snippet using avocado_python

job.py

Source:job.py Github

copy

Full Screen

...259 return suite260 for i in xrange(len(suite)):261 suite[i] = [test.DryRunTest, suite[i][1]]262 return suite263 def _log_job_id(self):264 job_log = _TEST_LOGGER265 job_log.info('Job ID: %s', self.unique_id)266 if self.replay_sourcejob is not None:267 job_log.info('Replay of Job ID: %s', self.replay_sourcejob)268 job_log.info('')269 @staticmethod270 def _log_cmdline():271 job_log = _TEST_LOGGER272 cmdline = " ".join(sys.argv)273 job_log.info("Command line: %s", cmdline)274 job_log.info('')275 @staticmethod276 def _log_avocado_version():277 job_log = _TEST_LOGGER278 job_log.info('CloudTest Version: %s', version.VERSION)279 if os.path.exists('.git') and os.path.exists('avocado.spec'):280 cmd = "git show --summary --pretty='%H' | head -1"281 status, top_commit = commands.getstatusoutput(cmd)282 cmd2 = "git rev-parse --abbrev-ref HEAD"283 status2, branch = commands.getstatusoutput(cmd2)284 # Let's display information only if git is installed285 # (commands succeed).286 if status == 0 and status2 == 0:287 job_log.info('Avocado git repo info')288 job_log.info("Top commit: %s", top_commit)289 job_log.info("Branch: %s", branch)290 job_log.info('')291 @staticmethod292 def _log_avocado_config():293 job_log = _TEST_LOGGER294 job_log.info('Config files read (in order):')295 for cfg_path in settings.config_paths:296 job_log.info(cfg_path)297 if settings.config_paths_failed:298 job_log.info('Config files failed to read (in order):')299 for cfg_path in settings.config_paths_failed:300 job_log.info(cfg_path)301 job_log.info('')302 job_log.info('Avocado config:')303 header = ('Section.Key', 'Value')304 config_matrix = []305 for section in settings.config.sections():306 for value in settings.config.items(section):307 config_key = ".".join((section, value[0]))308 config_matrix.append([config_key, value[1]])309 for line in astring.iter_tabular_output(config_matrix, header):310 job_log.info(line)311 job_log.info('')312 @staticmethod313 def _log_avocado_datadir():314 job_log = _TEST_LOGGER315 job_log.info('Avocado Data Directories:')316 job_log.info('')317 job_log.info("Avocado replaces config dirs that can't be accessed")318 job_log.info('with sensible defaults. Please edit your local config')319 job_log.info('file to customize values')320 job_log.info('')321 job_log.info('base ' + data_dir.get_base_dir())322 job_log.info('tests ' + data_dir.get_test_dir())323 job_log.info('data ' + data_dir.get_data_dir())324 job_log.info('logs ' + data_dir.get_logs_dir())325 job_log.info('')326 def _log_mux_tree(self, mux):327 job_log = _TEST_LOGGER328 tree_repr = tree.tree_view(mux.variants.root, verbose=True,329 use_utf8=False)330 if tree_repr:331 job_log.info('Multiplex tree representation:')332 for line in tree_repr.splitlines():333 job_log.info(line)334 job_log.info('')335 def _log_tmp_dir(self):336 job_log = _TEST_LOGGER337 job_log.info('Temporary dir: %s', data_dir.get_tmp_dir())338 job_log.info('')339 def _log_mux_variants(self, mux):340 job_log = _TEST_LOGGER341 for (index, tpl) in enumerate(mux.variants):342 paths = ', '.join([x.path for x in tpl])343 job_log.info('Variant %s: %s', index + 1, paths)344 if mux.variants:345 job_log.info('')346 def _log_job_debug_info(self, mux):347 """348 Log relevant debug information to the job log.349 """350 self._log_cmdline()351 self._log_avocado_version()352 self._log_avocado_config()353 self._log_avocado_datadir()354 self._log_mux_tree(mux)355 self._log_tmp_dir()356 self._log_mux_variants(mux)357 self._log_job_id()358 def create_test_suite(self):359 """360 Creates the test suite for this Job361 This is a public Job API as part of the documented Job phases362 """363 try:364 self.test_suite = self._make_test_suite(self.references)365 self.result.tests_total = len(self.test_suite)366 except loader.LoaderError as details:367 stacktrace.log_exc_info(sys.exc_info(), 'avocado.app.debug')368 raise exceptions.OptionValidationError(details)369 if not self.test_suite:370 if self.references:371 references = " ".join(self.references)...

Full Screen

Full Screen

worker.py

Source:worker.py Github

copy

Full Screen

...53 ''.join(random.sample(string.uppercase, 6))54 def _defer(self, fn, url_tag, countdown=0):55 url = '%s/bqworker/%s/%s/%s' % (config.DEFERRED_URL_PREFIX, self.__class__.__name__, self._instance_id, url_tag)56 deferred.defer(fn, _queue=self.queue, _url=url, _countdown=countdown)57 def _log_job_id(self):58 if self._job_id:59 logging.info('BigQuery job_id: %s', self._job_id)60 def start(self):61 if not self.project_id:62 raise ValueError('project_id is required.')63 if not self.queue or not isinstance(self.queue, basestring):64 raise ValueError('queue is required and must be a string.')65 if not isinstance(self.page_size, int) or self.page_size < 1:66 raise ValueError('page_size must be a positive integer.')67 if self.query_mode not in VALID_QUERY_MODES:68 raise ValueError('query_mode must be in %s.' % VALID_QUERY_MODES)69 self._stime = time.time()70 logging.info('Starting worker "%s" (project_id: %s, queue: %s, page_size: %s, query_mode: %s)',71 self._instance_id, self.project_id, self.queue, self.page_size, self.query_mode)72 self._defer(self._issue_query, 'issue_query')73 def _issue_query(self):74 try:75 self._job_id = _issue_query(self.project_id, self.get_query(), self.query_mode)76 self._log_job_id()77 self._defer(self._check_job, 'check_job/%d' % self._check_job_iteration,78 countdown=INITIAL_COUTDOWN[self.query_mode])79 except BigQueryError as ex:80 self.handle_error(ex)81 def _check_job(self):82 self._log_job_id()83 try:84 if not _is_complete(self.project_id, self._job_id):85 self._check_job_iteration += 186 self._defer(self._check_job, 'check_job/%d' % self._check_job_iteration,87 countdown=SUBSEQUENT_COUNTDOWN[self.query_mode])88 return89 self._defer(self._download_page, 'download_page/%d' % self._page_number)90 except BigQueryError as ex:91 self.handle_error(ex)92 def _download_page(self):93 self._log_job_id()94 page = _get_page(self.project_id, self._job_id,95 start_index=self._page_number*self.page_size, page_size=self.page_size)96 if len(page) > 0:97 self.process_page(page)98 if len(page) < self.page_size:99 logging.info('Elapsed time %.2f' % (time.time() - self._stime))100 self._defer(self.finalize, 'finalize')101 return102 self._page_number += 1103 self._defer(self._download_page, 'download_page/%d' % self._page_number)104 def get_query(self):105 raise NotImplementedError()106 def process_page(self, page):107 return...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful