How to use runner method in mountebank

Best JavaScript code snippet using mountebank

perftestsrunner_unittest.py

Source:perftestsrunner_unittest.py Github

copy

Full Screen

...99 def start(self):100 """do nothing"""101 def stop(self):102 """do nothing"""103 def create_runner(self, args=[], driver_class=TestDriver):104 options, parsed_args = PerfTestsRunner._parse_args(args)105 test_port = TestPort(host=MockHost(), options=options)106 test_port.create_driver = lambda worker_number=None, no_timeout=False: driver_class()107 runner = PerfTestsRunner(args=args, port=test_port)108 runner._host.filesystem.maybe_make_directory(runner._base_path, 'inspector')109 runner._host.filesystem.maybe_make_directory(runner._base_path, 'Bindings')110 runner._host.filesystem.maybe_make_directory(runner._base_path, 'Parser')111 return runner112 def run_test(self, test_name):113 runner = self.create_runner()114 driver = MainTest.TestDriver()115 return runner._run_single_test(ChromiumStylePerfTest(test_name, runner._host.filesystem.join('some-dir', test_name)), driver)116 def test_run_passing_test(self):117 self.assertTrue(self.run_test('pass.html'))118 def test_run_silent_test(self):119 self.assertFalse(self.run_test('silent.html'))120 def test_run_failed_test(self):121 self.assertFalse(self.run_test('failed.html'))122 def test_run_tonguey_test(self):123 self.assertFalse(self.run_test('tonguey.html'))124 def test_run_timeout_test(self):125 self.assertFalse(self.run_test('timeout.html'))126 def test_run_crash_test(self):127 self.assertFalse(self.run_test('crash.html'))128 def _tests_for_runner(self, runner, test_names):129 filesystem = runner._host.filesystem130 tests = []131 for test in test_names:132 path = filesystem.join(runner._base_path, test)133 dirname = filesystem.dirname(path)134 if test.startswith('inspector/'):135 tests.append(ChromiumStylePerfTest(test, path))136 else:137 tests.append(PerfTest(test, path))138 return tests139 def test_run_test_set(self):140 runner = self.create_runner()141 tests = self._tests_for_runner(runner, ['inspector/pass.html', 'inspector/silent.html', 'inspector/failed.html',142 'inspector/tonguey.html', 'inspector/timeout.html', 'inspector/crash.html'])143 output = OutputCapture()144 output.capture_output()145 try:146 unexpected_result_count = runner._run_tests_set(tests, runner._port)147 finally:148 stdout, stderr, log = output.restore_output()149 self.assertEqual(unexpected_result_count, len(tests) - 1)150 self.assertTrue('\nRESULT group_name: test_name= 42 ms\n' in log)151 def test_run_test_set_kills_drt_per_run(self):152 class TestDriverWithStopCount(MainTest.TestDriver):153 stop_count = 0154 def stop(self):155 TestDriverWithStopCount.stop_count += 1156 runner = self.create_runner(driver_class=TestDriverWithStopCount)157 tests = self._tests_for_runner(runner, ['inspector/pass.html', 'inspector/silent.html', 'inspector/failed.html',158 'inspector/tonguey.html', 'inspector/timeout.html', 'inspector/crash.html'])159 unexpected_result_count = runner._run_tests_set(tests, runner._port)160 self.assertEqual(TestDriverWithStopCount.stop_count, 6)161 def test_run_test_pause_before_testing(self):162 class TestDriverWithStartCount(MainTest.TestDriver):163 start_count = 0164 def start(self):165 TestDriverWithStartCount.start_count += 1166 runner = self.create_runner(args=["--pause-before-testing"], driver_class=TestDriverWithStartCount)167 tests = self._tests_for_runner(runner, ['inspector/pass.html'])168 output = OutputCapture()169 output.capture_output()170 try:171 unexpected_result_count = runner._run_tests_set(tests, runner._port)172 self.assertEqual(TestDriverWithStartCount.start_count, 1)173 finally:174 stdout, stderr, log = output.restore_output()175 self.assertEqual(stderr, "Ready to run test?\n")176 self.assertEqual(log, "Running inspector/pass.html (1 of 1)\nRESULT group_name: test_name= 42 ms\n\n")177 def test_run_test_set_for_parser_tests(self):178 runner = self.create_runner()179 tests = self._tests_for_runner(runner, ['Bindings/event-target-wrapper.html', 'Parser/some-parser.html'])180 output = OutputCapture()181 output.capture_output()182 try:183 unexpected_result_count = runner._run_tests_set(tests, runner._port)184 finally:185 stdout, stderr, log = output.restore_output()186 self.assertEqual(unexpected_result_count, 0)187 self.assertEqual(log, '\n'.join(['Running Bindings/event-target-wrapper.html (1 of 2)',188 'RESULT Bindings: event-target-wrapper= 1489.05 ms',189 'median= 1487.0 ms, stdev= 14.46 ms, min= 1471.0 ms, max= 1510.0 ms',190 '',191 'Running Parser/some-parser.html (2 of 2)',192 'RESULT Parser: some-parser= 1100.0 ms',193 'median= 1101.0 ms, stdev= 11.0 ms, min= 1080.0 ms, max= 1120.0 ms',194 '', '']))195 def test_run_test_set_with_json_output(self):196 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json'])197 runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True198 runner._host.filesystem.files[runner._base_path + '/Bindings/event-target-wrapper.html'] = True199 runner._timestamp = 123456789200 output_capture = OutputCapture()201 output_capture.capture_output()202 try:203 self.assertEqual(runner.run(), 0)204 finally:205 stdout, stderr, logs = output_capture.restore_output()206 self.assertEqual(logs,207 '\n'.join(['Running Bindings/event-target-wrapper.html (1 of 2)',208 'RESULT Bindings: event-target-wrapper= 1489.05 ms',209 'median= 1487.0 ms, stdev= 14.46 ms, min= 1471.0 ms, max= 1510.0 ms',210 '',211 'Running inspector/pass.html (2 of 2)',212 'RESULT group_name: test_name= 42 ms',213 '', '']))214 self.assertEqual(json.loads(runner._host.filesystem.files['/mock-checkout/output.json']), {215 "timestamp": 123456789, "results":216 {"Bindings/event-target-wrapper": {"max": 1510, "avg": 1489.05, "median": 1487, "min": 1471, "stdev": 14.46, "unit": "ms"},217 "inspector/pass.html:group_name:test_name": 42},218 "webkit-revision": 5678})219 def test_run_test_set_with_json_source(self):220 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json', '--source-json-path=/mock-checkout/source.json'])221 runner._host.filesystem.files['/mock-checkout/source.json'] = '{"key": "value"}'222 runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True223 runner._host.filesystem.files[runner._base_path + '/Bindings/event-target-wrapper.html'] = True224 runner._timestamp = 123456789225 output_capture = OutputCapture()226 output_capture.capture_output()227 try:228 self.assertEqual(runner.run(), 0)229 finally:230 stdout, stderr, logs = output_capture.restore_output()231 self.assertEqual(logs, '\n'.join(['Running Bindings/event-target-wrapper.html (1 of 2)',232 'RESULT Bindings: event-target-wrapper= 1489.05 ms',233 'median= 1487.0 ms, stdev= 14.46 ms, min= 1471.0 ms, max= 1510.0 ms',234 '',235 'Running inspector/pass.html (2 of 2)',236 'RESULT group_name: test_name= 42 ms',237 '', '']))238 self.assertEqual(json.loads(runner._host.filesystem.files['/mock-checkout/output.json']), {239 "timestamp": 123456789, "results":240 {"Bindings/event-target-wrapper": {"max": 1510, "avg": 1489.05, "median": 1487, "min": 1471, "stdev": 14.46, "unit": "ms"},241 "inspector/pass.html:group_name:test_name": 42},242 "webkit-revision": 5678,243 "key": "value"})244 def test_run_test_set_with_multiple_repositories(self):245 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json'])246 runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True247 runner._timestamp = 123456789248 runner._port.repository_paths = lambda: [('webkit', '/mock-checkout'), ('some', '/mock-checkout/some')]249 self.assertEqual(runner.run(), 0)250 self.assertEqual(json.loads(runner._host.filesystem.files['/mock-checkout/output.json']), {251 "timestamp": 123456789, "results": {"inspector/pass.html:group_name:test_name": 42.0}, "webkit-revision": 5678, "some-revision": 5678})252 def test_run_with_upload_json(self):253 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json',254 '--test-results-server', 'some.host', '--platform', 'platform1', '--builder-name', 'builder1', '--build-number', '123'])255 upload_json_is_called = [False]256 upload_json_returns_true = True257 def mock_upload_json(hostname, json_path):258 self.assertEqual(hostname, 'some.host')259 self.assertEqual(json_path, '/mock-checkout/output.json')260 upload_json_is_called[0] = True261 return upload_json_returns_true262 runner._upload_json = mock_upload_json263 runner._host.filesystem.files['/mock-checkout/source.json'] = '{"key": "value"}'264 runner._host.filesystem.files[runner._base_path + '/inspector/pass.html'] = True265 runner._host.filesystem.files[runner._base_path + '/Bindings/event-target-wrapper.html'] = True266 runner._timestamp = 123456789267 self.assertEqual(runner.run(), 0)268 self.assertEqual(upload_json_is_called[0], True)269 generated_json = json.loads(runner._host.filesystem.files['/mock-checkout/output.json'])270 self.assertEqual(generated_json['platform'], 'platform1')271 self.assertEqual(generated_json['builder-name'], 'builder1')272 self.assertEqual(generated_json['build-number'], 123)273 upload_json_returns_true = False274 runner = self.create_runner(args=['--output-json-path=/mock-checkout/output.json',275 '--test-results-server', 'some.host', '--platform', 'platform1', '--builder-name', 'builder1', '--build-number', '123'])276 runner._upload_json = mock_upload_json277 self.assertEqual(runner.run(), -3)278 def test_upload_json(self):279 runner = self.create_runner()280 runner._host.filesystem.files['/mock-checkout/some.json'] = 'some content'281 called = []282 upload_single_text_file_throws = False283 upload_single_text_file_return_value = StringIO.StringIO('OK')284 class MockFileUploader:285 def __init__(mock, url, timeout):286 self.assertEqual(url, 'https://some.host/api/test/report')287 self.assertTrue(isinstance(timeout, int) and timeout)288 called.append('FileUploader')289 def upload_single_text_file(mock, filesystem, content_type, filename):290 self.assertEqual(filesystem, runner._host.filesystem)291 self.assertEqual(content_type, 'application/json')292 self.assertEqual(filename, 'some.json')293 called.append('upload_single_text_file')294 if upload_single_text_file_throws:295 raise "Some exception"296 return upload_single_text_file_return_value297 runner._upload_json('some.host', 'some.json', MockFileUploader)298 self.assertEqual(called, ['FileUploader', 'upload_single_text_file'])299 output = OutputCapture()300 output.capture_output()301 upload_single_text_file_return_value = StringIO.StringIO('Some error')302 runner._upload_json('some.host', 'some.json', MockFileUploader)303 _, _, logs = output.restore_output()304 self.assertEqual(logs, 'Uploaded JSON but got a bad response:\nSome error\n')305 # Throwing an exception upload_single_text_file shouldn't blow up _upload_json306 called = []307 upload_single_text_file_throws = True308 runner._upload_json('some.host', 'some.json', MockFileUploader)309 self.assertEqual(called, ['FileUploader', 'upload_single_text_file'])310 def test_collect_tests(self):311 runner = self.create_runner()312 filename = runner._host.filesystem.join(runner._base_path, 'inspector', 'a_file.html')313 runner._host.filesystem.files[filename] = 'a content'314 tests = runner._collect_tests()315 self.assertEqual(len(tests), 1)316 def _collect_tests_and_sort_test_name(self, runner):317 return sorted([test.test_name() for test in runner._collect_tests()])318 def test_collect_tests(self):319 runner = self.create_runner(args=['PerformanceTests/test1.html', 'test2.html'])320 def add_file(filename):321 runner._host.filesystem.files[runner._host.filesystem.join(runner._base_path, filename)] = 'some content'322 add_file('test1.html')323 add_file('test2.html')324 add_file('test3.html')325 runner._host.filesystem.chdir(runner._port.perf_tests_dir()[:runner._port.perf_tests_dir().rfind(runner._host.filesystem.sep)])326 self.assertEqual(self._collect_tests_and_sort_test_name(runner), ['test1.html', 'test2.html'])327 def test_collect_tests_with_skipped_list(self):328 runner = self.create_runner()329 def add_file(dirname, filename, content=True):330 dirname = runner._host.filesystem.join(runner._base_path, dirname) if dirname else runner._base_path331 runner._host.filesystem.maybe_make_directory(dirname)332 runner._host.filesystem.files[runner._host.filesystem.join(dirname, filename)] = content333 add_file('inspector', 'test1.html')334 add_file('inspector', 'unsupported_test1.html')335 add_file('inspector', 'test2.html')336 add_file('inspector/resources', 'resource_file.html')337 add_file('unsupported', 'unsupported_test2.html')338 runner._port.skipped_perf_tests = lambda: ['inspector/unsupported_test1.html', 'unsupported']339 self.assertEqual(self._collect_tests_and_sort_test_name(runner), ['inspector/test1.html', 'inspector/test2.html'])340 def test_collect_tests_with_page_load_svg(self):341 runner = self.create_runner()342 def add_file(dirname, filename, content=True):343 dirname = runner._host.filesystem.join(runner._base_path, dirname) if dirname else runner._base_path344 runner._host.filesystem.maybe_make_directory(dirname)345 runner._host.filesystem.files[runner._host.filesystem.join(dirname, filename)] = content346 add_file('PageLoad', 'some-svg-test.svg')347 tests = runner._collect_tests()348 self.assertEqual(len(tests), 1)349 self.assertEqual(tests[0].__class__.__name__, 'PageLoadingPerfTest')350 def test_parse_args(self):351 runner = self.create_runner()352 options, args = PerfTestsRunner._parse_args([353 '--build-directory=folder42',354 '--platform=platform42',355 '--builder-name', 'webkit-mac-1',356 '--build-number=56',357 '--time-out-ms=42',358 '--output-json-path=a/output.json',359 '--source-json-path=a/source.json',360 '--test-results-server=somehost',361 '--debug'])362 self.assertEqual(options.build, True)363 self.assertEqual(options.build_directory, 'folder42')364 self.assertEqual(options.platform, 'platform42')365 self.assertEqual(options.builder_name, 'webkit-mac-1')...

