How to use _worker_teardown method in Molotov

Best Python code snippet using molotov_python

containers.py

Source:containers.py Github

copy

Full Screen

...307 # we don't need this any more, and breaking the cycle means308 # this can be reclaimed immediately, rather than waiting for a309 # gc sweep310 del exc_info311 self._worker_teardown(worker_ctx)312 def _inject_dependencies(self, worker_ctx):313 for provider in self.dependencies:314 dependency = provider.get_dependency(worker_ctx)315 setattr(worker_ctx.service, provider.attr_name, dependency)316 def _worker_setup(self, worker_ctx):317 for provider in self.dependencies:318 provider.worker_setup(worker_ctx)319 def _worker_result(self, worker_ctx, result, exc_info):320 _log.debug('signalling result for %s', worker_ctx)321 for provider in self.dependencies:322 provider.worker_result(worker_ctx, result, exc_info)323 def _worker_teardown(self, worker_ctx):324 for provider in self.dependencies:325 provider.worker_teardown(worker_ctx)326 def _kill_worker_threads(self):327 """ Kill any currently executing worker threads.328 See :meth:`ServiceContainer.spawn_worker`329 """330 num_workers = len(self._worker_threads)331 if num_workers:332 _log.warning('killing %s active workers(s)', num_workers)333 for worker_ctx, gt in list(self._worker_threads.items()):334 _log.warning('killing active worker for %s', worker_ctx)335 gt.kill()336 def _kill_managed_threads(self):337 """ Kill any currently executing managed threads....

Full Screen

Full Screen

self_play.py

Source:self_play.py Github

copy

Full Screen

...217 write_to_hdf(_env_.hdf_file_name, "fresh", df)218 tock = time.time()219 logger.warning("Worker %s played %d games (%d samples) in %.0fs (save=%.3fs)", 220 _env_.name, len(games_idxs), len(df.index), tock-tick, tock-tack)221def _worker_teardown(not_used):222 import time223 loop = asyncio.get_event_loop()224 try:225 for t in _env_.tasks:226 t.cancel()227 loop.run_until_complete(asyncio.sleep(0.01))228 asyncio.get_event_loop().close()229 except Exception as e:230 logger.exception("An error occured during workers teardown.")231 raise e232 time.sleep(0.1)233def _err_cb(err):234 if isinstance(err, BaseException):235 logger.error("Got an exception from a worker: %s", err)...

Full Screen

Full Screen

test_fmwk.py

Source:test_fmwk.py Github

copy

Full Screen

...228 @dedicatedloop229 def test_shutdown(self):230 res = []231 @teardown()232 def _worker_teardown(num):233 res.append("BYE WORKER")234 @global_teardown()235 def _teardown():236 res.append("BYE")237 @scenario(weight=100)238 async def test_two(session):239 os.kill(os.getpid(), signal.SIGTERM)240 args = self.get_args()241 results = Runner(args)()242 self.assertEqual(results["OK"], 1)243 self.assertEqual(results["FAILED"], 0)244 self.assertEqual(res, ["BYE WORKER", "BYE"])245 @dedicatedloop246 def test_shutdown_exception(self):247 @teardown()248 def _worker_teardown(num):249 raise Exception("bleh")250 @global_teardown()251 def _teardown():252 raise Exception("bleh")253 @scenario(weight=100)254 async def test_two(session):255 os.kill(os.getpid(), signal.SIGTERM)256 args = self.get_args()257 results = Runner(args)()258 self.assertEqual(results["OK"], 1)259 @async_test260 async def test_session_shutdown_exception(self, loop, console, results):261 @teardown_session()262 async def _teardown_session(wid, session):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Molotov automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful