How to use ensure_event_loop method in localstack

Best Python code snippet using localstack_python

__init__.py

Source:__init__.py Github

copy

Full Screen

...44 started = threading.Event()45 @wraps(f)46 def _wrapped(*args, **kwargs):47 started.clear()48 started.loop = ensure_event_loop()49 started.set()50 return started.loop.run_until_complete(f(*args, **kwargs))51 def _cancel():52 loop = ensure_event_loop()53 current_task = asyncio.tasks.Task.current_task(loop=loop)54 while True:55 try:56 tasks_to_cancel = asyncio.Task.all_tasks(loop=loop)57 tasks = [task for task in tasks_to_cancel58 if task is not current_task]59 break60 except RuntimeError as exception:61 # e.g., set changed size during iteration62 _L().debug('ERROR: %s', exception, exc_info=True)63 list(map(lambda task: task.cancel(), tasks))64 def cancel():65 started.loop.call_soon_threadsafe(_cancel)66 _wrapped.started = started67 _wrapped.cancel = lambda: started.loop.call_soon_threadsafe(_cancel)68 return _wrapped69def new_file_event_loop():70 '''71 Create an asyncio event loop compatible with file IO events, e.g., serial72 device events.73 On Windows, only sockets are supported by the default event loop (i.e.,74 :class:`asyncio.SelectorEventLoop`). However, file IO events _are_75 supported on Windows by the :class:`asyncio.ProactorEventLoop` event loop76 to support file I/O events.77 Returns78 -------79 asyncio.ProactorEventLoop or asyncio.SelectorEventLoop80 '''81 return (asyncio.ProactorEventLoop() if platform.system() == 'Windows'82 else asyncio.new_event_loop())83def ensure_event_loop():84 '''85 Ensure that an asyncio event loop has been bound to the local thread86 context.87 .. notes::88 While an asyncio event loop _is_ assigned to the local thread context89 for the main execution thread, other threads _do not_ have an event90 loop bound to the thread context.91 Returns92 -------93 asyncio.ProactorEventLoop or asyncio.SelectorEventLoop94 '''95 try:96 loop = asyncio.get_event_loop()97 except RuntimeError as e:98 if 'There is no current event loop' in str(e):99 loop = new_file_event_loop()100 asyncio.set_event_loop(loop)101 else:102 raise103 return loop104def with_loop(func):105 '''106 Decorator to run coroutine within an asyncio event loop.107 Parameters108 ----------109 func : asyncio.coroutine110 .. notes::111 Uses :class:`asyncio.ProactorEventLoop` on Windows to support file I/O112 events, e.g., serial device events.113 If an event loop is already bound to the thread, but is either a)114 currently running, or b) *not a :class:`asyncio.ProactorEventLoop`115 instance*, execute function in a new thread running a new116 :class:`asyncio.ProactorEventLoop` instance.117 Example118 -------119 >>> @with_loop120 ... @asyncio.coroutine121 ... def foo(a):122 ... raise asyncio.Return(a)123 >>>124 >>> a = 'hello'125 >>> assert(foo(a) == a)126 '''127 @wraps(func)128 def wrapped(*args, **kwargs):129 loop = ensure_event_loop()130 thread_required = False131 if loop.is_running():132 _L().debug('Event loop is already running.')133 thread_required = True134 elif all([platform.system() == 'Windows',135 not isinstance(loop, asyncio.ProactorEventLoop)]):136 _L().debug('`ProactorEventLoop` required, not `%s` loop in '137 'background thread.', type(loop))138 thread_required = True139 if thread_required:140 _L().debug('Execute new loop in background thread.')141 finished = threading.Event()142 def _run(generator):143 loop = ensure_event_loop()144 try:145 result = loop.run_until_complete(asyncio146 .ensure_future(generator))147 except Exception as e:148 finished.result = None149 finished.error = e150 else:151 finished.result = result152 finished.error = None153 finally:154 loop.close()155 _L().debug('closed event loop')156 finished.set()157 thread = threading.Thread(target=_run,...

Full Screen

Full Screen

async.py

Source:async.py Github

copy

Full Screen

...13 _read_device_id, asyncio, read_packet)14def new_file_event_loop():15 return (asyncio.ProactorEventLoop() if platform.system() == 'Windows'16 else asyncio.new_event_loop())17def ensure_event_loop():18 try:19 loop = asyncio.get_event_loop()20 except RuntimeError as e:21 if 'There is no current event loop' in str(e):22 loop = new_file_event_loop()23 asyncio.set_event_loop(loop)24 else:25 raise26 return loop27def with_loop(func):28 '''29 Decorator to run function within an asyncio event loop.30 .. notes::31 Uses :class:`asyncio.ProactorEventLoop` on Windows to support file I/O32 events, e.g., serial device events.33 If an event loop is already bound to the thread, but is either a)34 currently running, or b) *not a :class:`asyncio.ProactorEventLoop`35 instance*, execute function in a new thread running a new36 :class:`asyncio.ProactorEventLoop` instance.37 '''38 @wraps(func)39 def wrapped(*args, **kwargs):40 loop = ensure_event_loop()41 thread_required = False42 if loop.is_running():43 _L().debug('Event loop is already running.')44 thread_required = True45 elif all([platform.system() == 'Windows',46 not isinstance(loop, asyncio.ProactorEventLoop)]):47 _L().debug('`ProactorEventLoop` required, not `%s` loop in '48 'background thread.', type(loop))49 thread_required = True50 if thread_required:51 _L().debug('Execute new loop in background thread.')52 finished = threading.Event()53 def _run(generator):54 loop = ensure_event_loop()55 try:56 result = loop.run_until_complete(asyncio57 .ensure_future(generator))58 except Exception as e:59 finished.result = None60 finished.error = e61 else:62 finished.result = result63 finished.error = None64 finally:65 loop.close()66 _L().debug('closed event loop')67 finished.set()68 thread = threading.Thread(target=_run,...

Full Screen

Full Screen

asyncio_util.py

Source:asyncio_util.py Github

copy

Full Screen

...29 :class:`ProactorEventLoop` must explicitly be used on Windows. **30 '''31 return (asyncio.ProactorEventLoop() if platform.system() == 'Windows'32 else asyncio.new_event_loop())33def ensure_event_loop():34 '''35 .. versionadded:: 0.1536 Get existing event loop or create a new one if necessary.37 Returns38 -------39 asyncio.BaseEventLoop40 '''41 try:42 loop = asyncio.get_event_loop()43 except RuntimeError as e:44 if 'There is no current event loop' in str(e):45 loop = new_file_event_loop()46 asyncio.set_event_loop(loop)47 else:48 raise49 return loop50def with_loop(func):51 '''52 .. versionadded:: 0.1553 Decorator to run function within an asyncio event loop.54 .. notes::55 Uses :class:`asyncio.ProactorEventLoop` on Windows to support file I/O56 events, e.g., serial device events.57 If an event loop is already bound to the thread, but is either a)58 currently running, or b) *not a :class:`asyncio.ProactorEventLoop`59 instance*, execute function in a new thread running a new60 :class:`asyncio.ProactorEventLoop` instance.61 '''62 @wraps(func)63 def wrapped(*args, **kwargs):64 loop = ensure_event_loop()65 thread_required = False66 if loop.is_running():67 logger.debug('Event loop is already running.')68 thread_required = True69 elif all([platform.system() == 'Windows',70 not isinstance(loop, asyncio.ProactorEventLoop)]):71 logger.debug('`ProactorEventLoop` required, not `%s`'72 'loop in background thread.', type(loop))73 thread_required = True74 if thread_required:75 logger.debug('Execute new loop in background thread.')76 finished = threading.Event()77 def _run(generator):78 loop = ensure_event_loop()79 try:80 result = loop.run_until_complete(asyncio81 .ensure_future(generator))82 except Exception as e:83 finished.result = None84 finished.error = e85 else:86 finished.result = result87 finished.error = None88 finished.set()89 thread = threading.Thread(target=_run,90 args=(func(*args, **kwargs), ))91 thread.daemon = True92 thread.start()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful