Best Python code snippet using autotest_python
logging_manager_unittest.py
Source:logging_manager_unittest.py  
1#!/usr/bin/python2import logging3import os4import StringIO5import subprocess6import unittest7import select8try:9    import autotest.common as common10except ImportError:11    import common12from autotest.client.shared import logging_manager, logging_config13class PipedStringIO(object):14    """15    Like StringIO, but all I/O passes through a pipe.  This means a16    PipedStringIO is backed by a file descriptor is thus can do things like17    pass down to a subprocess.  However, this means the creating process must18    call read_pipe() (or the classmethod read_all_pipes()) periodically to read19    the pipe, and must call close() (or the classmethod cleanup()) to close the20    pipe.21    """22    _instances = set()23    def __init__(self):24        self._string_io = StringIO.StringIO()25        self._read_end, self._write_end = os.pipe()26        PipedStringIO._instances.add(self)27    def close(self):28        self._string_io.close()29        os.close(self._read_end)30        os.close(self._write_end)31        PipedStringIO._instances.remove(self)32    def write(self, data):33        os.write(self._write_end, data)34    def flush(self):35        pass36    def fileno(self):37        return self._write_end38    def getvalue(self):39        self.read_pipe()40        return self._string_io.getvalue()41    def read_pipe(self):42        while True:43            read_list, _, _ = select.select([self._read_end], [], [], 0)44            if not read_list:45                return46            self._string_io.write(os.read(self._read_end, 1024))47    @classmethod48    def read_all_pipes(cls):49        for instance in cls._instances:50            instance.read_pipe()51    @classmethod52    def cleanup_all_instances(cls):53        for instance in list(cls._instances):54            instance.close()55LOGGING_FORMAT = '%(levelname)s: %(message)s'56_EXPECTED_STDOUT = """\57print 158system 159INFO: logging 160INFO: print 261INFO: system 262INFO: logging 263INFO: print 664INFO: system 665INFO: logging 666print 767system 768INFO: logging 769"""70_EXPECTED_LOG1 = """\71INFO: print 372INFO: system 373INFO: logging 374INFO: print 475INFO: system 476INFO: logging 477INFO: print 578INFO: system 579INFO: logging 580"""81_EXPECTED_LOG2 = """\82INFO: print 483INFO: system 484INFO: logging 485"""86class DummyLoggingConfig(logging_config.LoggingConfig):87    console_formatter = logging.Formatter(LOGGING_FORMAT)88    def __init__(self):89        super(DummyLoggingConfig, self).__init__()90        self.log = PipedStringIO()91    def add_debug_file_handlers(self, log_dir, log_name=None):92        self.logger.addHandler(logging.StreamHandler(self.log))93# this isn't really a unit test since it creates subprocesses and pipes and all94# that. i just use the unittest framework because it's convenient.95class LoggingManagerTest(unittest.TestCase):96    def setUp(self):97        self.stdout = PipedStringIO()98        self._log1 = PipedStringIO()99        self._log2 = PipedStringIO()100        self._real_system_calls = False101        # the LoggingManager will change self.stdout (by design), so keep a102        # copy around103        self._original_stdout = self.stdout104        # clear out import-time logging config and reconfigure105        root_logger = logging.getLogger()106        for handler in list(root_logger.handlers):107            root_logger.removeHandler(handler)108        # use INFO to ignore debug output from logging_manager itself109        logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT,110                            stream=self.stdout)111        self._config_object = DummyLoggingConfig()112        logging_manager.LoggingManager.logging_config_object = (113            self._config_object)114    def tearDown(self):115        PipedStringIO.cleanup_all_instances()116    def _say(self, suffix):117        print >>self.stdout, 'print %s' % suffix118        if self._real_system_calls:119            os.system('echo system %s >&%s' % (suffix,120                                               self._original_stdout.fileno()))121        else:122            print >>self.stdout, 'system %s' % suffix123        logging.info('logging %s', suffix)124        PipedStringIO.read_all_pipes()125    def _setup_manager(self, manager_class=logging_manager.LoggingManager):126        def set_stdout(file_object):127            self.stdout = file_object128        manager = manager_class()129        manager.manage_stream(self.stdout, logging.INFO, set_stdout)130        return manager131    def _run_test(self, manager_class):132        manager = self._setup_manager(manager_class)133        self._say(1)134        manager.start_logging()135        self._say(2)136        manager.redirect_to_stream(self._log1)137        self._say(3)138        manager.tee_redirect_to_stream(self._log2)139        self._say(4)140        manager.undo_redirect()141        self._say(5)142        manager.undo_redirect()143        self._say(6)144        manager.stop_logging()145        self._say(7)146    def _grab_fd_info(self):147        command = 'ls -l /proc/%s/fd' % os.getpid()148        proc = subprocess.Popen(command.split(), shell=True,149                                stdout=subprocess.PIPE)150        return proc.communicate()[0]151    def _compare_logs(self, log_buffer, expected_text):152        actual_lines = log_buffer.getvalue().splitlines()153        expected_lines = expected_text.splitlines()154        if self._real_system_calls:155            # because of the many interacting processes, we can't ensure perfect156            # interleaving.  so compare sets of lines rather than ordered lines.157            actual_lines = set(actual_lines)158            expected_lines = set(expected_lines)159        self.assertEquals(actual_lines, expected_lines)160    def _check_results(self):161        # ensure our stdout was restored162        self.assertEquals(self.stdout, self._original_stdout)163        if self._real_system_calls:164            # ensure FDs were left in their original state165            self.assertEquals(self._grab_fd_info(), self._fd_info)166        self._compare_logs(self.stdout, _EXPECTED_STDOUT)167        self._compare_logs(self._log1, _EXPECTED_LOG1)168        self._compare_logs(self._log2, _EXPECTED_LOG2)169    @unittest.skip("logging manager does not behave well under nosetests")170    def test_logging_manager(self):171        self._run_test(logging_manager.LoggingManager)172        self._check_results()173    @unittest.skip("logging manager does not behave well under nosetests")174    def test_fd_redirection_logging_manager(self):175        self._real_system_calls = True176        self._fd_info = self._grab_fd_info()177        self._run_test(logging_manager.FdRedirectionLoggingManager)178        self._check_results()179    @unittest.skip("logging manager does not behave well under nosetests")180    def test_tee_redirect_debug_dir(self):181        manager = self._setup_manager()182        manager.start_logging()183        manager.tee_redirect_debug_dir('/fake/dir', tag='mytag')184        print >>self.stdout, 'hello'185        manager.undo_redirect()186        print >>self.stdout, 'goodbye'187        manager.stop_logging()188        self._compare_logs(self.stdout,189                           'INFO: mytag : hello\nINFO: goodbye')190        self._compare_logs(self._config_object.log, 'hello\n')191class MonkeyPatchTestCase(unittest.TestCase):192    def setUp(self):193        filename = os.path.split(__file__)[1]194        if filename.endswith('.pyc'):195            filename = filename[:-1]196        self.expected_filename = filename197    def check_filename(self, filename, expected=None):198        if expected is None:199            expected = [self.expected_filename]200        self.assertTrue(os.path.split(filename)[1] in expected)201    def _0_test_find_caller(self):202        finder = logging_manager._logging_manager_aware_logger__find_caller203        filename, lineno, caller_name = finder(logging_manager.logger)204        self.check_filename(filename)205        self.assertEquals('test_find_caller', caller_name)206    def _1_test_find_caller(self):207        self._0_test_find_caller()208    def test_find_caller(self):209        self._1_test_find_caller()210    def _0_test_non_reported_find_caller(self):211        finder = logging_manager._logging_manager_aware_logger__find_caller212        filename, lineno, caller_name = finder(logging_manager.logger)213        # Python 2.4 unittest implementation will call the unittest method in214        # file 'unittest.py', and Python >= 2.6 does the same in 'case.py'215        self.check_filename(filename, expected=['unittest.py', 'case.py'])216    def _1_test_non_reported_find_caller(self):217        self._0_test_non_reported_find_caller()218    @logging_manager.do_not_report_as_logging_caller219    def test_non_reported_find_caller(self):220        self._1_test_non_reported_find_caller()221if __name__ == '__main__':...logging_manager_test.py
Source:logging_manager_test.py  
1#!/usr/bin/python2import logging, os, select, StringIO, subprocess, sys, unittest3import common4from autotest_lib.client.common_lib import logging_manager, logging_config5class PipedStringIO(object):6    """7    Like StringIO, but all I/O passes through a pipe.  This means a8    PipedStringIO is backed by a file descriptor is thus can do things like9    pass down to a subprocess.  However, this means the creating process must10    call read_pipe() (or the classmethod read_all_pipes()) periodically to read11    the pipe, and must call close() (or the classmethod cleanup()) to close the12    pipe.13    """14    _instances = set()15    def __init__(self):16        self._string_io = StringIO.StringIO()17        self._read_end, self._write_end = os.pipe()18        PipedStringIO._instances.add(self)19    def close(self):20        self._string_io.close()21        os.close(self._read_end)22        os.close(self._write_end)23        PipedStringIO._instances.remove(self)24    def write(self, data):25        os.write(self._write_end, data)26    def flush(self):27        pass28    def fileno(self):29        return self._write_end30    def getvalue(self):31        self.read_pipe()32        return self._string_io.getvalue()33    def read_pipe(self):34        while True:35            read_list, _, _ = select.select([self._read_end], [], [], 0)36            if not read_list:37                return38            self._string_io.write(os.read(self._read_end, 1024))39    @classmethod40    def read_all_pipes(cls):41        for instance in cls._instances:42            instance.read_pipe()43    @classmethod44    def cleanup_all_instances(cls):45        for instance in list(cls._instances):46            instance.close()47LOGGING_FORMAT = '%(levelname)s: %(message)s'48_EXPECTED_STDOUT = """\49print 150system 151INFO: logging 152INFO: print 253INFO: system 254INFO: logging 255INFO: print 656INFO: system 657INFO: logging 658print 759system 760INFO: logging 761"""62_EXPECTED_LOG1 = """\63INFO: print 364INFO: system 365INFO: logging 366INFO: print 467INFO: system 468INFO: logging 469INFO: print 570INFO: system 571INFO: logging 572"""73_EXPECTED_LOG2 = """\74INFO: print 475INFO: system 476INFO: logging 477"""78class DummyLoggingConfig(logging_config.LoggingConfig):79    console_formatter = logging.Formatter(LOGGING_FORMAT)80    def __init__(self):81        super(DummyLoggingConfig, self).__init__()82        self.log = PipedStringIO()83    def add_debug_file_handlers(self, log_dir, log_name=None):84        self.logger.addHandler(logging.StreamHandler(self.log))85# this isn't really a unit test since it creates subprocesses and pipes and all86# that. i just use the unittest framework because it's convenient.87class LoggingManagerTest(unittest.TestCase):88    def setUp(self):89        self.stdout = PipedStringIO()90        self._log1 = PipedStringIO()91        self._log2 = PipedStringIO()92        self._real_system_calls = False93        # the LoggingManager will change self.stdout (by design), so keep a94        # copy around95        self._original_stdout = self.stdout96        # clear out import-time logging config and reconfigure97        root_logger = logging.getLogger()98        for handler in list(root_logger.handlers):99            root_logger.removeHandler(handler)100        # use INFO to ignore debug output from logging_manager itself101        logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT,102                            stream=self.stdout)103        self._config_object = DummyLoggingConfig()104        logging_manager.LoggingManager.logging_config_object = (105                self._config_object)106    def tearDown(self):107        PipedStringIO.cleanup_all_instances()108    def _say(self, suffix):109        print >>self.stdout, 'print %s' % suffix110        if self._real_system_calls:111            os.system('echo system %s >&%s' % (suffix,112                                               self._original_stdout.fileno()))113        else:114            print >>self.stdout, 'system %s' % suffix115        logging.info('logging %s', suffix)116        PipedStringIO.read_all_pipes()117    def _setup_manager(self, manager_class=logging_manager.LoggingManager):118        def set_stdout(file_object):119            self.stdout = file_object120        manager = manager_class()121        manager.manage_stream(self.stdout, logging.INFO, set_stdout)122        return manager123    def _run_test(self, manager_class):124        manager = self._setup_manager(manager_class)125        self._say(1)126        manager.start_logging()127        self._say(2)128        manager.redirect_to_stream(self._log1)129        self._say(3)130        manager.tee_redirect_to_stream(self._log2)131        self._say(4)132        manager.undo_redirect()133        self._say(5)134        manager.undo_redirect()135        self._say(6)136        manager.stop_logging()137        self._say(7)138    def _grab_fd_info(self):139        command = 'ls -l /proc/%s/fd' % os.getpid()140        proc = subprocess.Popen(command.split(), shell=True,141                                stdout=subprocess.PIPE)142        return proc.communicate()[0]143    def _compare_logs(self, log_buffer, expected_text):144        actual_lines = log_buffer.getvalue().splitlines()145        expected_lines = expected_text.splitlines()146        if self._real_system_calls:147            # because of the many interacting processes, we can't ensure perfect148            # interleaving.  so compare sets of lines rather than ordered lines.149            actual_lines = set(actual_lines)150            expected_lines = set(expected_lines)151        self.assertEquals(actual_lines, expected_lines)152    def _check_results(self):153        # ensure our stdout was restored154        self.assertEquals(self.stdout, self._original_stdout)155        if self._real_system_calls:156            # ensure FDs were left in their original state157            self.assertEquals(self._grab_fd_info(), self._fd_info)158        self._compare_logs(self.stdout, _EXPECTED_STDOUT)159        self._compare_logs(self._log1, _EXPECTED_LOG1)160        self._compare_logs(self._log2, _EXPECTED_LOG2)161    def test_logging_manager(self):162        self._run_test(logging_manager.LoggingManager)163        self._check_results()164    def test_fd_redirection_logging_manager(self):165        self._real_system_calls = True166        self._fd_info = self._grab_fd_info()167        self._run_test(logging_manager.FdRedirectionLoggingManager)168        self._check_results()169    def test_tee_redirect_debug_dir(self):170        manager = self._setup_manager()171        manager.start_logging()172        manager.tee_redirect_debug_dir('/fake/dir', tag='mytag')173        print >>self.stdout, 'hello'174        manager.undo_redirect()175        print >>self.stdout, 'goodbye'176        manager.stop_logging()177        self._compare_logs(self.stdout,178                           'INFO: mytag : hello\nINFO: goodbye')179        self._compare_logs(self._config_object.log, 'hello\n')180class MonkeyPatchTestCase(unittest.TestCase):181    def setUp(self):182        filename = os.path.split(__file__)[1]183        if filename.endswith('.pyc'):184            filename = filename[:-1]185        self.expected_filename = filename186    def check_filename(self, filename, expected=None):187        if expected is None:188            expected = [self.expected_filename]189        self.assertIn(os.path.split(filename)[1], expected)190    def _0_test_find_caller(self):191        finder = logging_manager._logging_manager_aware_logger__find_caller192        filename, lineno, caller_name = finder(logging_manager.logger)193        self.check_filename(filename)194        self.assertEquals('test_find_caller', caller_name)195    def _1_test_find_caller(self):196        self._0_test_find_caller()197    def test_find_caller(self):198        self._1_test_find_caller()199    def _0_test_non_reported_find_caller(self):200        finder = logging_manager._logging_manager_aware_logger__find_caller201        filename, lineno, caller_name = finder(logging_manager.logger)202        # Python 2.4 unittest implementation will call the unittest method in203        # file 'unittest.py', and Python >= 2.6 does the same in 'case.py'204        self.check_filename(filename, expected=['unittest.py', 'case.py'])205    def _1_test_non_reported_find_caller(self):206        self._0_test_non_reported_find_caller()207    @logging_manager.do_not_report_as_logging_caller208    def test_non_reported_find_caller(self):209        self._1_test_non_reported_find_caller()210if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