Full Screen

Full Screen

test_runner.py

Source:test_runner.py Github

copy

Full Screen

1# Copyright (c) Twisted Matrix Laboratories.2# See LICENSE for details.3"""4Tests for L{twisted.application.runner._runner}.5"""6from signal import SIGTERM7from io import BytesIO8import errno9from twisted.logger import (10 LogLevel, LogPublisher, LogBeginner,11 FileLogObserver, FilteringLogObserver, LogLevelFilterPredicate,12)13from ...runner import _runner14from .._exit import ExitStatus15from .._pidfile import PIDFile, NonePIDFile16from .._runner import Runner17from .test_pidfile import DummyFilePath18from .mockreactor import MockReactor19import twisted.trial.unittest20class RunnerTests(twisted.trial.unittest.TestCase):21 """22 Tests for L{Runner}.23 """24 def setUp(self):25 # Patch exit and kill so we can capture usage and prevent actual exits26 # and kills.27 self.exit = DummyExit()28 self.kill = DummyKill()29 self.patch(_runner, "exit", self.exit)30 self.patch(_runner, "kill", self.kill)31 # Patch getpid so we get a known result32 self.pid = 133733 self.pidFileContent = u"{}\n".format(self.pid).encode("utf-8")34 # Patch globalLogBeginner so that we aren't trying to install multiple35 # global log observers.36 self.stdout = BytesIO()37 self.stderr = BytesIO()38 self.stdio = DummyStandardIO(self.stdout, self.stderr)39 self.warnings = DummyWarningsModule()40 self.globalLogPublisher = LogPublisher()41 self.globalLogBeginner = LogBeginner(42 self.globalLogPublisher,43 self.stdio.stderr, self.stdio,44 self.warnings,45 )46 self.patch(_runner, "stderr", self.stderr)47 self.patch(_runner, "globalLogBeginner", self.globalLogBeginner)48 def test_runInOrder(self):49 """50 L{Runner.run} calls the expected methods in order.51 """52 runner = DummyRunner({})53 runner.run()54 self.assertEqual(55 runner.calledMethods,56 [57 "killIfRequested",58 "startLogging",59 "startReactor",60 "reactorExited",61 ]62 )63 def test_runUsesPIDFile(self):64 """65 L{Runner.run} uses the provided PID file.66 """67 pidFile = DummyPIDFile()68 runner = DummyRunner(pidFile=pidFile)69 self.assertFalse(pidFile.entered)70 self.assertFalse(pidFile.exited)71 runner.run()72 self.assertTrue(pidFile.entered)73 self.assertTrue(pidFile.exited)74 def test_runAlreadyRunning(self):75 """76 L{Runner.run} exits with L{ExitStatus.EX_USAGE} and the expected77 message if a process is already running that corresponds to the given78 PID file.79 """80 pidFile = PIDFile(DummyFilePath(self.pidFileContent))81 pidFile.isRunning = lambda: True82 runner = DummyRunner(pidFile=pidFile)83 runner.run()84 self.assertEqual(self.exit.status, ExitStatus.EX_CONFIG)85 self.assertEqual(self.exit.message, "Already running.")86 def test_killNotRequested(self):87 """88 L{Runner.killIfRequested} when C{kill} is false doesn't exit and89 doesn't indiscriminately murder anyone.90 """91 runner = Runner({})92 runner.killIfRequested()93 self.assertEqual(self.kill.calls, [])94 self.assertFalse(self.exit.exited)95 def test_killRequestedWithoutPIDFile(self):96 """97 L{Runner.killIfRequested} when C{kill} is true but C{pidFile} is98 L{nonePIDFile} exits with L{ExitStatus.EX_USAGE} and the expected99 message; and also doesn't indiscriminately murder anyone.100 """101 runner = Runner(kill=True)102 runner.killIfRequested()103 self.assertEqual(self.kill.calls, [])104 self.assertEqual(self.exit.status, ExitStatus.EX_USAGE)105 self.assertEqual(self.exit.message, "No PID file specified.")106 def test_killRequestedWithPIDFile(self):107 """108 L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}109 performs a targeted killing of the appropriate process.110 """111 pidFile = PIDFile(DummyFilePath(self.pidFileContent))112 runner = Runner(kill=True, pidFile=pidFile)113 runner.killIfRequested()114 self.assertEqual(self.kill.calls, [(self.pid, SIGTERM)])115 self.assertEqual(self.exit.status, ExitStatus.EX_OK)116 self.assertIdentical(self.exit.message, None)117 def test_killRequestedWithPIDFileCantRead(self):118 """119 L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}120 that it can't read exits with L{ExitStatus.EX_IOERR}.121 """122 pidFile = PIDFile(DummyFilePath(None))123 def read():124 raise OSError(errno.EACCES, "Permission denied")125 pidFile.read = read126 runner = Runner(kill=True, pidFile=pidFile)127 runner.killIfRequested()128 self.assertEqual(self.exit.status, ExitStatus.EX_IOERR)129 self.assertEqual(self.exit.message, "Unable to read PID file.")130 def test_killRequestedWithPIDFileEmpty(self):131 """132 L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}133 containing no value exits with L{ExitStatus.EX_DATAERR}.134 """135 pidFile = PIDFile(DummyFilePath(b""))136 runner = Runner(kill=True, pidFile=pidFile)137 runner.killIfRequested()138 self.assertEqual(self.exit.status, ExitStatus.EX_DATAERR)139 self.assertEqual(self.exit.message, "Invalid PID file.")140 def test_killRequestedWithPIDFileNotAnInt(self):141 """142 L{Runner.killIfRequested} when C{kill} is true and given a C{pidFile}143 containing a non-integer value exits with L{ExitStatus.EX_DATAERR}.144 """145 pidFile = PIDFile(DummyFilePath(b"** totally not a number, dude **"))146 runner = Runner(kill=True, pidFile=pidFile)147 runner.killIfRequested()148 self.assertEqual(self.exit.status, ExitStatus.EX_DATAERR)149 self.assertEqual(self.exit.message, "Invalid PID file.")150 def test_startLogging(self):151 """152 L{Runner.startLogging} sets up a filtering observer with a log level153 predicate set to the given log level that contains a file observer of154 the given type which writes to the given file.155 """156 logFile = BytesIO()157 # Patch the log beginner so that we don't try to start the already158 # running (started by trial) logging system.159 class LogBeginner(object):160 def beginLoggingTo(self, observers):161 LogBeginner.observers = observers162 self.patch(_runner, "globalLogBeginner", LogBeginner())163 # Patch FilteringLogObserver so we can capture its arguments164 class MockFilteringLogObserver(FilteringLogObserver):165 def __init__(166 self, observer, predicates,167 negativeObserver=lambda event: None168 ):169 MockFilteringLogObserver.observer = observer170 MockFilteringLogObserver.predicates = predicates171 FilteringLogObserver.__init__(172 self, observer, predicates, negativeObserver173 )174 self.patch(_runner, "FilteringLogObserver", MockFilteringLogObserver)175 # Patch FileLogObserver so we can capture its arguments176 class MockFileLogObserver(FileLogObserver):177 def __init__(self, outFile):178 MockFileLogObserver.outFile = outFile179 FileLogObserver.__init__(self, outFile, str)180 # Start logging181 runner = Runner(182 defaultLogLevel=LogLevel.critical,183 logFile=logFile,184 fileLogObserverFactory=MockFileLogObserver,185 )186 runner.startLogging()187 # Check for a filtering observer188 self.assertEqual(len(LogBeginner.observers), 1)189 self.assertIsInstance(LogBeginner.observers[0], FilteringLogObserver)190 # Check log level predicate with the correct default log level191 self.assertEqual(len(MockFilteringLogObserver.predicates), 1)192 self.assertIsInstance(193 MockFilteringLogObserver.predicates[0],194 LogLevelFilterPredicate195 )196 self.assertIdentical(197 MockFilteringLogObserver.predicates[0].defaultLogLevel,198 LogLevel.critical199 )200 # Check for a file observer attached to the filtering observer201 self.assertIsInstance(202 MockFilteringLogObserver.observer, MockFileLogObserver203 )204 # Check for the file we gave it205 self.assertIdentical(206 MockFilteringLogObserver.observer.outFile, logFile207 )208 def test_startReactorWithoutReactor(self):209 """210 L{Runner.startReactor} without the C{reactor} argument runs the default211 reactor.212 """213 reactor = MockReactor(self)214 self.patch(_runner, "defaultReactor", reactor)215 runner = Runner()216 runner.startReactor()217 self.assertTrue(reactor.hasInstalled)218 self.assertTrue(reactor.hasRun)219 def test_startReactorWithReactor(self):220 """221 L{Runner.startReactor} with the C{reactor} argument runs the given222 reactor.223 """224 reactor = MockReactor(self)225 runner = Runner(reactor=reactor)226 runner.startReactor()227 self.assertTrue(reactor.hasRun)228 def test_startReactorWhenRunning(self):229 """230 L{Runner.startReactor} ensures that C{whenRunning} is called with231 C{whenRunningArguments} when the reactor is running.232 """233 self._testHook("whenRunning", "startReactor")234 def test_whenRunningWithArguments(self):235 """236 L{Runner.whenRunning} calls C{whenRunning} with237 C{whenRunningArguments}.238 """239 self._testHook("whenRunning")240 def test_reactorExitedWithArguments(self):241 """242 L{Runner.whenRunning} calls C{reactorExited} with243 C{reactorExitedArguments}.244 """245 self._testHook("reactorExited")246 def _testHook(self, methodName, callerName=None):247 """248 Verify that the named hook is run with the expected arguments as249 specified by the arguments used to create the L{Runner}, when the250 specified caller is invoked.251 @param methodName: The name of the hook to verify.252 @type methodName: L{str}253 @param callerName: The name of the method that is expected to cause the254 hook to be called.255 If C{None}, use the L{Runner} method with the same name as the256 hook.257 @type callerName: L{str}258 """259 if callerName is None:260 callerName = methodName261 arguments = dict(a=object(), b=object(), c=object())262 argumentsSeen = []263 def hook(**arguments):264 argumentsSeen.append(arguments)265 runnerArguments = {266 methodName: hook,267 "{}Arguments".format(methodName): arguments.copy(),268 }269 runner = Runner(reactor=MockReactor(self), **runnerArguments)270 hookCaller = getattr(runner, callerName)271 hookCaller()272 self.assertEqual(len(argumentsSeen), 1)273 self.assertEqual(argumentsSeen[0], arguments)274class DummyRunner(Runner):275 """276 Stub for L{Runner}.277 Keep track of calls to some methods without actually doing anything.278 """279 def __init__(self, *args, **kwargs):280 Runner.__init__(self, *args, **kwargs)281 self.calledMethods = []282 def killIfRequested(self):283 self.calledMethods.append("killIfRequested")284 def startLogging(self):285 self.calledMethods.append("startLogging")286 def startReactor(self):287 self.calledMethods.append("startReactor")288 def reactorExited(self):289 self.calledMethods.append("reactorExited")290class DummyPIDFile(NonePIDFile):291 """292 Stub for L{PIDFile}.293 Tracks context manager entry/exit without doing anything.294 """295 def __init__(self):296 NonePIDFile.__init__(self)297 self.entered = False298 self.exited = False299 def __enter__(self):300 self.entered = True301 return self302 def __exit__(self, excType, excValue, traceback):303 self.exited = True304class DummyExit(object):305 """306 Stub for L{exit} that remembers whether it's been called and, if it has,307 what arguments it was given.308 """309 def __init__(self):310 self.exited = False311 def __call__(self, status, message=None):312 assert not self.exited313 self.status = status314 self.message = message315 self.exited = True316class DummyKill(object):317 """318 Stub for L{os.kill} that remembers whether it's been called and, if it has,319 what arguments it was given.320 """321 def __init__(self):322 self.calls = []323 def __call__(self, pid, sig):324 self.calls.append((pid, sig))325class DummyStandardIO(object):326 """327 Stub for L{sys} which provides L{BytesIO} streams as stdout and stderr.328 """329 def __init__(self, stdout, stderr):330 self.stdout = stdout331 self.stderr = stderr332class DummyWarningsModule(object):333 """334 Stub for L{warnings} which provides a C{showwarning} method that is a no-op.335 """336 def showwarning(*args, **kwargs):337 """338 Do nothing.339 @param args: ignored.340 @param kwargs: ignored....

Full Screen

Full Screen

top_7_stress.py

Source:top_7_stress.py Github

copy

Full Screen

1# Copyright 2014 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4from telemetry.page import page as page_module5from telemetry.page import shared_page_state6from telemetry import story7def _GetCurrentLocation(action_runner):8 return action_runner.EvaluateJavaScript('document.location.href')9def _WaitForLocationChange(action_runner, old_href):10 action_runner.WaitForJavaScriptCondition(11 'document.location.href != "%s"' % old_href)12class Top7StressPage(page_module.Page):13 def __init__(self, url, page_set, name=''):14 super(Top7StressPage, self).__init__(15 url=url, page_set=page_set, name=name,16 shared_page_state_class=shared_page_state.SharedDesktopPageState,17 credentials_path = 'data/credentials.json')18 self.archive_data_file = 'data/top_7_stress.json'19 def RunPageInteractions(self, action_runner):20 raise NotImplementedError()21class GoogleWebSearchPage(Top7StressPage):22 """ Why: top google property; a google tab is often open """23 def __init__(self, page_set):24 super(GoogleWebSearchPage, self).__init__(25 url='https://www.google.com/#hl=en&q=barack+obama',26 page_set=page_set)27 def RunNavigateSteps(self, action_runner):28 super(GoogleWebSearchPage, self).RunNavigateSteps(action_runner)29 action_runner.WaitForElement(text='Next')30 def RunPageInteractions(self, action_runner):31 with action_runner.CreateGestureInteraction('ScrollAction'):32 action_runner.ScrollPage()33 old_href = _GetCurrentLocation(action_runner)34 action_runner.ClickElement(text='Next')35 _WaitForLocationChange(action_runner, old_href)36 action_runner.WaitForElement(text='Next')37 with action_runner.CreateGestureInteraction('ScrollAction'):38 action_runner.ScrollPage()39 old_href = _GetCurrentLocation(action_runner)40 action_runner.ClickElement(text='Next')41 _WaitForLocationChange(action_runner, old_href)42 action_runner.WaitForElement(text='Next')43 with action_runner.CreateGestureInteraction('ScrollAction'):44 action_runner.ScrollPage()45 old_href = _GetCurrentLocation(action_runner)46 action_runner.ClickElement(text='Next')47 _WaitForLocationChange(action_runner, old_href)48 action_runner.WaitForElement(text='Previous')49 with action_runner.CreateGestureInteraction('ScrollAction'):50 action_runner.ScrollPage()51 old_href = _GetCurrentLocation(action_runner)52 action_runner.ClickElement(text='Previous')53 _WaitForLocationChange(action_runner, old_href)54 action_runner.WaitForElement(text='Previous')55 with action_runner.CreateGestureInteraction('ScrollAction'):56 action_runner.ScrollPage()57 old_href = _GetCurrentLocation(action_runner)58 action_runner.ClickElement(text='Previous')59 _WaitForLocationChange(action_runner, old_href)60 action_runner.WaitForElement(text='Previous')61 with action_runner.CreateGestureInteraction('ScrollAction'):62 action_runner.ScrollPage()63 old_href = _GetCurrentLocation(action_runner)64 action_runner.ClickElement(text='Previous')65 _WaitForLocationChange(action_runner, old_href)66 action_runner.WaitForElement(text='Images')67 with action_runner.CreateGestureInteraction('ScrollAction'):68 action_runner.ScrollPage()69 old_href = _GetCurrentLocation(action_runner)70 action_runner.ClickElement(text='Images')71 _WaitForLocationChange(action_runner, old_href)72 action_runner.WaitForElement(text='Images')73class GmailPage(Top7StressPage):74 """ Why: productivity, top google properties """75 def __init__(self, page_set):76 super(GmailPage, self).__init__(77 url='https://mail.google.com/mail/',78 page_set=page_set)79 self.credentials = 'google'80 def RunNavigateSteps(self, action_runner):81 super(GmailPage, self).RunNavigateSteps(action_runner)82 action_runner.WaitForJavaScriptCondition(83 'window.gmonkey !== undefined &&'84 'document.getElementById("gb") !== null')85 def RunPageInteractions(self, action_runner):86 old_href = _GetCurrentLocation(action_runner)87 action_runner.ClickElement(88 'a[href="https://mail.google.com/mail/u/0/?shva=1#starred"]')89 _WaitForLocationChange(action_runner, old_href)90 old_href = _GetCurrentLocation(action_runner)91 action_runner.ClickElement(92 'a[href="https://mail.google.com/mail/u/0/?shva=1#inbox"]')93 _WaitForLocationChange(action_runner, old_href)94class GoogleCalendarPage(Top7StressPage):95 """ Why: productivity, top google properties """96 def __init__(self, page_set):97 super(GoogleCalendarPage, self).__init__(98 url='https://www.google.com/calendar/',99 page_set=page_set)100 self.credentials = 'google'101 def RunNavigateSteps(self, action_runner):102 super(GoogleCalendarPage, self).RunNavigateSteps(action_runner)103 action_runner.Wait(2)104 action_runner.WaitForElement('div[class~="navForward"]')105 action_runner.ExecuteJavaScript('''106 (function() {107 var elem = document.createElement('meta');108 elem.name='viewport';109 elem.content='initial-scale=1';110 document.body.appendChild(elem);111 })();''')112 action_runner.Wait(1)113 def RunPageInteractions(self, action_runner):114 action_runner.ClickElement('div[class~="navForward"]')115 action_runner.Wait(2)116 action_runner.WaitForElement('div[class~="navForward"]')117 action_runner.ClickElement('div[class~="navForward"]')118 action_runner.Wait(2)119 action_runner.WaitForElement('div[class~="navForward"]')120 action_runner.ClickElement('div[class~="navForward"]')121 action_runner.Wait(2)122 action_runner.WaitForElement('div[class~="navForward"]')123 action_runner.ClickElement('div[class~="navForward"]')124 action_runner.Wait(2)125 action_runner.WaitForElement('div[class~="navBack"]')126 action_runner.ClickElement('div[class~="navBack"]')127 action_runner.Wait(2)128 action_runner.WaitForElement('div[class~="navBack"]')129 action_runner.ClickElement('div[class~="navBack"]')130 action_runner.Wait(2)131 action_runner.WaitForElement('div[class~="navBack"]')132 action_runner.ClickElement('div[class~="navBack"]')133 action_runner.Wait(2)134 action_runner.WaitForElement('div[class~="navBack"]')135 action_runner.ClickElement('div[class~="navBack"]')136 action_runner.Wait(2)137 action_runner.WaitForElement('div[class~="navBack"]')138class GooglePlusPage(Top7StressPage):139 """ Why: social; top google property; Public profile; infinite scrolls """140 def __init__(self, page_set):141 super(GooglePlusPage, self).__init__(142 url='https://plus.google.com/110031535020051778989/posts',143 page_set=page_set)144 self.credentials = 'google'145 def RunNavigateSteps(self, action_runner):146 super(GooglePlusPage, self).RunNavigateSteps(action_runner)147 action_runner.WaitForElement(text='Home')148 def RunPageInteractions(self, action_runner):149 action_runner.ClickElement(text='Home')150 action_runner.Wait(2)151 action_runner.WaitForElement(text='Profile')152 action_runner.ClickElement(text='Profile')153 action_runner.Wait(2)154 action_runner.WaitForElement(text='Explore')155 action_runner.ClickElement(text='Explore')156 action_runner.Wait(2)157 action_runner.WaitForElement(text='Events')158 action_runner.ClickElement(text='Events')159 action_runner.Wait(2)160 action_runner.WaitForElement(text='Communities')161 action_runner.ClickElement(text='Communities')162 action_runner.Wait(2)163 action_runner.WaitForElement(text='Home')164class BlogspotPage(Top7StressPage):165 """ Why: #11 (Alexa global), google property; some blogger layouts have166 infinite scroll but more interesting """167 def __init__(self, page_set):168 super(BlogspotPage, self).__init__(169 url='http://googlewebmastercentral.blogspot.com/',170 page_set=page_set,171 name='Blogger')172 def RunNavigateSteps(self, action_runner):173 super(BlogspotPage, self).RunNavigateSteps(action_runner)174 action_runner.WaitForElement(text='accessibility')175 def RunPageInteractions(self, action_runner):176 action_runner.ClickElement(text='accessibility')177 action_runner.WaitForNavigate()178 with action_runner.CreateGestureInteraction('ScrollAction'):179 action_runner.ScrollPage()180 # Insert 300ms wait to simulate user finger movement,181 # and ensure scheduling of idle tasks.182 action_runner.Wait(0.3)183 action_runner.ClickElement(text='advanced')184 action_runner.WaitForNavigate()185 with action_runner.CreateGestureInteraction('ScrollAction'):186 action_runner.ScrollPage()187 action_runner.Wait(0.3)188 action_runner.ClickElement(text='beginner')189 action_runner.WaitForNavigate()190 with action_runner.CreateGestureInteraction('ScrollAction'):191 action_runner.ScrollPage()192 action_runner.Wait(0.3)193 action_runner.ClickElement(text='Home')194 action_runner.WaitForNavigate()195class WordpressPage(Top7StressPage):196 """ Why: #18 (Alexa global), Picked an interesting post """197 def __init__(self, page_set):198 super(WordpressPage, self).__init__(199 # pylint: disable=line-too-long200 url='http://en.blog.wordpress.com/2012/09/04/freshly-pressed-editors-picks-for-august-2012/',201 page_set=page_set,202 name='Wordpress')203 def RunNavigateSteps(self, action_runner):204 super(WordpressPage, self).RunNavigateSteps(action_runner)205 action_runner.WaitForElement(206 # pylint: disable=line-too-long207 'a[href="http://en.blog.wordpress.com/2012/08/30/new-themes-able-and-sight/"]')208 def RunPageInteractions(self, action_runner):209 with action_runner.CreateGestureInteraction('ScrollAction'):210 action_runner.ScrollPage()211 # Insert 300ms wait to simulate user finger movement,212 # and ensure scheduling of idle tasks.213 action_runner.Wait(0.3)214 action_runner.ClickElement(215 # pylint: disable=line-too-long216 'a[href="http://en.blog.wordpress.com/2012/08/30/new-themes-able-and-sight/"]')217 action_runner.WaitForNavigate()218 with action_runner.CreateGestureInteraction('ScrollAction'):219 action_runner.ScrollPage()220 action_runner.Wait(0.3)221 action_runner.ClickElement(text='Features')222 action_runner.WaitForNavigate()223 with action_runner.CreateGestureInteraction('ScrollAction'):224 action_runner.ScrollPage()225 action_runner.Wait(0.3)226 action_runner.ClickElement(text='News')227 action_runner.WaitForNavigate()228 with action_runner.CreateGestureInteraction('ScrollAction'):229 action_runner.ScrollPage()230class FacebookPage(Top7StressPage):231 """ Why: top social,Public profile """232 def __init__(self, page_set):233 super(FacebookPage, self).__init__(234 url='https://www.facebook.com/barackobama',235 page_set=page_set,236 name='Facebook')237 self.credentials = 'facebook2'238 def RunNavigateSteps(self, action_runner):239 super(FacebookPage, self).RunNavigateSteps(action_runner)240 action_runner.WaitForElement(text='About')241 def RunPageInteractions(self, action_runner):242 # Scroll and wait for the next page to be loaded.243 with action_runner.CreateGestureInteraction('ScrollAction'):244 action_runner.ScrollPage()245 action_runner.WaitForJavaScriptCondition(246 'document.documentElement.scrollHeight - window.innerHeight - '247 'window.pageYOffset > 0')248 # Scroll and wait again.249 with action_runner.CreateGestureInteraction('ScrollAction'):250 action_runner.ScrollPage()251 action_runner.WaitForJavaScriptCondition(252 'document.documentElement.scrollHeight - window.innerHeight - '253 'window.pageYOffset > 0')254class Top7StressPageSet(story.StorySet):255 """ Pages hand-picked for stress testing. """256 def __init__(self):257 super(Top7StressPageSet, self).__init__(258 archive_data_file='data/top_7_stress.json',259 cloud_storage_bucket=story.PARTNER_BUCKET)260 self.AddStory(GoogleWebSearchPage(self))261 self.AddStory(GmailPage(self))262 self.AddStory(GoogleCalendarPage(self))263 self.AddStory(GooglePlusPage(self))264 self.AddStory(BlogspotPage(self))265 self.AddStory(WordpressPage(self))...

