How to use runner method in pytest-benchmark

Best Python code snippet using pytest-benchmark

perftestsrunner_unittest.py

Source:perftestsrunner_unittest.py Github

copy

Full Screen

...99 def start(self):100 """do nothing"""101 def stop(self):102 """do nothing"""103 def create_runner(self, args=[], driver_class=TestDriver):104 options, parsed_args = PerfTestsRunner._parse_args(args)105 test_port = TestPort(host=MockHost(), options=options)106 test_port.create_driver = lambda worker_number=None, no_timeout=False: driver_class()107 runner = PerfTestsRunner(args=args, port=test_port)108 runner._host.filesystem.maybe_make_directory(runner._base_path, 'inspector')109 runner._host.filesystem.maybe_make_directory(runner._base_path, 'Bindings')110 runner._host.filesystem.maybe_make_directory(runner._base_path, 'Parser')111 return runner112 def run_test(self, test_name):113 runner = self.create_runner()114 driver = MainTest.TestDriver()115 return runner._run_single_test(ChromiumStylePerfTest(test_name, runner._host.filesystem.join('some-dir', test_name)), driver)116 def test_run_passing_test(self):117 self.assertTrue(self.run_test('pass.html'))118 def test_run_silent_test(self):119 self.assertFalse(self.run_test('silent.html'))120 def test_run_failed_test(self):121 self.assertFalse(self.run_test('failed.html'))122 def test_run_tonguey_test(self):123 self.assertFalse(self.run_test('tonguey.html'))124 def test_run_timeout_test(self):125 self.assertFalse(self.run_test('timeout.html'))126 def test_run_crash_test(self):127 self.assertFalse(self.run_test('crash.html'))128 def _tests_for_runner(self, runner, test_names):129 filesystem = runner._host.filesystem130 tests = []131 for test in test_names:132 path = filesystem.join(runner._base_path, test)133 dirname = filesystem.dirname(path)134 if test.startswith('inspector/'):135 tests.append(ChromiumStylePerfTest(test, path))136 else:137 tests.append(PerfTest(test, path))138 return tests139 def test_run_test_set(self):140 runner = self.create_runner()141 tests = self._tests_for_runner(runner, ['inspector/pass.html', 'inspector/silent.html', 'inspector/failed.html',142 'inspector/tonguey.html', 'inspector/timeout.html', 'inspector/crash.html'])143 output = OutputCapture()144 output.capture_output()145 try:146 unexpected_result_count = runner._run_tests_set(tests, runner._port)147 finally:148 stdout, stderr, log = output.restore_output()149 self.assertEqual(unexpected_result_count, len(tests) - 1)150 self.assertTrue('\nRESULT group_name: test_name= 42 ms\n' in log)151 def test_run_test_set_kills_drt_per_run(self):152 class TestDriverWithStopCount(MainTest.TestDriver):153 stop_count = 0154 def stop(self):155 TestDriverWithStopCount.stop_count += 1156 runner = self.create_runner(driver_class=TestDriverWithStopCount)157 tests = self._tests_for_runner(runner, ['inspector/pass.html', 'inspector/silent.html', 'inspector/failed.html',158 'inspector/tonguey.html', 'inspector/timeout.html', 'inspector/crash.html'])159 unexpected_result_count = runner._run_tests_set(tests, runner._port)160 self.assertEqual(TestDriverWithStopCount.stop_count, 6)161 def test_run_test_pause_before_testing(self):162 class TestDriverWithStartCount(MainTest.TestDriver):163 start_count = 0164 def start(self):165 TestDriverWithStartCount.start_count += 1166 runner = self.create_runner(args=["--pause-before-testing"], driver_class=TestDriverWithStartCount)167 tests = self._tests_for_runner(runner, ['inspector/pass.html'])168 output = OutputCapture()169 output.capture_output()170 try:171 unexpected_result_count = runner._run_tests_set(tests, runner._port)172 self.assertEqual(TestDriverWithStartCount.start_count, 1)173 finally:174 stdout, stderr, log = output.restore_output()175 self.assertEqual(stderr, "Ready to run test?\n")176 self.assertEqual(log, "Running inspector/pass.html (1 of 1)\nRESULT group_name: test_name= 42 ms\n\n")177 def test_run_test_set_for_parser_tests(self):178 runner = self.create_runner()179 tests = self._tests_for_runner(runner, ['Bindings/event-target-wrapper.html', 'Parser/some-parser.html'])180 output = OutputCapture()181 output.capture_output()182 try:183 unexpected_result_count = runner._run_tests_set(tests, runner._port)184 finally:185 stdout, stderr, log = output.restore_output()186 self.assertEqual(unexpected_result_count, 0)187 self.assertEqual(log, '\n'.join(['Running Bindings/event-target-wrapper.html (1 of 2)',188 'RESULT Bindings: event-target-wrapper= 1489.05 ms',189 'median= 1487.0 ms, stdev= 14.46 ms, min= 1471.0 ms, max= 1510.0 ms',190 '',191 'Running Parser/some-parser.html (2 of 2)',192 'RESULT Parser: some-parser= 1100.0 ms',193 'median= 1101.0 ms, stdev= 11.0 ms, min= 1080.0 ms, max= 1120.0 ms',194 '', '']))195 def test_run_test_set_with_json_output(self):196 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json'])197 runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True198 runner._host.filesystem.files[runner._base_path + '/Bindings/event-target-wrapper.html'] = True199 runner._timestamp = 123456789200 output_capture = OutputCapture()201 output_capture.capture_output()202 try:203 self.assertEqual(runner.run(), 0)204 finally:205 stdout, stderr, logs = output_capture.restore_output()206 self.assertEqual(logs,207 '\n'.join(['Running Bindings/event-target-wrapper.html (1 of 2)',208 'RESULT Bindings: event-target-wrapper= 1489.05 ms',209 'median= 1487.0 ms, stdev= 14.46 ms, min= 1471.0 ms, max= 1510.0 ms',210 '',211 'Running inspector/pass.html (2 of 2)',212 'RESULT group_name: test_name= 42 ms',213 '', '']))214 self.assertEqual(json.loads(runner._host.filesystem.files['/mock-checkout/output.json']), {215 "timestamp": 123456789, "results":216 {"Bindings/event-target-wrapper": {"max": 1510, "avg": 1489.05, "median": 1487, "min": 1471, "stdev": 14.46, "unit": "ms"},217 "inspector/pass.html:group_name:test_name": 42},218 "webkit-revision": 5678})219 def test_run_test_set_with_json_source(self):220 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json', '--source-json-path=/mock-checkout/source.json'])221 runner._host.filesystem.files['/mock-checkout/source.json'] = '{"key": "value"}'222 runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True223 runner._host.filesystem.files[runner._base_path + '/Bindings/event-target-wrapper.html'] = True224 runner._timestamp = 123456789225 output_capture = OutputCapture()226 output_capture.capture_output()227 try:228 self.assertEqual(runner.run(), 0)229 finally:230 stdout, stderr, logs = output_capture.restore_output()231 self.assertEqual(logs, '\n'.join(['Running Bindings/event-target-wrapper.html (1 of 2)',232 'RESULT Bindings: event-target-wrapper= 1489.05 ms',233 'median= 1487.0 ms, stdev= 14.46 ms, min= 1471.0 ms, max= 1510.0 ms',234 '',235 'Running inspector/pass.html (2 of 2)',236 'RESULT group_name: test_name= 42 ms',237 '', '']))238 self.assertEqual(json.loads(runner._host.filesystem.files['/mock-checkout/output.json']), {239 "timestamp": 123456789, "results":240 {"Bindings/event-target-wrapper": {"max": 1510, "avg": 1489.05, "median": 1487, "min": 1471, "stdev": 14.46, "unit": "ms"},241 "inspector/pass.html:group_name:test_name": 42},242 "webkit-revision": 5678,243 "key": "value"})244 def test_run_test_set_with_multiple_repositories(self):245 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json'])246 runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True247 runner._timestamp = 123456789248 runner._port.repository_paths = lambda: [('webkit', '/mock-checkout'), ('some', '/mock-checkout/some')]249 self.assertEqual(runner.run(), 0)250 self.assertEqual(json.loads(runner._host.filesystem.files['/mock-checkout/output.json']), {251 "timestamp": 123456789, "results": {"inspector/pass.html:group_name:test_name": 42.0}, "webkit-revision": 5678, "some-revision": 5678})252 def test_run_with_upload_json(self):253 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json',254 '--test-results-server', 'some.host', '--platform', 'platform1', '--builder-name', 'builder1', '--build-number', '123'])255 upload_json_is_called = [False]256 upload_json_returns_true = True257 def mock_upload_json(hostname, json_path):258 self.assertEqual(hostname, 'some.host')259 self.assertEqual(json_path, '/mock-checkout/output.json')260 upload_json_is_called[0] = True261 return upload_json_returns_true262 runner._upload_json = mock_upload_json263 runner._host.filesystem.files['/mock-checkout/source.json'] = '{"key": "value"}'264 runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True265 runner._host.filesystem.files[runner._base_path + '/Bindings/event-target-wrapper.html'] = True266 runner._timestamp = 123456789267 self.assertEqual(runner.run(), 0)268 self.assertEqual(upload_json_is_called[0], True)269 generated_json = json.loads(runner._host.filesystem.files['/mock-checkout/output.json'])270 self.assertEqual(generated_json['platform'], 'platform1')271 self.assertEqual(generated_json['builder-name'], 'builder1')272 self.assertEqual(generated_json['build-number'], 123)273 upload_json_returns_true = False274 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json',275 '--test-results-server', 'some.host', '--platform', 'platform1', '--builder-name', 'builder1', '--build-number', '123'])276 runner._upload_json = mock_upload_json277 self.assertEqual(runner.run(), -3)278 def test_upload_json(self):279 runner = self.create_runner()280 runner._host.filesystem.files['/mock-checkout/some.json'] = 'some content'281 called = []282 upload_single_text_file_throws = False283 upload_single_text_file_return_value = StringIO.StringIO('OK')284 class MockFileUploader:285 def __init__(mock, url, timeout):286 self.assertEqual(url, 'https://some.host/api/test/report')287 self.assertTrue(isinstance(timeout, int) and timeout)288 called.append('FileUploader')289 def upload_single_text_file(mock, filesystem, content_type, filename):290 self.assertEqual(filesystem, runner._host.filesystem)291 self.assertEqual(content_type, 'application/json')292 self.assertEqual(filename, 'some.json')293 called.append('upload_single_text_file')294 if upload_single_text_file_throws:295 raise "Some exception"296 return upload_single_text_file_return_value297 runner._upload_json('some.host', 'some.json', MockFileUploader)298 self.assertEqual(called, ['FileUploader', 'upload_single_text_file'])299 output = OutputCapture()300 output.capture_output()301 upload_single_text_file_return_value = StringIO.StringIO('Some error')302 runner._upload_json('some.host', 'some.json', MockFileUploader)303 _, _, logs = output.restore_output()304 self.assertEqual(logs, 'Uploaded JSON but got a bad response:\nSome error\n')305 # Throwing an exception upload_single_text_file shouldn't blow up _upload_json306 called = []307 upload_single_text_file_throws = True308 runner._upload_json('some.host', 'some.json', MockFileUploader)309 self.assertEqual(called, ['FileUploader', 'upload_single_text_file'])310 def test_collect_tests(self):311 runner = self.create_runner()312 filename = runner._host.filesystem.join(runner._base_path, 'inspector', 'a_file.html')313 runner._host.filesystem.files[filename] = 'a content'314 tests = runner._collect_tests()315 self.assertEqual(len(tests), 1)316 def _collect_tests_and_sort_test_name(self, runner):317 return sorted([test.test_name() for test in runner._collect_tests()])318 def test_collect_tests(self):319 runner = self.create_runner(args=['PerformanceTests/test1.html', 'test2.html'])320 def add_file(filename):321 runner._host.filesystem.files[runner._host.filesystem.join(runner._base_path, filename)] = 'some content'322 add_file('test1.html')323 add_file('test2.html')324 add_file('test3.html')325 runner._host.filesystem.chdir(runner._port.perf_tests_dir()[:runner._port.perf_tests_dir().rfind(runner._host.filesystem.sep)])326 self.assertEqual(self._collect_tests_and_sort_test_name(runner), ['test1.html', 'test2.html'])327 def test_collect_tests_with_skipped_list(self):328 runner = self.create_runner()329 def add_file(dirname, filename, content=True):330 dirname = runner._host.filesystem.join(runner._base_path, dirname) if dirname else runner._base_path331 runner._host.filesystem.maybe_make_directory(dirname)332 runner._host.filesystem.files[runner._host.filesystem.join(dirname, filename)] = content333 add_file('inspector', 'test1.html')334 add_file('inspector', 'unsupported_test1.html')335 add_file('inspector', 'test2.html')336 add_file('inspector/resources', 'resource_file.html')337 add_file('unsupported', 'unsupported_test2.html')338 runner._port.skipped_perf_tests = lambda: ['inspector/unsupported_test1.html', 'unsupported']339 self.assertEqual(self._collect_tests_and_sort_test_name(runner), ['inspector/test1.html', 'inspector/test2.html'])340 def test_collect_tests_with_page_load_svg(self):341 runner = self.create_runner()342 def add_file(dirname, filename, content=True):343 dirname = runner._host.filesystem.join(runner._base_path, dirname) if dirname else runner._base_path344 runner._host.filesystem.maybe_make_directory(dirname)345 runner._host.filesystem.files[runner._host.filesystem.join(dirname, filename)] = content346 add_file('PageLoad', 'some-svg-test.svg')347 tests = runner._collect_tests()348 self.assertEqual(len(tests), 1)349 self.assertEqual(tests[0].__class__.__name__, 'PageLoadingPerfTest')350 def test_parse_args(self):351 runner = self.create_runner()352 options, args = PerfTestsRunner._parse_args([353 '--build-directory=folder42',354 '--platform=platform42',355 '--builder-name', 'webkit-mac-1',356 '--build-number=56',357 '--time-out-ms=42',358 '--output-json-path=a/output.json',359 '--source-json-path=a/source.json',360 '--test-results-server=somehost',361 '--debug'])362 self.assertEqual(options.build, True)363 self.assertEqual(options.build_directory, 'folder42')364 self.assertEqual(options.platform, 'platform42')365 self.assertEqual(options.builder_name, 'webkit-mac-1')...

