Best Python code snippet using molotov_python
containers.py
Source:containers.py  
...307                # we don't need this any more, and breaking the cycle means308                # this can be reclaimed immediately, rather than waiting for a309                # gc sweep310                del exc_info311                self._worker_teardown(worker_ctx)312    def _inject_dependencies(self, worker_ctx):313        for provider in self.dependencies:314            dependency = provider.get_dependency(worker_ctx)315            setattr(worker_ctx.service, provider.attr_name, dependency)316    def _worker_setup(self, worker_ctx):317        for provider in self.dependencies:318            provider.worker_setup(worker_ctx)319    def _worker_result(self, worker_ctx, result, exc_info):320        _log.debug('signalling result for %s', worker_ctx)321        for provider in self.dependencies:322            provider.worker_result(worker_ctx, result, exc_info)323    def _worker_teardown(self, worker_ctx):324        for provider in self.dependencies:325            provider.worker_teardown(worker_ctx)326    def _kill_worker_threads(self):327        """ Kill any currently executing worker threads.328        See :meth:`ServiceContainer.spawn_worker`329        """330        num_workers = len(self._worker_threads)331        if num_workers:332            _log.warning('killing %s active workers(s)', num_workers)333            for worker_ctx, gt in list(self._worker_threads.items()):334                _log.warning('killing active worker for %s', worker_ctx)335                gt.kill()336    def _kill_managed_threads(self):337        """ Kill any currently executing managed threads....self_play.py
Source:self_play.py  
...217        write_to_hdf(_env_.hdf_file_name, "fresh", df)218    tock = time.time()219    logger.warning("Worker %s played %d games (%d samples) in %.0fs (save=%.3fs)", 220        _env_.name, len(games_idxs), len(df.index), tock-tick, tock-tack)221def _worker_teardown(not_used):222    import time223    loop = asyncio.get_event_loop()224    try:225        for  t in _env_.tasks:226            t.cancel()227        loop.run_until_complete(asyncio.sleep(0.01))228        asyncio.get_event_loop().close()229    except Exception as e:230        logger.exception("An error occured during workers teardown.")231        raise e232    time.sleep(0.1)233def _err_cb(err):234    if isinstance(err, BaseException):235        logger.error("Got an exception from a worker: %s", err)...test_fmwk.py
Source:test_fmwk.py  
...228    @dedicatedloop229    def test_shutdown(self):230        res = []231        @teardown()232        def _worker_teardown(num):233            res.append("BYE WORKER")234        @global_teardown()235        def _teardown():236            res.append("BYE")237        @scenario(weight=100)238        async def test_two(session):239            os.kill(os.getpid(), signal.SIGTERM)240        args = self.get_args()241        results = Runner(args)()242        self.assertEqual(results["OK"], 1)243        self.assertEqual(results["FAILED"], 0)244        self.assertEqual(res, ["BYE WORKER", "BYE"])245    @dedicatedloop246    def test_shutdown_exception(self):247        @teardown()248        def _worker_teardown(num):249            raise Exception("bleh")250        @global_teardown()251        def _teardown():252            raise Exception("bleh")253        @scenario(weight=100)254        async def test_two(session):255            os.kill(os.getpid(), signal.SIGTERM)256        args = self.get_args()257        results = Runner(args)()258        self.assertEqual(results["OK"], 1)259    @async_test260    async def test_session_shutdown_exception(self, loop, console, results):261        @teardown_session()262        async def _teardown_session(wid, session):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
