Best Python code snippet using pytest-benchmark
perftestsrunner_unittest.py
Source:perftestsrunner_unittest.py  
...99        def start(self):100            """do nothing"""101        def stop(self):102            """do nothing"""103    def create_runner(self, args=[], driver_class=TestDriver):104        options, parsed_args = PerfTestsRunner._parse_args(args)105        test_port = TestPort(host=MockHost(), options=options)106        test_port.create_driver = lambda worker_number=None, no_timeout=False: driver_class()107        runner = PerfTestsRunner(args=args, port=test_port)108        runner._host.filesystem.maybe_make_directory(runner._base_path, 'inspector')109        runner._host.filesystem.maybe_make_directory(runner._base_path, 'Bindings')110        runner._host.filesystem.maybe_make_directory(runner._base_path, 'Parser')111        return runner112    def run_test(self, test_name):113        runner = self.create_runner()114        driver = MainTest.TestDriver()115        return runner._run_single_test(ChromiumStylePerfTest(test_name, runner._host.filesystem.join('some-dir', test_name)), driver)116    def test_run_passing_test(self):117        self.assertTrue(self.run_test('pass.html'))118    def test_run_silent_test(self):119        self.assertFalse(self.run_test('silent.html'))120    def test_run_failed_test(self):121        self.assertFalse(self.run_test('failed.html'))122    def test_run_tonguey_test(self):123        self.assertFalse(self.run_test('tonguey.html'))124    def test_run_timeout_test(self):125        self.assertFalse(self.run_test('timeout.html'))126    def test_run_crash_test(self):127        self.assertFalse(self.run_test('crash.html'))128    def _tests_for_runner(self, runner, test_names):129        filesystem = runner._host.filesystem130        tests = []131        for test in test_names:132            path = filesystem.join(runner._base_path, test)133            dirname = filesystem.dirname(path)134            if test.startswith('inspector/'):135                tests.append(ChromiumStylePerfTest(test, path))136            else:137                tests.append(PerfTest(test, path))138        return tests139    def test_run_test_set(self):140        runner = self.create_runner()141        tests = self._tests_for_runner(runner, ['inspector/pass.html', 'inspector/silent.html', 'inspector/failed.html',142            'inspector/tonguey.html', 'inspector/timeout.html', 'inspector/crash.html'])143        output = OutputCapture()144        output.capture_output()145        try:146            unexpected_result_count = runner._run_tests_set(tests, runner._port)147        finally:148            stdout, stderr, log = output.restore_output()149        self.assertEqual(unexpected_result_count, len(tests) - 1)150        self.assertTrue('\nRESULT group_name: test_name= 42 ms\n' in log)151    def test_run_test_set_kills_drt_per_run(self):152        class TestDriverWithStopCount(MainTest.TestDriver):153            stop_count = 0154            def stop(self):155                TestDriverWithStopCount.stop_count += 1156        runner = self.create_runner(driver_class=TestDriverWithStopCount)157        tests = self._tests_for_runner(runner, ['inspector/pass.html', 'inspector/silent.html', 'inspector/failed.html',158            'inspector/tonguey.html', 'inspector/timeout.html', 'inspector/crash.html'])159        unexpected_result_count = runner._run_tests_set(tests, runner._port)160        self.assertEqual(TestDriverWithStopCount.stop_count, 6)161    def test_run_test_pause_before_testing(self):162        class TestDriverWithStartCount(MainTest.TestDriver):163            start_count = 0164            def start(self):165                TestDriverWithStartCount.start_count += 1166        runner = self.create_runner(args=["--pause-before-testing"], driver_class=TestDriverWithStartCount)167        tests = self._tests_for_runner(runner, ['inspector/pass.html'])168        output = OutputCapture()169        output.capture_output()170        try:171            unexpected_result_count = runner._run_tests_set(tests, runner._port)172            self.assertEqual(TestDriverWithStartCount.start_count, 1)173        finally:174            stdout, stderr, log = output.restore_output()175        self.assertEqual(stderr, "Ready to run test?\n")176        self.assertEqual(log, "Running inspector/pass.html (1 of 1)\nRESULT group_name: test_name= 42 ms\n\n")177    def test_run_test_set_for_parser_tests(self):178        runner = self.create_runner()179        tests = self._tests_for_runner(runner, ['Bindings/event-target-wrapper.html', 'Parser/some-parser.html'])180        output = OutputCapture()181        output.capture_output()182        try:183            unexpected_result_count = runner._run_tests_set(tests, runner._port)184        finally:185            stdout, stderr, log = output.restore_output()186        self.assertEqual(unexpected_result_count, 0)187        self.assertEqual(log, '\n'.join(['Running Bindings/event-target-wrapper.html (1 of 2)',188        'RESULT Bindings: event-target-wrapper= 1489.05 ms',189        'median= 1487.0 ms, stdev= 14.46 ms, min= 1471.0 ms, max= 1510.0 ms',190        '',191        'Running Parser/some-parser.html (2 of 2)',192        'RESULT Parser: some-parser= 1100.0 ms',193        'median= 1101.0 ms, stdev= 11.0 ms, min= 1080.0 ms, max= 1120.0 ms',194        '', '']))195    def test_run_test_set_with_json_output(self):196        runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json'])197        runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True198        runner._host.filesystem.files[runner._base_path + '/Bindings/event-target-wrapper.html'] = True199        runner._timestamp = 123456789200        output_capture = OutputCapture()201        output_capture.capture_output()202        try:203            self.assertEqual(runner.run(), 0)204        finally:205            stdout, stderr, logs = output_capture.restore_output()206        self.assertEqual(logs,207            '\n'.join(['Running Bindings/event-target-wrapper.html (1 of 2)',208                       'RESULT Bindings: event-target-wrapper= 1489.05 ms',209                       'median= 1487.0 ms, stdev= 14.46 ms, min= 1471.0 ms, max= 1510.0 ms',210                       '',211                       'Running inspector/pass.html (2 of 2)',212                       'RESULT group_name: test_name= 42 ms',213                       '', '']))214        self.assertEqual(json.loads(runner._host.filesystem.files['/mock-checkout/output.json']), {215            "timestamp": 123456789, "results":216            {"Bindings/event-target-wrapper": {"max": 1510, "avg": 1489.05, "median": 1487, "min": 1471, "stdev": 14.46, "unit": "ms"},217            "inspector/pass.html:group_name:test_name": 42},218            "webkit-revision": 5678})219    def test_run_test_set_with_json_source(self):220        runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json', '--source-json-path=/mock-checkout/source.json'])221        runner._host.filesystem.files['/mock-checkout/source.json'] = '{"key": "value"}'222        runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True223        runner._host.filesystem.files[runner._base_path + '/Bindings/event-target-wrapper.html'] = True224        runner._timestamp = 123456789225        output_capture = OutputCapture()226        output_capture.capture_output()227        try:228            self.assertEqual(runner.run(), 0)229        finally:230            stdout, stderr, logs = output_capture.restore_output()231        self.assertEqual(logs, '\n'.join(['Running Bindings/event-target-wrapper.html (1 of 2)',232            'RESULT Bindings: event-target-wrapper= 1489.05 ms',233            'median= 1487.0 ms, stdev= 14.46 ms, min= 1471.0 ms, max= 1510.0 ms',234            '',235            'Running inspector/pass.html (2 of 2)',236            'RESULT group_name: test_name= 42 ms',237            '', '']))238        self.assertEqual(json.loads(runner._host.filesystem.files['/mock-checkout/output.json']), {239            "timestamp": 123456789, "results":240            {"Bindings/event-target-wrapper": {"max": 1510, "avg": 1489.05, "median": 1487, "min": 1471, "stdev": 14.46, "unit": "ms"},241            "inspector/pass.html:group_name:test_name": 42},242            "webkit-revision": 5678,243            "key": "value"})244    def test_run_test_set_with_multiple_repositories(self):245        runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json'])246        runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True247        runner._timestamp = 123456789248        runner._port.repository_paths = lambda: [('webkit', '/mock-checkout'), ('some', '/mock-checkout/some')]249        self.assertEqual(runner.run(), 0)250        self.assertEqual(json.loads(runner._host.filesystem.files['/mock-checkout/output.json']), {251            "timestamp": 123456789, "results": {"inspector/pass.html:group_name:test_name": 42.0}, "webkit-revision": 5678, "some-revision": 5678})252    def test_run_with_upload_json(self):253        runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json',254            '--test-results-server', 'some.host', '--platform', 'platform1', '--builder-name', 'builder1', '--build-number', '123'])255        upload_json_is_called = [False]256        upload_json_returns_true = True257        def mock_upload_json(hostname, json_path):258            self.assertEqual(hostname, 'some.host')259            self.assertEqual(json_path, '/mock-checkout/output.json')260            upload_json_is_called[0] = True261            return upload_json_returns_true262        runner._upload_json = mock_upload_json263        runner._host.filesystem.files['/mock-checkout/source.json'] = '{"key": "value"}'264        runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True265        runner._host.filesystem.files[runner._base_path + '/Bindings/event-target-wrapper.html'] = True266        runner._timestamp = 123456789267        self.assertEqual(runner.run(), 0)268        self.assertEqual(upload_json_is_called[0], True)269        generated_json = json.loads(runner._host.filesystem.files['/mock-checkout/output.json'])270        self.assertEqual(generated_json['platform'], 'platform1')271        self.assertEqual(generated_json['builder-name'], 'builder1')272        self.assertEqual(generated_json['build-number'], 123)273        upload_json_returns_true = False274        runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json',275            '--test-results-server', 'some.host', '--platform', 'platform1', '--builder-name', 'builder1', '--build-number', '123'])276        runner._upload_json = mock_upload_json277        self.assertEqual(runner.run(), -3)278    def test_upload_json(self):279        runner = self.create_runner()280        runner._host.filesystem.files['/mock-checkout/some.json'] = 'some content'281        called = []282        upload_single_text_file_throws = False283        upload_single_text_file_return_value = StringIO.StringIO('OK')284        class MockFileUploader:285            def __init__(mock, url, timeout):286                self.assertEqual(url, 'https://some.host/api/test/report')287                self.assertTrue(isinstance(timeout, int) and timeout)288                called.append('FileUploader')289            def upload_single_text_file(mock, filesystem, content_type, filename):290                self.assertEqual(filesystem, runner._host.filesystem)291                self.assertEqual(content_type, 'application/json')292                self.assertEqual(filename, 'some.json')293                called.append('upload_single_text_file')294                if upload_single_text_file_throws:295                    raise "Some exception"296                return upload_single_text_file_return_value297        runner._upload_json('some.host', 'some.json', MockFileUploader)298        self.assertEqual(called, ['FileUploader', 'upload_single_text_file'])299        output = OutputCapture()300        output.capture_output()301        upload_single_text_file_return_value = StringIO.StringIO('Some error')302        runner._upload_json('some.host', 'some.json', MockFileUploader)303        _, _, logs = output.restore_output()304        self.assertEqual(logs, 'Uploaded JSON but got a bad response:\nSome error\n')305        # Throwing an exception upload_single_text_file shouldn't blow up _upload_json306        called = []307        upload_single_text_file_throws = True308        runner._upload_json('some.host', 'some.json', MockFileUploader)309        self.assertEqual(called, ['FileUploader', 'upload_single_text_file'])310    def test_collect_tests(self):311        runner = self.create_runner()312        filename = runner._host.filesystem.join(runner._base_path, 'inspector', 'a_file.html')313        runner._host.filesystem.files[filename] = 'a content'314        tests = runner._collect_tests()315        self.assertEqual(len(tests), 1)316    def _collect_tests_and_sort_test_name(self, runner):317        return sorted([test.test_name() for test in runner._collect_tests()])318    def test_collect_tests(self):319        runner = self.create_runner(args=['PerformanceTests/test1.html', 'test2.html'])320        def add_file(filename):321            runner._host.filesystem.files[runner._host.filesystem.join(runner._base_path, filename)] = 'some content'322        add_file('test1.html')323        add_file('test2.html')324        add_file('test3.html')325        runner._host.filesystem.chdir(runner._port.perf_tests_dir()[:runner._port.perf_tests_dir().rfind(runner._host.filesystem.sep)])326        self.assertEqual(self._collect_tests_and_sort_test_name(runner), ['test1.html', 'test2.html'])327    def test_collect_tests_with_skipped_list(self):328        runner = self.create_runner()329        def add_file(dirname, filename, content=True):330            dirname = runner._host.filesystem.join(runner._base_path, dirname) if dirname else runner._base_path331            runner._host.filesystem.maybe_make_directory(dirname)332            runner._host.filesystem.files[runner._host.filesystem.join(dirname, filename)] = content333        add_file('inspector', 'test1.html')334        add_file('inspector', 'unsupported_test1.html')335        add_file('inspector', 'test2.html')336        add_file('inspector/resources', 'resource_file.html')337        add_file('unsupported', 'unsupported_test2.html')338        runner._port.skipped_perf_tests = lambda: ['inspector/unsupported_test1.html', 'unsupported']339        self.assertEqual(self._collect_tests_and_sort_test_name(runner), ['inspector/test1.html', 'inspector/test2.html'])340    def test_collect_tests_with_page_load_svg(self):341        runner = self.create_runner()342        def add_file(dirname, filename, content=True):343            dirname = runner._host.filesystem.join(runner._base_path, dirname) if dirname else runner._base_path344            runner._host.filesystem.maybe_make_directory(dirname)345            runner._host.filesystem.files[runner._host.filesystem.join(dirname, filename)] = content346        add_file('PageLoad', 'some-svg-test.svg')347        tests = runner._collect_tests()348        self.assertEqual(len(tests), 1)349        self.assertEqual(tests[0].__class__.__name__, 'PageLoadingPerfTest')350    def test_parse_args(self):351        runner = self.create_runner()352        options, args = PerfTestsRunner._parse_args([353                '--build-directory=folder42',354                '--platform=platform42',355                '--builder-name', 'webkit-mac-1',356                '--build-number=56',357                '--time-out-ms=42',358                '--output-json-path=a/output.json',359                '--source-json-path=a/source.json',360                '--test-results-server=somehost',361                '--debug'])362        self.assertEqual(options.build, True)363        self.assertEqual(options.build_directory, 'folder42')364        self.assertEqual(options.platform, 'platform42')365        self.assertEqual(options.builder_name, 'webkit-mac-1')...test_runner.py
Source:test_runner.py  
1# Copyright (c) Twisted Matrix Laboratories.2# See LICENSE for details.3"""4Tests for L{twisted.application.runner._runner}.5"""6from signal import SIGTERM7from io import BytesIO8import errno9from twisted.logger import (10    LogLevel, LogPublisher, LogBeginner,11    FileLogObserver, FilteringLogObserver, LogLevelFilterPredicate,12)13from ...runner import _runner14from .._exit import ExitStatus15from .._pidfile import PIDFile, NonePIDFile16from .._runner import Runner17from .test_pidfile import DummyFilePath18from .mockreactor import MockReactor19import twisted.trial.unittest20class RunnerTests(twisted.trial.unittest.TestCase):21    """22    Tests for L{Runner}.23    """24    def setUp(self):25        # Patch exit and kill so we can capture usage and prevent actual exits26        # and kills.27        self.exit = DummyExit()28        self.kill = DummyKill()29        self.patch(_runner, "exit", self.exit)30        self.patch(_runner, "kill", self.kill)31        # Patch getpid so we get a known result32        self.pid = 133733        self.pidFileContent = u"{}\n".format(self.pid).encode("utf-8")34        # Patch globalLogBeginner so that we aren't trying to install multiple35        # global log observers.36        self.stdout = BytesIO()37        self.stderr = BytesIO()38        self.stdio = DummyStandardIO(self.stdout, self.stderr)39        self.warnings = DummyWarningsModule()40        self.globalLogPublisher = LogPublisher()41        self.globalLogBeginner = LogBeginner(42            self.globalLogPublisher,43            self.stdio.stderr, self.stdio,44            self.warnings,45        )46        self.patch(_runner, "stderr", self.stderr)47        self.patch(_runner, "globalLogBeginner", self.globalLogBeginner)48    def test_runInOrder(self):49        """50        L{Runner.run} calls the expected methods in order.51        """52        runner = DummyRunner({})53        runner.run()54        self.assertEqual(55            runner.calledMethods,56            [57                "killIfRequested",58                "startLogging",59                "startReactor",60                "reactorExited",61            ]62        )63    def test_runUsesPIDFile(self):64        """65        L{Runner.run} uses the provided PID file.66        """67        pidFile = DummyPIDFile()68        runner = DummyRunner(pidFile=pidFile)69        self.assertFalse(pidFile.entered)70        self.assertFalse(pidFile.exited)71        runner.run()72        self.assertTrue(pidFile.entered)73        self.assertTrue(pidFile.exited)74    def test_runAlreadyRunning(self):75        """76        L{Runner.run} exits with L{ExitStatus.EX_USAGE} and the expected77        message if a process is already running that corresponds to the given78        PID file.79        """80        pidFile = PIDFile(DummyFilePath(self.pidFileContent))81        pidFile.isRunning = lambda: True82        runner = DummyRunner(pidFile=pidFile)83        runner.run()84        self.assertEqual(self.exit.status, ExitStatus.EX_CONFIG)85        self.assertEqual(self.exit.message, "Already running.")86    def test_killNotRequested(self):87        """88        L{Runner.killIfRequested} when C{kill} is false doesn't exit and89        doesn't indiscriminately murder anyone.90        """91        runner = Runner({})92        runner.killIfRequested()93        self.assertEqual(self.kill.calls, [])94        self.assertFalse(self.exit.exited)95    def test_killRequestedWithoutPIDFile(self):96        """97        L{Runner.killIfRequested} when C{kill} is true but C{pidFile} is98        L{nonePIDFile} exits with L{ExitStatus.EX_USAGE} and the expected99        message; and also doesn't indiscriminately murder anyone.100        """101        runner = Runner(kill=True)102        runner.killIfRequested()103        self.assertEqual(self.kill.calls, [])104        self.assertEqual(self.exit.status, ExitStatus.EX_USAGE)105        self.assertEqual(self.exit.message, "No PID file specified.")106    def test_killRequestedWithPIDFile(self):107        """108        L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}109        performs a targeted killing of the appropriate process.110        """111        pidFile = PIDFile(DummyFilePath(self.pidFileContent))112        runner = Runner(kill=True, pidFile=pidFile)113        runner.killIfRequested()114        self.assertEqual(self.kill.calls, [(self.pid, SIGTERM)])115        self.assertEqual(self.exit.status, ExitStatus.EX_OK)116        self.assertIdentical(self.exit.message, None)117    def test_killRequestedWithPIDFileCantRead(self):118        """119        L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}120        that it can't read exits with L{ExitStatus.EX_IOERR}.121        """122        pidFile = PIDFile(DummyFilePath(None))123        def read():124            raise OSError(errno.EACCES, "Permission denied")125        pidFile.read = read126        runner = Runner(kill=True, pidFile=pidFile)127        runner.killIfRequested()128        self.assertEqual(self.exit.status, ExitStatus.EX_IOERR)129        self.assertEqual(self.exit.message, "Unable to read PID file.")130    def test_killRequestedWithPIDFileEmpty(self):131        """132        L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}133        containing no value exits with L{ExitStatus.EX_DATAERR}.134        """135        pidFile = PIDFile(DummyFilePath(b""))136        runner = Runner(kill=True, pidFile=pidFile)137        runner.killIfRequested()138        self.assertEqual(self.exit.status, ExitStatus.EX_DATAERR)139        self.assertEqual(self.exit.message, "Invalid PID file.")140    def test_killRequestedWithPIDFileNotAnInt(self):141        """142        L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}143        containing a non-integer value exits with L{ExitStatus.EX_DATAERR}.144        """145        pidFile = PIDFile(DummyFilePath(b"** totally not a number, dude **"))146        runner = Runner(kill=True, pidFile=pidFile)147        runner.killIfRequested()148        self.assertEqual(self.exit.status, ExitStatus.EX_DATAERR)149        self.assertEqual(self.exit.message, "Invalid PID file.")150    def test_startLogging(self):151        """152        L{Runner.startLogging} sets up a filtering observer with a log level153        predicate set to the given log level that contains a file observer of154        the given type which writes to the given file.155        """156        logFile = BytesIO()157        # Patch the log beginner so that we don't try to start the already158        # running (started by trial) logging system.159        class LogBeginner(object):160            def beginLoggingTo(self, observers):161                LogBeginner.observers = observers162        self.patch(_runner, "globalLogBeginner", LogBeginner())163        # Patch FilteringLogObserver so we can capture its arguments164        class MockFilteringLogObserver(FilteringLogObserver):165            def __init__(166                self, observer, predicates,167                negativeObserver=lambda event: None168            ):169                MockFilteringLogObserver.observer = observer170                MockFilteringLogObserver.predicates = predicates171                FilteringLogObserver.__init__(172                    self, observer, predicates, negativeObserver173                )174        self.patch(_runner, "FilteringLogObserver", MockFilteringLogObserver)175        # Patch FileLogObserver so we can capture its arguments176        class MockFileLogObserver(FileLogObserver):177            def __init__(self, outFile):178                MockFileLogObserver.outFile = outFile179                FileLogObserver.__init__(self, outFile, str)180        # Start logging181        runner = Runner(182            defaultLogLevel=LogLevel.critical,183            logFile=logFile,184            fileLogObserverFactory=MockFileLogObserver,185        )186        runner.startLogging()187        # Check for a filtering observer188        self.assertEqual(len(LogBeginner.observers), 1)189        self.assertIsInstance(LogBeginner.observers[0], FilteringLogObserver)190        # Check log level predicate with the correct default log level191        self.assertEqual(len(MockFilteringLogObserver.predicates), 1)192        self.assertIsInstance(193            MockFilteringLogObserver.predicates[0],194            LogLevelFilterPredicate195        )196        self.assertIdentical(197            MockFilteringLogObserver.predicates[0].defaultLogLevel,198            LogLevel.critical199        )200        # Check for a file observer attached to the filtering observer201        self.assertIsInstance(202            MockFilteringLogObserver.observer, MockFileLogObserver203        )204        # Check for the file we gave it205        self.assertIdentical(206            MockFilteringLogObserver.observer.outFile, logFile207        )208    def test_startReactorWithoutReactor(self):209        """210        L{Runner.startReactor} without the C{reactor} argument runs the default211        reactor.212        """213        reactor = MockReactor(self)214        self.patch(_runner, "defaultReactor", reactor)215        runner = Runner()216        runner.startReactor()217        self.assertTrue(reactor.hasInstalled)218        self.assertTrue(reactor.hasRun)219    def test_startReactorWithReactor(self):220        """221        L{Runner.startReactor} with the C{reactor} argument runs the given222        reactor.223        """224        reactor = MockReactor(self)225        runner = Runner(reactor=reactor)226        runner.startReactor()227        self.assertTrue(reactor.hasRun)228    def test_startReactorWhenRunning(self):229        """230        L{Runner.startReactor} ensures that C{whenRunning} is called with231        C{whenRunningArguments} when the reactor is running.232        """233        self._testHook("whenRunning", "startReactor")234    def test_whenRunningWithArguments(self):235        """236        L{Runner.whenRunning} calls C{whenRunning} with237        C{whenRunningArguments}.238        """239        self._testHook("whenRunning")240    def test_reactorExitedWithArguments(self):241        """242        L{Runner.whenRunning} calls C{reactorExited} with243        C{reactorExitedArguments}.244        """245        self._testHook("reactorExited")246    def _testHook(self, methodName, callerName=None):247        """248        Verify that the named hook is run with the expected arguments as249        specified by the arguments used to create the L{Runner}, when the250        specified caller is invoked.251        @param methodName: The name of the hook to verify.252        @type methodName: L{str}253        @param callerName: The name of the method that is expected to cause the254            hook to be called.255            If C{None}, use the L{Runner} method with the same name as the256            hook.257        @type callerName: L{str}258        """259        if callerName is None:260            callerName = methodName261        arguments = dict(a=object(), b=object(), c=object())262        argumentsSeen = []263        def hook(**arguments):264            argumentsSeen.append(arguments)265        runnerArguments = {266            methodName: hook,267            "{}Arguments".format(methodName): arguments.copy(),268        }269        runner = Runner(reactor=MockReactor(self), **runnerArguments)270        hookCaller = getattr(runner, callerName)271        hookCaller()272        self.assertEqual(len(argumentsSeen), 1)273        self.assertEqual(argumentsSeen[0], arguments)274class DummyRunner(Runner):275    """276    Stub for L{Runner}.277    Keep track of calls to some methods without actually doing anything.278    """279    def __init__(self, *args, **kwargs):280        Runner.__init__(self, *args, **kwargs)281        self.calledMethods = []282    def killIfRequested(self):283        self.calledMethods.append("killIfRequested")284    def startLogging(self):285        self.calledMethods.append("startLogging")286    def startReactor(self):287        self.calledMethods.append("startReactor")288    def reactorExited(self):289        self.calledMethods.append("reactorExited")290class DummyPIDFile(NonePIDFile):291    """292    Stub for L{PIDFile}.293    Tracks context manager entry/exit without doing anything.294    """295    def __init__(self):296        NonePIDFile.__init__(self)297        self.entered = False298        self.exited  = False299    def __enter__(self):300        self.entered = True301        return self302    def __exit__(self, excType, excValue, traceback):303        self.exited  = True304class DummyExit(object):305    """306    Stub for L{exit} that remembers whether it's been called and, if it has,307    what arguments it was given.308    """309    def __init__(self):310        self.exited = False311    def __call__(self, status, message=None):312        assert not self.exited313        self.status  = status314        self.message = message315        self.exited  = True316class DummyKill(object):317    """318    Stub for L{os.kill} that remembers whether it's been called and, if it has,319    what arguments it was given.320    """321    def __init__(self):322        self.calls = []323    def __call__(self, pid, sig):324        self.calls.append((pid, sig))325class DummyStandardIO(object):326    """327    Stub for L{sys} which provides L{BytesIO} streams as stdout and stderr.328    """329    def __init__(self, stdout, stderr):330        self.stdout = stdout331        self.stderr = stderr332class DummyWarningsModule(object):333    """334    Stub for L{warnings} which provides a C{showwarning} method that is a no-op.335    """336    def showwarning(*args, **kwargs):337        """338        Do nothing.339        @param args: ignored.340        @param kwargs: ignored....top_7_stress.py
Source:top_7_stress.py  
1# Copyright 2014 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4from telemetry.page import page as page_module5from telemetry.page import shared_page_state6from telemetry import story7def _GetCurrentLocation(action_runner):8  return action_runner.EvaluateJavaScript('document.location.href')9def _WaitForLocationChange(action_runner, old_href):10  action_runner.WaitForJavaScriptCondition(11      'document.location.href != "%s"' % old_href)12class Top7StressPage(page_module.Page):13  def __init__(self, url, page_set, name=''):14    super(Top7StressPage, self).__init__(15        url=url, page_set=page_set, name=name,16        shared_page_state_class=shared_page_state.SharedDesktopPageState,17        credentials_path = 'data/credentials.json')18    self.archive_data_file = 'data/top_7_stress.json'19  def RunPageInteractions(self, action_runner):20    raise NotImplementedError()21class GoogleWebSearchPage(Top7StressPage):22  """ Why: top google property; a google tab is often open """23  def __init__(self, page_set):24    super(GoogleWebSearchPage, self).__init__(25      url='https://www.google.com/#hl=en&q=barack+obama',26      page_set=page_set)27  def RunNavigateSteps(self, action_runner):28    super(GoogleWebSearchPage, self).RunNavigateSteps(action_runner)29    action_runner.WaitForElement(text='Next')30  def RunPageInteractions(self, action_runner):31    with action_runner.CreateGestureInteraction('ScrollAction'):32      action_runner.ScrollPage()33    old_href = _GetCurrentLocation(action_runner)34    action_runner.ClickElement(text='Next')35    _WaitForLocationChange(action_runner, old_href)36    action_runner.WaitForElement(text='Next')37    with action_runner.CreateGestureInteraction('ScrollAction'):38      action_runner.ScrollPage()39    old_href = _GetCurrentLocation(action_runner)40    action_runner.ClickElement(text='Next')41    _WaitForLocationChange(action_runner, old_href)42    action_runner.WaitForElement(text='Next')43    with action_runner.CreateGestureInteraction('ScrollAction'):44      action_runner.ScrollPage()45    old_href = _GetCurrentLocation(action_runner)46    action_runner.ClickElement(text='Next')47    _WaitForLocationChange(action_runner, old_href)48    action_runner.WaitForElement(text='Previous')49    with action_runner.CreateGestureInteraction('ScrollAction'):50      action_runner.ScrollPage()51    old_href = _GetCurrentLocation(action_runner)52    action_runner.ClickElement(text='Previous')53    _WaitForLocationChange(action_runner, old_href)54    action_runner.WaitForElement(text='Previous')55    with action_runner.CreateGestureInteraction('ScrollAction'):56      action_runner.ScrollPage()57    old_href = _GetCurrentLocation(action_runner)58    action_runner.ClickElement(text='Previous')59    _WaitForLocationChange(action_runner, old_href)60    action_runner.WaitForElement(text='Previous')61    with action_runner.CreateGestureInteraction('ScrollAction'):62      action_runner.ScrollPage()63    old_href = _GetCurrentLocation(action_runner)64    action_runner.ClickElement(text='Previous')65    _WaitForLocationChange(action_runner, old_href)66    action_runner.WaitForElement(text='Images')67    with action_runner.CreateGestureInteraction('ScrollAction'):68      action_runner.ScrollPage()69    old_href = _GetCurrentLocation(action_runner)70    action_runner.ClickElement(text='Images')71    _WaitForLocationChange(action_runner, old_href)72    action_runner.WaitForElement(text='Images')73class GmailPage(Top7StressPage):74  """ Why: productivity, top google properties """75  def __init__(self, page_set):76    super(GmailPage, self).__init__(77      url='https://mail.google.com/mail/',78      page_set=page_set)79    self.credentials = 'google'80  def RunNavigateSteps(self, action_runner):81    super(GmailPage, self).RunNavigateSteps(action_runner)82    action_runner.WaitForJavaScriptCondition(83        'window.gmonkey !== undefined &&'84        'document.getElementById("gb") !== null')85  def RunPageInteractions(self, action_runner):86    old_href = _GetCurrentLocation(action_runner)87    action_runner.ClickElement(88        'a[href="https://mail.google.com/mail/u/0/?shva=1#starred"]')89    _WaitForLocationChange(action_runner, old_href)90    old_href = _GetCurrentLocation(action_runner)91    action_runner.ClickElement(92        'a[href="https://mail.google.com/mail/u/0/?shva=1#inbox"]')93    _WaitForLocationChange(action_runner, old_href)94class GoogleCalendarPage(Top7StressPage):95  """ Why: productivity, top google properties """96  def __init__(self, page_set):97    super(GoogleCalendarPage, self).__init__(98      url='https://www.google.com/calendar/',99      page_set=page_set)100    self.credentials = 'google'101  def RunNavigateSteps(self, action_runner):102    super(GoogleCalendarPage, self).RunNavigateSteps(action_runner)103    action_runner.Wait(2)104    action_runner.WaitForElement('div[class~="navForward"]')105    action_runner.ExecuteJavaScript('''106        (function() {107          var elem = document.createElement('meta');108          elem.name='viewport';109          elem.content='initial-scale=1';110          document.body.appendChild(elem);111        })();''')112    action_runner.Wait(1)113  def RunPageInteractions(self, action_runner):114    action_runner.ClickElement('div[class~="navForward"]')115    action_runner.Wait(2)116    action_runner.WaitForElement('div[class~="navForward"]')117    action_runner.ClickElement('div[class~="navForward"]')118    action_runner.Wait(2)119    action_runner.WaitForElement('div[class~="navForward"]')120    action_runner.ClickElement('div[class~="navForward"]')121    action_runner.Wait(2)122    action_runner.WaitForElement('div[class~="navForward"]')123    action_runner.ClickElement('div[class~="navForward"]')124    action_runner.Wait(2)125    action_runner.WaitForElement('div[class~="navBack"]')126    action_runner.ClickElement('div[class~="navBack"]')127    action_runner.Wait(2)128    action_runner.WaitForElement('div[class~="navBack"]')129    action_runner.ClickElement('div[class~="navBack"]')130    action_runner.Wait(2)131    action_runner.WaitForElement('div[class~="navBack"]')132    action_runner.ClickElement('div[class~="navBack"]')133    action_runner.Wait(2)134    action_runner.WaitForElement('div[class~="navBack"]')135    action_runner.ClickElement('div[class~="navBack"]')136    action_runner.Wait(2)137    action_runner.WaitForElement('div[class~="navBack"]')138class GooglePlusPage(Top7StressPage):139  """ Why: social; top google property; Public profile; infinite scrolls """140  def __init__(self, page_set):141    super(GooglePlusPage, self).__init__(142      url='https://plus.google.com/110031535020051778989/posts',143      page_set=page_set)144    self.credentials = 'google'145  def RunNavigateSteps(self, action_runner):146    super(GooglePlusPage, self).RunNavigateSteps(action_runner)147    action_runner.WaitForElement(text='Home')148  def RunPageInteractions(self, action_runner):149    action_runner.ClickElement(text='Home')150    action_runner.Wait(2)151    action_runner.WaitForElement(text='Profile')152    action_runner.ClickElement(text='Profile')153    action_runner.Wait(2)154    action_runner.WaitForElement(text='Explore')155    action_runner.ClickElement(text='Explore')156    action_runner.Wait(2)157    action_runner.WaitForElement(text='Events')158    action_runner.ClickElement(text='Events')159    action_runner.Wait(2)160    action_runner.WaitForElement(text='Communities')161    action_runner.ClickElement(text='Communities')162    action_runner.Wait(2)163    action_runner.WaitForElement(text='Home')164class BlogspotPage(Top7StressPage):165  """ Why: #11 (Alexa global), google property; some blogger layouts have166  infinite scroll but more interesting """167  def __init__(self, page_set):168    super(BlogspotPage, self).__init__(169      url='http://googlewebmastercentral.blogspot.com/',170      page_set=page_set,171      name='Blogger')172  def RunNavigateSteps(self, action_runner):173    super(BlogspotPage, self).RunNavigateSteps(action_runner)174    action_runner.WaitForElement(text='accessibility')175  def RunPageInteractions(self, action_runner):176    action_runner.ClickElement(text='accessibility')177    action_runner.WaitForNavigate()178    with action_runner.CreateGestureInteraction('ScrollAction'):179      action_runner.ScrollPage()180    # Insert 300ms wait to simulate user finger movement,181    # and ensure scheduling of idle tasks.182    action_runner.Wait(0.3)183    action_runner.ClickElement(text='advanced')184    action_runner.WaitForNavigate()185    with action_runner.CreateGestureInteraction('ScrollAction'):186      action_runner.ScrollPage()187    action_runner.Wait(0.3)188    action_runner.ClickElement(text='beginner')189    action_runner.WaitForNavigate()190    with action_runner.CreateGestureInteraction('ScrollAction'):191      action_runner.ScrollPage()192    action_runner.Wait(0.3)193    action_runner.ClickElement(text='Home')194    action_runner.WaitForNavigate()195class WordpressPage(Top7StressPage):196  """ Why: #18 (Alexa global), Picked an interesting post """197  def __init__(self, page_set):198    super(WordpressPage, self).__init__(199      # pylint: disable=line-too-long200      url='http://en.blog.wordpress.com/2012/09/04/freshly-pressed-editors-picks-for-august-2012/',201      page_set=page_set,202      name='Wordpress')203  def RunNavigateSteps(self, action_runner):204    super(WordpressPage, self).RunNavigateSteps(action_runner)205    action_runner.WaitForElement(206        # pylint: disable=line-too-long207        'a[href="http://en.blog.wordpress.com/2012/08/30/new-themes-able-and-sight/"]')208  def RunPageInteractions(self, action_runner):209    with action_runner.CreateGestureInteraction('ScrollAction'):210      action_runner.ScrollPage()211    # Insert 300ms wait to simulate user finger movement,212    # and ensure scheduling of idle tasks.213    action_runner.Wait(0.3)214    action_runner.ClickElement(215        # pylint: disable=line-too-long216        'a[href="http://en.blog.wordpress.com/2012/08/30/new-themes-able-and-sight/"]')217    action_runner.WaitForNavigate()218    with action_runner.CreateGestureInteraction('ScrollAction'):219      action_runner.ScrollPage()220    action_runner.Wait(0.3)221    action_runner.ClickElement(text='Features')222    action_runner.WaitForNavigate()223    with action_runner.CreateGestureInteraction('ScrollAction'):224      action_runner.ScrollPage()225    action_runner.Wait(0.3)226    action_runner.ClickElement(text='News')227    action_runner.WaitForNavigate()228    with action_runner.CreateGestureInteraction('ScrollAction'):229      action_runner.ScrollPage()230class FacebookPage(Top7StressPage):231  """ Why: top social,Public profile """232  def __init__(self, page_set):233    super(FacebookPage, self).__init__(234      url='https://www.facebook.com/barackobama',235      page_set=page_set,236      name='Facebook')237    self.credentials = 'facebook2'238  def RunNavigateSteps(self, action_runner):239    super(FacebookPage, self).RunNavigateSteps(action_runner)240    action_runner.WaitForElement(text='About')241  def RunPageInteractions(self, action_runner):242    # Scroll and wait for the next page to be loaded.243    with action_runner.CreateGestureInteraction('ScrollAction'):244      action_runner.ScrollPage()245    action_runner.WaitForJavaScriptCondition(246        'document.documentElement.scrollHeight - window.innerHeight - '247        'window.pageYOffset > 0')248    # Scroll and wait again.249    with action_runner.CreateGestureInteraction('ScrollAction'):250      action_runner.ScrollPage()251    action_runner.WaitForJavaScriptCondition(252        'document.documentElement.scrollHeight - window.innerHeight - '253        'window.pageYOffset > 0')254class Top7StressPageSet(story.StorySet):255  """ Pages hand-picked for stress testing. """256  def __init__(self):257    super(Top7StressPageSet, self).__init__(258      archive_data_file='data/top_7_stress.json',259      cloud_storage_bucket=story.PARTNER_BUCKET)260    self.AddStory(GoogleWebSearchPage(self))261    self.AddStory(GmailPage(self))262    self.AddStory(GoogleCalendarPage(self))263    self.AddStory(GooglePlusPage(self))264    self.AddStory(BlogspotPage(self))265    self.AddStory(WordpressPage(self))...test_eval_hook.py
Source:test_eval_hook.py  
1import os.path as osp2import tempfile3import unittest.mock as mock4from collections import OrderedDict5from unittest.mock import MagicMock, patch6import pytest7import torch8import torch.nn as nn9from mmcv.runner import EpochBasedRunner, build_optimizer10from mmcv.utils import get_logger11from torch.utils.data import DataLoader, Dataset12from mmdet.core import DistEvalHook, EvalHook13class ExampleDataset(Dataset):14    def __init__(self):15        self.index = 016        self.eval_result = [0.1, 0.4, 0.3, 0.7, 0.2, 0.05, 0.4, 0.6]17    def __getitem__(self, idx):18        results = dict(imgs=torch.tensor([1]))19        return results20    def __len__(self):21        return 122    @mock.create_autospec23    def evaluate(self, results, logger=None):24        pass25class EvalDataset(ExampleDataset):26    def evaluate(self, results, logger=None):27        mean_ap = self.eval_result[self.index]28        output = OrderedDict(mAP=mean_ap, index=self.index, score=mean_ap)29        self.index += 130        return output31class ExampleModel(nn.Module):32    def __init__(self):33        super().__init__()34        self.conv = nn.Linear(1, 1)35        self.test_cfg = None36    def forward(self, imgs, rescale=False, return_loss=False):37        return imgs38    def train_step(self, data_batch, optimizer, **kwargs):39        outputs = {40            'loss': 0.5,41            'log_vars': {42                'accuracy': 0.9843            },44            'num_samples': 145        }46        return outputs47@pytest.mark.skipif(48    not torch.cuda.is_available(), reason='requires CUDA support')49@patch('mmdet.apis.single_gpu_test', MagicMock)50@patch('mmdet.apis.multi_gpu_test', MagicMock)51@pytest.mark.parametrize('EvalHookCls', (EvalHook, DistEvalHook))52def test_eval_hook(EvalHookCls):53    with pytest.raises(TypeError):54        # dataloader must be a pytorch DataLoader55        test_dataset = ExampleDataset()56        data_loader = [57            DataLoader(58                test_dataset,59                batch_size=1,60                sampler=None,61                num_worker=0,62                shuffle=False)63        ]64        EvalHookCls(data_loader)65    with pytest.raises(KeyError):66        # rule must be in keys of rule_map67        test_dataset = ExampleDataset()68        data_loader = DataLoader(69            test_dataset,70            batch_size=1,71            sampler=None,72            num_workers=0,73            shuffle=False)74        EvalHookCls(data_loader, save_best='auto', rule='unsupport')75    with pytest.raises(ValueError):76        # key_indicator must be valid when rule_map is None77        test_dataset = ExampleDataset()78        data_loader = DataLoader(79            test_dataset,80            batch_size=1,81            sampler=None,82            num_workers=0,83            shuffle=False)84        EvalHookCls(data_loader, save_best='unsupport')85    optimizer_cfg = dict(86        type='SGD', lr=0.01, momentum=0.9, weight_decay=0.0001)87    test_dataset = ExampleDataset()88    loader = DataLoader(test_dataset, batch_size=1)89    model = ExampleModel()90    optimizer = build_optimizer(model, optimizer_cfg)91    data_loader = DataLoader(test_dataset, batch_size=1)92    eval_hook = EvalHookCls(data_loader, save_best=None)93    with tempfile.TemporaryDirectory() as tmpdir:94        logger = get_logger('test_eval')95        runner = EpochBasedRunner(96            model=model,97            batch_processor=None,98            optimizer=optimizer,99            work_dir=tmpdir,100            logger=logger)101        runner.register_hook(eval_hook)102        runner.run([loader], [('train', 1)], 1)103        assert runner.meta is None or 'best_score' not in runner.meta[104            'hook_msgs']105        assert runner.meta is None or 'best_ckpt' not in runner.meta[106            'hook_msgs']107    # when `save_best` is set to 'auto', first metric will be used.108    loader = DataLoader(EvalDataset(), batch_size=1)109    model = ExampleModel()110    data_loader = DataLoader(EvalDataset(), batch_size=1)111    eval_hook = EvalHookCls(data_loader, interval=1, save_best='auto')112    with tempfile.TemporaryDirectory() as tmpdir:113        logger = get_logger('test_eval')114        runner = EpochBasedRunner(115            model=model,116            batch_processor=None,117            optimizer=optimizer,118            work_dir=tmpdir,119            logger=logger)120        runner.register_checkpoint_hook(dict(interval=1))121        runner.register_hook(eval_hook)122        runner.run([loader], [('train', 1)], 8)123        real_path = osp.join(tmpdir, 'best_mAP_epoch_4.pth')124        assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)125        assert runner.meta['hook_msgs']['best_score'] == 0.7126    loader = DataLoader(EvalDataset(), batch_size=1)127    model = ExampleModel()128    data_loader = DataLoader(EvalDataset(), batch_size=1)129    eval_hook = EvalHookCls(data_loader, interval=1, save_best='mAP')130    with tempfile.TemporaryDirectory() as tmpdir:131        logger = get_logger('test_eval')132        runner = EpochBasedRunner(133            model=model,134            batch_processor=None,135            optimizer=optimizer,136            work_dir=tmpdir,137            logger=logger)138        runner.register_checkpoint_hook(dict(interval=1))139        runner.register_hook(eval_hook)140        runner.run([loader], [('train', 1)], 8)141        real_path = osp.join(tmpdir, 'best_mAP_epoch_4.pth')142        assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)143        assert runner.meta['hook_msgs']['best_score'] == 0.7144    data_loader = DataLoader(EvalDataset(), batch_size=1)145    eval_hook = EvalHookCls(146        data_loader, interval=1, save_best='score', rule='greater')147    with tempfile.TemporaryDirectory() as tmpdir:148        logger = get_logger('test_eval')149        runner = EpochBasedRunner(150            model=model,151            batch_processor=None,152            optimizer=optimizer,153            work_dir=tmpdir,154            logger=logger)155        runner.register_checkpoint_hook(dict(interval=1))156        runner.register_hook(eval_hook)157        runner.run([loader], [('train', 1)], 8)158        real_path = osp.join(tmpdir, 'best_score_epoch_4.pth')159        assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)160        assert runner.meta['hook_msgs']['best_score'] == 0.7161    data_loader = DataLoader(EvalDataset(), batch_size=1)162    eval_hook = EvalHookCls(data_loader, save_best='mAP', rule='less')163    with tempfile.TemporaryDirectory() as tmpdir:164        logger = get_logger('test_eval')165        runner = EpochBasedRunner(166            model=model,167            batch_processor=None,168            optimizer=optimizer,169            work_dir=tmpdir,170            logger=logger)171        runner.register_checkpoint_hook(dict(interval=1))172        runner.register_hook(eval_hook)173        runner.run([loader], [('train', 1)], 8)174        real_path = osp.join(tmpdir, 'best_mAP_epoch_6.pth')175        assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)176        assert runner.meta['hook_msgs']['best_score'] == 0.05177    data_loader = DataLoader(EvalDataset(), batch_size=1)178    eval_hook = EvalHookCls(data_loader, save_best='mAP')179    with tempfile.TemporaryDirectory() as tmpdir:180        logger = get_logger('test_eval')181        runner = EpochBasedRunner(182            model=model,183            batch_processor=None,184            optimizer=optimizer,185            work_dir=tmpdir,186            logger=logger)187        runner.register_checkpoint_hook(dict(interval=1))188        runner.register_hook(eval_hook)189        runner.run([loader], [('train', 1)], 2)190        real_path = osp.join(tmpdir, 'best_mAP_epoch_2.pth')191        assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)192        assert runner.meta['hook_msgs']['best_score'] == 0.4193        resume_from = osp.join(tmpdir, 'latest.pth')194        loader = DataLoader(ExampleDataset(), batch_size=1)195        eval_hook = EvalHookCls(data_loader, save_best='mAP')196        runner = EpochBasedRunner(197            model=model,198            batch_processor=None,199            optimizer=optimizer,200            work_dir=tmpdir,201            logger=logger)202        runner.register_checkpoint_hook(dict(interval=1))203        runner.register_hook(eval_hook)204        runner.resume(resume_from)205        runner.run([loader], [('train', 1)], 8)206        real_path = osp.join(tmpdir, 'best_mAP_epoch_4.pth')207        assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