Full Screen

Full Screen

test_runner.py

Source:test_runner.py Github

copy

Full Screen

1# Copyright (c) Twisted Matrix Laboratories.2# See LICENSE for details.3"""4Tests for L{twisted.application.runner._runner}.5"""6from signal import SIGTERM7from io import BytesIO8import errno9from twisted.logger import (10 LogLevel, LogPublisher, LogBeginner,11 FileLogObserver, FilteringLogObserver, LogLevelFilterPredicate,12)13from ...runner import _runner14from .._exit import ExitStatus15from .._pidfile import PIDFile, NonePIDFile16from .._runner import Runner17from .test_pidfile import DummyFilePath18from .mockreactor import MockReactor19import twisted.trial.unittest20class RunnerTests(twisted.trial.unittest.TestCase):21 """22 Tests for L{Runner}.23 """24 def setUp(self):25 # Patch exit and kill so we can capture usage and prevent actual exits26 # and kills.27 self.exit = DummyExit()28 self.kill = DummyKill()29 self.patch(_runner, "exit", self.exit)30 self.patch(_runner, "kill", self.kill)31 # Patch getpid so we get a known result32 self.pid = 133733 self.pidFileContent = u"{}\n".format(self.pid).encode("utf-8")34 # Patch globalLogBeginner so that we aren't trying to install multiple35 # global log observers.36 self.stdout = BytesIO()37 self.stderr = BytesIO()38 self.stdio = DummyStandardIO(self.stdout, self.stderr)39 self.warnings = DummyWarningsModule()40 self.globalLogPublisher = LogPublisher()41 self.globalLogBeginner = LogBeginner(42 self.globalLogPublisher,43 self.stdio.stderr, self.stdio,44 self.warnings,45 )46 self.patch(_runner, "stderr", self.stderr)47 self.patch(_runner, "globalLogBeginner", self.globalLogBeginner)48 def test_runInOrder(self):49 """50 L{Runner.run} calls the expected methods in order.51 """52 runner = DummyRunner({})53 runner.run()54 self.assertEqual(55 runner.calledMethods,56 [57 "killIfRequested",58 "startLogging",59 "startReactor",60 "reactorExited",61 ]62 )63 def test_runUsesPIDFile(self):64 """65 L{Runner.run} uses the provided PID file.66 """67 pidFile = DummyPIDFile()68 runner = DummyRunner(pidFile=pidFile)69 self.assertFalse(pidFile.entered)70 self.assertFalse(pidFile.exited)71 runner.run()72 self.assertTrue(pidFile.entered)73 self.assertTrue(pidFile.exited)74 def test_runAlreadyRunning(self):75 """76 L{Runner.run} exits with L{ExitStatus.EX_USAGE} and the expected77 message if a process is already running that corresponds to the given78 PID file.79 """80 pidFile = PIDFile(DummyFilePath(self.pidFileContent))81 pidFile.isRunning = lambda: True82 runner = DummyRunner(pidFile=pidFile)83 runner.run()84 self.assertEqual(self.exit.status, ExitStatus.EX_CONFIG)85 self.assertEqual(self.exit.message, "Already running.")86 def test_killNotRequested(self):87 """88 L{Runner.killIfRequested} when C{kill} is false doesn't exit and89 doesn't indiscriminately murder anyone.90 """91 runner = Runner({})92 runner.killIfRequested()93 self.assertEqual(self.kill.calls, [])94 self.assertFalse(self.exit.exited)95 def test_killRequestedWithoutPIDFile(self):96 """97 L{Runner.killIfRequested} when C{kill} is true but C{pidFile} is98 L{nonePIDFile} exits with L{ExitStatus.EX_USAGE} and the expected99 message; and also doesn't indiscriminately murder anyone.100 """101 runner = Runner(kill=True)102 runner.killIfRequested()103 self.assertEqual(self.kill.calls, [])104 self.assertEqual(self.exit.status, ExitStatus.EX_USAGE)105 self.assertEqual(self.exit.message, "No PID file specified.")106 def test_killRequestedWithPIDFile(self):107 """108 L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}109 performs a targeted killing of the appropriate process.110 """111 pidFile = PIDFile(DummyFilePath(self.pidFileContent))112 runner = Runner(kill=True, pidFile=pidFile)113 runner.killIfRequested()114 self.assertEqual(self.kill.calls, [(self.pid, SIGTERM)])115 self.assertEqual(self.exit.status, ExitStatus.EX_OK)116 self.assertIdentical(self.exit.message, None)117 def test_killRequestedWithPIDFileCantRead(self):118 """119 L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}120 that it can't read exits with L{ExitStatus.EX_IOERR}.121 """122 pidFile = PIDFile(DummyFilePath(None))123 def read():124 raise OSError(errno.EACCES, "Permission denied")125 pidFile.read = read126 runner = Runner(kill=True, pidFile=pidFile)127 runner.killIfRequested()128 self.assertEqual(self.exit.status, ExitStatus.EX_IOERR)129 self.assertEqual(self.exit.message, "Unable to read PID file.")130 def test_killRequestedWithPIDFileEmpty(self):131 """132 L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}133 containing no value exits with L{ExitStatus.EX_DATAERR}.134 """135 pidFile = PIDFile(DummyFilePath(b""))136 runner = Runner(kill=True, pidFile=pidFile)137 runner.killIfRequested()138 self.assertEqual(self.exit.status, ExitStatus.EX_DATAERR)139 self.assertEqual(self.exit.message, "Invalid PID file.")140 def test_killRequestedWithPIDFileNotAnInt(self):141 """142 L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}143 containing a non-integer value exits with L{ExitStatus.EX_DATAERR}.144 """145 pidFile = PIDFile(DummyFilePath(b"** totally not a number, dude **"))146 runner = Runner(kill=True, pidFile=pidFile)147 runner.killIfRequested()148 self.assertEqual(self.exit.status, ExitStatus.EX_DATAERR)149 self.assertEqual(self.exit.message, "Invalid PID file.")150 def test_startLogging(self):151 """152 L{Runner.startLogging} sets up a filtering observer with a log level153 predicate set to the given log level that contains a file observer of154 the given type which writes to the given file.155 """156 logFile = BytesIO()157 # Patch the log beginner so that we don't try to start the already158 # running (started by trial) logging system.159 class LogBeginner(object):160 def beginLoggingTo(self, observers):161 LogBeginner.observers = observers162 self.patch(_runner, "globalLogBeginner", LogBeginner())163 # Patch FilteringLogObserver so we can capture its arguments164 class MockFilteringLogObserver(FilteringLogObserver):165 def __init__(166 self, observer, predicates,167 negativeObserver=lambda event: None168 ):169 MockFilteringLogObserver.observer = observer170 MockFilteringLogObserver.predicates = predicates171 FilteringLogObserver.__init__(172 self, observer, predicates, negativeObserver173 )174 self.patch(_runner, "FilteringLogObserver", MockFilteringLogObserver)175 # Patch FileLogObserver so we can capture its arguments176 class MockFileLogObserver(FileLogObserver):177 def __init__(self, outFile):178 MockFileLogObserver.outFile = outFile179 FileLogObserver.__init__(self, outFile, str)180 # Start logging181 runner = Runner(182 defaultLogLevel=LogLevel.critical,183 logFile=logFile,184 fileLogObserverFactory=MockFileLogObserver,185 )186 runner.startLogging()187 # Check for a filtering observer188 self.assertEqual(len(LogBeginner.observers), 1)189 self.assertIsInstance(LogBeginner.observers[0], FilteringLogObserver)190 # Check log level predicate with the correct default log level191 self.assertEqual(len(MockFilteringLogObserver.predicates), 1)192 self.assertIsInstance(193 MockFilteringLogObserver.predicates[0],194 LogLevelFilterPredicate195 )196 self.assertIdentical(197 MockFilteringLogObserver.predicates[0].defaultLogLevel,198 LogLevel.critical199 )200 # Check for a file observer attached to the filtering observer201 self.assertIsInstance(202 MockFilteringLogObserver.observer, MockFileLogObserver203 )204 # Check for the file we gave it205 self.assertIdentical(206 MockFilteringLogObserver.observer.outFile, logFile207 )208 def test_startReactorWithoutReactor(self):209 """210 L{Runner.startReactor} without the C{reactor} argument runs the default211 reactor.212 """213 reactor = MockReactor(self)214 self.patch(_runner, "defaultReactor", reactor)215 runner = Runner()216 runner.startReactor()217 self.assertTrue(reactor.hasInstalled)218 self.assertTrue(reactor.hasRun)219 def test_startReactorWithReactor(self):220 """221 L{Runner.startReactor} with the C{reactor} argument runs the given222 reactor.223 """224 reactor = MockReactor(self)225 runner = Runner(reactor=reactor)226 runner.startReactor()227 self.assertTrue(reactor.hasRun)228 def test_startReactorWhenRunning(self):229 """230 L{Runner.startReactor} ensures that C{whenRunning} is called with231 C{whenRunningArguments} when the reactor is running.232 """233 self._testHook("whenRunning", "startReactor")234 def test_whenRunningWithArguments(self):235 """236 L{Runner.whenRunning} calls C{whenRunning} with237 C{whenRunningArguments}.238 """239 self._testHook("whenRunning")240 def test_reactorExitedWithArguments(self):241 """242 L{Runner.whenRunning} calls C{reactorExited} with243 C{reactorExitedArguments}.244 """245 self._testHook("reactorExited")246 def _testHook(self, methodName, callerName=None):247 """248 Verify that the named hook is run with the expected arguments as249 specified by the arguments used to create the L{Runner}, when the250 specified caller is invoked.251 @param methodName: The name of the hook to verify.252 @type methodName: L{str}253 @param callerName: The name of the method that is expected to cause the254 hook to be called.255 If C{None}, use the L{Runner} method with the same name as the256 hook.257 @type callerName: L{str}258 """259 if callerName is None:260 callerName = methodName261 arguments = dict(a=object(), b=object(), c=object())262 argumentsSeen = []263 def hook(**arguments):264 argumentsSeen.append(arguments)265 runnerArguments = {266 methodName: hook,267 "{}Arguments".format(methodName): arguments.copy(),268 }269 runner = Runner(reactor=MockReactor(self), **runnerArguments)270 hookCaller = getattr(runner, callerName)271 hookCaller()272 self.assertEqual(len(argumentsSeen), 1)273 self.assertEqual(argumentsSeen[0], arguments)274class DummyRunner(Runner):275 """276 Stub for L{Runner}.277 Keep track of calls to some methods without actually doing anything.278 """279 def __init__(self, *args, **kwargs):280 Runner.__init__(self, *args, **kwargs)281 self.calledMethods = []282 def killIfRequested(self):283 self.calledMethods.append("killIfRequested")284 def startLogging(self):285 self.calledMethods.append("startLogging")286 def startReactor(self):287 self.calledMethods.append("startReactor")288 def reactorExited(self):289 self.calledMethods.append("reactorExited")290class DummyPIDFile(NonePIDFile):291 """292 Stub for L{PIDFile}.293 Tracks context manager entry/exit without doing anything.294 """295 def __init__(self):296 NonePIDFile.__init__(self)297 self.entered = False298 self.exited = False299 def __enter__(self):300 self.entered = True301 return self302 def __exit__(self, excType, excValue, traceback):303 self.exited = True304class DummyExit(object):305 """306 Stub for L{exit} that remembers whether it's been called and, if it has,307 what arguments it was given.308 """309 def __init__(self):310 self.exited = False311 def __call__(self, status, message=None):312 assert not self.exited313 self.status = status314 self.message = message315 self.exited = True316class DummyKill(object):317 """318 Stub for L{os.kill} that remembers whether it's been called and, if it has,319 what arguments it was given.320 """321 def __init__(self):322 self.calls = []323 def __call__(self, pid, sig):324 self.calls.append((pid, sig))325class DummyStandardIO(object):326 """327 Stub for L{sys} which provides L{BytesIO} streams as stdout and stderr.328 """329 def __init__(self, stdout, stderr):330 self.stdout = stdout331 self.stderr = stderr332class DummyWarningsModule(object):333 """334 Stub for L{warnings} which provides a C{showwarning} method that is a no-op.335 """336 def showwarning(*args, **kwargs):337 """338 Do nothing.339 @param args: ignored.340 @param kwargs: ignored....

