Best Python code snippet using autotest_python
logging_manager.py
Source:logging_manager.py  
...194        self._replace_with_logger()195    def stop_logging(self):196        """Restore the stream to its original settings."""197        self._restore_stream()198    def on_push_context(self, context):199        """200        Called when the logging manager is about to push a new context onto the201        stack and has changed logging settings.  The StreamHandler can modify202        the context to be saved before returning.203        """204        pass205    def on_restore_context(self, context):206        """207        Called when the logging manager is restoring a previous context.208        """209        pass210class LoggingManager(object):211    """212    Manages a stack of logging configurations, allowing clients to conveniently213    add and remove logging destinations.  Also keeps a list of StreamManagers214    to easily direct streams into the logging module.215    """216    #管çä¸å loggingé
ç½®ï¼å
许clientsæ¹ä¾¿çå¢å loggingä½ç½®ï¼å¦å¤ä¿æä¸ä¸ªStreamMangerå表217    #ç¨äºstreams redirect218    STREAM_MANAGER_CLASS = _StreamManager219    logging_config_object = None220    def __init__(self):221        """222        This class should not ordinarily be constructed directly (other than in223        tests).  Use the module-global factory method get_logging_manager()224        instead.225        """226        #使ç¨å
¨å±å½æ°get_logging_managerï¼ä¸è¦å¤æ¬¡initè¿ä¸ªç±»227        if self.logging_config_object is None:228            raise RuntimeError('You must call configure_logging() before this')229        # _context_stack holds a stack of context dicts.  Each context dict230        # contains:231        # * old_handlers: list of registered logging Handlers232        # contexts may also be extended by _StreamHandlers233        self._context_stack = []234        self._streams = []235        self._started = False236    def manage_stream(self, stream, level, stream_setter):237        """238        Tells this manager to manage the given stream.  All data written to the239        stream will be directed to the logging module instead.  Must be called240        before start_logging().241        :param stream: stream to manage242        :param level: level to log data written to this stream243        :param stream_setter: function to set the stream to a new object244        """245        if self._started:246            raise RuntimeError('You must call this before start_logging()')247        self._streams.append(self.STREAM_MANAGER_CLASS(stream, level,248                                                       stream_setter))249    def _sys_stream_setter(self, stream_name):250        #!éè¦ï¼å°sys.stdoutåstderr设置为ä¸ä¸ªæä»¶å¯¹è±¡ï¼251        assert stream_name in ('stdout', 'stderr'), stream_name252        def set_stream(file_object):253            setattr(sys, stream_name, file_object)254        return set_stream255    def manage_stdout(self):256        self.manage_stream(sys.stdout, logging.INFO,257                           self._sys_stream_setter('stdout'))258    def manage_stderr(self):259        self.manage_stream(sys.stderr, self.logging_config_object.stderr_level,260                           self._sys_stream_setter('stderr'))261    def start_logging(self):262        """263        Begin capturing output to the logging module.264        """265        for stream_manager in self._streams:266            stream_manager.start_logging()267        self._started = True268    def stop_logging(self):269        """270        Restore output to its original state.271        """272        while self._context_stack:273            self._pop_context()274        for stream_manager in self._streams:275            stream_manager.stop_logging()276        self._started = False277    def _clear_all_handlers(self):278        for handler in _current_handlers():279            logger.removeHandler(handler)280    def _get_context(self):281        return {'old_handlers': _current_handlers()}282    def _push_context(self, context):283        for stream_manager in self._streams:284            stream_manager.on_push_context(context)285        self._context_stack.append(context)286    def _flush_all_streams(self):287        for stream_manager in self._streams:288            stream_manager.flush()289    def _add_log_handlers(self, add_handlers_fn):290        """291        Modify the logging module's registered handlers and push a new context292        onto the stack.293        :param add_handlers_fn: function to modify the registered logging294        handlers. Accepts a context dictionary which may be modified.295        """296        self._flush_all_streams()297        context = self._get_context()298        add_handlers_fn(context)299        self._push_context(context)300    class _TaggingFormatter(logging.Formatter):301        """302        Delegates to a given formatter, but prefixes each line of output with a303        tag.304        """305        def __init__(self, base_formatter, tag):306            self.base_formatter = base_formatter307            prefix = tag + ' : '308            self._fmt = base_formatter._fmt.replace('%(message)s',309                                                    prefix + '%(message)s')310            self.datefmt = base_formatter.datefmt311    def _add_tagging_formatter(self, tag):312        for handler in _current_handlers():313            tagging_formatter = self._TaggingFormatter(handler.formatter, tag)314            handler.setFormatter(tagging_formatter)315    def _do_redirect(self, stream=None, filename=None, level=None,316                     clear_other_handlers=False):317        """318        :param clear_other_handlers - if true, clear out all other logging319        handlers.320        """321        assert bool(stream) != bool(filename)  # xor322        if not self._started:323            raise RuntimeError('You must call start_logging() before this')324        def add_handler(context):325            if clear_other_handlers:326                self._clear_all_handlers()327            if stream:328                handler = self.logging_config_object.add_stream_handler(stream)329            else:330                handler = self.logging_config_object.add_file_handler(filename)331            if level:332                handler.setLevel(level)333        self._add_log_handlers(add_handler)334    def redirect(self, filename):335        """Redirect output to the specified file"""336        self._do_redirect(filename=filename, clear_other_handlers=True)337    def redirect_to_stream(self, stream):338        """Redirect output to the given stream"""339        self._do_redirect(stream=stream, clear_other_handlers=True)340    def tee_redirect(self, filename, level=None):341        """Tee output to the specified file"""342        self._do_redirect(filename=filename, level=level)343    def tee_redirect_to_stream(self, stream):344        """Tee output to the given stream"""345        self._do_redirect(stream=stream)346    def tee_redirect_debug_dir(self, debug_dir, log_name=None, tag=None):347        """348        Tee output to a full new set of debug logs in the given directory.349        """350        def add_handlers(context):351            if tag:352                self._add_tagging_formatter(tag)353                context['tag_added'] = True354            self.logging_config_object.add_debug_file_handlers(355                debug_dir, log_name=log_name)356        self._add_log_handlers(add_handlers)357    def _restore_context(self, context):358        for stream_handler in self._streams:359            stream_handler.on_restore_context(context)360        # restore logging handlers361        old_handlers = context['old_handlers']362        for handler in _current_handlers() - old_handlers:363            handler.close()364        self._clear_all_handlers()365        for handler in old_handlers:366            logger.addHandler(handler)367        if 'tag_added' in context:368            for handler in _current_handlers():369                tagging_formatter = handler.formatter370                handler.setFormatter(tagging_formatter.base_formatter)371    def _pop_context(self):372        self._flush_all_streams()373        context = self._context_stack.pop()374        self._restore_context(context)375    def undo_redirect(self):376        """377        Undo the last redirection (that hasn't yet been undone).378        If any subprocesses have been launched since the redirection was379        performed, they must have ended by the time this is called.  Otherwise,380        this will hang waiting for the logging subprocess to end.381        """382        if not self._context_stack:383            raise RuntimeError('No redirects to undo')384        self._pop_context()385    def restore(self):386        """387        Same as undo_redirect().  For backwards compatibility with388        fd_stack.389        """390        self.undo_redirect()391class _FdRedirectionStreamManager(_StreamManager):392    """393    Like StreamManager, but also captures output from subprocesses by modifying394    the underlying file descriptors.395    For the underlying file descriptors, we spawn a subprocess that writes all396    input to the logging module, and we point the FD to that subprocess.  As a397    result, every time we redirect output we need to spawn a new subprocess to398    pick up the new logging settings (without disturbing any existing processes399    using the old logging subprocess).400    If, one day, we could get all code using utils.run() and friends to launch401    subprocesses, we'd no longer need to handle raw FD output, and we could402    get rid of all this business with subprocesses.  Another option would be403    to capture all stray output to a single, separate destination.404    """405    #ç»§æ¿äºStreamManagerï¼åæ¶è·åsubprocessçè¾åºï¼406    #æå¼ä¸ä¸ªå线ç¨ï¼å»ç®¡éä¸è¯»åç¶è¿ç¨çlogï¼ç®çæ¯ä¸ºäºç¨åè¿ç¨å»è·å¾ææ°çloggingé
ç½®407    #妿æä¸å¤©æä»¬é½æ¯éè¿utils.runæ§è¡å½ä»¤ï¼å°±ä¸éè¦è¿ä¸ªå¥æªçæ¹æ³æ¶élog408    def __init__(self, stream, level, stream_setter):409        if not hasattr(stream, 'fileno'):410            # with fake, in-process file objects, subprocess output won't be411            # captured. this should never happen in normal use, since the412            # factory methods will only pass sys.stdout and sys.stderr.413            raise ValueError("FdRedirectionLoggingManager won't work with "414                             "streams that aren't backed by file "415                             "descriptors")416        super(_FdRedirectionStreamManager, self).__init__(stream, level,417                                                          stream_setter)418        #self._fdç´æ¥æåstreamç奿419        self._fd = stream.fileno()420        self._fd_copy_stream = None421    def _point_stream_handlers_to_copy(self):422        """423        point logging StreamHandlers that point to this stream to a safe424        copy of the underlying FD. otherwise, StreamHandler output will go425        to the logging subprocess, effectively getting doubly logged.426        """427        #å®å
¨å¤å¶ä¸ä¸ªæä»¶å¥æ428        fd_copy = os.dup(self._fd)429        #æå¼è¿ä¸ªæä»¶430        self._fd_copy_stream = os.fdopen(fd_copy, 'w')431        self._redirect_logging_stream_handlers(self._stream,432                                               self._fd_copy_stream)433    def _restore_stream_handlers(self):434        """ point logging StreamHandlers back to the original FD """435        #å
³éé£ä¸ªç¨æ¥ç¨æ¥å¤å¶çstreamï¼æå¤å¶çhandleråæåæ¥436        self._redirect_logging_stream_handlers(self._fd_copy_stream,437                                               self._stream)438        self._fd_copy_stream.close()439    def _redirect_logging_stream_handlers(self, old_stream, new_stream):440        """441        Redirect all configured logging StreamHandlers pointing to442        old_stream to point to new_stream instead.443        """444        #å°æææåold_streamçhandleréå®åå°æ°çstreamä¸å»445        #æ¥æ¾ææå½ååå¨çhandlerï¼å¦ææ¯StreamHandlerï¼å¦æstreamæfd446        #妿fdçäºold_streamï¼é£ä¹å é¤æ§çhandlerï¼æ·»å ä¸ä¸ªæ°çStreamHandlerï¼ä½æ¯stream没æå447        for handler in _current_handlers():448            points_to_stream = (isinstance(handler, logging.StreamHandler) and449                                hasattr(handler.stream, 'fileno') and450                                handler.stream.fileno() == old_stream.fileno())451            if points_to_stream:452                logger.removeHandler(handler)453                handler.close()  # doesn't close the stream, just the handler454                new_handler = logging.StreamHandler(new_stream)455                new_handler.setLevel(handler.level)456                new_handler.setFormatter(handler.formatter)457                for log_filter in handler.filters:458                    new_handler.addFilter(log_filter)459                logger.addHandler(new_handler)460    def start_logging(self):461        super(_FdRedirectionStreamManager, self).start_logging()462        self._point_stream_handlers_to_copy()463    def stop_logging(self):464        super(_FdRedirectionStreamManager, self).stop_logging()465        self._restore_stream_handlers()466    def _spawn_logging_subprocess(self):467        """468        Spawn a subprocess to log all input to the logging module with the469        current settings, and direct output to it.470        """471        #Parå°å½åçlogåå
¥ç®¡é write -->  childä»ç®¡éä¸è¯»ålogï¼logåºå»472        read_end, write_end = os.pipe()473        pid = os.fork()474        if pid:  # parent475            #476            os.close(read_end)477            os.dup2(write_end, self._fd)  # point FD to the subprocess478            os.close(write_end)479            return pid480        else:  # child481            try:482                os.close(write_end)483                # ensure this subprocess doesn't hold any pipes to others484                os.close(1)485                os.close(2)486                self._run_logging_subprocess(read_end)  # never returns487            except Exception:488                # don't let exceptions in the child escape489                try:490                    logging.exception('Logging subprocess died:')491                finally:492                    os._exit(1)493    def _run_logging_subprocess(self, read_fd):494        """495        Always run from a subprocess.  Read from read_fd and write to the496        logging module until EOF.497        """498        signal.signal(signal.SIGTERM, signal.SIG_DFL)  # clear handler499        input_file = os.fdopen(read_fd, 'r')500        for line in iter(input_file.readline, ''):501            logging.log(self._level, line.rstrip('\n'))502        os._exit(0)503    def _context_id(self):504        return '%s_context' % id(self)505    def on_push_context(self, context):506        # adds a context dict for this stream, $id_context, with the following:507        # * old_fd: FD holding a copy of the managed FD before launching a new508        #   subprocess.509        # * child_pid: PID of the logging subprocess launched510        fd_copy = os.dup(self._fd)511        child_pid = self._spawn_logging_subprocess()512        my_context = {'old_fd': fd_copy, 'child_pid': child_pid}513        context[self._context_id()] = my_context514    def on_restore_context(self, context):515        my_context = context[self._context_id()]516        # shut down subprocess517        child_pid = my_context['child_pid']518        try:519            os.close(self._fd)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