Full Screen

Full Screen

test_eval_hook.py

Source:test_eval_hook.py Github

copy

Full Screen

1import os.path as osp2import tempfile3import unittest.mock as mock4from collections import OrderedDict5from unittest.mock import MagicMock, patch6import pytest7import torch8import torch.nn as nn9from mmcv.runner import EpochBasedRunner, build_optimizer10from mmcv.utils import get_logger11from torch.utils.data import DataLoader, Dataset12from mmdet.core import DistEvalHook, EvalHook13class ExampleDataset(Dataset):14 def __init__(self):15 self.index = 016 self.eval_result = [0.1, 0.4, 0.3, 0.7, 0.2, 0.05, 0.4, 0.6]17 def __getitem__(self, idx):18 results = dict(imgs=torch.tensor([1]))19 return results20 def __len__(self):21 return 122 @mock.create_autospec23 def evaluate(self, results, logger=None):24 pass25class EvalDataset(ExampleDataset):26 def evaluate(self, results, logger=None):27 mean_ap = self.eval_result[self.index]28 output = OrderedDict(mAP=mean_ap, index=self.index, score=mean_ap)29 self.index += 130 return output31class ExampleModel(nn.Module):32 def __init__(self):33 super().__init__()34 self.conv = nn.Linear(1, 1)35 self.test_cfg = None36 def forward(self, imgs, rescale=False, return_loss=False):37 return imgs38 def train_step(self, data_batch, optimizer, **kwargs):39 outputs = {40 'loss': 0.5,41 'log_vars': {42 'accuracy': 0.9843 },44 'num_samples': 145 }46 return outputs47@pytest.mark.skipif(48 not torch.cuda.is_available(), reason='requires CUDA support')49@patch('mmdet.apis.single_gpu_test', MagicMock)50@patch('mmdet.apis.multi_gpu_test', MagicMock)51@pytest.mark.parametrize('EvalHookCls', (EvalHook, DistEvalHook))52def test_eval_hook(EvalHookCls):53 with pytest.raises(TypeError):54 # dataloader must be a pytorch DataLoader55 test_dataset = ExampleDataset()56 data_loader = [57 DataLoader(58 test_dataset,59 batch_size=1,60 sampler=None,61 num_worker=0,62 shuffle=False)63 ]64 EvalHookCls(data_loader)65 with pytest.raises(KeyError):66 # rule must be in keys of rule_map67 test_dataset = ExampleDataset()68 data_loader = DataLoader(69 test_dataset,70 batch_size=1,71 sampler=None,72 num_workers=0,73 shuffle=False)74 EvalHookCls(data_loader, save_best='auto', rule='unsupport')75 with pytest.raises(ValueError):76 # key_indicator must be valid when rule_map is None77 test_dataset = ExampleDataset()78 data_loader = DataLoader(79 test_dataset,80 batch_size=1,81 sampler=None,82 num_workers=0,83 shuffle=False)84 EvalHookCls(data_loader, save_best='unsupport')85 optimizer_cfg = dict(86 type='SGD', lr=0.01, momentum=0.9, weight_decay=0.0001)87 test_dataset = ExampleDataset()88 loader = DataLoader(test_dataset, batch_size=1)89 model = ExampleModel()90 optimizer = build_optimizer(model, optimizer_cfg)91 data_loader = DataLoader(test_dataset, batch_size=1)92 eval_hook = EvalHookCls(data_loader, save_best=None)93 with tempfile.TemporaryDirectory() as tmpdir:94 logger = get_logger('test_eval')95 runner = EpochBasedRunner(96 model=model,97 batch_processor=None,98 optimizer=optimizer,99 work_dir=tmpdir,100 logger=logger)101 runner.register_hook(eval_hook)102 runner.run([loader], [('train', 1)], 1)103 assert runner.meta is None or 'best_score' not in runner.meta[104 'hook_msgs']105 assert runner.meta is None or 'best_ckpt' not in runner.meta[106 'hook_msgs']107 # when `save_best` is set to 'auto', first metric will be used.108 loader = DataLoader(EvalDataset(), batch_size=1)109 model = ExampleModel()110 data_loader = DataLoader(EvalDataset(), batch_size=1)111 eval_hook = EvalHookCls(data_loader, interval=1, save_best='auto')112 with tempfile.TemporaryDirectory() as tmpdir:113 logger = get_logger('test_eval')114 runner = EpochBasedRunner(115 model=model,116 batch_processor=None,117 optimizer=optimizer,118 work_dir=tmpdir,119 logger=logger)120 runner.register_checkpoint_hook(dict(interval=1))121 runner.register_hook(eval_hook)122 runner.run([loader], [('train', 1)], 8)123 real_path = osp.join(tmpdir, 'best_mAP_epoch_4.pth')124 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)125 assert runner.meta['hook_msgs']['best_score'] == 0.7126 loader = DataLoader(EvalDataset(), batch_size=1)127 model = ExampleModel()128 data_loader = DataLoader(EvalDataset(), batch_size=1)129 eval_hook = EvalHookCls(data_loader, interval=1, save_best='mAP')130 with tempfile.TemporaryDirectory() as tmpdir:131 logger = get_logger('test_eval')132 runner = EpochBasedRunner(133 model=model,134 batch_processor=None,135 optimizer=optimizer,136 work_dir=tmpdir,137 logger=logger)138 runner.register_checkpoint_hook(dict(interval=1))139 runner.register_hook(eval_hook)140 runner.run([loader], [('train', 1)], 8)141 real_path = osp.join(tmpdir, 'best_mAP_epoch_4.pth')142 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)143 assert runner.meta['hook_msgs']['best_score'] == 0.7144 data_loader = DataLoader(EvalDataset(), batch_size=1)145 eval_hook = EvalHookCls(146 data_loader, interval=1, save_best='score', rule='greater')147 with tempfile.TemporaryDirectory() as tmpdir:148 logger = get_logger('test_eval')149 runner = EpochBasedRunner(150 model=model,151 batch_processor=None,152 optimizer=optimizer,153 work_dir=tmpdir,154 logger=logger)155 runner.register_checkpoint_hook(dict(interval=1))156 runner.register_hook(eval_hook)157 runner.run([loader], [('train', 1)], 8)158 real_path = osp.join(tmpdir, 'best_score_epoch_4.pth')159 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)160 assert runner.meta['hook_msgs']['best_score'] == 0.7161 data_loader = DataLoader(EvalDataset(), batch_size=1)162 eval_hook = EvalHookCls(data_loader, save_best='mAP', rule='less')163 with tempfile.TemporaryDirectory() as tmpdir:164 logger = get_logger('test_eval')165 runner = EpochBasedRunner(166 model=model,167 batch_processor=None,168 optimizer=optimizer,169 work_dir=tmpdir,170 logger=logger)171 runner.register_checkpoint_hook(dict(interval=1))172 runner.register_hook(eval_hook)173 runner.run([loader], [('train', 1)], 8)174 real_path = osp.join(tmpdir, 'best_mAP_epoch_6.pth')175 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)176 assert runner.meta['hook_msgs']['best_score'] == 0.05177 data_loader = DataLoader(EvalDataset(), batch_size=1)178 eval_hook = EvalHookCls(data_loader, save_best='mAP')179 with tempfile.TemporaryDirectory() as tmpdir:180 logger = get_logger('test_eval')181 runner = EpochBasedRunner(182 model=model,183 batch_processor=None,184 optimizer=optimizer,185 work_dir=tmpdir,186 logger=logger)187 runner.register_checkpoint_hook(dict(interval=1))188 runner.register_hook(eval_hook)189 runner.run([loader], [('train', 1)], 2)190 real_path = osp.join(tmpdir, 'best_mAP_epoch_2.pth')191 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)192 assert runner.meta['hook_msgs']['best_score'] == 0.4193 resume_from = osp.join(tmpdir, 'latest.pth')194 loader = DataLoader(ExampleDataset(), batch_size=1)195 eval_hook = EvalHookCls(data_loader, save_best='mAP')196 runner = EpochBasedRunner(197 model=model,198 batch_processor=None,199 optimizer=optimizer,200 work_dir=tmpdir,201 logger=logger)202 runner.register_checkpoint_hook(dict(interval=1))203 runner.register_hook(eval_hook)204 runner.resume(resume_from)205 runner.run([loader], [('train', 1)], 8)206 real_path = osp.join(tmpdir, 'best_mAP_epoch_4.pth')207 assert runner.meta['hook_msgs']['best_ckpt'] == osp.realpath(real_path)...

Full Screen

Full Screen

tether_task_runner.py

Source:tether_task_runner.py Github

copy

Full Screen

1#!/usr/bin/env python2##3# Licensed to the Apache Software Foundation (ASF) under one4# or more contributor license agreements. See the NOTICE file5# distributed with this work for additional information6# regarding copyright ownership. The ASF licenses this file7# to you under the Apache License, Version 2.0 (the8# "License"); you may not use this file except in compliance9# with the License. You may obtain a copy of the License at10#11# https://www.apache.org/licenses/LICENSE-2.012#13# Unless required by applicable law or agreed to in writing, software14# distributed under the License is distributed on an "AS IS" BASIS,15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.16# See the License for the specific language governing permissions and17# limitations under the License.18from __future__ import absolute_import, division, print_function19import logging20import sys21import threading22import traceback23import weakref24import avro.tether.tether_task25import avro.tether.util26from avro import ipc27try:28 import BaseHTTPServer as http_server # type: ignore29except ImportError:30 import http.server as http_server # type: ignore31__all__ = ["TaskRunner"]32class TaskRunnerResponder(ipc.Responder):33 """34 The responder for the tethered process35 """36 def __init__(self,runner):37 """38 Param39 ----------------------------------------------------------40 runner - Instance of TaskRunner41 """42 ipc.Responder.__init__(self, avro.tether.tether_task.inputProtocol)43 self.log=logging.getLogger("TaskRunnerResponder")44 # should we use weak references to avoid circular references?45 # We use weak references b\c self.runner owns this instance of TaskRunnerResponder46 if isinstance(runner,weakref.ProxyType):47 self.runner=runner48 else:49 self.runner=weakref.proxy(runner)50 self.task=weakref.proxy(runner.task)51 def invoke(self, message, request):52 try:53 if message.name=='configure':54 self.log.info("TetherTaskRunner: Recieved configure")55 self.task.configure(request["taskType"],request["inSchema"],request["outSchema"])56 elif message.name=='partitions':57 self.log.info("TetherTaskRunner: Recieved partitions")58 try:59 self.task.set_partitions(request["partitions"])60 except Exception as e:61 self.log.error("Exception occured while processing the partitions message: Message:\n"+traceback.format_exc())62 raise63 elif message.name=='input':64 self.log.info("TetherTaskRunner: Recieved input")65 self.task.input(request["data"],request["count"])66 elif message.name=='abort':67 self.log.info("TetherTaskRunner: Recieved abort")68 self.runner.close()69 elif message.name=='complete':70 self.log.info("TetherTaskRunner: Recieved complete")71 self.task.complete()72 self.task.close()73 self.runner.close()74 else:75 self.log.warning("TetherTaskRunner: recieved unknown message {0}".format(message.name))76 except Exception as e:77 self.log.error("Error occured while processing message: {0}".format(message.name))78 emsg=traceback.format_exc()79 self.task.fail(emsg)80 return None81def HTTPHandlerGen(runner):82 """83 This is a class factory for the HTTPHandler. We need84 a factory because we need a reference to the runner85 Parameters86 -----------------------------------------------------------------87 runner - instance of the task runner88 """89 if not(isinstance(runner,weakref.ProxyType)):90 runnerref=weakref.proxy(runner)91 else:92 runnerref=runner93 class TaskRunnerHTTPHandler(http_server.BaseHTTPRequestHandler):94 """Create a handler for the parent.95 """96 runner=runnerref97 def __init__(self,*args,**param):98 """99 """100 http_server.BaseHTTPRequestHandler.__init__(self,*args,**param)101 def do_POST(self):102 self.responder =TaskRunnerResponder(self.runner)103 call_request_reader = ipc.FramedReader(self.rfile)104 call_request = call_request_reader.read_framed_message()105 resp_body = self.responder.respond(call_request)106 self.send_response(200)107 self.send_header('Content-Type', 'avro/binary')108 self.end_headers()109 resp_writer = ipc.FramedWriter(self.wfile)110 resp_writer.write_framed_message(resp_body)111 return TaskRunnerHTTPHandler112class TaskRunner(object):113 """This class ties together the server handling the requests from114 the parent process and the instance of TetherTask which actually115 implements the logic for the mapper and reducer phases116 """117 def __init__(self,task):118 """119 Construct the runner120 Parameters121 ---------------------------------------------------------------122 task - An instance of tether task123 """124 self.log=logging.getLogger("TaskRunner:")125 if not(isinstance(task, avro.tether.tether_task.TetherTask)):126 raise ValueError("task must be an instance of tether task")127 self.task=task128 self.server=None129 self.sthread=None130 def start(self,outputport=None,join=True):131 """132 Start the server133 Parameters134 -------------------------------------------------------------------135 outputport - (optional) The port on which the parent process is listening136 for requests from the task.137 - This will typically be supplied by an environment variable138 we allow it to be supplied as an argument mainly for debugging139 join - (optional) If set to fault then we don't issue a join to block140 until the thread excecuting the server terminates.141 This is mainly for debugging. By setting it to false,142 we can resume execution in this thread so that we can do additional143 testing144 """145 port = avro.tether.util.find_port()146 address=("localhost",port)147 def thread_run(task_runner=None):148 task_runner.server = http_server.HTTPServer(address, HTTPHandlerGen(task_runner))149 task_runner.server.allow_reuse_address = True150 task_runner.server.serve_forever()151 # create a separate thread for the http server152 sthread=threading.Thread(target=thread_run,kwargs={"task_runner":self})153 sthread.start()154 self.sthread=sthread155 # This needs to run in a separat thread b\c serve_forever() blocks156 self.task.open(port,clientPort=outputport)157 # wait for the other thread to finish158 if (join):159 self.task.ready_for_shutdown.wait()160 self.server.shutdown()161 # should we do some kind of check to make sure it exits162 self.log.info("Shutdown the logger")163 # shutdown the logging164 logging.shutdown()165 def close(self):166 """167 Handler for the close message168 """169 self.task.close()170if __name__ == '__main__':171 # TODO::Make the logging level a parameter we can set172 # logging.basicConfig(level=logging.INFO,filename='/tmp/log',filemode='w')173 logging.basicConfig(level=logging.INFO)174 if (len(sys.argv)<=1):175 print("Error: tether_task_runner.__main__: Usage: tether_task_runner task_package.task_module.TaskClass")176 raise ValueError("Usage: tether_task_runner task_package.task_module.TaskClass")177 fullcls=sys.argv[1]178 mod,cname=fullcls.rsplit(".",1)179 logging.info("tether_task_runner.__main__: Task: {0}".format(fullcls))180 modobj=__import__(mod,fromlist=cname)181 taskcls=getattr(modobj,cname)182 task=taskcls()183 runner=TaskRunner(task=task)...

Full Screen

Full Screen

join.py

Source:join.py Github

copy

Full Screen

1from django.core.management.base import BaseCommand2from libs.dump import dump_sql3from apps.runner.models import Book, Publisher, Author4class Command(BaseCommand):5 # manage.pyで動作させた時に引数を受け取れるようにする6 args = '引数'7 def handle(self, *args, **options):8 if 'inner' in args:9 self.inner_join()10 if 'left' in args:11 self.left_join()12 if 'backward' in args:13 self.backward_join()14 if 'multi' in args:15 self.multi_join()16 def inner_join(self):17 # 微妙に違うSQLが出される18 # INNER JOINするけど、SELECT句にはpublisherの列は無い19 b1 = Book.objects.filter(publisher__num_awards=5).values()20 #=> SELECT "runner_book"."id", "runner_book"."name", "runner_book"."pages", "runner_book"."price",21 # "runner_book"."rating", "runner_book"."publisher_id", "runner_book"."pubdate"22 # FROM "runner_book"23 # INNER JOIN "runner_publisher" ON ( "runner_book"."publisher_id" = "runner_publisher"."id" )24 # WHERE "runner_publisher"."num_awards" = %s25 # LIMIT 21 - PARAMS = (5,)26 print(b1)27 dump_sql()28 # INNER JOINし、SELECT句にもpublisherの列がある29 b2 = Book.objects.select_related().all().values()30 #=> SELECT "runner_book"."id", "runner_book"."name", "runner_book"."pages", "runner_book"."price",31 # "runner_book"."rating", "runner_book"."publisher_id", "runner_book"."pubdate",32 # "runner_publisher"."id", "runner_publisher"."name", "runner_publisher"."num_awards"33 # FROM "runner_book"34 # INNER JOIN "runner_publisher" ON ( "runner_book"."publisher_id" = "runner_publisher"."id" )35 # LIMIT 21 - PARAMS = ()36 print(b2)37 dump_sql()38 # INNER JOINとWHEREを使って、全列出す39 b3 = Book.objects.filter(publisher__num_awards=5).select_related().all().values()40 #=> SELECT "runner_book"."id", "runner_book"."name", "runner_book"."pages", "runner_book"."price",41 # "runner_book"."rating", "runner_book"."publisher_id", "runner_book"."pubdate",42 # "runner_publisher"."id", "runner_publisher"."name", "runner_publisher"."num_awards"43 # FROM "runner_book"44 # INNER JOIN "runner_publisher" ON ( "runner_book"."publisher_id" = "runner_publisher"."id" )45 # WHERE "runner_publisher"."num_awards" = %s46 # LIMIT 21 - PARAMS = (5,)47 print(b3)48 dump_sql()49 def left_join(self):50 # id IS NULL51 # p1 = Publisher.objects.filter(book__isnull=False) # これだと、INNER JOIN になる52 p1 = Publisher.objects.filter(book__isnull=True).values()53 #=> SELECT *54 # FROM "runner_publisher"55 # LEFT OUTER JOIN "runner_book" ON ( "runner_publisher"."id" = "runner_book"."publisher_id" )56 # WHERE "runner_book"."id" IS NULL57 print(p1)58 dump_sql()59 # publisher_id IS NULL60 p2 = Publisher.objects.filter(book__publisher__isnull=True).values()61 #=> SELECT *62 # FROM "runner_publisher"63 # LEFT OUTER JOIN "runner_book" ON ( "runner_publisher"."id" = "runner_book"."publisher_id" )64 # WHERE "runner_book"."publisher_id" IS NULL65 print(p2)66 dump_sql()67 def backward_join(self):68 # p = Author.objects.filter(age__gt=5)や69 # p = Author.objects.all() だと逆引きできない70 a = Author.objects.get(pk=1).book_set.all().values()71 #=> SELECT "runner_book"."id", "runner_book"."name", "runner_book"."pages", "runner_book"."price",72 # "runner_book"."rating", "runner_book"."publisher_id", "runner_book"."pubdate"73 # FROM "runner_book"74 # INNER JOIN "runner_book_authors" ON ( "runner_book"."id" = "runner_book_authors"."book_id" )75 # WHERE "runner_book_authors"."author_id" = %s76 # LIMIT 21 - PARAMS = (1,)77 print(a)78 dump_sql()79 def multi_join(self):80 a = Author.objects.filter(book__publisher__name='Pub2').values()81 #=> SELECT "runner_author"."id", "runner_author"."name", "runner_author"."age"82 # FROM "runner_author"83 # INNER JOIN "runner_book_authors" ON ( "runner_author"."id" = "runner_book_authors"."author_id" )84 # INNER JOIN "runner_book" ON ( "runner_book_authors"."book_id" = "runner_book"."id" )85 # INNER JOIN "runner_publisher" ON ( "runner_book"."publisher_id" = "runner_publisher"."id" )86 # WHERE "runner_publisher"."name" = %s87 # LIMIT 21 - PARAMS = ('Pub2',)88 print(a)...