Full Screen

Full Screen

top_7_stress.py

Source:top_7_stress.py Github

copy

Full Screen

1# Copyright 2014 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4from telemetry.page import page as page_module5from telemetry.page import shared_page_state6from telemetry import story7def _GetCurrentLocation(action_runner):8 return action_runner.EvaluateJavaScript('document.location.href')9def _WaitForLocationChange(action_runner, old_href):10 action_runner.WaitForJavaScriptCondition(11 'document.location.href != "%s"' % old_href)12class Top7StressPage(page_module.Page):13 def __init__(self, url, page_set, name=''):14 super(Top7StressPage, self).__init__(15 url=url, page_set=page_set, name=name,16 shared_page_state_class=shared_page_state.SharedDesktopPageState,17 credentials_path = 'data/credentials.json')18 self.archive_data_file = 'data/top_7_stress.json'19 def RunPageInteractions(self, action_runner):20 raise NotImplementedError()21class GoogleWebSearchPage(Top7StressPage):22 """ Why: top google property; a google tab is often open """23 def __init__(self, page_set):24 super(GoogleWebSearchPage, self).__init__(25 url='https://www.google.com/#hl=en&q=barack+obama',26 page_set=page_set)27 def RunNavigateSteps(self, action_runner):28 super(GoogleWebSearchPage, self).RunNavigateSteps(action_runner)29 action_runner.WaitForElement(text='Next')30 def RunPageInteractions(self, action_runner):31 with action_runner.CreateGestureInteraction('ScrollAction'):32 action_runner.ScrollPage()33 old_href = _GetCurrentLocation(action_runner)34 action_runner.ClickElement(text='Next')35 _WaitForLocationChange(action_runner, old_href)36 action_runner.WaitForElement(text='Next')37 with action_runner.CreateGestureInteraction('ScrollAction'):38 action_runner.ScrollPage()39 old_href = _GetCurrentLocation(action_runner)40 action_runner.ClickElement(text='Next')41 _WaitForLocationChange(action_runner, old_href)42 action_runner.WaitForElement(text='Next')43 with action_runner.CreateGestureInteraction('ScrollAction'):44 action_runner.ScrollPage()45 old_href = _GetCurrentLocation(action_runner)46 action_runner.ClickElement(text='Next')47 _WaitForLocationChange(action_runner, old_href)48 action_runner.WaitForElement(text='Previous')49 with action_runner.CreateGestureInteraction('ScrollAction'):50 action_runner.ScrollPage()51 old_href = _GetCurrentLocation(action_runner)52 action_runner.ClickElement(text='Previous')53 _WaitForLocationChange(action_runner, old_href)54 action_runner.WaitForElement(text='Previous')55 with action_runner.CreateGestureInteraction('ScrollAction'):56 action_runner.ScrollPage()57 old_href = _GetCurrentLocation(action_runner)58 action_runner.ClickElement(text='Previous')59 _WaitForLocationChange(action_runner, old_href)60 action_runner.WaitForElement(text='Previous')61 with action_runner.CreateGestureInteraction('ScrollAction'):62 action_runner.ScrollPage()63 old_href = _GetCurrentLocation(action_runner)64 action_runner.ClickElement(text='Previous')65 _WaitForLocationChange(action_runner, old_href)66 action_runner.WaitForElement(text='Images')67 with action_runner.CreateGestureInteraction('ScrollAction'):68 action_runner.ScrollPage()69 old_href = _GetCurrentLocation(action_runner)70 action_runner.ClickElement(text='Images')71 _WaitForLocationChange(action_runner, old_href)72 action_runner.WaitForElement(text='Images')73class GmailPage(Top7StressPage):74 """ Why: productivity, top google properties """75 def __init__(self, page_set):76 super(GmailPage, self).__init__(77 url='https://mail.google.com/mail/',78 page_set=page_set)79 self.credentials = 'google'80 def RunNavigateSteps(self, action_runner):81 super(GmailPage, self).RunNavigateSteps(action_runner)82 action_runner.WaitForJavaScriptCondition(83 'window.gmonkey !== undefined &&'84 'document.getElementById("gb") !== null')85 def RunPageInteractions(self, action_runner):86 old_href = _GetCurrentLocation(action_runner)87 action_runner.ClickElement(88 'a[href="https://mail.google.com/mail/u/0/?shva=1#starred"]')89 _WaitForLocationChange(action_runner, old_href)90 old_href = _GetCurrentLocation(action_runner)91 action_runner.ClickElement(92 'a[href="https://mail.google.com/mail/u/0/?shva=1#inbox"]')93 _WaitForLocationChange(action_runner, old_href)94class GoogleCalendarPage(Top7StressPage):95 """ Why: productivity, top google properties """96 def __init__(self, page_set):97 super(GoogleCalendarPage, self).__init__(98 url='https://www.google.com/calendar/',99 page_set=page_set)100 self.credentials = 'google'101 def RunNavigateSteps(self, action_runner):102 super(GoogleCalendarPage, self).RunNavigateSteps(action_runner)103 action_runner.Wait(2)104 action_runner.WaitForElement('div[class~="navForward"]')105 action_runner.ExecuteJavaScript('''106 (function() {107 var elem = document.createElement('meta');108 elem.name='viewport';109 elem.content='initial-scale=1';110 document.body.appendChild(elem);111 })();''')112 action_runner.Wait(1)113 def RunPageInteractions(self, action_runner):114 action_runner.ClickElement('div[class~="navForward"]')115 action_runner.Wait(2)116 action_runner.WaitForElement('div[class~="navForward"]')117 action_runner.ClickElement('div[class~="navForward"]')118 action_runner.Wait(2)119 action_runner.WaitForElement('div[class~="navForward"]')120 action_runner.ClickElement('div[class~="navForward"]')121 action_runner.Wait(2)122 action_runner.WaitForElement('div[class~="navForward"]')123 action_runner.ClickElement('div[class~="navForward"]')124 action_runner.Wait(2)125 action_runner.WaitForElement('div[class~="navBack"]')126 action_runner.ClickElement('div[class~="navBack"]')127 action_runner.Wait(2)128 action_runner.WaitForElement('div[class~="navBack"]')129 action_runner.ClickElement('div[class~="navBack"]')130 action_runner.Wait(2)131 action_runner.WaitForElement('div[class~="navBack"]')132 action_runner.ClickElement('div[class~="navBack"]')133 action_runner.Wait(2)134 action_runner.WaitForElement('div[class~="navBack"]')135 action_runner.ClickElement('div[class~="navBack"]')136 action_runner.Wait(2)137 action_runner.WaitForElement('div[class~="navBack"]')138class GooglePlusPage(Top7StressPage):139 """ Why: social; top google property; Public profile; infinite scrolls """140 def __init__(self, page_set):141 super(GooglePlusPage, self).__init__(142 url='https://plus.google.com/110031535020051778989/posts',143 page_set=page_set)144 self.credentials = 'google'145 def RunNavigateSteps(self, action_runner):146 super(GooglePlusPage, self).RunNavigateSteps(action_runner)147 action_runner.WaitForElement(text='Home')148 def RunPageInteractions(self, action_runner):149 action_runner.ClickElement(text='Home')150 action_runner.Wait(2)151 action_runner.WaitForElement(text='Profile')152 action_runner.ClickElement(text='Profile')153 action_runner.Wait(2)154 action_runner.WaitForElement(text='Explore')155 action_runner.ClickElement(text='Explore')156 action_runner.Wait(2)157 action_runner.WaitForElement(text='Events')158 action_runner.ClickElement(text='Events')159 action_runner.Wait(2)160 action_runner.WaitForElement(text='Communities')161 action_runner.ClickElement(text='Communities')162 action_runner.Wait(2)163 action_runner.WaitForElement(text='Home')164class BlogspotPage(Top7StressPage):165 """ Why: #11 (Alexa global), google property; some blogger layouts have166 infinite scroll but more interesting """167 def __init__(self, page_set):168 super(BlogspotPage, self).__init__(169 url='http://googlewebmastercentral.blogspot.com/',170 page_set=page_set,171 name='Blogger')172 def RunNavigateSteps(self, action_runner):173 super(BlogspotPage, self).RunNavigateSteps(action_runner)174 action_runner.WaitForElement(text='accessibility')175 def RunPageInteractions(self, action_runner):176 action_runner.ClickElement(text='accessibility')177 action_runner.WaitForNavigate()178 with action_runner.CreateGestureInteraction('ScrollAction'):179 action_runner.ScrollPage()180 # Insert 300ms wait to simulate user finger movement,181 # and ensure scheduling of idle tasks.182 action_runner.Wait(0.3)183 action_runner.ClickElement(text='advanced')184 action_runner.WaitForNavigate()185 with action_runner.CreateGestureInteraction('ScrollAction'):186 action_runner.ScrollPage()187 action_runner.Wait(0.3)188 action_runner.ClickElement(text='beginner')189 action_runner.WaitForNavigate()190 with action_runner.CreateGestureInteraction('ScrollAction'):191 action_runner.ScrollPage()192 action_runner.Wait(0.3)193 action_runner.ClickElement(text='Home')194 action_runner.WaitForNavigate()195class WordpressPage(Top7StressPage):196 """ Why: #18 (Alexa global), Picked an interesting post """197 def __init__(self, page_set):198 super(WordpressPage, self).__init__(199 # pylint: disable=line-too-long200 url='http://en.blog.wordpress.com/2012/09/04/freshly-pressed-editors-picks-for-august-2012/',201 page_set=page_set,202 name='Wordpress')203 def RunNavigateSteps(self, action_runner):204 super(WordpressPage, self).RunNavigateSteps(action_runner)205 action_runner.WaitForElement(206 # pylint: disable=line-too-long207 'a[href="http://en.blog.wordpress.com/2012/08/30/new-themes-able-and-sight/"]')208 def RunPageInteractions(self, action_runner):209 with action_runner.CreateGestureInteraction('ScrollAction'):210 action_runner.ScrollPage()211 # Insert 300ms wait to simulate user finger movement,212 # and ensure scheduling of idle tasks.213 action_runner.Wait(0.3)214 action_runner.ClickElement(215 # pylint: disable=line-too-long216 'a[href="http://en.blog.wordpress.com/2012/08/30/new-themes-able-and-sight/"]')217 action_runner.WaitForNavigate()218 with action_runner.CreateGestureInteraction('ScrollAction'):219 action_runner.ScrollPage()220 action_runner.Wait(0.3)221 action_runner.ClickElement(text='Features')222 action_runner.WaitForNavigate()223 with action_runner.CreateGestureInteraction('ScrollAction'):224 action_runner.ScrollPage()225 action_runner.Wait(0.3)226 action_runner.ClickElement(text='News')227 action_runner.WaitForNavigate()228 with action_runner.CreateGestureInteraction('ScrollAction'):229 action_runner.ScrollPage()230class FacebookPage(Top7StressPage):231 """ Why: top social,Public profile """232 def __init__(self, page_set):233 super(FacebookPage, self).__init__(234 url='https://www.facebook.com/barackobama',235 page_set=page_set,236 name='Facebook')237 self.credentials = 'facebook2'238 def RunNavigateSteps(self, action_runner):239 super(FacebookPage, self).RunNavigateSteps(action_runner)240 action_runner.WaitForElement(text='About')241 def RunPageInteractions(self, action_runner):242 # Scroll and wait for the next page to be loaded.243 with action_runner.CreateGestureInteraction('ScrollAction'):244 action_runner.ScrollPage()245 action_runner.WaitForJavaScriptCondition(246 'document.documentElement.scrollHeight - window.innerHeight - '247 'window.pageYOffset > 0')248 # Scroll and wait again.249 with action_runner.CreateGestureInteraction('ScrollAction'):250 action_runner.ScrollPage()251 action_runner.WaitForJavaScriptCondition(252 'document.documentElement.scrollHeight - window.innerHeight - '253 'window.pageYOffset > 0')254class Top7StressPageSet(story.StorySet):255 """ Pages hand-picked for stress testing. """256 def __init__(self):257 super(Top7StressPageSet, self).__init__(258 archive_data_file='data/top_7_stress.json',259 cloud_storage_bucket=story.PARTNER_BUCKET)260 self.AddStory(GoogleWebSearchPage(self))261 self.AddStory(GmailPage(self))262 self.AddStory(GoogleCalendarPage(self))263 self.AddStory(GooglePlusPage(self))264 self.AddStory(BlogspotPage(self))265 self.AddStory(WordpressPage(self))...

