Best Python code snippet using stestr_python
testsuite.py
Source:testsuite.py  
...60        shouldStop attribute on the result object they are run with, which will61        be set if an exception is raised in the thread which62        ConcurrentTestSuite.run is called in.63        """64        tests = self.make_tests(self)65        try:66            threads = {}67            queue = Queue()68            semaphore = threading.Semaphore(1)69            for i, test in enumerate(tests):70                process_result = self._wrap_result(71                    testtools.ThreadsafeForwardingResult(result, semaphore), i)72                reader_thread = threading.Thread(73                    target=self._run_test, args=(test, process_result, queue))74                threads[test] = reader_thread, process_result75                reader_thread.start()76            while threads:77                finished_test = queue.get()78                threads[finished_test][0].join()79                del threads[finished_test]80        except:81            for thread, process_result in threads.values():82                process_result.stop()83            raise84    def _run_test(self, test, process_result, queue):85        try:86            try:87                test.run(process_result)88            except Exception as e:89                # The run logic itself failed.90                case = testtools.ErrorHolder(91                    "broken-runner",92                    error=sys.exc_info())93                case.run(process_result)94        finally:95            queue.put(test)96class ConcurrentStreamTestSuite(object):97    """A TestSuite whose run() parallelises."""98    def __init__(self, make_tests):99        """Create a ConcurrentTestSuite to execute tests returned by make_tests.100        :param make_tests: A helper function that should return some number101            of concurrently executable test suite / test case objects.102            make_tests must take no parameters and return an iterable of103            tuples. Each tuple must be of the form (case, route_code), where104            case is a TestCase-like object with a run(result) method, and105            route_code is either None or a unicode string.106        """107        super(ConcurrentStreamTestSuite, self).__init__()108        self.make_tests = make_tests109    def run(self, result):110        """Run the tests concurrently.111        This calls out to the provided make_tests helper to determine the112        concurrency to use and to assign routing codes to each worker.113        ConcurrentTestSuite provides no special mechanism to stop the tests114        returned by make_tests, it is up to the made tests to honour the115        shouldStop attribute on the result object they are run with, which will116        be set if the test run is to be aborted.117        The tests are run with an ExtendedToStreamDecorator wrapped around a118        StreamToQueue instance. ConcurrentStreamTestSuite dequeues events from119        the queue and forwards them to result. Tests can therefore be either120        original unittest tests (or compatible tests), or new tests that emit121        StreamResult events directly.122        :param result: A StreamResult instance. The caller is responsible for123            calling startTestRun on this instance prior to invoking suite.run,124            and stopTestRun subsequent to the run method returning.125        """126        tests = self.make_tests()127        try:128            threads = {}129            queue = Queue()130            for test, route_code in tests:131                to_queue = testtools.StreamToQueue(queue, route_code)132                process_result = testtools.ExtendedToStreamDecorator(133                    testtools.TimestampingStreamResult(to_queue))134                runner_thread = threading.Thread(135                    target=self._run_test,136                    args=(test, process_result, route_code))137                threads[to_queue] = runner_thread, process_result138                runner_thread.start()139            while threads:140                event_dict = queue.get()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