Full Screen

Full Screen

test_add.py

Source:test_add.py Github

copy

Full Screen

1import unittest2import os3from os.path import exists, getsize4from pathlib import Path5import click6from click.testing import CliRunner7from mpl_pymanage.add import add_package, add_license8from mpl_pymanage.utils import shell, file_content9from mpl_pymanage.mpl_pymanage import cli10class TestAdd(unittest.TestCase):11 def test_add_package(self):12 runner = CliRunner()13 with runner.isolated_filesystem():14 add_package('level1.level2.level3')15 self.assertTrue(exists('level1/level2/level3/__init__.py'))16 def test_add_license(self):17 runner = CliRunner()18 with runner.isolated_filesystem():19 shell('touch setup.py')20 cwd = Path.cwd()21 add_license(cwd, 'MIT')22 self.assertTrue(exists(cwd / 'LICENSE'))23 def test_add_license_with_string(self):24 runner = CliRunner()25 with runner.isolated_filesystem():26 shell('touch setup.py')27 add_license('.', 'MIT')28 self.assertTrue(exists(Path.cwd() / 'LICENSE'))29 def test_add_license_no_setup_py(self):30 runner = CliRunner()31 with runner.isolated_filesystem():32 add_license('.', 'MIT')33 self.assertTrue(exists(Path.cwd() / 'LICENSE'))34 def test_add_license_existing(self):35 runner = CliRunner()36 with runner.isolated_filesystem():37 with self.assertRaises(click.BadParameter):38 shell('touch LICENSE')39 add_license('.', 'MIT')40 self.assertTrue(exists(Path.cwd() / 'LICENSE'))41 def test_add_gitignore(self):42 runner = CliRunner()43 with runner.isolated_filesystem():44 runner.invoke(cli, ['add', 'gitignore'])45 cwd = Path.cwd()46 self.assertTrue(exists(cwd / '.gitignore'))47 def _test_add_setup_py(self):48 runner = CliRunner()49 with runner.isolated_filesystem():50 runner.invoke(cli, ['add', 'setup.py', '--name', 'test_project'], input='\n\n\n\n\n')51 cwd = Path.cwd()52 self.assertTrue(exists(cwd / 'setup.py'))53 def _test_add_setup_py_overwrite(self):54 runner = CliRunner()55 with runner.isolated_filesystem():56 shell('touch setup.py')57 cwd = Path.cwd()58 assert getsize(cwd / 'setup.py') == 059 runner.invoke(cli, ['add', 'setup.py', '--name', 'test_project'], input='\n\n\n\n\ny\n')60 self.assertTrue(exists(cwd / 'setup.py'))61 assert getsize(cwd / 'setup.py') > 062 def test_add_docs(self):63 runner = CliRunner()64 with runner.isolated_filesystem():65 shell('touch setup.py')66 cwd = Path.cwd()67 runner.invoke(cli, ['add', 'docs'])68 self.assertTrue(exists(cwd / 'docs' / 'conf.py'))69 def test_add_github_action(self):70 runner = CliRunner()71 with runner.isolated_filesystem():72 cwd = Path.cwd()73 target = cwd / '.github' / 'workflows' / 'check.yml'74 assert not exists(target)75 result = runner.invoke(cli, ['add', 'github-action'], input='abc\nghi\n')76 assert exists(target)77 check_yml = file_content(target)78 assert 'abc' in check_yml79 assert 'ghi' in check_yml80 assert ':PACKAGE:' not in check_yml81 assert ':TEST:' not in check_yml82 def test_add_github_action_overwrite(self):83 runner = CliRunner()84 with runner.isolated_filesystem():85 cwd = Path.cwd()86 os.mkdir(cwd / '.github')87 os.mkdir(cwd / '.github' / 'workflows')88 target = cwd / '.github' / 'workflows' / 'check.yml'89 shell('touch ' + str(target))90 assert exists(target)91 runner.invoke(cli, ['add', 'github-action'], input='abc\nghi\ny\n')92 assert exists(target)93 check_yml = file_content(target)94 assert 'abc' in check_yml95 assert 'ghi' in check_yml96 assert ':PACKAGE:' not in check_yml97 assert ':TEST:' not in check_yml98 def test_add_github_action_no_overwrite(self):99 runner = CliRunner()100 with runner.isolated_filesystem():101 cwd = Path.cwd()102 os.mkdir(cwd / '.github')103 os.mkdir(cwd / '.github' / 'workflows')104 target = cwd / '.github' / 'workflows' / 'check.yml'105 shell('touch ' + str(target))106 assert exists(target)107 runner.invoke(cli, ['add', 'github-action'], input='abc\nghi\nn\n')108 assert exists(target)109 check_yml = file_content(target)110 assert 'abc' not in check_yml111if __name__ == '__main__':...

Full Screen

Full Screen

create_test_runner_script.py

Source:create_test_runner_script.py Github

copy

Full Screen

1#!/usr/bin/env python2#3# Copyright 2015 The Chromium Authors. All rights reserved.4# Use of this source code is governed by a BSD-style license that can be5# found in the LICENSE file.6"""Creates a script to run an android test using build/android/test_runner.py.7"""8import argparse9import os10import sys11from util import build_utils12SCRIPT_TEMPLATE = """\13#!/usr/bin/env python14#15# This file was generated by build/android/gyp/create_test_runner_script.py16import logging17import os18import sys19def main():20 script_directory = os.path.dirname(__file__)21 def ResolvePath(path):22 \"\"\"Returns an absolute filepath given a path relative to this script.23 \"\"\"24 return os.path.abspath(os.path.join(script_directory, path))25 test_runner_path = ResolvePath('{test_runner_path}')26 test_runner_args = {test_runner_args}27 test_runner_path_args = {test_runner_path_args}28 for arg, path in test_runner_path_args.iteritems():29 test_runner_args.extend([arg, ResolvePath(path)])30 test_runner_cmd = ' '.join(31 [test_runner_path] + test_runner_args + sys.argv[1:])32 logging.critical(test_runner_cmd)33 os.system(test_runner_cmd)34if __name__ == '__main__':35 sys.exit(main())36"""37def main():38 parser = argparse.ArgumentParser()39 parser.add_argument('--script-output-path',40 help='Output path for executable script.')41 parser.add_argument('--depfile',42 help='Path to the depfile. This must be specified as '43 "the action's first output.")44 # We need to intercept any test runner path arguments and make all45 # of the paths relative to the output script directory.46 group = parser.add_argument_group('Test runner path arguments.')47 group.add_argument('--output-directory')48 group.add_argument('--isolate-file-path')49 args, test_runner_args = parser.parse_known_args()50 def RelativizePathToScript(path):51 """Returns the path relative to the output script directory."""52 return os.path.relpath(path, os.path.dirname(args.script_output_path))53 test_runner_path = os.path.join(54 os.path.dirname(__file__), os.path.pardir, 'test_runner.py')55 test_runner_path = RelativizePathToScript(test_runner_path)56 test_runner_path_args = {}57 if args.output_directory:58 test_runner_path_args['--output-directory'] = RelativizePathToScript(59 args.output_directory)60 if args.isolate_file_path:61 test_runner_path_args['--isolate-file-path'] = RelativizePathToScript(62 args.isolate_file_path)63 with open(args.script_output_path, 'w') as script:64 script.write(SCRIPT_TEMPLATE.format(65 test_runner_path=str(test_runner_path),66 test_runner_args=str(test_runner_args),67 test_runner_path_args=str(test_runner_path_args)))68 os.chmod(args.script_output_path, 0750)69 if args.depfile:70 build_utils.WriteDepfile(71 args.depfile,72 build_utils.GetPythonDependencies())73if __name__ == '__main__':...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1var mb = require('mountebank');2mb.start({3}).then(function () {4 mb.create({5 stubs: [{6 predicates: [{7 equals: {8 }9 }],10 responses: [{11 is: {12 headers: {13 },14 body: {15 }16 }17 }]18 }]19 }).then(function () {20 console.log('mountebank running on port 2525');21 });22});23var request = require('supertest');24var expect = require('chai').expect;25describe('Test', function () {26 it('should return 200', function (done) {27 api.get('/test')28 .set('Accept', 'application/json')29 .expect(200)30 .end(function (err, res) {31 expect(res.body.message).to.equal('Hello world');32 done();33 });34 });35});