Full Screen

Full Screen

test_eval_hook.py

Source:test_eval_hook.py Github

copy

Full Screen

1import os.path as osp2import tempfile3import unittest.mock as mock4from collections import OrderedDict5from unittest.mock import MagicMock, patch6import pytest7import torch8import torch.nn as nn9from mmcv.runner import EpochBasedRunner, build_optimizer10from mmcv.utils import get_logger11from torch.utils.data import DataLoader, Dataset12from mmdet.core import DistEvalHook, EvalHook13class ExampleDataset(Dataset):14 def __init__(self):15 self.index = 016 self.eval_result = [0.1, 0.4, 0.3, 0.7, 0.2, 0.05, 0.4, 0.6]17 def __getitem__(self, idx):18 results = dict(imgs=torch.tensor([1]))19 return results20 def __len__(self):21 return 122 @mock.create_autospec23 def evaluate(self, results, logger=None):24 pass25class EvalDataset(ExampleDataset):26 def evaluate(self, results, logger=None):27 mean_ap = self.eval_result[self.index]28 output = OrderedDict(mAP=mean_ap, index=self.index, score=mean_ap)29 self.index += 130 return output31class ExampleModel(nn.Module):32 def __init__(self):33 super().__init__()34 self.conv = nn.Linear(1, 1)35 self.test_cfg = None36 def forward(self, imgs, rescale=False, return_loss=False):37 return imgs38 def train_step(self, data_batch, optimizer, **kwargs):39 outputs = {40 'loss': 0.5,41 'log_vars': {42 'accuracy': 0.9843 },44 'num_samples': 145 }46 return outputs47@pytest.mark.skipif(48 not torch.cuda.is_available(), reason='requires CUDA support')49@patch('mmdet.apis.single_gpu_test', MagicMock)50@patch('mmdet.apis.multi_gpu_test', MagicMock)51@pytest.mark.parametrize('EvalHookCls', (EvalHook, DistEvalHook))52def test_eval_hook(EvalHookCls):53 with pytest.raises(TypeError):54 # dataloader must be a pytorch DataLoader55 test_dataset = ExampleDataset()56 data_loader = [57 DataLoader(58 test_dataset,59 batch_size=1,60 sampler=None,61 num_worker=0,62 shuffle=False)63 ]64 EvalHookCls(data_loader)65 with pytest.raises(KeyError):66 # rule must be in keys of rule_map67 test_dataset = ExampleDataset()68 data_loader = DataLoader(69 test_dataset,70 batch_size=1,71 sampler=None,72 num_workers=0,73 shuffle=False)74 EvalHookCls(data_loader, save_best='auto', rule='unsupport')75 with pytest.raises(ValueError):76 # key_indicator must be valid when rule_map is None77 test_dataset = ExampleDataset()78 data_loader = DataLoader(79 test_dataset,80 batch_size=1,81 sampler=None,82 num_workers=0,83 shuffle=False)84 EvalHookCls(data_loader, save_best='unsupport')85 optimizer_cfg = dict(86 type='SGD', lr=0.01, momentum=0.9, weight_decay=0.0001)87 test_dataset = ExampleDataset()88 loader = DataLoader(test_dataset, batch_size=1)89 model = ExampleModel()90 optimizer = build_optimizer(model, optimizer_cfg)91 data_loader = DataLoader(test_dataset, batch_size=1)92 eval_hook = EvalHookCls(data_loader, save_best=None)93 with tempfile.TemporaryDirectory() as tmpdir:94 logger = get_logger('test_eval')95 runner = EpochBasedRunner(96 model=model,97 batch_processor=None,98 optimizer=optimizer,99 work_dir=tmpdir,100 logger=logger)101 runner.register_hook(eval_hook)102 runner.run([loader], [('train', 1)], 1)103 assert runner.meta is None or 'best_score' not in runner.meta[104 'hook_msgs']105 assert runner.meta is None or 'best_ckpt' not in runner.meta[106 'hook_msgs']107 # when `save_best` is set to 'auto', first metric will be used.108 loader = DataLoader(EvalDataset(), batch_size=1)109 model = ExampleModel()110 data_loader = DataLoader(EvalDataset(), batch_size=1)111 eval_hook = EvalHookCls(data_loader, interval=1, save_best='auto')112 with tempfile.TemporaryDirectory() as tmpdir:113 logger = get_logger('test_eval')114 runner = EpochBasedRunner(115 model=model,116 batch_processor=None,117 optimizer=optimizer,118 work_dir=tmpdir,119 logger=logger)120 runner.register_checkpoint_hook(dict(interval=1))121 runner.register_hook(eval_hook)122 runner.run([loader], [('train', 1)], 8)123 real_path = osp.join(tmpdir, 'best_mAP_epoch_4.pth')124 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)125 assert runner.meta['hook_msgs']['best_score'] == 0.7126 loader = DataLoader(EvalDataset(), batch_size=1)127 model = ExampleModel()128 data_loader = DataLoader(EvalDataset(), batch_size=1)129 eval_hook = EvalHookCls(data_loader, interval=1, save_best='mAP')130 with tempfile.TemporaryDirectory() as tmpdir:131 logger = get_logger('test_eval')132 runner = EpochBasedRunner(133 model=model,134 batch_processor=None,135 optimizer=optimizer,136 work_dir=tmpdir,137 logger=logger)138 runner.register_checkpoint_hook(dict(interval=1))139 runner.register_hook(eval_hook)140 runner.run([loader], [('train', 1)], 8)141 real_path = osp.join(tmpdir, 'best_mAP_epoch_4.pth')142 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)143 assert runner.meta['hook_msgs']['best_score'] == 0.7144 data_loader = DataLoader(EvalDataset(), batch_size=1)145 eval_hook = EvalHookCls(146 data_loader, interval=1, save_best='score', rule='greater')147 with tempfile.TemporaryDirectory() as tmpdir:148 logger = get_logger('test_eval')149 runner = EpochBasedRunner(150 model=model,151 batch_processor=None,152 optimizer=optimizer,153 work_dir=tmpdir,154 logger=logger)155 runner.register_checkpoint_hook(dict(interval=1))156 runner.register_hook(eval_hook)157 runner.run([loader], [('train', 1)], 8)158 real_path = osp.join(tmpdir, 'best_score_epoch_4.pth')159 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)160 assert runner.meta['hook_msgs']['best_score'] == 0.7161 data_loader = DataLoader(EvalDataset(), batch_size=1)162 eval_hook = EvalHookCls(data_loader, save_best='mAP', rule='less')163 with tempfile.TemporaryDirectory() as tmpdir:164 logger = get_logger('test_eval')165 runner = EpochBasedRunner(166 model=model,167 batch_processor=None,168 optimizer=optimizer,169 work_dir=tmpdir,170 logger=logger)171 runner.register_checkpoint_hook(dict(interval=1))172 runner.register_hook(eval_hook)173 runner.run([loader], [('train', 1)], 8)174 real_path = osp.join(tmpdir, 'best_mAP_epoch_6.pth')175 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)176 assert runner.meta['hook_msgs']['best_score'] == 0.05177 data_loader = DataLoader(EvalDataset(), batch_size=1)178 eval_hook = EvalHookCls(data_loader, save_best='mAP')179 with tempfile.TemporaryDirectory() as tmpdir:180 logger = get_logger('test_eval')181 runner = EpochBasedRunner(182 model=model,183 batch_processor=None,184 optimizer=optimizer,185 work_dir=tmpdir,186 logger=logger)187 runner.register_checkpoint_hook(dict(interval=1))188 runner.register_hook(eval_hook)189 runner.run([loader], [('train', 1)], 2)190 real_path = osp.join(tmpdir, 'best_mAP_epoch_2.pth')191 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)192 assert runner.meta['hook_msgs']['best_score'] == 0.4193 resume_from = osp.join(tmpdir, 'latest.pth')194 loader = DataLoader(ExampleDataset(), batch_size=1)195 eval_hook = EvalHookCls(data_loader, save_best='mAP')196 runner = EpochBasedRunner(197 model=model,198 batch_processor=None,199 optimizer=optimizer,200 work_dir=tmpdir,201 logger=logger)202 runner.register_checkpoint_hook(dict(interval=1))203 runner.register_hook(eval_hook)204 runner.resume(resume_from)205 runner.run([loader], [('train', 1)], 8)206 real_path = osp.join(tmpdir, 'best_mAP_epoch_4.pth')207 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pytest-benchmark automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful