How to use _restore_context method in autotest

Best Python code snippet using autotest_python

sync.py

Source:sync.py Github

copy

Full Screen

...10try:11 import contextvars # Python 3.7+ only.12except ImportError:13 contextvars = None14def _restore_context(context):15 # Check for changes in contextvars, and set them to the current16 # context for downstream consumers17 for cvar in context:18 try:19 if cvar.get() != context.get(cvar):20 cvar.set(context.get(cvar))21 except LookupError:22 cvar.set(context.get(cvar))23class AsyncToSync:24 """25 Utility class which turns an awaitable that only works on the thread with26 the event loop into a synchronous callable that works in a subthread.27 If the call stack contains an async loop, the code runs there.28 Otherwise, the code runs in a new loop in a new thread.29 Either way, this thread then pauses and waits to run any thread_sensitive30 code called from further down the call stack using SyncToAsync, before31 finally exiting once the async task returns.32 """33 # Maps launched Tasks to the threads that launched them (for locals impl)34 launch_map = {}35 # Keeps track of which CurrentThreadExecutor to use. This uses an asgiref36 # Local, not a threadlocal, so that tasks can work out what their parent used.37 executors = Local()38 def __init__(self, awaitable, force_new_loop=False):39 self.awaitable = awaitable40 try:41 self.__self__ = self.awaitable.__self__42 except AttributeError:43 pass44 if force_new_loop:45 # They have asked that we always run in a new sub-loop.46 self.main_event_loop = None47 else:48 try:49 self.main_event_loop = asyncio.get_event_loop()50 except RuntimeError:51 # There's no event loop in this thread. Look for the threadlocal if52 # we're inside SyncToAsync53 self.main_event_loop = getattr(54 SyncToAsync.threadlocal, "main_event_loop", None55 )56 def __call__(self, *args, **kwargs):57 # You can't call AsyncToSync from a thread with a running event loop58 try:59 event_loop = asyncio.get_event_loop()60 except RuntimeError:61 pass62 else:63 if event_loop.is_running():64 raise RuntimeError(65 "You cannot use AsyncToSync in the same thread as an async event loop - "66 "just await the async function directly."67 )68 if contextvars is not None:69 # Wrapping context in list so it can be reassigned from within70 # `main_wrap`.71 context = [contextvars.copy_context()]72 else:73 context = None74 # Make a future for the return information75 call_result = Future()76 # Get the source thread77 source_thread = threading.current_thread()78 # Make a CurrentThreadExecutor we'll use to idle in this thread - we79 # need one for every sync frame, even if there's one above us in the80 # same thread.81 if hasattr(self.executors, "current"):82 old_current_executor = self.executors.current83 else:84 old_current_executor = None85 current_executor = CurrentThreadExecutor()86 self.executors.current = current_executor87 # Use call_soon_threadsafe to schedule a synchronous callback on the88 # main event loop's thread if it's there, otherwise make a new loop89 # in this thread.90 try:91 awaitable = self.main_wrap(92 args, kwargs, call_result, source_thread, sys.exc_info(), context93 )94 if not (self.main_event_loop and self.main_event_loop.is_running()):95 # Make our own event loop - in a new thread - and run inside that.96 loop = asyncio.new_event_loop()97 loop_executor = ThreadPoolExecutor(max_workers=1)98 loop_future = loop_executor.submit(99 self._run_event_loop, loop, awaitable100 )101 if current_executor:102 # Run the CurrentThreadExecutor until the future is done103 current_executor.run_until_future(loop_future)104 # Wait for future and/or allow for exception propagation105 loop_future.result()106 else:107 # Call it inside the existing loop108 self.main_event_loop.call_soon_threadsafe(109 self.main_event_loop.create_task, awaitable110 )111 if current_executor:112 # Run the CurrentThreadExecutor until the future is done113 current_executor.run_until_future(call_result)114 finally:115 # Clean up any executor we were running116 if hasattr(self.executors, "current"):117 del self.executors.current118 if old_current_executor:119 self.executors.current = old_current_executor120 if contextvars is not None:121 _restore_context(context[0])122 # Wait for results from the future.123 return call_result.result()124 def _run_event_loop(self, loop, coro):125 """126 Runs the given event loop (designed to be called in a thread).127 """128 asyncio.set_event_loop(loop)129 try:130 loop.run_until_complete(coro)131 finally:132 try:133 # mimic asyncio.run() behavior134 # cancel unexhausted async generators135 if sys.version_info >= (3, 7, 0):136 tasks = asyncio.all_tasks(loop)137 else:138 tasks = asyncio.Task.all_tasks(loop)139 for task in tasks:140 task.cancel()141 loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))142 for task in tasks:143 if task.cancelled():144 continue145 if task.exception() is not None:146 loop.call_exception_handler(147 {148 "message": "unhandled exception during loop shutdown",149 "exception": task.exception(),150 "task": task,151 }152 )153 if hasattr(loop, "shutdown_asyncgens"):154 loop.run_until_complete(loop.shutdown_asyncgens())155 finally:156 loop.close()157 asyncio.set_event_loop(self.main_event_loop)158 def __get__(self, parent, objtype):159 """160 Include self for methods161 """162 func = functools.partial(self.__call__, parent)163 return functools.update_wrapper(func, self.awaitable)164 async def main_wrap(165 self, args, kwargs, call_result, source_thread, exc_info, context166 ):167 """168 Wraps the awaitable with something that puts the result into the169 result/exception future.170 """171 if context is not None:172 _restore_context(context[0])173 current_task = SyncToAsync.get_current_task()174 self.launch_map[current_task] = source_thread175 try:176 # If we have an exception, run the function inside the except block177 # after raising it so exc_info is correctly populated.178 if exc_info[1]:179 try:180 raise exc_info[1]181 except:182 result = await self.awaitable(*args, **kwargs)183 else:184 result = await self.awaitable(*args, **kwargs)185 except Exception as e:186 call_result.set_exception(e)187 else:188 call_result.set_result(result)189 finally:190 del self.launch_map[current_task]191 if context is not None:192 context[0] = contextvars.copy_context()193class SyncToAsync:194 """195 Utility class which turns a synchronous callable into an awaitable that196 runs in a threadpool. It also sets a threadlocal inside the thread so197 calls to AsyncToSync can escape it.198 If thread_sensitive is passed, the code will run in the same thread as any199 outer code. This is needed for underlying Python code that is not200 threadsafe (for example, code which handles SQLite database connections).201 If the outermost program is async (i.e. SyncToAsync is outermost), then202 this will be a dedicated single sub-thread that all sync code runs in,203 one after the other. If the outermost program is sync (i.e. AsyncToSync is204 outermost), this will just be the main thread. This is achieved by idling205 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,206 rather than just blocking.207 """208 # If they've set ASGI_THREADS, update the default asyncio executor for now209 if "ASGI_THREADS" in os.environ:210 loop = asyncio.get_event_loop()211 loop.set_default_executor(212 ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"]))213 )214 # Maps launched threads to the coroutines that spawned them215 launch_map = {}216 # Storage for main event loop references217 threadlocal = threading.local()218 # Single-thread executor for thread-sensitive code219 single_thread_executor = ThreadPoolExecutor(max_workers=1)220 def __init__(self, func, thread_sensitive=False):221 self.func = func222 functools.update_wrapper(self, func)223 self._thread_sensitive = thread_sensitive224 self._is_coroutine = asyncio.coroutines._is_coroutine225 try:226 self.__self__ = func.__self__227 except AttributeError:228 pass229 async def __call__(self, *args, **kwargs):230 loop = asyncio.get_event_loop()231 # Work out what thread to run the code in232 if self._thread_sensitive:233 if hasattr(AsyncToSync.executors, "current"):234 # If we have a parent sync thread above somewhere, use that235 executor = AsyncToSync.executors.current236 else:237 # Otherwise, we run it in a fixed single thread238 executor = self.single_thread_executor239 else:240 executor = None # Use default241 if contextvars is not None:242 context = contextvars.copy_context()243 child = functools.partial(self.func, *args, **kwargs)244 func = context.run245 args = (child,)246 kwargs = {}247 else:248 func = self.func249 # Run the code in the right thread250 future = loop.run_in_executor(251 executor,252 functools.partial(253 self.thread_handler,254 loop,255 self.get_current_task(),256 sys.exc_info(),257 func,258 *args,259 **kwargs260 ),261 )262 ret = await asyncio.wait_for(future, timeout=None)263 if contextvars is not None:264 _restore_context(context)265 return ret266 def __get__(self, parent, objtype):267 """268 Include self for methods269 """270 return functools.partial(self.__call__, parent)271 def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs):272 """273 Wraps the sync application with exception handling.274 """275 # Set the threadlocal for AsyncToSync276 self.threadlocal.main_event_loop = loop277 # Set the task mapping (used for the locals module)278 current_thread = threading.current_thread()...

Full Screen

Full Screen

input.py

Source:input.py Github

copy

Full Screen

...13# results14# input_prompt_prompts15# input_prompt_command16def command___input_cancel(manager):17 _restore_context(manager)18 _switch_normal_mode(manager)19def command___input_prompt(manager):20 global _context21 prompts = _context["input_prompt_prompts"]22 command = _context["input_prompt_command"]23 # Chains the input prompt24 # Save the previous input value.25 if "results" not in _context:26 _context["results"] = []27 _context["results"].append(manager._instance._cli.pattern)28 _context["input_prompt_prompts"] = prompts[1:]29 # reset prompt text30 prompt = prompts[0].get("prompt")31 text = prompts[0].get("text", "")32 manager._instance._cli._additional_prompt_string = prompt33 manager._instance._cli.setPattern(text)34 key_dict = manager._instance._cli._key_dict35 if len(prompts) == 1:36 # done37 key_dict["<CR>"] = "_do_" + command38 else:39 key_dict["<CR>"] = "_input_prompt"40 manager._instance._cli._key_dict = key_dict41def input_prompt(manager, command, prompts=[]):42 """43 params:44 command:45 "delete" or "edit" or ("add")46 When <CR> is pressed, manager.command___co_{command}() is executed.47 prompts:48 input prompts49 [50 {"prompt": prompt1, "text": text1},51 ...52 ]53 """54 global _context55 _context["input_prompt_prompts"] = prompts[1:]56 _context["input_prompt_command"] = command57 # set pattern58 prompt = prompts[0].get("prompt", "")59 text = prompts[0].get("text", "")60 manager._instance._cli._additional_prompt_string = prompt61 manager._instance._cli.setPattern(text)62 # update key_dict63 key_dict = {64 lhs: rhs65 for [lhs, rhs] in manager._instance._cli._key_dict.items()66 if rhs.lower()67 in {68 "<esc>",69 "<c-c>",70 "<bs>",71 "<c-h>",72 "<c-u>",73 "<c-w>",74 "<del>",75 "<c-v>",76 "<s-insert>",77 "<home>",78 "<c-b>",79 "<end>",80 "<c-e>",81 "<left>",82 "<right>",83 "<up>",84 "<down>",85 "open_parent_or_backspace",86 "open_parent_or_clear_line",87 }88 }89 # To be able to do general input90 for [lrs, rhs] in key_dict.items():91 rhs_low = rhs.lower()92 if rhs_low in {"open_parent_or_backspace", "open_parent_or_clear_line"}:93 key_dict[lrs] = "<BS>"94 elif rhs_low in {"<esc>", "<c-c>"}:95 key_dict[lrs] = "_input_cancel"96 # add command97 if len(prompts) == 1:98 key_dict["<CR>"] = "_do_" + command99 else:100 # chain101 key_dict["<CR>"] = "_input_prompt"102 manager._instance._cli._key_dict = key_dict103 manager.input()104def do_command(func):105 """106 example:107 @do_command108 def command___do_xxx(manager, context, results):109 ...110 """111 # Give a list of input results to a function112 def inner_func(manager):113 # The first argument must be manager114 global _context115 results = _context.get("results", [])116 results.append(manager._instance._cli.pattern)117 try:118 func(manager, _context, results)119 finally:120 _restore_context(121 manager, restore_input_pattern=False, restore_cursor_pos=False122 )123 _switch_normal_mode(manager)124 manager._instance._cli.setPattern("")125 return inner_func126def save_context(manager, **kwargs):127 """ For input_prompt128 """129 global _context130 _context = {}131 _context["search_func"] = manager._search132 _context[133 "additional_prompt_string"134 ] = manager._instance._cli._additional_prompt_string135 _context["cli_key_dict"] = dict(manager._instance._cli._key_dict)136 _context["cli_cmdline"] = list(manager._instance._cli._cmdline)137 _context["cli_cursor_pos"] = manager._instance._cli._cursor_pos138 _context["cli_pattern"] = manager._instance._cli._pattern139 _context["cursor_pos"] = manager._getInstance()._window_object.cursor140 _context.update(**kwargs)141 manager._search = lambda content, is_continue=False, step=0: ""142def _restore_context(manager, restore_input_pattern=True, restore_cursor_pos=True):143 """ For input_prompt144 params:145 restore_input_pattern:146 The following attributes of the `cli` will not be restored147 * _cmdline148 * _cursor_pos149 * _pattern150 restore_cursor_pos:151 cursor position152 """153 global _context154 manager._search = _context["search_func"]155 manager._instance._cli._additional_prompt_string = _context[156 "additional_prompt_string"...

Full Screen

Full Screen

context.py

Source:context.py Github

copy

Full Screen

...25 os.environ[env] = value26 for env in self._exclude_env:27 if env in os.environ:28 os.environ.pop(env)29 def _restore_context(self):30 if self._include_env is not None:31 for env in self._include_env.keys():32 if value := self._contexts[env]:33 os.environ[env] = self._contexts[env]34 else: # value is None, it didn't exist before, so we remove it35 os.environ.pop(env)36 if self._exclude_env is not None:37 for env in self._exclude_env:38 if value := self._contexts[env]: # It existed before39 os.environ[env] = value40 # else it didn't exist before41 def __enter__(self):42 self._save_contexts()43 self._set_contexts()44 def __exit__(self, exc_type, exc_value, exc_traceback):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful