How to use disable_external_logging method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...602 """603 return self.partition(device, loop_size, mountpoint)604 def enable_external_logging(self):605 pass606 def disable_external_logging(self):607 pass608 def reboot_setup(self):609 # save the partition list and mount points, as well as the cpu count610 partition_list = partition_lib.get_partition_list(self,611 exclude_swap=False)612 mount_info = partition_lib.get_mount_info(partition_list)613 self._state.set('client', 'mount_info', mount_info)614 self._state.set('client', 'cpu_count', utils.count_cpus())615 def reboot(self):616 self.reboot_setup()617 self.harness.run_reboot()618 # HACK: using this as a module sometimes hangs shutdown, so if it's619 # installed unload it first620 utils.system("modprobe -r netconsole", ignore_status=True)621 # sync first, so that a sync during shutdown doesn't time out622 utils.system("sync; sync", ignore_status=True)623 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")624 self.quit()625 def noop(self, text):626"job: noop: " + text)627 @_run_test_complete_on_exit628 def parallel(self, *tasklist):629 """Run tasks in parallel"""630 pids = []631 old_log_filename = self._logger.global_filename632 for i, task in enumerate(tasklist):633 assert isinstance(task, (tuple, list))634 self._logger.global_filename = old_log_filename + (".%d" % i)635 def task_func():636 # stub out _record_indent with a process-local one637 base_record_indent = self._record_indent638 proc_local = self._job_state.property_factory(639 '_state', '_record_indent.%d' % os.getpid(),640 base_record_indent, namespace='client')641 self.__class__._record_indent = proc_local642 task[0](*task[1:])643 pids.append(parallel.fork_start(self.resultdir, task_func))644 old_log_path = os.path.join(self.resultdir, old_log_filename)645 old_log = open(old_log_path, "a")646 exceptions = []647 for i, pid in enumerate(pids):648 # wait for the task to finish649 try:650 parallel.fork_waitfor(self.resultdir, pid)651 except Exception, e:652 exceptions.append(e)653 # copy the logs from the subtask into the main log654 new_log_path = old_log_path + (".%d" % i)655 if os.path.exists(new_log_path):656 new_log = open(new_log_path)657 old_log.write( new_log.close()659 old_log.flush()660 os.remove(new_log_path)661 old_log.close()662 self._logger.global_filename = old_log_filename663 # handle any exceptions raised by the parallel tasks664 if exceptions:665 msg = "%d task(s) failed in job.parallel" % len(exceptions)666 raise error.JobError(msg)667 def quit(self):668 # XXX: should have a better name.669 self.harness.run_pause()670 raise error.JobContinue("more to come")671 def complete(self, status):672 """Write pending reports, clean up, and exit"""673 # write out a job HTML report674 try:675 html_report.create_report(self.resultdir)676 except Exception, e:677 logging.error("Error writing job HTML report: %s", e)678 # We are about to exit 'complete' so clean up the control file.679 dest = os.path.join(self.resultdir, os.path.basename(self._state_file))680 shutil.move(self._state_file, dest)681 self.harness.run_complete()682 self.disable_external_logging()683 sys.exit(status)684 def _load_state(self):685 # grab any initial state and set up $CONTROL.state as the backing file686 init_state_file = self.control + '.init.state'687 self._state_file = self.control + '.state'688 if os.path.exists(init_state_file):689 shutil.move(init_state_file, self._state_file)690 self._state.set_backing_file(self._state_file)691 # initialize the state engine, if necessary692 has_steps = self._state.has('client', 'steps')693 if not self._is_continuation and has_steps:694 raise RuntimeError('Loaded state can only contain client.steps if '695 'this is a continuation')696 if not has_steps:...

Full Screen

Full Screen Github


Full Screen

...317 """318 Start or restart external logging mechanism.319 """320 pass321 def disable_external_logging(self):322 """323 Pause or stop external logging mechanism.324 """325 pass326 def use_external_logging(self):327 """328 Return True if external logging should be used.329 """330 return False331 def _make_parallel_wrapper(self, function, machines, log):332 """Wrap function as appropriate for calling by parallel_simple."""333 is_forking = not (len(machines) == 1 and self.machines == machines)334 if self._parse_job and is_forking and log:335 def wrapper(machine):336 self._parse_job += "/" + machine337 self._using_parser = True338 self.machines = [machine]339 self.push_execution_context(machine)340 os.chdir(self.resultdir)341 utils.write_keyval(self.resultdir, {"hostname": machine})342 self.init_parser()343 result = function(machine)344 self.cleanup_parser()345 return result346 elif len(machines) > 1 and log:347 def wrapper(machine):348 self.push_execution_context(machine)349 os.chdir(self.resultdir)350 machine_data = {'hostname': machine,351 'status_version': str(self._STATUS_VERSION)}352 utils.write_keyval(self.resultdir, machine_data)353 result = function(machine)354 return result355 else:356 wrapper = function357 return wrapper358 def parallel_simple(self, function, machines, log=True, timeout=None,359 return_results=False):360 """361 Run 'function' using parallel_simple, with an extra wrapper to handle362 the necessary setup for continuous parsing, if possible. If continuous363 parsing is already properly initialized then this should just work.364 :param function: A callable to run in parallel given each machine.365 :param machines: A list of machine names to be passed one per subcommand366 invocation of function.367 :param log: If True, output will be written to output in a subdirectory368 named after each machine.369 :param timeout: Seconds after which the function call should timeout.370 :param return_results: If True instead of an AutoServError being raised371 on any error a list of the results|exceptions from the function372 called on each arg is returned. [default: False]373 :raise error.AutotestError: If any of the functions failed.374 """375 wrapper = self._make_parallel_wrapper(function, machines, log)376 return subcommand.parallel_simple(wrapper, machines,377 log=log, timeout=timeout,378 return_results=return_results)379 def parallel_on_machines(self, function, machines, timeout=None):380 """381 :param function: Called in parallel with one machine as its argument.382 :param machines: A list of machines to call function(machine) on.383 :param timeout: Seconds after which the function call should timeout.384 :return: A list of machines on which function(machine) returned385 without raising an exception.386 """387 results = self.parallel_simple(function, machines, timeout=timeout,388 return_results=True)389 success_machines = []390 for result, machine in itertools.izip(results, machines):391 if not isinstance(result, Exception):392 success_machines.append(machine)393 return success_machines394 _USE_TEMP_DIR = object()395 def run(self, cleanup=False, install_before=False, install_after=False,396 collect_crashdumps=True, namespace={}, control=None,397 control_file_dir=None, only_collect_crashinfo=False):398 # for a normal job, make sure the uncollected logs file exists399 # for a crashinfo-only run it should already exist, bail out otherwise400 created_uncollected_logs = False401 if self.resultdir and not os.path.exists(self._uncollected_log_file):402 if only_collect_crashinfo:403 # if this is a crashinfo-only run, and there were no existing404 # uncollected logs, just bail out early405"No existing uncollected logs, "406 "skipping crashinfo collection")407 return408 else:409 log_file = open(self._uncollected_log_file, "w")410 pickle.dump([], log_file)411 log_file.close()412 created_uncollected_logs = True413 # use a copy so changes don't affect the original dictionary414 namespace = namespace.copy()415 machines = self.machines416 if control is None:417 if self.control is None:418 control = ''419 else:420 control = self._load_control_file(self.control)421 if control_file_dir is None:422 control_file_dir = self.resultdir423 self.aborted = False424 namespace['machines'] = machines425 namespace['args'] = self.args426 namespace['job'] = self427 namespace['ssh_user'] = self._ssh_user428 namespace['ssh_port'] = self._ssh_port429 namespace['ssh_pass'] = self._ssh_pass430 test_start_time = int(time.time())431 if self.resultdir:432 os.chdir(self.resultdir)433 # touch status.log so that the parser knows a job is running here434 open(self.get_status_log_path(), 'a').close()435 self.enable_external_logging()436 collect_crashinfo = True437 temp_control_file_dir = None438 try:439 try:440 if install_before and machines:441 self._execute_code(INSTALL_CONTROL_FILE, namespace)442 if only_collect_crashinfo:443 return444 # determine the dir to write the control files to445 cfd_specified = (control_file_dir and control_file_dir is not446 self._USE_TEMP_DIR)447 if cfd_specified:448 temp_control_file_dir = None449 else:450 temp_control_file_dir = tempfile.mkdtemp(451 suffix='temp_control_file_dir')452 control_file_dir = temp_control_file_dir453 server_control_file = os.path.join(control_file_dir,454 self._control_filename)455 client_control_file = os.path.join(control_file_dir,456 CLIENT_CONTROL_FILENAME)457 if self._client:458 namespace['control'] = control459 utils.open_write_close(client_control_file, control)460 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,461 server_control_file)462 else:463 utils.open_write_close(server_control_file, control)464"Processing control file")465 self._execute_code(server_control_file, namespace)466"Finished processing control file")467 # no error occurred, so we don't need to collect crashinfo468 collect_crashinfo = False469 except Exception, e:470 try:471 logging.exception(472 'Exception escaped control file, job aborting:')473 self.record('INFO', None, None, str(e),474 {'job_abort_reason': str(e)})475 except Exception:476 pass # don't let logging exceptions here interfere477 raise478 finally:479 if temp_control_file_dir:480 # Clean up temp directory used for copies of the control files481 try:482 shutil.rmtree(temp_control_file_dir)483 except Exception, e:484 logging.warn('Could not remove temp directory %s: %s',485 temp_control_file_dir, e)486 if machines and (collect_crashdumps or collect_crashinfo):487 namespace['test_start_time'] = test_start_time488 if collect_crashinfo:489 # includes crashdumps490 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)491 else:492 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)493 if self._uncollected_log_file and created_uncollected_logs:494 os.remove(self._uncollected_log_file)495 self.disable_external_logging()496 if cleanup and machines:497 self._execute_code(CLEANUP_CONTROL_FILE, namespace)498 if install_after and machines:499 self._execute_code(INSTALL_CONTROL_FILE, namespace)500 def run_test(self, url, *args, **dargs):501 """502 Summon a test object and run it.503 tag504 tag to add to testname505 url506 url of the test to run507 """508 group, testname = self.pkgmgr.get_package_name(url, 'test')509 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?