How to use on_push_context method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...194 self._replace_with_logger()195 def stop_logging(self):196 """Restore the stream to its original settings."""197 self._restore_stream()198 def on_push_context(self, context):199 """200 Called when the logging manager is about to push a new context onto the201 stack and has changed logging settings. The StreamHandler can modify202 the context to be saved before returning.203 """204 pass205 def on_restore_context(self, context):206 """207 Called when the logging manager is restoring a previous context.208 """209 pass210class LoggingManager(object):211 """212 Manages a stack of logging configurations, allowing clients to conveniently213 add and remove logging destinations. Also keeps a list of StreamManagers214 to easily direct streams into the logging module.215 """216 #管理一堆logging配置,允许clients方便的增删logging位置,另外保持一个StreamManger列表217 #用于streams redirect218 STREAM_MANAGER_CLASS = _StreamManager219 logging_config_object = None220 def __init__(self):221 """222 This class should not ordinarily be constructed directly (other than in223 tests). Use the module-global factory method get_logging_manager()224 instead.225 """226 #使用全局函数get_logging_manager,不要多次init这个类227 if self.logging_config_object is None:228 raise RuntimeError('You must call configure_logging() before this')229 # _context_stack holds a stack of context dicts. Each context dict230 # contains:231 # * old_handlers: list of registered logging Handlers232 # contexts may also be extended by _StreamHandlers233 self._context_stack = []234 self._streams = []235 self._started = False236 def manage_stream(self, stream, level, stream_setter):237 """238 Tells this manager to manage the given stream. All data written to the239 stream will be directed to the logging module instead. Must be called240 before start_logging().241 :param stream: stream to manage242 :param level: level to log data written to this stream243 :param stream_setter: function to set the stream to a new object244 """245 if self._started:246 raise RuntimeError('You must call this before start_logging()')247 self._streams.append(self.STREAM_MANAGER_CLASS(stream, level,248 stream_setter))249 def _sys_stream_setter(self, stream_name):250 #!重要,将sys.stdout和stderr设置为一个文件对象,251 assert stream_name in ('stdout', 'stderr'), stream_name252 def set_stream(file_object):253 setattr(sys, stream_name, file_object)254 return set_stream255 def manage_stdout(self):256 self.manage_stream(sys.stdout, logging.INFO,257 self._sys_stream_setter('stdout'))258 def manage_stderr(self):259 self.manage_stream(sys.stderr, self.logging_config_object.stderr_level,260 self._sys_stream_setter('stderr'))261 def start_logging(self):262 """263 Begin capturing output to the logging module.264 """265 for stream_manager in self._streams:266 stream_manager.start_logging()267 self._started = True268 def stop_logging(self):269 """270 Restore output to its original state.271 """272 while self._context_stack:273 self._pop_context()274 for stream_manager in self._streams:275 stream_manager.stop_logging()276 self._started = False277 def _clear_all_handlers(self):278 for handler in _current_handlers():279 logger.removeHandler(handler)280 def _get_context(self):281 return {'old_handlers': _current_handlers()}282 def _push_context(self, context):283 for stream_manager in self._streams:284 stream_manager.on_push_context(context)285 self._context_stack.append(context)286 def _flush_all_streams(self):287 for stream_manager in self._streams:288 stream_manager.flush()289 def _add_log_handlers(self, add_handlers_fn):290 """291 Modify the logging module's registered handlers and push a new context292 onto the stack.293 :param add_handlers_fn: function to modify the registered logging294 handlers. Accepts a context dictionary which may be modified.295 """296 self._flush_all_streams()297 context = self._get_context()298 add_handlers_fn(context)299 self._push_context(context)300 class _TaggingFormatter(logging.Formatter):301 """302 Delegates to a given formatter, but prefixes each line of output with a303 tag.304 """305 def __init__(self, base_formatter, tag):306 self.base_formatter = base_formatter307 prefix = tag + ' : '308 self._fmt = base_formatter._fmt.replace('%(message)s',309 prefix + '%(message)s')310 self.datefmt = base_formatter.datefmt311 def _add_tagging_formatter(self, tag):312 for handler in _current_handlers():313 tagging_formatter = self._TaggingFormatter(handler.formatter, tag)314 handler.setFormatter(tagging_formatter)315 def _do_redirect(self, stream=None, filename=None, level=None,316 clear_other_handlers=False):317 """318 :param clear_other_handlers - if true, clear out all other logging319 handlers.320 """321 assert bool(stream) != bool(filename) # xor322 if not self._started:323 raise RuntimeError('You must call start_logging() before this')324 def add_handler(context):325 if clear_other_handlers:326 self._clear_all_handlers()327 if stream:328 handler = self.logging_config_object.add_stream_handler(stream)329 else:330 handler = self.logging_config_object.add_file_handler(filename)331 if level:332 handler.setLevel(level)333 self._add_log_handlers(add_handler)334 def redirect(self, filename):335 """Redirect output to the specified file"""336 self._do_redirect(filename=filename, clear_other_handlers=True)337 def redirect_to_stream(self, stream):338 """Redirect output to the given stream"""339 self._do_redirect(stream=stream, clear_other_handlers=True)340 def tee_redirect(self, filename, level=None):341 """Tee output to the specified file"""342 self._do_redirect(filename=filename, level=level)343 def tee_redirect_to_stream(self, stream):344 """Tee output to the given stream"""345 self._do_redirect(stream=stream)346 def tee_redirect_debug_dir(self, debug_dir, log_name=None, tag=None):347 """348 Tee output to a full new set of debug logs in the given directory.349 """350 def add_handlers(context):351 if tag:352 self._add_tagging_formatter(tag)353 context['tag_added'] = True354 self.logging_config_object.add_debug_file_handlers(355 debug_dir, log_name=log_name)356 self._add_log_handlers(add_handlers)357 def _restore_context(self, context):358 for stream_handler in self._streams:359 stream_handler.on_restore_context(context)360 # restore logging handlers361 old_handlers = context['old_handlers']362 for handler in _current_handlers() - old_handlers:363 handler.close()364 self._clear_all_handlers()365 for handler in old_handlers:366 logger.addHandler(handler)367 if 'tag_added' in context:368 for handler in _current_handlers():369 tagging_formatter = handler.formatter370 handler.setFormatter(tagging_formatter.base_formatter)371 def _pop_context(self):372 self._flush_all_streams()373 context = self._context_stack.pop()374 self._restore_context(context)375 def undo_redirect(self):376 """377 Undo the last redirection (that hasn't yet been undone).378 If any subprocesses have been launched since the redirection was379 performed, they must have ended by the time this is called. Otherwise,380 this will hang waiting for the logging subprocess to end.381 """382 if not self._context_stack:383 raise RuntimeError('No redirects to undo')384 self._pop_context()385 def restore(self):386 """387 Same as undo_redirect(). For backwards compatibility with388 fd_stack.389 """390 self.undo_redirect()391class _FdRedirectionStreamManager(_StreamManager):392 """393 Like StreamManager, but also captures output from subprocesses by modifying394 the underlying file descriptors.395 For the underlying file descriptors, we spawn a subprocess that writes all396 input to the logging module, and we point the FD to that subprocess. As a397 result, every time we redirect output we need to spawn a new subprocess to398 pick up the new logging settings (without disturbing any existing processes399 using the old logging subprocess).400 If, one day, we could get all code using and friends to launch401 subprocesses, we'd no longer need to handle raw FD output, and we could402 get rid of all this business with subprocesses. Another option would be403 to capture all stray output to a single, separate destination.404 """405 #继承了StreamManager,同时获取subprocess的输出,406 #打开一个子线程,去管道中读取父进程的log,目的是为了用子进程去获得最新的logging配置407 #如果有一天我们都是通过utils.run执行命令,就不需要这个奇怪的方法收集log408 def __init__(self, stream, level, stream_setter):409 if not hasattr(stream, 'fileno'):410 # with fake, in-process file objects, subprocess output won't be411 # captured. this should never happen in normal use, since the412 # factory methods will only pass sys.stdout and sys.stderr.413 raise ValueError("FdRedirectionLoggingManager won't work with "414 "streams that aren't backed by file "415 "descriptors")416 super(_FdRedirectionStreamManager, self).__init__(stream, level,417 stream_setter)418 #self._fd直接指向stream的句柄419 self._fd = stream.fileno()420 self._fd_copy_stream = None421 def _point_stream_handlers_to_copy(self):422 """423 point logging StreamHandlers that point to this stream to a safe424 copy of the underlying FD. otherwise, StreamHandler output will go425 to the logging subprocess, effectively getting doubly logged.426 """427 #安全复制一个文件句柄428 fd_copy = os.dup(self._fd)429 #打开这个文件430 self._fd_copy_stream = os.fdopen(fd_copy, 'w')431 self._redirect_logging_stream_handlers(self._stream,432 self._fd_copy_stream)433 def _restore_stream_handlers(self):434 """ point logging StreamHandlers back to the original FD """435 #关闭那个用来用来复制的stream,把复制的handler又指回来436 self._redirect_logging_stream_handlers(self._fd_copy_stream,437 self._stream)438 self._fd_copy_stream.close()439 def _redirect_logging_stream_handlers(self, old_stream, new_stream):440 """441 Redirect all configured logging StreamHandlers pointing to442 old_stream to point to new_stream instead.443 """444 #将所有指向old_stream的handler重定向到新的stream中去445 #查找所有当前存在的handler,如果是StreamHandler,如果stream有fd446 #如果fd等于old_stream,那么删除旧的handler,添加一个新的StreamHandler,但是stream没有停447 for handler in _current_handlers():448 points_to_stream = (isinstance(handler, logging.StreamHandler) and449 hasattr(, 'fileno') and450 == old_stream.fileno())451 if points_to_stream:452 logger.removeHandler(handler)453 handler.close() # doesn't close the stream, just the handler454 new_handler = logging.StreamHandler(new_stream)455 new_handler.setLevel(handler.level)456 new_handler.setFormatter(handler.formatter)457 for log_filter in handler.filters:458 new_handler.addFilter(log_filter)459 logger.addHandler(new_handler)460 def start_logging(self):461 super(_FdRedirectionStreamManager, self).start_logging()462 self._point_stream_handlers_to_copy()463 def stop_logging(self):464 super(_FdRedirectionStreamManager, self).stop_logging()465 self._restore_stream_handlers()466 def _spawn_logging_subprocess(self):467 """468 Spawn a subprocess to log all input to the logging module with the469 current settings, and direct output to it.470 """471 #Par将当前的log写入管道 write --> child从管道中读取log,log出去472 read_end, write_end = os.pipe()473 pid = os.fork()474 if pid: # parent475 #476 os.close(read_end)477 os.dup2(write_end, self._fd) # point FD to the subprocess478 os.close(write_end)479 return pid480 else: # child481 try:482 os.close(write_end)483 # ensure this subprocess doesn't hold any pipes to others484 os.close(1)485 os.close(2)486 self._run_logging_subprocess(read_end) # never returns487 except Exception:488 # don't let exceptions in the child escape489 try:490 logging.exception('Logging subprocess died:')491 finally:492 os._exit(1)493 def _run_logging_subprocess(self, read_fd):494 """495 Always run from a subprocess. Read from read_fd and write to the496 logging module until EOF.497 """498 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler499 input_file = os.fdopen(read_fd, 'r')500 for line in iter(input_file.readline, ''):501 logging.log(self._level, line.rstrip('\n'))502 os._exit(0)503 def _context_id(self):504 return '%s_context' % id(self)505 def on_push_context(self, context):506 # adds a context dict for this stream, $id_context, with the following:507 # * old_fd: FD holding a copy of the managed FD before launching a new508 # subprocess.509 # * child_pid: PID of the logging subprocess launched510 fd_copy = os.dup(self._fd)511 child_pid = self._spawn_logging_subprocess()512 my_context = {'old_fd': fd_copy, 'child_pid': child_pid}513 context[self._context_id()] = my_context514 def on_restore_context(self, context):515 my_context = context[self._context_id()]516 # shut down subprocess517 child_pid = my_context['child_pid']518 try:519 os.close(self._fd)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?