Full Screen

Using AI Code Generation

copy

Full Screen

1var mb = require('mountebank');2mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log' }, function (error) {3 if (error) {4 console.error('Error creating mb', error);5 }6});7var mb = require('mountebank');8mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log' }, function (error) {9 if (error) {10 console.error('Error creating mb', error);11 }12});13var mb = require('mountebank');14mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log' }, function (error) {15 if (error) {16 console.error('Error creating mb', error);17 }18});19var mb = require('mountebank');20mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log' }, function (error) {21 if (error) {22 console.error('Error creating mb', error);23 }24});25var mb = require('mountebank');26mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log' }, function (error) {27 if (error) {28 console.error('Error creating mb', error);29 }30});31var mb = require('mountebank');32mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log' }, function (error) {33 if (error) {34 console.error('Error creating mb', error);35 }36});37var mb = require('mountebank');38mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log' }, function (error) {39 if (error) {40 console.error('Error creating mb', error);41 }42});43var mb = require('mountebank');44mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb

Full Screen

Using AI Code Generation

copy

Full Screen

1var mb = require('mountebank');2var Q = require('q');3var fs = require('fs');4var options = {5};6mb.start(options).then(function () {7 return mb.create({8 stubs: [{9 responses: [{10 is: {11 headers: {12 },13 body: JSON.stringify({14 })15 }16 }]17 }]18 });19}).then(function () {20 console.log('mountebank is ready');21}).done();22var assert = require('assert');23var request = require('request');24describe('API Test', function () {25 it('should return 200', function (done) {26 assert.equal(response.statusCode, 200);27 done();28 });29 });30});31var assert = require('assert');32var request = require('request');33describe('API Test', function () {34 it('should return 200', function (done) {35 assert.equal(response.statusCode, 200);36 done();37 });38 });39});40var assert = require('assert');41var request = require('request');42describe('API Test', function () {43 it('should return 200', function (done) {44 assert.equal(response.statusCode, 200);45 done();46 });47 });48});49var assert = require('assert');50var request = require('request');51describe('API Test', function () {52 it('should return 200', function (done) {53 assert.equal(response.statusCode, 200);54 done();55 });56 });57});58var assert = require('assert');59var request = require('request');60describe('API Test', function () {

Full Screen

Using AI Code Generation

copy

Full Screen

1var mb = require('mountebank');2var port = 2525;3var imposter = {4 {5 {6 equals: {7 }8 }9 {10 is: {11 }12 }13 }14};15mb.create({16}, function () {17 mb.startImposter(imposter, function (error, imposter) {18 console.log('Imposter created at port ' + imposter.port);19 });20});21var mb = require('mountebank');22var port = 2525;23var imposter = {24 {25 {26 equals: {27 }28 }29 {30 is: {31 }32 }33 }34};35mb.create({36}, function () {37 mb.startImposter(imposter, function (error, imposter) {38 console.log('Imposter created at port ' + imposter.port);39 });40});

Full Screen

Using AI Code Generation

copy

Full Screen

1const mb = require('mountebank');2mb.start({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log', ipWhitelist: ['*'] }).then(() => {3 return mb.post('/imposters', {4 stubs: [{5 responses: [{6 is: {7 headers: { 'Content-Type': 'application/json' },8 body: JSON.stringify({ success: true })9 }10 }]11 }]12 });13}).then(() => {14 return mb.get('/imposters');15}).then(response => {16 console.log(JSON.stringify(response.body));17 return mb.del('/imposters/3000');18}).then(() => {19 return mb.stop();20}).catch(error => {21 console.error(error);22 return mb.stop();23});24const mb = require('mountebank');25mb.start({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log', ipWhitelist: ['*'] }).then(() => {26 return mb.post('/imposters', {27 stubs: [{28 responses: [{29 is: {30 headers: { 'Content-Type': 'application/json' },31 body: JSON.stringify({ success: true })32 }33 }]34 }]35 });36}).then(() => {37 return mb.get('/imposters');38}).then(response => {39 console.log(JSON.stringify(response.body));40 return mb.del('/imposters/3000');41}).then(() => {42 return mb.stop();43}).catch(error => {44 console.error(error);45 return mb.stop();46});47const mb = require('mountebank');48mb.start({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log', ipWhitelist: ['*'] }).then(() => {49 return mb.post('/imposters', {50 stubs: [{51 responses: [{52 is: {53 headers: { 'Content-Type': 'application/json' },

Full Screen

Using AI Code Generation

copy

Full Screen

1const mb = require('mountebank');2const fs = require('fs');3const path = require('path');4const imposters = JSON.parse(fs.readFileSync(path.resolve(__dirname, 'imposters.json'), 'utf8'));5const options = {6 pidfile: path.resolve(__dirname, 'mb.pid'),7 logfile: path.resolve(__dirname, 'mb.log'),8 protofile: path.resolve(__dirname, 'mb.proto'),9};10mb.start(options, error => {11 if (error) {12 console.error('Failed to start mb', error);13 process.exit(1);14 }15 mb.createImposter(2525, imposters, (error, result) => {16 if (error) {17 console.error('Failed to create imposter', error);18 process.exit(1);19 }20 console.log('Imposter created', result);21 });22});23{24 {25 {26 "equals": {27 }28 }29 {30 "is": {31 "headers": {32 },33 {34 },35 {36 },37 {38 }39 }40 }41 }42}43const request = require('request');44const chai = require('chai');45const expect = chai.expect;46const chaiHttp = require('chai-http');47chai.use(chaiHttp);48describe('Employee API Test', () => {49 it('should return all employees

Full Screen

Using AI Code Generation

copy

Full Screen

1var mb = require('mountebank');2mb.create({3}, function (error, server) {4 console.log('Mountebank server started on port 2525');5});6var stub = {7 predicates: [{ equals: { method: 'GET', path: '/api/test' } }],8 responses: [{ is: { statusCode: 200, body: 'Hello World' } }]9};10var imposters = [{ port: 4000, protocol: 'http', stubs: [stub] }];11mb.post('/imposters', imposters, function (error, response) {12 console.log('Imposter created');13});14mb.get('/imposters/4000', function (error, response) {15 console.log('Imposter retrieved');16});17mb.del('/imposters', function (error, response) {18 console.log('Imposter deleted');19});20mb.stop(function (error) {21 console.log('Mountebank server stopped');22});

Full Screen

Using AI Code Generation

copy

Full Screen

1const mb = require('mountebank');2const port = 2525;3const protocol = 'http';4const host = 'localhost';5const imposterPort = 2526;6const imposter = {7 {8 {9 is: {10 headers: {11 },12 body: {13 }14 }15 }16 }17};18mb.create(url, imposter)19 .then(() => {20 return mb.get(url, imposterPort);21 })22 .then(imposter => {23 console.log(imposter);24 })25 .then(() => {26 return mb.put(url, imposterPort, imposter);27 })28 .then(imposter => {29 console.log(imposter);30 })31 .then(() => {32 return mb.del(url, imposterPort);33 })34 .then(() => {35 return mb.del(url, imposterPort);36 })37 .then(() => {38 return mb.get(url, imposterPort);39 })40 .catch(error => {41 console.log(error);42 });43const mb = require('mountebank');44const port = 2525;45const protocol = 'http';46const host = 'localhost';47const imposterPort = 2526;48const imposter = {49 {50 {51 is: {52 headers: {53 },54 body: {55 }56 }57 }58 }59};60mb.create(url, imposter)61 .then(() => {62 return mb.get(url, imposterPort);63 })64 .then(imposter => {65 console.log(imposter);66 })67 .then(() => {68 return mb.put(url, im

Full Screen

Using AI Code Generation

copy

Full Screen

1var mb = require('mountebank');2mb.create(function (error, mb) {3 if (error) {4 console.log("Error creating mb instance: " + error.message);5 }6 mb.start({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log' }, function (error) {7 if (error) {8 console.log("Error starting mb: " + error.message);9 }10 console.log("mb started");11 mb.stop(function (error) {12 if (error) {13 console.log("Error stopping mb: " + error.message);14 }15 console.log("mb stopped");16 });17 });18});

Full Screen

Using AI Code Generation

copy

Full Screen

1const mb = require('mountebank');2const { promisify } = require('util');3const { get, post } = require('request-promise-native');4const { assert } = require('chai');5const { stub } = require('sinon');6const getStub = stub();7const postStub = stub();8 { name: 'John', age: 30 },9 { name: 'Jane', age: 27 },10 { name: 'Jack', age: 33 }11]);12 { name: 'John', age: 30 }13]);14 { name: 'Jane', age: 27 }15]);16 { name: 'Jack', age: 33 }17]);18 { name: 'John', age: 30 }19]);20 { name: 'Jane', age: 27 }21]);22 { name: 'Jack', age: 33 }23]);24const requestPromiseNativeStub = stub();25requestPromiseNativeStub.withArgs('request-promise-native').returns({

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run mountebank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful