Best Python code snippet using playwright-python
test_futures.py
Source:test_futures.py  
...83    try:84        yield logging_stream85    finally:86        logging.root.removeHandler(handler)87def create_future(state=PENDING, exception=None, result=None):88    f = Future()89    f._state = state90    f._exception = exception91    f._result = result92    return f93PENDING_FUTURE = create_future(state=PENDING)94RUNNING_FUTURE = create_future(state=RUNNING)95CANCELLED_FUTURE = create_future(state=CANCELLED)96CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)97EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())98SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)99def mul(x, y):100    return x * y101def sleep_and_raise(t):102    time.sleep(t)103    raise Exception('this is an exception')104def sleep_and_print(t, msg):105    time.sleep(t)106    print(msg)107    sys.stdout.flush()108class ExecutorMixin:109    worker_count = 5110    def setUp(self):111        self.t1 = time.time()112        try:113            self.executor = self.executor_type(max_workers=self.worker_count)114        except NotImplementedError:115            e = sys.exc_info()[1]116            self.skipTest(str(e))117        self._prime_executor()118    def tearDown(self):119        self.executor.shutdown(wait=True)120        dt = time.time() - self.t1121        if test_support.verbose:122            print("%.2fs" % dt)123        self.assertLess(dt, 60, "synchronization issue: test lasted too long")124    def _prime_executor(self):125        # Make sure that the executor is ready to do work before running the126        # tests. This should reduce the probability of timeouts in the tests.127        futures = [self.executor.submit(time.sleep, 0.1)128                   for _ in range(self.worker_count)]129        for f in futures:130            f.result()131class ThreadPoolMixin(ExecutorMixin):132    executor_type = futures.ThreadPoolExecutor133class ProcessPoolMixin(ExecutorMixin):134    executor_type = futures.ProcessPoolExecutor135class ExecutorShutdownTest(unittest.TestCase):136    def test_run_after_shutdown(self):137        self.executor.shutdown()138        self.assertRaises(RuntimeError,139                          self.executor.submit,140                          pow, 2, 5)141    def test_interpreter_shutdown(self):142        # Test the atexit hook for shutdown of worker threads and processes143        rc, out, err = assert_python_ok('-c', """if 1:144            from concurrent.futures import %s145            from time import sleep146            from test_futures import sleep_and_print147            t = %s(5)148            t.submit(sleep_and_print, 1.0, "apple")149            """ % (self.executor_type.__name__, self.executor_type.__name__))150        # Errors in atexit hooks don't change the process exit code, check151        # stderr manually.152        self.assertFalse(err)153        self.assertEqual(out.strip(), "apple".encode())154    def test_hang_issue12364(self):155        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]156        self.executor.shutdown()157        for f in fs:158            f.result()159class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):160    def _prime_executor(self):161        pass162    def test_threads_terminate(self):163        self.executor.submit(mul, 21, 2)164        self.executor.submit(mul, 6, 7)165        self.executor.submit(mul, 3, 14)166        self.assertEqual(len(self.executor._threads), 3)167        self.executor.shutdown()168        for t in self.executor._threads:169            t.join()170    def test_context_manager_shutdown(self):171        with futures.ThreadPoolExecutor(max_workers=5) as e:172            executor = e173            self.assertEqual(list(e.map(abs, range(-5, 5))),174                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])175        for t in executor._threads:176            t.join()177    def test_del_shutdown(self):178        executor = futures.ThreadPoolExecutor(max_workers=5)179        executor.map(abs, range(-5, 5))180        threads = executor._threads181        del executor182        for t in threads:183            t.join()184class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):185    def _prime_executor(self):186        pass187    def test_processes_terminate(self):188        self.executor.submit(mul, 21, 2)189        self.executor.submit(mul, 6, 7)190        self.executor.submit(mul, 3, 14)191        self.assertEqual(len(self.executor._processes), 5)192        processes = self.executor._processes193        self.executor.shutdown()194        for p in processes:195            p.join()196    def test_context_manager_shutdown(self):197        with futures.ProcessPoolExecutor(max_workers=5) as e:198            processes = e._processes199            self.assertEqual(list(e.map(abs, range(-5, 5))),200                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])201        for p in processes:202            p.join()203    def test_del_shutdown(self):204        executor = futures.ProcessPoolExecutor(max_workers=5)205        list(executor.map(abs, range(-5, 5)))206        queue_management_thread = executor._queue_management_thread207        processes = executor._processes208        del executor209        queue_management_thread.join()210        for p in processes:211            p.join()212class WaitTests(unittest.TestCase):213    def test_first_completed(self):214        future1 = self.executor.submit(mul, 21, 2)215        future2 = self.executor.submit(time.sleep, 1.5)216        done, not_done = futures.wait(217                [CANCELLED_FUTURE, future1, future2],218                 return_when=futures.FIRST_COMPLETED)219        self.assertEqual(set([future1]), done)220        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)221    def test_first_completed_some_already_completed(self):222        future1 = self.executor.submit(time.sleep, 1.5)223        finished, pending = futures.wait(224                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],225                 return_when=futures.FIRST_COMPLETED)226        self.assertEqual(227                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),228                finished)229        self.assertEqual(set([future1]), pending)230    def test_first_exception(self):231        future1 = self.executor.submit(mul, 2, 21)232        future2 = self.executor.submit(sleep_and_raise, 1.5)233        future3 = self.executor.submit(time.sleep, 3)234        finished, pending = futures.wait(235                [future1, future2, future3],236                return_when=futures.FIRST_EXCEPTION)237        self.assertEqual(set([future1, future2]), finished)238        self.assertEqual(set([future3]), pending)239    def test_first_exception_some_already_complete(self):240        future1 = self.executor.submit(divmod, 21, 0)241        future2 = self.executor.submit(time.sleep, 1.5)242        finished, pending = futures.wait(243                [SUCCESSFUL_FUTURE,244                 CANCELLED_FUTURE,245                 CANCELLED_AND_NOTIFIED_FUTURE,246                 future1, future2],247                return_when=futures.FIRST_EXCEPTION)248        self.assertEqual(set([SUCCESSFUL_FUTURE,249                              CANCELLED_AND_NOTIFIED_FUTURE,250                              future1]), finished)251        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)252    def test_first_exception_one_already_failed(self):253        future1 = self.executor.submit(time.sleep, 2)254        finished, pending = futures.wait(255                 [EXCEPTION_FUTURE, future1],256                 return_when=futures.FIRST_EXCEPTION)257        self.assertEqual(set([EXCEPTION_FUTURE]), finished)258        self.assertEqual(set([future1]), pending)259    def test_all_completed(self):260        future1 = self.executor.submit(divmod, 2, 0)261        future2 = self.executor.submit(mul, 2, 21)262        finished, pending = futures.wait(263                [SUCCESSFUL_FUTURE,264                 CANCELLED_AND_NOTIFIED_FUTURE,265                 EXCEPTION_FUTURE,266                 future1,267                 future2],268                return_when=futures.ALL_COMPLETED)269        self.assertEqual(set([SUCCESSFUL_FUTURE,270                              CANCELLED_AND_NOTIFIED_FUTURE,271                              EXCEPTION_FUTURE,272                              future1,273                              future2]), finished)274        self.assertEqual(set(), pending)275    def test_timeout(self):276        future1 = self.executor.submit(mul, 6, 7)277        future2 = self.executor.submit(time.sleep, 3)278        finished, pending = futures.wait(279                [CANCELLED_AND_NOTIFIED_FUTURE,280                 EXCEPTION_FUTURE,281                 SUCCESSFUL_FUTURE,282                 future1, future2],283                timeout=1.5,284                return_when=futures.ALL_COMPLETED)285        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,286                              EXCEPTION_FUTURE,287                              SUCCESSFUL_FUTURE,288                              future1]), finished)289        self.assertEqual(set([future2]), pending)290class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):291    def test_pending_calls_race(self):292        # Issue #14406: multi-threaded race condition when waiting on all293        # futures.294        event = threading.Event()295        def future_func():296            event.wait()297        oldswitchinterval = sys.getcheckinterval()298        sys.setcheckinterval(1)299        try:300            fs = set(self.executor.submit(future_func) for i in range(100))301            event.set()302            futures.wait(fs, return_when=futures.ALL_COMPLETED)303        finally:304            sys.setcheckinterval(oldswitchinterval)305class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):306    pass307class AsCompletedTests(unittest.TestCase):308    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.309    def test_no_timeout(self):310        future1 = self.executor.submit(mul, 2, 21)311        future2 = self.executor.submit(mul, 7, 6)312        completed = set(futures.as_completed(313                [CANCELLED_AND_NOTIFIED_FUTURE,314                 EXCEPTION_FUTURE,315                 SUCCESSFUL_FUTURE,316                 future1, future2]))317        self.assertEqual(set(318                [CANCELLED_AND_NOTIFIED_FUTURE,319                 EXCEPTION_FUTURE,320                 SUCCESSFUL_FUTURE,321                 future1, future2]),322                completed)323    def test_zero_timeout(self):324        future1 = self.executor.submit(time.sleep, 2)325        completed_futures = set()326        try:327            for future in futures.as_completed(328                    [CANCELLED_AND_NOTIFIED_FUTURE,329                     EXCEPTION_FUTURE,330                     SUCCESSFUL_FUTURE,331                     future1],332                    timeout=0):333                completed_futures.add(future)334        except futures.TimeoutError:335            pass336        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,337                              EXCEPTION_FUTURE,338                              SUCCESSFUL_FUTURE]),339                         completed_futures)340    def test_duplicate_futures(self):341        # Issue 20367. Duplicate futures should not raise exceptions or give342        # duplicate responses.343        future1 = self.executor.submit(time.sleep, 2)344        completed = [f for f in futures.as_completed([future1,future1])]345        self.assertEqual(len(completed), 1)346class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):347    pass348class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):349    pass350class ExecutorTest(unittest.TestCase):351    # Executor.shutdown() and context manager usage is tested by352    # ExecutorShutdownTest.353    def test_submit(self):354        future = self.executor.submit(pow, 2, 8)355        self.assertEqual(256, future.result())356    def test_submit_keyword(self):357        future = self.executor.submit(mul, 2, y=8)358        self.assertEqual(16, future.result())359    def test_map(self):360        self.assertEqual(361                list(self.executor.map(pow, range(10), range(10))),362                list(map(pow, range(10), range(10))))363    def test_map_exception(self):364        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])365        self.assertEqual(next(i), (0, 1))366        self.assertEqual(next(i), (0, 1))367        self.assertRaises(ZeroDivisionError, next, i)368    def test_map_timeout(self):369        results = []370        try:371            for i in self.executor.map(time.sleep,372                                       [0, 0, 3],373                                       timeout=1.5):374                results.append(i)375        except futures.TimeoutError:376            pass377        else:378            self.fail('expected TimeoutError')379        self.assertEqual([None, None], results)380class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):381    def test_map_submits_without_iteration(self):382        """Tests verifying issue 11777."""383        finished = []384        def record_finished(n):385            finished.append(n)386        self.executor.map(record_finished, range(10))387        self.executor.shutdown(wait=True)388        self.assertEqual(len(finished), 10)389class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):390    pass391class FutureTests(unittest.TestCase):392    def test_done_callback_with_result(self):393        callback_result = [None]394        def fn(callback_future):395            callback_result[0] = callback_future.result()396        f = Future()397        f.add_done_callback(fn)398        f.set_result(5)399        self.assertEqual(5, callback_result[0])400    def test_done_callback_with_exception(self):401        callback_exception = [None]402        def fn(callback_future):403            callback_exception[0] = callback_future.exception()404        f = Future()405        f.add_done_callback(fn)406        f.set_exception(Exception('test'))407        self.assertEqual(('test',), callback_exception[0].args)408    def test_done_callback_with_cancel(self):409        was_cancelled = [None]410        def fn(callback_future):411            was_cancelled[0] = callback_future.cancelled()412        f = Future()413        f.add_done_callback(fn)414        self.assertTrue(f.cancel())415        self.assertTrue(was_cancelled[0])416    def test_done_callback_raises(self):417        with captured_stderr() as stderr:418            raising_was_called = [False]419            fn_was_called = [False]420            def raising_fn(callback_future):421                raising_was_called[0] = True422                raise Exception('doh!')423            def fn(callback_future):424                fn_was_called[0] = True425            f = Future()426            f.add_done_callback(raising_fn)427            f.add_done_callback(fn)428            f.set_result(5)429            self.assertTrue(raising_was_called)430            self.assertTrue(fn_was_called)431            self.assertIn('Exception: doh!', stderr.getvalue())432    def test_done_callback_already_successful(self):433        callback_result = [None]434        def fn(callback_future):435            callback_result[0] = callback_future.result()436        f = Future()437        f.set_result(5)438        f.add_done_callback(fn)439        self.assertEqual(5, callback_result[0])440    def test_done_callback_already_failed(self):441        callback_exception = [None]442        def fn(callback_future):443            callback_exception[0] = callback_future.exception()444        f = Future()445        f.set_exception(Exception('test'))446        f.add_done_callback(fn)447        self.assertEqual(('test',), callback_exception[0].args)448    def test_done_callback_already_cancelled(self):449        was_cancelled = [None]450        def fn(callback_future):451            was_cancelled[0] = callback_future.cancelled()452        f = Future()453        self.assertTrue(f.cancel())454        f.add_done_callback(fn)455        self.assertTrue(was_cancelled[0])456    def test_repr(self):457        self.assertRegexpMatches(repr(PENDING_FUTURE),458                                 '<Future at 0x[0-9a-f]+ state=pending>')459        self.assertRegexpMatches(repr(RUNNING_FUTURE),460                                 '<Future at 0x[0-9a-f]+ state=running>')461        self.assertRegexpMatches(repr(CANCELLED_FUTURE),462                                 '<Future at 0x[0-9a-f]+ state=cancelled>')463        self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),464                                 '<Future at 0x[0-9a-f]+ state=cancelled>')465        self.assertRegexpMatches(466                repr(EXCEPTION_FUTURE),467                '<Future at 0x[0-9a-f]+ state=finished raised IOError>')468        self.assertRegexpMatches(469                repr(SUCCESSFUL_FUTURE),470                '<Future at 0x[0-9a-f]+ state=finished returned int>')471    def test_cancel(self):472        f1 = create_future(state=PENDING)473        f2 = create_future(state=RUNNING)474        f3 = create_future(state=CANCELLED)475        f4 = create_future(state=CANCELLED_AND_NOTIFIED)476        f5 = create_future(state=FINISHED, exception=IOError())477        f6 = create_future(state=FINISHED, result=5)478        self.assertTrue(f1.cancel())479        self.assertEqual(f1._state, CANCELLED)480        self.assertFalse(f2.cancel())481        self.assertEqual(f2._state, RUNNING)482        self.assertTrue(f3.cancel())483        self.assertEqual(f3._state, CANCELLED)484        self.assertTrue(f4.cancel())485        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)486        self.assertFalse(f5.cancel())487        self.assertEqual(f5._state, FINISHED)488        self.assertFalse(f6.cancel())489        self.assertEqual(f6._state, FINISHED)490    def test_cancelled(self):491        self.assertFalse(PENDING_FUTURE.cancelled())492        self.assertFalse(RUNNING_FUTURE.cancelled())493        self.assertTrue(CANCELLED_FUTURE.cancelled())494        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())495        self.assertFalse(EXCEPTION_FUTURE.cancelled())496        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())497    def test_done(self):498        self.assertFalse(PENDING_FUTURE.done())499        self.assertFalse(RUNNING_FUTURE.done())500        self.assertTrue(CANCELLED_FUTURE.done())501        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())502        self.assertTrue(EXCEPTION_FUTURE.done())503        self.assertTrue(SUCCESSFUL_FUTURE.done())504    def test_running(self):505        self.assertFalse(PENDING_FUTURE.running())506        self.assertTrue(RUNNING_FUTURE.running())507        self.assertFalse(CANCELLED_FUTURE.running())508        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())509        self.assertFalse(EXCEPTION_FUTURE.running())510        self.assertFalse(SUCCESSFUL_FUTURE.running())511    def test_result_with_timeout(self):512        self.assertRaises(futures.TimeoutError,513                          PENDING_FUTURE.result, timeout=0)514        self.assertRaises(futures.TimeoutError,515                          RUNNING_FUTURE.result, timeout=0)516        self.assertRaises(futures.CancelledError,517                          CANCELLED_FUTURE.result, timeout=0)518        self.assertRaises(futures.CancelledError,519                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)520        self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)521        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)522    def test_result_with_success(self):523        # TODO(brian@sweetapp.com): This test is timing dependant.524        def notification():525            # Wait until the main thread is waiting for the result.526            time.sleep(1)527            f1.set_result(42)528        f1 = create_future(state=PENDING)529        t = threading.Thread(target=notification)530        t.start()531        self.assertEqual(f1.result(timeout=5), 42)532    def test_result_with_cancel(self):533        # TODO(brian@sweetapp.com): This test is timing dependant.534        def notification():535            # Wait until the main thread is waiting for the result.536            time.sleep(1)537            f1.cancel()538        f1 = create_future(state=PENDING)539        t = threading.Thread(target=notification)540        t.start()541        self.assertRaises(futures.CancelledError, f1.result, timeout=5)542    def test_exception_with_timeout(self):543        self.assertRaises(futures.TimeoutError,544                          PENDING_FUTURE.exception, timeout=0)545        self.assertRaises(futures.TimeoutError,546                          RUNNING_FUTURE.exception, timeout=0)547        self.assertRaises(futures.CancelledError,548                          CANCELLED_FUTURE.exception, timeout=0)549        self.assertRaises(futures.CancelledError,550                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)551        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),552                                   IOError))553        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)554    def test_exception_with_success(self):555        def notification():556            # Wait until the main thread is waiting for the exception.557            time.sleep(1)558            with f1._condition:559                f1._state = FINISHED560                f1._exception = IOError()561                f1._condition.notify_all()562        f1 = create_future(state=PENDING)563        t = threading.Thread(target=notification)564        t.start()565        self.assertTrue(isinstance(f1.exception(timeout=5), IOError))566@reap_threads567def test_main():568    try:569        test_support.run_unittest(ProcessPoolExecutorTest,570                                  ThreadPoolExecutorTest,571                                  ProcessPoolWaitTests,572                                  ThreadPoolWaitTests,573                                  ProcessPoolAsCompletedTests,574                                  ThreadPoolAsCompletedTests,575                                  FutureTests,576                                  ProcessPoolShutdownTest,...test_concurrent_futures.py
Source:test_concurrent_futures.py  
...15from concurrent import futures16from concurrent.futures._base import (17    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)18import concurrent.futures.process19def create_future(state=PENDING, exception=None, result=None):20    f = Future()21    f._state = state22    f._exception = exception23    f._result = result24    return f25PENDING_FUTURE = create_future(state=PENDING)26RUNNING_FUTURE = create_future(state=RUNNING)27CANCELLED_FUTURE = create_future(state=CANCELLED)28CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)29EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())30SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)31def mul(x, y):32    return x * y33def sleep_and_raise(t):34    time.sleep(t)35    raise Exception('this is an exception')36def sleep_and_print(t, msg):37    time.sleep(t)38    print(msg)39    sys.stdout.flush()40class ExecutorMixin:41    worker_count = 542    def setUp(self):43        self.t1 = time.time()44        try:45            self.executor = self.executor_type(max_workers=self.worker_count)46        except NotImplementedError as e:47            self.skipTest(str(e))48        self._prime_executor()49    def tearDown(self):50        self.executor.shutdown(wait=True)51        dt = time.time() - self.t152        if test.support.verbose:53            print("%.2fs" % dt, end=' ')54        self.assertLess(dt, 60, "synchronization issue: test lasted too long")55    def _prime_executor(self):56        # Make sure that the executor is ready to do work before running the57        # tests. This should reduce the probability of timeouts in the tests.58        futures = [self.executor.submit(time.sleep, 0.1)59                   for _ in range(self.worker_count)]60        for f in futures:61            f.result()62class ThreadPoolMixin(ExecutorMixin):63    executor_type = futures.ThreadPoolExecutor64class ProcessPoolMixin(ExecutorMixin):65    executor_type = futures.ProcessPoolExecutor66class ExecutorShutdownTest(unittest.TestCase):67    def test_run_after_shutdown(self):68        self.executor.shutdown()69        self.assertRaises(RuntimeError,70                          self.executor.submit,71                          pow, 2, 5)72    def test_interpreter_shutdown(self):73        # Test the atexit hook for shutdown of worker threads and processes74        rc, out, err = assert_python_ok('-c', """if 1:75            from concurrent.futures import {executor_type}76            from time import sleep77            from test.test_concurrent_futures import sleep_and_print78            t = {executor_type}(5)79            t.submit(sleep_and_print, 1.0, "apple")80            """.format(executor_type=self.executor_type.__name__))81        # Errors in atexit hooks don't change the process exit code, check82        # stderr manually.83        self.assertFalse(err)84        self.assertEqual(out.strip(), b"apple")85class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):86    def _prime_executor(self):87        pass88    def test_threads_terminate(self):89        self.executor.submit(mul, 21, 2)90        self.executor.submit(mul, 6, 7)91        self.executor.submit(mul, 3, 14)92        self.assertEqual(len(self.executor._threads), 3)93        self.executor.shutdown()94        for t in self.executor._threads:95            t.join()96    def test_context_manager_shutdown(self):97        with futures.ThreadPoolExecutor(max_workers=5) as e:98            executor = e99            self.assertEqual(list(e.map(abs, range(-5, 5))),100                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])101        for t in executor._threads:102            t.join()103    def test_del_shutdown(self):104        executor = futures.ThreadPoolExecutor(max_workers=5)105        executor.map(abs, range(-5, 5))106        threads = executor._threads107        del executor108        for t in threads:109            t.join()110class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):111    def _prime_executor(self):112        pass113    def test_processes_terminate(self):114        self.executor.submit(mul, 21, 2)115        self.executor.submit(mul, 6, 7)116        self.executor.submit(mul, 3, 14)117        self.assertEqual(len(self.executor._processes), 5)118        processes = self.executor._processes119        self.executor.shutdown()120        for p in processes:121            p.join()122    def test_context_manager_shutdown(self):123        with futures.ProcessPoolExecutor(max_workers=5) as e:124            processes = e._processes125            self.assertEqual(list(e.map(abs, range(-5, 5))),126                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])127        for p in processes:128            p.join()129    def test_del_shutdown(self):130        executor = futures.ProcessPoolExecutor(max_workers=5)131        list(executor.map(abs, range(-5, 5)))132        queue_management_thread = executor._queue_management_thread133        processes = executor._processes134        del executor135        queue_management_thread.join()136        for p in processes:137            p.join()138class WaitTests(unittest.TestCase):139    def test_first_completed(self):140        future1 = self.executor.submit(mul, 21, 2)141        future2 = self.executor.submit(time.sleep, 1.5)142        done, not_done = futures.wait(143                [CANCELLED_FUTURE, future1, future2],144                 return_when=futures.FIRST_COMPLETED)145        self.assertEqual(set([future1]), done)146        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)147    def test_first_completed_some_already_completed(self):148        future1 = self.executor.submit(time.sleep, 1.5)149        finished, pending = futures.wait(150                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],151                 return_when=futures.FIRST_COMPLETED)152        self.assertEqual(153                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),154                finished)155        self.assertEqual(set([future1]), pending)156    def test_first_exception(self):157        future1 = self.executor.submit(mul, 2, 21)158        future2 = self.executor.submit(sleep_and_raise, 1.5)159        future3 = self.executor.submit(time.sleep, 3)160        finished, pending = futures.wait(161                [future1, future2, future3],162                return_when=futures.FIRST_EXCEPTION)163        self.assertEqual(set([future1, future2]), finished)164        self.assertEqual(set([future3]), pending)165    def test_first_exception_some_already_complete(self):166        future1 = self.executor.submit(divmod, 21, 0)167        future2 = self.executor.submit(time.sleep, 1.5)168        finished, pending = futures.wait(169                [SUCCESSFUL_FUTURE,170                 CANCELLED_FUTURE,171                 CANCELLED_AND_NOTIFIED_FUTURE,172                 future1, future2],173                return_when=futures.FIRST_EXCEPTION)174        self.assertEqual(set([SUCCESSFUL_FUTURE,175                              CANCELLED_AND_NOTIFIED_FUTURE,176                              future1]), finished)177        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)178    def test_first_exception_one_already_failed(self):179        future1 = self.executor.submit(time.sleep, 2)180        finished, pending = futures.wait(181                 [EXCEPTION_FUTURE, future1],182                 return_when=futures.FIRST_EXCEPTION)183        self.assertEqual(set([EXCEPTION_FUTURE]), finished)184        self.assertEqual(set([future1]), pending)185    def test_all_completed(self):186        future1 = self.executor.submit(divmod, 2, 0)187        future2 = self.executor.submit(mul, 2, 21)188        finished, pending = futures.wait(189                [SUCCESSFUL_FUTURE,190                 CANCELLED_AND_NOTIFIED_FUTURE,191                 EXCEPTION_FUTURE,192                 future1,193                 future2],194                return_when=futures.ALL_COMPLETED)195        self.assertEqual(set([SUCCESSFUL_FUTURE,196                              CANCELLED_AND_NOTIFIED_FUTURE,197                              EXCEPTION_FUTURE,198                              future1,199                              future2]), finished)200        self.assertEqual(set(), pending)201    def test_timeout(self):202        future1 = self.executor.submit(mul, 6, 7)203        future2 = self.executor.submit(time.sleep, 3)204        finished, pending = futures.wait(205                [CANCELLED_AND_NOTIFIED_FUTURE,206                 EXCEPTION_FUTURE,207                 SUCCESSFUL_FUTURE,208                 future1, future2],209                timeout=1.5,210                return_when=futures.ALL_COMPLETED)211        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,212                              EXCEPTION_FUTURE,213                              SUCCESSFUL_FUTURE,214                              future1]), finished)215        self.assertEqual(set([future2]), pending)216class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):217    pass218class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):219    pass220class AsCompletedTests(unittest.TestCase):221    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.222    def test_no_timeout(self):223        future1 = self.executor.submit(mul, 2, 21)224        future2 = self.executor.submit(mul, 7, 6)225        completed = set(futures.as_completed(226                [CANCELLED_AND_NOTIFIED_FUTURE,227                 EXCEPTION_FUTURE,228                 SUCCESSFUL_FUTURE,229                 future1, future2]))230        self.assertEqual(set(231                [CANCELLED_AND_NOTIFIED_FUTURE,232                 EXCEPTION_FUTURE,233                 SUCCESSFUL_FUTURE,234                 future1, future2]),235                completed)236    def test_zero_timeout(self):237        future1 = self.executor.submit(time.sleep, 2)238        completed_futures = set()239        try:240            for future in futures.as_completed(241                    [CANCELLED_AND_NOTIFIED_FUTURE,242                     EXCEPTION_FUTURE,243                     SUCCESSFUL_FUTURE,244                     future1],245                    timeout=0):246                completed_futures.add(future)247        except futures.TimeoutError:248            pass249        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,250                              EXCEPTION_FUTURE,251                              SUCCESSFUL_FUTURE]),252                         completed_futures)253class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):254    pass255class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):256    pass257class ExecutorTest(unittest.TestCase):258    # Executor.shutdown() and context manager usage is tested by259    # ExecutorShutdownTest.260    def test_submit(self):261        future = self.executor.submit(pow, 2, 8)262        self.assertEqual(256, future.result())263    def test_submit_keyword(self):264        future = self.executor.submit(mul, 2, y=8)265        self.assertEqual(16, future.result())266    def test_map(self):267        self.assertEqual(268                list(self.executor.map(pow, range(10), range(10))),269                list(map(pow, range(10), range(10))))270    def test_map_exception(self):271        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])272        self.assertEqual(i.__next__(), (0, 1))273        self.assertEqual(i.__next__(), (0, 1))274        self.assertRaises(ZeroDivisionError, i.__next__)275    def test_map_timeout(self):276        results = []277        try:278            for i in self.executor.map(time.sleep,279                                       [0, 0, 3],280                                       timeout=1.5):281                results.append(i)282        except futures.TimeoutError:283            pass284        else:285            self.fail('expected TimeoutError')286        self.assertEqual([None, None], results)287class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):288    pass289class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):290    pass291class FutureTests(unittest.TestCase):292    def test_done_callback_with_result(self):293        callback_result = None294        def fn(callback_future):295            nonlocal callback_result296            callback_result = callback_future.result()297        f = Future()298        f.add_done_callback(fn)299        f.set_result(5)300        self.assertEqual(5, callback_result)301    def test_done_callback_with_exception(self):302        callback_exception = None303        def fn(callback_future):304            nonlocal callback_exception305            callback_exception = callback_future.exception()306        f = Future()307        f.add_done_callback(fn)308        f.set_exception(Exception('test'))309        self.assertEqual(('test',), callback_exception.args)310    def test_done_callback_with_cancel(self):311        was_cancelled = None312        def fn(callback_future):313            nonlocal was_cancelled314            was_cancelled = callback_future.cancelled()315        f = Future()316        f.add_done_callback(fn)317        self.assertTrue(f.cancel())318        self.assertTrue(was_cancelled)319    def test_done_callback_raises(self):320        with test.support.captured_stderr() as stderr:321            raising_was_called = False322            fn_was_called = False323            def raising_fn(callback_future):324                nonlocal raising_was_called325                raising_was_called = True326                raise Exception('doh!')327            def fn(callback_future):328                nonlocal fn_was_called329                fn_was_called = True330            f = Future()331            f.add_done_callback(raising_fn)332            f.add_done_callback(fn)333            f.set_result(5)334            self.assertTrue(raising_was_called)335            self.assertTrue(fn_was_called)336            self.assertIn('Exception: doh!', stderr.getvalue())337    def test_done_callback_already_successful(self):338        callback_result = None339        def fn(callback_future):340            nonlocal callback_result341            callback_result = callback_future.result()342        f = Future()343        f.set_result(5)344        f.add_done_callback(fn)345        self.assertEqual(5, callback_result)346    def test_done_callback_already_failed(self):347        callback_exception = None348        def fn(callback_future):349            nonlocal callback_exception350            callback_exception = callback_future.exception()351        f = Future()352        f.set_exception(Exception('test'))353        f.add_done_callback(fn)354        self.assertEqual(('test',), callback_exception.args)355    def test_done_callback_already_cancelled(self):356        was_cancelled = None357        def fn(callback_future):358            nonlocal was_cancelled359            was_cancelled = callback_future.cancelled()360        f = Future()361        self.assertTrue(f.cancel())362        f.add_done_callback(fn)363        self.assertTrue(was_cancelled)364    def test_repr(self):365        self.assertRegex(repr(PENDING_FUTURE),366                         '<Future at 0x[0-9a-f]+ state=pending>')367        self.assertRegex(repr(RUNNING_FUTURE),368                         '<Future at 0x[0-9a-f]+ state=running>')369        self.assertRegex(repr(CANCELLED_FUTURE),370                         '<Future at 0x[0-9a-f]+ state=cancelled>')371        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),372                         '<Future at 0x[0-9a-f]+ state=cancelled>')373        self.assertRegex(374                repr(EXCEPTION_FUTURE),375                '<Future at 0x[0-9a-f]+ state=finished raised IOError>')376        self.assertRegex(377                repr(SUCCESSFUL_FUTURE),378                '<Future at 0x[0-9a-f]+ state=finished returned int>')379    def test_cancel(self):380        f1 = create_future(state=PENDING)381        f2 = create_future(state=RUNNING)382        f3 = create_future(state=CANCELLED)383        f4 = create_future(state=CANCELLED_AND_NOTIFIED)384        f5 = create_future(state=FINISHED, exception=IOError())385        f6 = create_future(state=FINISHED, result=5)386        self.assertTrue(f1.cancel())387        self.assertEqual(f1._state, CANCELLED)388        self.assertFalse(f2.cancel())389        self.assertEqual(f2._state, RUNNING)390        self.assertTrue(f3.cancel())391        self.assertEqual(f3._state, CANCELLED)392        self.assertTrue(f4.cancel())393        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)394        self.assertFalse(f5.cancel())395        self.assertEqual(f5._state, FINISHED)396        self.assertFalse(f6.cancel())397        self.assertEqual(f6._state, FINISHED)398    def test_cancelled(self):399        self.assertFalse(PENDING_FUTURE.cancelled())400        self.assertFalse(RUNNING_FUTURE.cancelled())401        self.assertTrue(CANCELLED_FUTURE.cancelled())402        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())403        self.assertFalse(EXCEPTION_FUTURE.cancelled())404        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())405    def test_done(self):406        self.assertFalse(PENDING_FUTURE.done())407        self.assertFalse(RUNNING_FUTURE.done())408        self.assertTrue(CANCELLED_FUTURE.done())409        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())410        self.assertTrue(EXCEPTION_FUTURE.done())411        self.assertTrue(SUCCESSFUL_FUTURE.done())412    def test_running(self):413        self.assertFalse(PENDING_FUTURE.running())414        self.assertTrue(RUNNING_FUTURE.running())415        self.assertFalse(CANCELLED_FUTURE.running())416        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())417        self.assertFalse(EXCEPTION_FUTURE.running())418        self.assertFalse(SUCCESSFUL_FUTURE.running())419    def test_result_with_timeout(self):420        self.assertRaises(futures.TimeoutError,421                          PENDING_FUTURE.result, timeout=0)422        self.assertRaises(futures.TimeoutError,423                          RUNNING_FUTURE.result, timeout=0)424        self.assertRaises(futures.CancelledError,425                          CANCELLED_FUTURE.result, timeout=0)426        self.assertRaises(futures.CancelledError,427                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)428        self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)429        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)430    def test_result_with_success(self):431        # TODO(brian@sweetapp.com): This test is timing dependant.432        def notification():433            # Wait until the main thread is waiting for the result.434            time.sleep(1)435            f1.set_result(42)436        f1 = create_future(state=PENDING)437        t = threading.Thread(target=notification)438        t.start()439        self.assertEqual(f1.result(timeout=5), 42)440    def test_result_with_cancel(self):441        # TODO(brian@sweetapp.com): This test is timing dependant.442        def notification():443            # Wait until the main thread is waiting for the result.444            time.sleep(1)445            f1.cancel()446        f1 = create_future(state=PENDING)447        t = threading.Thread(target=notification)448        t.start()449        self.assertRaises(futures.CancelledError, f1.result, timeout=5)450    def test_exception_with_timeout(self):451        self.assertRaises(futures.TimeoutError,452                          PENDING_FUTURE.exception, timeout=0)453        self.assertRaises(futures.TimeoutError,454                          RUNNING_FUTURE.exception, timeout=0)455        self.assertRaises(futures.CancelledError,456                          CANCELLED_FUTURE.exception, timeout=0)457        self.assertRaises(futures.CancelledError,458                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)459        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),460                                   IOError))461        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)462    def test_exception_with_success(self):463        def notification():464            # Wait until the main thread is waiting for the exception.465            time.sleep(1)466            with f1._condition:467                f1._state = FINISHED468                f1._exception = IOError()469                f1._condition.notify_all()470        f1 = create_future(state=PENDING)471        t = threading.Thread(target=notification)472        t.start()473        self.assertTrue(isinstance(f1.exception(timeout=5), IOError))474@test.support.reap_threads475def test_main():476    try:477        test.support.run_unittest(ProcessPoolExecutorTest,478                                  ThreadPoolExecutorTest,479                                  ProcessPoolWaitTests,480                                  ThreadPoolWaitTests,481                                  ProcessPoolAsCompletedTests,482                                  ThreadPoolAsCompletedTests,483                                  FutureTests,484                                  ProcessPoolShutdownTest,..._base.py
Source:_base.py  
1# Copyright 2009 Brian Quinlan. All Rights Reserved.2# Licensed to PSF under a Contributor Agreement.3from __future__ import with_statement4import functools5import logging6import threading7import time8try:9    from collections import namedtuple10except ImportError:11    from concurrent.futures._compat import namedtuple12__author__ = 'Brian Quinlan (brian@sweetapp.com)'13FIRST_COMPLETED = 'FIRST_COMPLETED'14FIRST_EXCEPTION = 'FIRST_EXCEPTION'15ALL_COMPLETED = 'ALL_COMPLETED'16_AS_COMPLETED = '_AS_COMPLETED'17# Possible future states (for internal use by the futures package).18PENDING = 'PENDING'19RUNNING = 'RUNNING'20# The future was cancelled by the user...21CANCELLED = 'CANCELLED'22# ...and _Waiter.add_cancelled() was called by a worker.23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'24FINISHED = 'FINISHED'25_FUTURE_STATES = [26    PENDING,27    RUNNING,28    CANCELLED,29    CANCELLED_AND_NOTIFIED,30    FINISHED31]32_STATE_TO_DESCRIPTION_MAP = {33    PENDING: "pending",34    RUNNING: "running",35    CANCELLED: "cancelled",36    CANCELLED_AND_NOTIFIED: "cancelled",37    FINISHED: "finished"38}39# Logger for internal use by the futures package.40LOGGER = logging.getLogger("concurrent.futures")41STDERR_HANDLER = logging.StreamHandler()42LOGGER.addHandler(STDERR_HANDLER)43class Error(Exception):44    """Base class for all future-related exceptions."""45    pass46class CancelledError(Error):47    """The Future was cancelled."""48    pass49class TimeoutError(Error):50    """The operation exceeded the given deadline."""51    pass52class _Waiter(object):53    """Provides the event that wait() and as_completed() block on."""54    def __init__(self):55        self.event = threading.Event()56        self.finished_futures = []57    def add_result(self, future):58        self.finished_futures.append(future)59    def add_exception(self, future):60        self.finished_futures.append(future)61    def add_cancelled(self, future):62        self.finished_futures.append(future)63class _AsCompletedWaiter(_Waiter):64    """Used by as_completed()."""65    def __init__(self):66        super(_AsCompletedWaiter, self).__init__()67        self.lock = threading.Lock()68    def add_result(self, future):69        with self.lock:70            super(_AsCompletedWaiter, self).add_result(future)71            self.event.set()72    def add_exception(self, future):73        with self.lock:74            super(_AsCompletedWaiter, self).add_exception(future)75            self.event.set()76    def add_cancelled(self, future):77        with self.lock:78            super(_AsCompletedWaiter, self).add_cancelled(future)79            self.event.set()80class _FirstCompletedWaiter(_Waiter):81    """Used by wait(return_when=FIRST_COMPLETED)."""82    def add_result(self, future):83        super(_FirstCompletedWaiter, self).add_result(future)84        self.event.set()85    def add_exception(self, future):86        super(_FirstCompletedWaiter, self).add_exception(future)87        self.event.set()88    def add_cancelled(self, future):89        super(_FirstCompletedWaiter, self).add_cancelled(future)90        self.event.set()91class _AllCompletedWaiter(_Waiter):92    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""93    def __init__(self, num_pending_calls, stop_on_exception):94        self.num_pending_calls = num_pending_calls95        self.stop_on_exception = stop_on_exception96        super(_AllCompletedWaiter, self).__init__()97    def _decrement_pending_calls(self):98        if self.num_pending_calls == len(self.finished_futures):99            self.event.set()100    def add_result(self, future):101        super(_AllCompletedWaiter, self).add_result(future)102        self._decrement_pending_calls()103    def add_exception(self, future):104        super(_AllCompletedWaiter, self).add_exception(future)105        if self.stop_on_exception:106            self.event.set()107        else:108            self._decrement_pending_calls()109    def add_cancelled(self, future):110        super(_AllCompletedWaiter, self).add_cancelled(future)111        self._decrement_pending_calls()112class _AcquireFutures(object):113    """A context manager that does an ordered acquire of Future conditions."""114    def __init__(self, futures):115        self.futures = sorted(futures, key=id)116    def __enter__(self):117        for future in self.futures:118            future._condition.acquire()119    def __exit__(self, *args):120        for future in self.futures:121            future._condition.release()122def _create_and_install_waiters(fs, return_when):123    if return_when == _AS_COMPLETED:124        waiter = _AsCompletedWaiter()125    elif return_when == FIRST_COMPLETED:126        waiter = _FirstCompletedWaiter()127    else:128        pending_count = sum(129                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)130        if return_when == FIRST_EXCEPTION:131            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)132        elif return_when == ALL_COMPLETED:133            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)134        else:135            raise ValueError("Invalid return condition: %r" % return_when)136    for f in fs:137        f._waiters.append(waiter)138    return waiter139def as_completed(fs, timeout=None):140    """An iterator over the given futures that yields each as it completes.141    Args:142        fs: The sequence of Futures (possibly created by different Executors) to143            iterate over.144        timeout: The maximum number of seconds to wait. If None, then there145            is no limit on the wait time.146    Returns:147        An iterator that yields the given Futures as they complete (finished or148        cancelled).149    Raises:150        TimeoutError: If the entire result iterator could not be generated151            before the given timeout.152    """153    if timeout is not None:154        end_time = timeout + time.time()155    with _AcquireFutures(fs):156        finished = set(157                f for f in fs158                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])159        pending = set(fs) - finished160        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)161    try:162        for future in finished:163            yield future164        while pending:165            if timeout is None:166                wait_timeout = None167            else:168                wait_timeout = end_time - time.time()169                if wait_timeout < 0:170                    raise TimeoutError(171                            '%d (of %d) futures unfinished' % (172                            len(pending), len(fs)))173            waiter.event.wait(wait_timeout)174            with waiter.lock:175                finished = waiter.finished_futures176                waiter.finished_futures = []177                waiter.event.clear()178            for future in finished:179                yield future180                pending.remove(future)181    finally:182        for f in fs:183            f._waiters.remove(waiter)184DoneAndNotDoneFutures = namedtuple(185        'DoneAndNotDoneFutures', 'done not_done')186def wait(fs, timeout=None, return_when=ALL_COMPLETED):187    """Wait for the futures in the given sequence to complete.188    Args:189        fs: The sequence of Futures (possibly created by different Executors) to190            wait upon.191        timeout: The maximum number of seconds to wait. If None, then there192            is no limit on the wait time.193        return_when: Indicates when this function should return. The options194            are:195            FIRST_COMPLETED - Return when any future finishes or is196                              cancelled.197            FIRST_EXCEPTION - Return when any future finishes by raising an198                              exception. If no future raises an exception199                              then it is equivalent to ALL_COMPLETED.200            ALL_COMPLETED -   Return when all futures finish or are cancelled.201    Returns:202        A named 2-tuple of sets. The first set, named 'done', contains the203        futures that completed (is finished or cancelled) before the wait204        completed. The second set, named 'not_done', contains uncompleted205        futures.206    """207    with _AcquireFutures(fs):208        done = set(f for f in fs209                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])210        not_done = set(fs) - done211        if (return_when == FIRST_COMPLETED) and done:212            return DoneAndNotDoneFutures(done, not_done)213        elif (return_when == FIRST_EXCEPTION) and done:214            if any(f for f in done215                   if not f.cancelled() and f.exception() is not None):216                return DoneAndNotDoneFutures(done, not_done)217        if len(done) == len(fs):218            return DoneAndNotDoneFutures(done, not_done)219        waiter = _create_and_install_waiters(fs, return_when)220    waiter.event.wait(timeout)221    for f in fs:222        f._waiters.remove(waiter)223    done.update(waiter.finished_futures)224    return DoneAndNotDoneFutures(done, set(fs) - done)225class Future(object):226    """Represents the result of an asynchronous computation."""227    def __init__(self):228        """Initializes the future. Should not be called by clients."""229        self._condition = threading.Condition()230        self._state = PENDING231        self._result = None232        self._exception = None233        self._waiters = []234        self._done_callbacks = []235    def _invoke_callbacks(self):236        for callback in self._done_callbacks:237            try:238                callback(self)239            except Exception:240                LOGGER.exception('exception calling callback for %r', self)241    def __repr__(self):242        with self._condition:243            if self._state == FINISHED:244                if self._exception:245                    return '<Future at %s state=%s raised %s>' % (246                        hex(id(self)),247                        _STATE_TO_DESCRIPTION_MAP[self._state],248                        self._exception.__class__.__name__)249                else:250                    return '<Future at %s state=%s returned %s>' % (251                        hex(id(self)),252                        _STATE_TO_DESCRIPTION_MAP[self._state],253                        self._result.__class__.__name__)254            return '<Future at %s state=%s>' % (255                    hex(id(self)),256                   _STATE_TO_DESCRIPTION_MAP[self._state])257    def cancel(self):258        """Cancel the future if possible.259        Returns True if the future was cancelled, False otherwise. A future260        cannot be cancelled if it is running or has already completed.261        """262        with self._condition:263            if self._state in [RUNNING, FINISHED]:264                return False265            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:266                return True267            self._state = CANCELLED268            self._condition.notify_all()269        self._invoke_callbacks()270        return True271    def cancelled(self):272        """Return True if the future has cancelled."""273        with self._condition:274            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]275    def running(self):276        """Return True if the future is currently executing."""277        with self._condition:278            return self._state == RUNNING279    def done(self):280        """Return True of the future was cancelled or finished executing."""281        with self._condition:282            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]283    def __get_result(self):284        if self._exception:285            raise self._exception286        else:287            return self._result288    def add_done_callback(self, fn):289        """Attaches a callable that will be called when the future finishes.290        Args:291            fn: A callable that will be called with this future as its only292                argument when the future completes or is cancelled. The callable293                will always be called by a thread in the same process in which294                it was added. If the future has already completed or been295                cancelled then the callable will be called immediately. These296                callables are called in the order that they were added.297        """298        with self._condition:299            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:300                self._done_callbacks.append(fn)301                return302        fn(self)303    def result(self, timeout=None):304        """Return the result of the call that the future represents.305        Args:306            timeout: The number of seconds to wait for the result if the future307                isn't done. If None, then there is no limit on the wait time.308        Returns:309            The result of the call that the future represents.310        Raises:311            CancelledError: If the future was cancelled.312            TimeoutError: If the future didn't finish executing before the given313                timeout.314            Exception: If the call raised then that exception will be raised.315        """316        with self._condition:317            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:318                raise CancelledError()319            elif self._state == FINISHED:320                return self.__get_result()321            self._condition.wait(timeout)322            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:323                raise CancelledError()324            elif self._state == FINISHED:325                return self.__get_result()326            else:327                raise TimeoutError()328    def exception(self, timeout=None):329        """Return the exception raised by the call that the future represents.330        Args:331            timeout: The number of seconds to wait for the exception if the332                future isn't done. If None, then there is no limit on the wait333                time.334        Returns:335            The exception raised by the call that the future represents or None336            if the call completed without raising.337        Raises:338            CancelledError: If the future was cancelled.339            TimeoutError: If the future didn't finish executing before the given340                timeout.341        """342        with self._condition:343            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:344                raise CancelledError()345            elif self._state == FINISHED:346                return self._exception347            self._condition.wait(timeout)348            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:349                raise CancelledError()350            elif self._state == FINISHED:351                return self._exception352            else:353                raise TimeoutError()354    # The following methods should only be used by Executors and in tests.355    def set_running_or_notify_cancel(self):356        """Mark the future as running or process any cancel notifications.357        Should only be used by Executor implementations and unit tests.358        If the future has been cancelled (cancel() was called and returned359        True) then any threads waiting on the future completing (though calls360        to as_completed() or wait()) are notified and False is returned.361        If the future was not cancelled then it is put in the running state362        (future calls to running() will return True) and True is returned.363        This method should be called by Executor implementations before364        executing the work associated with this future. If this method returns365        False then the work should not be executed.366        Returns:367            False if the Future was cancelled, True otherwise.368        Raises:369            RuntimeError: if this method was already called or if set_result()370                or set_exception() was called.371        """372        with self._condition:373            if self._state == CANCELLED:374                self._state = CANCELLED_AND_NOTIFIED375                for waiter in self._waiters:376                    waiter.add_cancelled(self)377                # self._condition.notify_all() is not necessary because378                # self.cancel() triggers a notification.379                return False380            elif self._state == PENDING:381                self._state = RUNNING382                return True383            else:384                LOGGER.critical('Future %s in unexpected state: %s',385                                id(self.future),386                                self.future._state)387                raise RuntimeError('Future in unexpected state')388    def set_result(self, result):389        """Sets the return value of work associated with the future.390        Should only be used by Executor implementations and unit tests.391        """392        with self._condition:393            self._result = result394            self._state = FINISHED395            for waiter in self._waiters:396                waiter.add_result(self)397            self._condition.notify_all()398        self._invoke_callbacks()399    def set_exception(self, exception):400        """Sets the result of the future as being the given exception.401        Should only be used by Executor implementations and unit tests.402        """403        with self._condition:404            self._exception = exception405            self._state = FINISHED406            for waiter in self._waiters:407                waiter.add_exception(self)408            self._condition.notify_all()409        self._invoke_callbacks()410class Executor(object):411    """This is an abstract base class for concrete asynchronous executors."""412    def submit(self, fn, *args, **kwargs):413        """Submits a callable to be executed with the given arguments.414        Schedules the callable to be executed as fn(*args, **kwargs) and returns415        a Future instance representing the execution of the callable.416        Returns:417            A Future representing the given call.418        """419        raise NotImplementedError()420    def map(self, fn, *iterables, **kwargs):421        """Returns a iterator equivalent to map(fn, iter).422        Args:423            fn: A callable that will take take as many arguments as there are424                passed iterables.425            timeout: The maximum number of seconds to wait. If None, then there426                is no limit on the wait time.427        Returns:428            An iterator equivalent to: map(func, *iterables) but the calls may429            be evaluated out-of-order.430        Raises:431            TimeoutError: If the entire result iterator could not be generated432                before the given timeout.433            Exception: If fn(*args) raises for any values.434        """435        timeout = kwargs.get('timeout')436        if timeout is not None:437            end_time = timeout + time.time()438        fs = [self.submit(fn, *args) for args in zip(*iterables)]439        try:440            for future in fs:441                if timeout is None:442                    yield future.result()443                else:444                    yield future.result(end_time - time.time())445        finally:446            for future in fs:447                future.cancel()448    def shutdown(self, wait=True):449        """Clean-up the resources associated with the Executor.450        It is safe to call this method several times. Otherwise, no other451        methods can be called after this one.452        Args:453            wait: If True then shutdown will not return until all running454                futures have finished executing and the resources used by the455                executor have been reclaimed.456        """457        pass458    def __enter__(self):459        return self460    def __exit__(self, exc_type, exc_val, exc_tb):461        self.shutdown(wait=True)...futures.py
Source:futures.py  
...266            dest.set_exception(_convert_future_exc(exception))267        else:268            result = source.result()269            dest.set_result(result)270def _chain_future(source, destination):271    """Chain two futures so that when one completes, so does the other.272    The result (or exception) of source will be copied to destination.273    If destination is cancelled, source gets cancelled too.274    Compatible with both asyncio.Future and concurrent.futures.Future.275    """276    if not isfuture(source) and not isinstance(source,277                                               concurrent.futures.Future):278        raise TypeError('A future is required for source argument')279    if not isfuture(destination) and not isinstance(destination,280                                                    concurrent.futures.Future):281        raise TypeError('A future is required for destination argument')282    source_loop = _get_loop(source) if isfuture(source) else None283    dest_loop = _get_loop(destination) if isfuture(destination) else None284    def _set_state(future, other):285        if isfuture(future):286            _copy_future_state(other, future)287        else:288            _set_concurrent_future_state(future, other)289    def _call_check_cancel(destination):290        if destination.cancelled():291            if source_loop is None or source_loop is dest_loop:292                source.cancel()293            else:294                source_loop.call_soon_threadsafe(source.cancel)295    def _call_set_state(source):296        if (destination.cancelled() and297                dest_loop is not None and dest_loop.is_closed()):298            return299        if dest_loop is None or dest_loop is source_loop:300            _set_state(destination, source)301        else:302            dest_loop.call_soon_threadsafe(_set_state, destination, source)303    destination.add_done_callback(_call_check_cancel)304    source.add_done_callback(_call_set_state)305def wrap_future(future, *, loop=None):306    """Wrap concurrent.futures.Future object."""307    if isfuture(future):308        return future309    assert isinstance(future, concurrent.futures.Future), \310        f'concurrent.futures.Future is expected, got {future!r}'311    if loop is None:312        loop = events.get_event_loop()313    new_future = loop.create_future()314    _chain_future(future, new_future)315    return new_future316try:317    import _asyncio318except ImportError:319    pass320else:321    # _CFuture is needed for tests....LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
