How to use coroutine method in pandera

Best Python code snippet using pandera_python

coroutines.py

Source:coroutines.py Github

copy

Full Screen

...72 return CoroWrapper(gen, None)73class CoroWrapper:74 # Wrapper for coroutine object in _DEBUG mode.75 def __init__(self, gen, func=None):76 assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen77 self.gen = gen78 self.func = func # Used to unwrap @coroutine decorator79 self._source_traceback = events.extract_stack(sys._getframe(1))80 self.__name__ = getattr(gen, '__name__', None)81 self.__qualname__ = getattr(gen, '__qualname__', None)82 def __repr__(self):83 coro_repr = _format_coroutine(self)84 if self._source_traceback:85 frame = self._source_traceback[-1]86 coro_repr += ', created at %s:%s' % (frame[0], frame[1])87 return '<%s %s>' % (self.__class__.__name__, coro_repr)88 def __iter__(self):89 return self90 def __next__(self):91 return self.gen.send(None)92 if _YIELD_FROM_BUG:93 # For for CPython issue #21209: using "yield from" and a custom94 # generator, generator.send(tuple) unpacks the tuple instead of passing95 # the tuple unchanged. Check if the caller is a generator using "yield96 # from" to decide if the parameter should be unpacked or not.97 def send(self, *value):98 frame = sys._getframe()99 caller = frame.f_back100 assert caller.f_lasti >= 0101 if caller.f_code.co_code[caller.f_lasti] != _YIELD_FROM:102 value = value[0]103 return self.gen.send(value)104 else:105 def send(self, value):106 return self.gen.send(value)107 def throw(self, type, value=None, traceback=None):108 return self.gen.throw(type, value, traceback)109 def close(self):110 return self.gen.close()111 @property112 def gi_frame(self):113 return self.gen.gi_frame114 @property115 def gi_running(self):116 return self.gen.gi_running117 @property118 def gi_code(self):119 return self.gen.gi_code120 if compat.PY35:121 def __await__(self):122 cr_await = getattr(self.gen, 'cr_await', None)123 if cr_await is not None:124 raise RuntimeError(125 "Cannot await on coroutine {!r} while it's "126 "awaiting for {!r}".format(self.gen, cr_await))127 return self128 @property129 def gi_yieldfrom(self):130 return self.gen.gi_yieldfrom131 @property132 def cr_await(self):133 return self.gen.cr_await134 @property135 def cr_running(self):136 return self.gen.cr_running137 @property138 def cr_code(self):139 return self.gen.cr_code140 @property141 def cr_frame(self):142 return self.gen.cr_frame143 def __del__(self):144 # Be careful accessing self.gen.frame -- self.gen might not exist.145 gen = getattr(self, 'gen', None)146 frame = getattr(gen, 'gi_frame', None)147 if frame is None:148 frame = getattr(gen, 'cr_frame', None)149 if frame is not None and frame.f_lasti == -1:150 msg = '%r was never yielded from' % self151 tb = getattr(self, '_source_traceback', ())152 if tb:153 tb = ''.join(traceback.format_list(tb))154 msg += (f'\nCoroutine object created at '155 f'(most recent call last, truncated to '156 f'{constants.DEBUG_STACK_DEPTH} last lines):\n')157 msg += tb.rstrip()158 logger.error(msg)159def coroutine(func):160 """Decorator to mark coroutines.161 If the coroutine is not yielded from before it is destroyed,162 an error message is logged.163 """164 if _inspect_iscoroutinefunction(func):165 # In Python 3.5 that's all we need to do for coroutines166 # defined with "async def".167 # Wrapping in CoroWrapper will happen via168 # 'sys.set_coroutine_wrapper' function.169 return func170 if inspect.isgeneratorfunction(func):171 coro = func172 else:173 @functools.wraps(func)174 def coro(*args, **kw):175 res = func(*args, **kw)176 if (base_futures.isfuture(res) or inspect.isgenerator(res) or177 isinstance(res, CoroWrapper)):178 res = yield from res179 elif _AwaitableABC is not None:180 # If 'func' returns an Awaitable (new in 3.5) we181 # want to run it.182 try:183 await_meth = res.__await__184 except AttributeError:185 pass186 else:187 if isinstance(res, _AwaitableABC):188 res = yield from await_meth()189 return res190 if not _DEBUG:191 if _types_coroutine is None:192 wrapper = coro193 else:194 wrapper = _types_coroutine(coro)195 else:196 @functools.wraps(func)197 def wrapper(*args, **kwds):198 w = CoroWrapper(coro(*args, **kwds), func=func)199 if w._source_traceback:200 del w._source_traceback[-1]201 # Python < 3.5 does not implement __qualname__202 # on generator objects, so we set it manually.203 # We use getattr as some callables (such as204 # functools.partial may lack __qualname__).205 w.__name__ = getattr(func, '__name__', None)206 w.__qualname__ = getattr(func, '__qualname__', None)207 return w208 wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction().209 return wrapper210# A marker for iscoroutinefunction.211_is_coroutine = object()212def iscoroutinefunction(func):213 """Return True if func is a decorated coroutine function."""214 return (getattr(func, '_is_coroutine', None) is _is_coroutine or215 _inspect_iscoroutinefunction(func))216_COROUTINE_TYPES = (types.GeneratorType, CoroWrapper)217if _CoroutineABC is not None:218 _COROUTINE_TYPES += (_CoroutineABC,)219if _types_CoroutineType is not None:220 # Prioritize native coroutine check to speed-up221 # asyncio.iscoroutine.222 _COROUTINE_TYPES = (_types_CoroutineType,) + _COROUTINE_TYPES223def iscoroutine(obj):224 """Return True if obj is a coroutine object."""225 return isinstance(obj, _COROUTINE_TYPES)226def _format_coroutine(coro):227 assert iscoroutine(coro)228 if not hasattr(coro, 'cr_code') and not hasattr(coro, 'gi_code'):229 # Most likely a built-in type or a Cython coroutine.230 # Built-in types might not have __qualname__ or __name__.231 coro_name = getattr(232 coro, '__qualname__',233 getattr(coro, '__name__', type(coro).__name__))234 coro_name = '{}()'.format(coro_name)235 running = False236 try:237 running = coro.cr_running238 except AttributeError:239 try:240 running = coro.gi_running241 except AttributeError:...

Full Screen

Full Screen

test__format_coroutine.py

Source:test__format_coroutine.py Github

copy

Full Screen

...14 """15 generator = _example_generator_function()16 17 try:18 result = format_coroutine(generator)19 20 vampytest.assert_in(_example_generator_function.__name__, result)21 vampytest.assert_in(' from ', result)22 vampytest.assert_in(' suspended', result)23 24 finally:25 generator.close()26def test__format_coroutine__1():27 """28 Tests whether `format_coroutine` acts correctly for coroutines.29 """30 coroutine = _example_coroutine_function()31 32 try:33 result = format_coroutine(coroutine)34 35 vampytest.assert_in(_example_coroutine_function.__name__, result)36 vampytest.assert_in(' from ', result)37 vampytest.assert_in(' suspended', result)38 39 finally:40 coroutine.close()41def test__format_coroutine__2():42 """43 Tests whether `format_coroutine` acts correctly for coroutine generators.44 """45 coroutine_generator = _example_coroutine_generator_function()46 47 try:48 result = format_coroutine(coroutine_generator)49 50 vampytest.assert_in(_example_coroutine_generator_function.__name__, result)51 vampytest.assert_in(' from ', result)52 vampytest.assert_in(' suspended', result)53 54 finally:55 coroutine_generator.aclose()56def test__format_coroutine__3():57 """58 Tests whether `format_coroutine` acts correctly for random objects.59 """60 result = format_coroutine(object())61 62 vampytest.assert_in(object.__name__, result)63 vampytest.assert_in(' from ', result)64 vampytest.assert_in('unknown state', result)65 vampytest.assert_in('unknown location', result)66def test__format_coroutine__4():67 """68 Tests whether `format_coroutine` shows that the coroutine is stopped.69 """70 coroutine = _example_coroutine_function()71 coroutine.close()72 73 result = format_coroutine(coroutine)74 75 vampytest.assert_in('finished', result)76def _check_state_in_generator_itself():77 try:78 self = yield79 80 result = format_coroutine(self)81 vampytest.assert_in('running', result)82 83 yield84 finally:85 self = None86def test__format_coroutine__5():87 """88 Tests whether `format_coroutine` shows that the coroutine is running.89 """90 generator = _check_state_in_generator_itself()91 try:92 generator.send(None)93 generator.send(generator)94 ...

Full Screen

Full Screen

interop_asyncio.py

Source:interop_asyncio.py Github

copy

Full Screen

...3@asyncio.coroutine4def asyncio_noop():5 pass6@asyncio.coroutine7def asyncio_coroutine(coro):8 print("asyncio coroutine")9 res = yield from coro10 print("asyncio inner coroutine result: %r" % (res,))11 print("asyncio coroutine done")12 return "asyncio"13@trollius.coroutine14def trollius_noop():15 pass16@trollius.coroutine17def trollius_coroutine(coro):18 print("trollius coroutine")19 res = yield trollius.From(coro)20 print("trollius inner coroutine result: %r" % (res,))21 print("trollius coroutine done")22 raise trollius.Return("trollius")23def main():24 # use trollius event loop policy in asyncio25 policy = trollius.get_event_loop_policy()26 asyncio.set_event_loop_policy(policy)27 # create an event loop for the main thread: use Trollius event loop28 loop = trollius.get_event_loop()29 assert asyncio.get_event_loop() is loop30 print("[ asyncio coroutine called from trollius coroutine ]")31 coro1 = asyncio_noop()32 coro2 = asyncio_coroutine(coro1)33 res = loop.run_until_complete(trollius_coroutine(coro2))34 print("trollius coroutine result: %r" % res)35 print("")36 print("[ asyncio coroutine called from trollius coroutine ]")37 coro1 = trollius_noop()38 coro2 = trollius_coroutine(coro1)39 res = loop.run_until_complete(asyncio_coroutine(coro2))40 print("asyncio coroutine result: %r" % res)41 print("")42 loop.close()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pandera automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful