Best Python code snippet using locust
test_icon_inner_service.py
Source:test_icon_inner_service.py  
...24from iconservice.icon_service_engine import IconServiceEngine25from iconservice.iconscore.icon_score_step import OutOfStepException26from tests import create_block_hash27@pytest.fixture(params=[ENABLE_THREAD_FLAG, ~ENABLE_THREAD_FLAG])28def inner_task(mocker, request):29    mocker.patch.object(IconScoreInnerTask, "_open")30    mocker.patch.object(IconScoreInnerTask, "_close")31    inner_task = IconScoreInnerTask(Mock(spec=IconConfig))32    inner_task._thread_flag = request.param33    icon_service_engine = Mock(spec=IconServiceEngine)34    inner_task._icon_service_engine = icon_service_engine35    return inner_task36@pytest.fixture()37def dummy_block():38    block = {39        ConstantKeys.BLOCK_HEIGHT: hex(0),40        ConstantKeys.BLOCK_HASH: create_block_hash().hex(),41        ConstantKeys.TIMESTAMP: hex(0),42        ConstantKeys.PREV_BLOCK_HASH: create_block_hash().hex()...test_waitfor.py
Source:test_waitfor.py  
1import asyncio2import unittest3import time4def tearDownModule():5    asyncio.set_event_loop_policy(None)6# The following value can be used as a very small timeout:7# it passes check "timeout > 0", but has almost8# no effect on the test performance9_EPSILON = 0.000110class SlowTask:11    """ Task will run for this defined time, ignoring cancel requests """12    TASK_TIMEOUT = 0.213    def __init__(self):14        self.exited = False15    async def run(self):16        exitat = time.monotonic() + self.TASK_TIMEOUT17        while True:18            tosleep = exitat - time.monotonic()19            if tosleep <= 0:20                break21            try:22                await asyncio.sleep(tosleep)23            except asyncio.CancelledError:24                pass25        self.exited = True26class AsyncioWaitForTest(unittest.IsolatedAsyncioTestCase):27    async def test_asyncio_wait_for_cancelled(self):28        t = SlowTask()29        waitfortask = asyncio.create_task(30            asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))31        await asyncio.sleep(0)32        waitfortask.cancel()33        await asyncio.wait({waitfortask})34        self.assertTrue(t.exited)35    async def test_asyncio_wait_for_timeout(self):36        t = SlowTask()37        try:38            await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)39        except asyncio.TimeoutError:40            pass41        self.assertTrue(t.exited)42    async def test_wait_for_timeout_less_then_0_or_0_future_done(self):43        loop = asyncio.get_running_loop()44        fut = loop.create_future()45        fut.set_result('done')46        t0 = loop.time()47        ret = await asyncio.wait_for(fut, 0)48        t1 = loop.time()49        self.assertEqual(ret, 'done')50        self.assertTrue(fut.done())51        self.assertLess(t1 - t0, 0.1)52    async def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):53        loop = asyncio.get_running_loop()54        foo_started = False55        async def foo():56            nonlocal foo_started57            foo_started = True58        with self.assertRaises(asyncio.TimeoutError):59            t0 = loop.time()60            await asyncio.wait_for(foo(), 0)61        t1 = loop.time()62        self.assertEqual(foo_started, False)63        self.assertLess(t1 - t0, 0.1)64    async def test_wait_for_timeout_less_then_0_or_0(self):65        loop = asyncio.get_running_loop()66        for timeout in [0, -1]:67            with self.subTest(timeout=timeout):68                foo_running = None69                started = loop.create_future()70                async def foo():71                    nonlocal foo_running72                    foo_running = True73                    started.set_result(None)74                    try:75                        await asyncio.sleep(10)76                    finally:77                        foo_running = False78                    return 'done'79                fut = asyncio.create_task(foo())80                await started81                with self.assertRaises(asyncio.TimeoutError):82                    t0 = loop.time()83                    await asyncio.wait_for(fut, timeout)84                t1 = loop.time()85                self.assertTrue(fut.done())86                # it should have been cancelled due to the timeout87                self.assertTrue(fut.cancelled())88                self.assertEqual(foo_running, False)89                self.assertLess(t1 - t0, 0.1)90    async def test_wait_for(self):91        loop = asyncio.get_running_loop()92        foo_running = None93        async def foo():94            nonlocal foo_running95            foo_running = True96            try:97                await asyncio.sleep(10)98            finally:99                foo_running = False100            return 'done'101        fut = asyncio.create_task(foo())102        with self.assertRaises(asyncio.TimeoutError):103            t0 = loop.time()104            await asyncio.wait_for(fut, 0.1)105        t1 = loop.time()106        self.assertTrue(fut.done())107        # it should have been cancelled due to the timeout108        self.assertTrue(fut.cancelled())109        self.assertLess(t1 - t0, 0.5)110        self.assertEqual(foo_running, False)111    async def test_wait_for_blocking(self):112        async def coro():113            return 'done'114        res = await asyncio.wait_for(coro(), timeout=None)115        self.assertEqual(res, 'done')116    async def test_wait_for_race_condition(self):117        loop = asyncio.get_running_loop()118        fut = loop.create_future()119        task = asyncio.wait_for(fut, timeout=0.2)120        loop.call_later(0.1, fut.set_result, "ok")121        res = await task122        self.assertEqual(res, "ok")123    async def test_wait_for_cancellation_race_condition(self):124        async def inner():125            with self.assertRaises(asyncio.CancelledError):126                await asyncio.sleep(1)127            return 1128        result = await asyncio.wait_for(inner(), timeout=.01)129        self.assertEqual(result, 1)130    async def test_wait_for_waits_for_task_cancellation(self):131        task_done = False132        async def inner():133            nonlocal task_done134            try:135                await asyncio.sleep(10)136            except asyncio.CancelledError:137                await asyncio.sleep(_EPSILON)138                raise139            finally:140                task_done = True141        inner_task = asyncio.create_task(inner())142        with self.assertRaises(asyncio.TimeoutError) as cm:143            await asyncio.wait_for(inner_task, timeout=_EPSILON)144        self.assertTrue(task_done)145        chained = cm.exception.__context__146        self.assertEqual(type(chained), asyncio.CancelledError)147    async def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):148        task_done = False149        async def foo():150            async def inner():151                nonlocal task_done152                try:153                    await asyncio.sleep(10)154                except asyncio.CancelledError:155                    await asyncio.sleep(_EPSILON)156                    raise157                finally:158                    task_done = True159            inner_task = asyncio.create_task(inner())160            await asyncio.sleep(_EPSILON)161            await asyncio.wait_for(inner_task, timeout=0)162        with self.assertRaises(asyncio.TimeoutError) as cm:163            await foo()164        self.assertTrue(task_done)165        chained = cm.exception.__context__166        self.assertEqual(type(chained), asyncio.CancelledError)167    async def test_wait_for_reraises_exception_during_cancellation(self):168        class FooException(Exception):169            pass170        async def foo():171            async def inner():172                try:173                    await asyncio.sleep(0.2)174                finally:175                    raise FooException176            inner_task = asyncio.create_task(inner())177            await asyncio.wait_for(inner_task, timeout=_EPSILON)178        with self.assertRaises(FooException):179            await foo()180    async def test_wait_for_self_cancellation(self):181        async def inner():182            try:183                await asyncio.sleep(0.3)184            except asyncio.CancelledError:185                try:186                    await asyncio.sleep(0.3)187                except asyncio.CancelledError:188                    await asyncio.sleep(0.3)189            return 42190        inner_task = asyncio.create_task(inner())191        wait = asyncio.wait_for(inner_task, timeout=0.1)192        # Test that wait_for itself is properly cancellable193        # even when the initial task holds up the initial cancellation.194        task = asyncio.create_task(wait)195        await asyncio.sleep(0.2)196        task.cancel()197        with self.assertRaises(asyncio.CancelledError):198            await task199        self.assertEqual(await inner_task, 42)200    async def _test_cancel_wait_for(self, timeout):201        loop = asyncio.get_running_loop()202        async def blocking_coroutine():203            fut = loop.create_future()204            # Block: fut result is never set205            await fut206        task = asyncio.create_task(blocking_coroutine())207        wait = asyncio.create_task(asyncio.wait_for(task, timeout))208        loop.call_soon(wait.cancel)209        with self.assertRaises(asyncio.CancelledError):210            await wait211        # Python issue #23219: cancelling the wait must also cancel the task212        self.assertTrue(task.cancelled())213    async def test_cancel_blocking_wait_for(self):214        await self._test_cancel_wait_for(None)215    async def test_cancel_wait_for(self):216        await self._test_cancel_wait_for(60.0)217if __name__ == '__main__':...decorators.py
Source:decorators.py  
...23# change this to enable a ton of debug printing24DEBUG = False25def _task(daemon=False):26    def __task(func):27        def inner_task(*args, **kwargs):28            async_task = AsyncTask(func, args, kwargs, daemon)29            async_task.context.except_func = inner_task.except_func30            async_task.context.finally_func = inner_task.finally_func31            async_task.context.set_stack(traceback.extract_stack())32            async_task.execute()33        inner_task.except_func = None34        inner_task.finally_func = None35        def _do_except_wrapper(except_func):36            inner_task.except_func = except_func37            return inner_task38        def _do_finally_wrapper(finally_func):39            inner_task.finally_func = finally_func40            return inner_task41        inner_task.do_except = _do_except_wrapper...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
