How to use sleep_forever method in localstack

Best Python code snippet using localstack_python

test_batch_pool_executor.py

Source:test_batch_pool_executor.py Github

copy

Full Screen

...44 future_twenty_one = bpe.submit(lambda: 7 * 3)45 assert 21 == future_twenty_one.result()46def test_cancel_future(backend):47 with BatchPoolExecutor(backend=backend, project='hail-vdc', image=PYTHON_DILL_IMAGE) as bpe:48 def sleep_forever():49 while True:50 time.sleep(3600)51 future = bpe.submit(sleep_forever)52 was_cancelled = future.cancel()53 assert was_cancelled54 assert future.cancelled()55def test_cancel_future_after_shutdown_no_wait(backend):56 bpe = BatchPoolExecutor(backend=backend, project='hail-vdc', image=PYTHON_DILL_IMAGE)57 def sleep_forever():58 while True:59 time.sleep(3600)60 future = bpe.submit(sleep_forever)61 bpe.shutdown(wait=False)62 was_cancelled = future.cancel()63 assert was_cancelled64 assert future.cancelled()65def test_cancel_future_after_exit_no_wait_on_exit(backend):66 with BatchPoolExecutor(backend=backend, project='hail-vdc', wait_on_exit=False, image=PYTHON_DILL_IMAGE) as bpe:67 def sleep_forever():68 while True:69 time.sleep(3600)70 future = bpe.submit(sleep_forever)71 was_cancelled = future.cancel()72 assert was_cancelled73 assert future.cancelled()74def test_result_with_timeout(backend):75 with BatchPoolExecutor(backend=backend, project='hail-vdc', image=PYTHON_DILL_IMAGE) as bpe:76 def sleep_forever():77 while True:78 time.sleep(3600)79 future = bpe.submit(sleep_forever)80 try:81 future.result(timeout=2)82 except asyncio.TimeoutError:83 pass84 else:85 assert False86 finally:87 future.cancel()88def test_map_chunksize(backend):89 row_args = [x90 for row in range(5)91 for x in [row, row, row, row, row]]92 col_args = [x93 for row in range(5)94 for x in list(range(5))]95 with BatchPoolExecutor(backend=backend, project='hail-vdc', image=PYTHON_DILL_IMAGE) as bpe:96 multiplication_table = list(bpe.map(lambda x, y: x * y,97 row_args,98 col_args,99 chunksize=5))100 assert multiplication_table == [101 0, 0, 0, 0, 0,102 0, 1, 2, 3, 4,103 0, 2, 4, 6, 8,104 0, 3, 6, 9, 12,105 0, 4, 8, 12, 16]106def test_map_timeout(backend):107 with BatchPoolExecutor(backend=backend, project='hail-vdc', image=PYTHON_DILL_IMAGE) as bpe:108 def sleep_forever():109 while True:110 time.sleep(3600)111 try:112 list(bpe.map(lambda _: sleep_forever(), range(5), timeout=2))113 except concurrent.futures.TimeoutError:114 pass115 else:116 assert False117def test_map_error_without_wait_no_error(backend):118 with BatchPoolExecutor(backend=backend, project='hail-vdc', wait_on_exit=False, image=PYTHON_DILL_IMAGE) as bpe:119 bpe.map(lambda _: time.sleep(10), range(5), timeout=2)120def test_exception_in_map(backend):121 def raise_value_error():122 raise ValueError('dead')123 with BatchPoolExecutor(backend=backend, project='hail-vdc', image=PYTHON_DILL_IMAGE) as bpe:124 try:125 gen = bpe.map(lambda _: raise_value_error(), range(5))126 next(gen)127 except ValueError as exc:128 assert 'ValueError: dead' in exc.args[0]129 else:130 assert False131def test_exception_in_result(backend):132 def raise_value_error():133 raise ValueError('dead')134 with BatchPoolExecutor(backend=backend, project='hail-vdc', image=PYTHON_DILL_IMAGE) as bpe:135 try:136 future = bpe.submit(raise_value_error)137 future.result()138 except ValueError as exc:139 assert 'ValueError: dead' in exc.args[0]140 else:141 assert False142def test_exception_in_exception(backend):143 def raise_value_error():144 raise ValueError('dead')145 with BatchPoolExecutor(backend=backend, project='hail-vdc', image=PYTHON_DILL_IMAGE) as bpe:146 try:147 future = bpe.submit(raise_value_error)148 future.exception()149 except ValueError as exc:150 assert 'ValueError: dead' in exc.args[0]151 else:152 assert False153def test_no_exception_when_exiting_context(backend):154 def raise_value_error():155 raise ValueError('dead')156 with BatchPoolExecutor(backend=backend, project='hail-vdc', image=PYTHON_DILL_IMAGE) as bpe:157 future = bpe.submit(raise_value_error)158 try:159 future.exception()160 except ValueError as exc:161 assert 'ValueError: dead' in exc.args[0]162 else:163 assert False164def test_bad_image_gives_good_error(backend):165 with BatchPoolExecutor(166 backend=backend,167 project='hail-vdc',168 image='hailgenetics/not-a-valid-image:123abc') as bpe:169 future = bpe.submit(lambda: 3)170 try:171 future.exception()172 except ValueError as exc:173 assert 'submitted job failed:' in exc.args[0]174 else:175 assert False176def test_call_result_after_timeout():177 with BatchPoolExecutor(project='hail-vdc', image=PYTHON_DILL_IMAGE) as bpe:178 def sleep_forever():179 while True:180 time.sleep(3600)181 future = bpe.submit(sleep_forever)182 try:183 future.result(timeout=2)184 except asyncio.TimeoutError:185 try:186 future.result(timeout=2)187 except asyncio.TimeoutError:188 pass189 else:190 assert False191 else:192 assert False...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful