Best Python code snippet using autotest_python
sync.py
Source:sync.py  
...10try:11    import contextvars  # Python 3.7+ only.12except ImportError:13    contextvars = None14def _restore_context(context):15    # Check for changes in contextvars, and set them to the current16    # context for downstream consumers17    for cvar in context:18        try:19            if cvar.get() != context.get(cvar):20                cvar.set(context.get(cvar))21        except LookupError:22            cvar.set(context.get(cvar))23class AsyncToSync:24    """25    Utility class which turns an awaitable that only works on the thread with26    the event loop into a synchronous callable that works in a subthread.27    If the call stack contains an async loop, the code runs there.28    Otherwise, the code runs in a new loop in a new thread.29    Either way, this thread then pauses and waits to run any thread_sensitive30    code called from further down the call stack using SyncToAsync, before31    finally exiting once the async task returns.32    """33    # Maps launched Tasks to the threads that launched them (for locals impl)34    launch_map = {}35    # Keeps track of which CurrentThreadExecutor to use. This uses an asgiref36    # Local, not a threadlocal, so that tasks can work out what their parent used.37    executors = Local()38    def __init__(self, awaitable, force_new_loop=False):39        self.awaitable = awaitable40        try:41            self.__self__ = self.awaitable.__self__42        except AttributeError:43            pass44        if force_new_loop:45            # They have asked that we always run in a new sub-loop.46            self.main_event_loop = None47        else:48            try:49                self.main_event_loop = asyncio.get_event_loop()50            except RuntimeError:51                # There's no event loop in this thread. Look for the threadlocal if52                # we're inside SyncToAsync53                self.main_event_loop = getattr(54                    SyncToAsync.threadlocal, "main_event_loop", None55                )56    def __call__(self, *args, **kwargs):57        # You can't call AsyncToSync from a thread with a running event loop58        try:59            event_loop = asyncio.get_event_loop()60        except RuntimeError:61            pass62        else:63            if event_loop.is_running():64                raise RuntimeError(65                    "You cannot use AsyncToSync in the same thread as an async event loop - "66                    "just await the async function directly."67                )68        if contextvars is not None:69            # Wrapping context in list so it can be reassigned from within70            # `main_wrap`.71            context = [contextvars.copy_context()]72        else:73            context = None74        # Make a future for the return information75        call_result = Future()76        # Get the source thread77        source_thread = threading.current_thread()78        # Make a CurrentThreadExecutor we'll use to idle in this thread - we79        # need one for every sync frame, even if there's one above us in the80        # same thread.81        if hasattr(self.executors, "current"):82            old_current_executor = self.executors.current83        else:84            old_current_executor = None85        current_executor = CurrentThreadExecutor()86        self.executors.current = current_executor87        # Use call_soon_threadsafe to schedule a synchronous callback on the88        # main event loop's thread if it's there, otherwise make a new loop89        # in this thread.90        try:91            awaitable = self.main_wrap(92                args, kwargs, call_result, source_thread, sys.exc_info(), context93            )94            if not (self.main_event_loop and self.main_event_loop.is_running()):95                # Make our own event loop - in a new thread - and run inside that.96                loop = asyncio.new_event_loop()97                loop_executor = ThreadPoolExecutor(max_workers=1)98                loop_future = loop_executor.submit(99                    self._run_event_loop, loop, awaitable100                )101                if current_executor:102                    # Run the CurrentThreadExecutor until the future is done103                    current_executor.run_until_future(loop_future)104                # Wait for future and/or allow for exception propagation105                loop_future.result()106            else:107                # Call it inside the existing loop108                self.main_event_loop.call_soon_threadsafe(109                    self.main_event_loop.create_task, awaitable110                )111                if current_executor:112                    # Run the CurrentThreadExecutor until the future is done113                    current_executor.run_until_future(call_result)114        finally:115            # Clean up any executor we were running116            if hasattr(self.executors, "current"):117                del self.executors.current118            if old_current_executor:119                self.executors.current = old_current_executor120            if contextvars is not None:121                _restore_context(context[0])122        # Wait for results from the future.123        return call_result.result()124    def _run_event_loop(self, loop, coro):125        """126        Runs the given event loop (designed to be called in a thread).127        """128        asyncio.set_event_loop(loop)129        try:130            loop.run_until_complete(coro)131        finally:132            try:133                # mimic asyncio.run() behavior134                # cancel unexhausted async generators135                if sys.version_info >= (3, 7, 0):136                    tasks = asyncio.all_tasks(loop)137                else:138                    tasks = asyncio.Task.all_tasks(loop)139                for task in tasks:140                    task.cancel()141                loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))142                for task in tasks:143                    if task.cancelled():144                        continue145                    if task.exception() is not None:146                        loop.call_exception_handler(147                            {148                                "message": "unhandled exception during loop shutdown",149                                "exception": task.exception(),150                                "task": task,151                            }152                        )153                if hasattr(loop, "shutdown_asyncgens"):154                    loop.run_until_complete(loop.shutdown_asyncgens())155            finally:156                loop.close()157                asyncio.set_event_loop(self.main_event_loop)158    def __get__(self, parent, objtype):159        """160        Include self for methods161        """162        func = functools.partial(self.__call__, parent)163        return functools.update_wrapper(func, self.awaitable)164    async def main_wrap(165        self, args, kwargs, call_result, source_thread, exc_info, context166    ):167        """168        Wraps the awaitable with something that puts the result into the169        result/exception future.170        """171        if context is not None:172            _restore_context(context[0])173        current_task = SyncToAsync.get_current_task()174        self.launch_map[current_task] = source_thread175        try:176            # If we have an exception, run the function inside the except block177            # after raising it so exc_info is correctly populated.178            if exc_info[1]:179                try:180                    raise exc_info[1]181                except:182                    result = await self.awaitable(*args, **kwargs)183            else:184                result = await self.awaitable(*args, **kwargs)185        except Exception as e:186            call_result.set_exception(e)187        else:188            call_result.set_result(result)189        finally:190            del self.launch_map[current_task]191            if context is not None:192                context[0] = contextvars.copy_context()193class SyncToAsync:194    """195    Utility class which turns a synchronous callable into an awaitable that196    runs in a threadpool. It also sets a threadlocal inside the thread so197    calls to AsyncToSync can escape it.198    If thread_sensitive is passed, the code will run in the same thread as any199    outer code. This is needed for underlying Python code that is not200    threadsafe (for example, code which handles SQLite database connections).201    If the outermost program is async (i.e. SyncToAsync is outermost), then202    this will be a dedicated single sub-thread that all sync code runs in,203    one after the other. If the outermost program is sync (i.e. AsyncToSync is204    outermost), this will just be the main thread. This is achieved by idling205    with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,206    rather than just blocking.207    """208    # If they've set ASGI_THREADS, update the default asyncio executor for now209    if "ASGI_THREADS" in os.environ:210        loop = asyncio.get_event_loop()211        loop.set_default_executor(212            ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"]))213        )214    # Maps launched threads to the coroutines that spawned them215    launch_map = {}216    # Storage for main event loop references217    threadlocal = threading.local()218    # Single-thread executor for thread-sensitive code219    single_thread_executor = ThreadPoolExecutor(max_workers=1)220    def __init__(self, func, thread_sensitive=False):221        self.func = func222        functools.update_wrapper(self, func)223        self._thread_sensitive = thread_sensitive224        self._is_coroutine = asyncio.coroutines._is_coroutine225        try:226            self.__self__ = func.__self__227        except AttributeError:228            pass229    async def __call__(self, *args, **kwargs):230        loop = asyncio.get_event_loop()231        # Work out what thread to run the code in232        if self._thread_sensitive:233            if hasattr(AsyncToSync.executors, "current"):234                # If we have a parent sync thread above somewhere, use that235                executor = AsyncToSync.executors.current236            else:237                # Otherwise, we run it in a fixed single thread238                executor = self.single_thread_executor239        else:240            executor = None  # Use default241        if contextvars is not None:242            context = contextvars.copy_context()243            child = functools.partial(self.func, *args, **kwargs)244            func = context.run245            args = (child,)246            kwargs = {}247        else:248            func = self.func249        # Run the code in the right thread250        future = loop.run_in_executor(251            executor,252            functools.partial(253                self.thread_handler,254                loop,255                self.get_current_task(),256                sys.exc_info(),257                func,258                *args,259                **kwargs260            ),261        )262        ret = await asyncio.wait_for(future, timeout=None)263        if contextvars is not None:264            _restore_context(context)265        return ret266    def __get__(self, parent, objtype):267        """268        Include self for methods269        """270        return functools.partial(self.__call__, parent)271    def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs):272        """273        Wraps the sync application with exception handling.274        """275        # Set the threadlocal for AsyncToSync276        self.threadlocal.main_event_loop = loop277        # Set the task mapping (used for the locals module)278        current_thread = threading.current_thread()...input.py
Source:input.py  
...13# results14# input_prompt_prompts15# input_prompt_command16def command___input_cancel(manager):17    _restore_context(manager)18    _switch_normal_mode(manager)19def command___input_prompt(manager):20    global _context21    prompts = _context["input_prompt_prompts"]22    command = _context["input_prompt_command"]23    # Chains the input prompt24    # Save the previous input value.25    if "results" not in _context:26        _context["results"] = []27    _context["results"].append(manager._instance._cli.pattern)28    _context["input_prompt_prompts"] = prompts[1:]29    # reset prompt text30    prompt = prompts[0].get("prompt")31    text = prompts[0].get("text", "")32    manager._instance._cli._additional_prompt_string = prompt33    manager._instance._cli.setPattern(text)34    key_dict = manager._instance._cli._key_dict35    if len(prompts) == 1:36        # done37        key_dict["<CR>"] = "_do_" + command38    else:39        key_dict["<CR>"] = "_input_prompt"40    manager._instance._cli._key_dict = key_dict41def input_prompt(manager, command, prompts=[]):42    """43    params:44        command:45            "delete" or "edit" or ("add")46            When <CR> is pressed, manager.command___co_{command}() is executed.47        prompts:48            input prompts49            [50                {"prompt": prompt1, "text": text1},51                ...52            ]53    """54    global _context55    _context["input_prompt_prompts"] = prompts[1:]56    _context["input_prompt_command"] = command57    # set pattern58    prompt = prompts[0].get("prompt", "")59    text = prompts[0].get("text", "")60    manager._instance._cli._additional_prompt_string = prompt61    manager._instance._cli.setPattern(text)62    # update key_dict63    key_dict = {64        lhs: rhs65        for [lhs, rhs] in manager._instance._cli._key_dict.items()66        if rhs.lower()67        in {68            "<esc>",69            "<c-c>",70            "<bs>",71            "<c-h>",72            "<c-u>",73            "<c-w>",74            "<del>",75            "<c-v>",76            "<s-insert>",77            "<home>",78            "<c-b>",79            "<end>",80            "<c-e>",81            "<left>",82            "<right>",83            "<up>",84            "<down>",85            "open_parent_or_backspace",86            "open_parent_or_clear_line",87        }88    }89    # To be able to do general input90    for [lrs, rhs] in key_dict.items():91        rhs_low = rhs.lower()92        if rhs_low in {"open_parent_or_backspace", "open_parent_or_clear_line"}:93            key_dict[lrs] = "<BS>"94        elif rhs_low in {"<esc>", "<c-c>"}:95            key_dict[lrs] = "_input_cancel"96    # add command97    if len(prompts) == 1:98        key_dict["<CR>"] = "_do_" + command99    else:100        # chain101        key_dict["<CR>"] = "_input_prompt"102    manager._instance._cli._key_dict = key_dict103    manager.input()104def do_command(func):105    """106        example:107            @do_command108            def command___do_xxx(manager, context, results):109                ...110    """111    # Give a list of input results to a function112    def inner_func(manager):113        # The first argument must be manager114        global _context115        results = _context.get("results", [])116        results.append(manager._instance._cli.pattern)117        try:118            func(manager, _context, results)119        finally:120            _restore_context(121                manager, restore_input_pattern=False, restore_cursor_pos=False122            )123            _switch_normal_mode(manager)124            manager._instance._cli.setPattern("")125    return inner_func126def save_context(manager, **kwargs):127    """ For input_prompt128    """129    global _context130    _context = {}131    _context["search_func"] = manager._search132    _context[133        "additional_prompt_string"134    ] = manager._instance._cli._additional_prompt_string135    _context["cli_key_dict"] = dict(manager._instance._cli._key_dict)136    _context["cli_cmdline"] = list(manager._instance._cli._cmdline)137    _context["cli_cursor_pos"] = manager._instance._cli._cursor_pos138    _context["cli_pattern"] = manager._instance._cli._pattern139    _context["cursor_pos"] = manager._getInstance()._window_object.cursor140    _context.update(**kwargs)141    manager._search = lambda content, is_continue=False, step=0: ""142def _restore_context(manager, restore_input_pattern=True, restore_cursor_pos=True):143    """ For input_prompt144    params:145        restore_input_pattern:146            The following attributes of the `cli` will not be restored147            * _cmdline148            * _cursor_pos149            * _pattern150        restore_cursor_pos:151            cursor position152    """153    global _context154    manager._search = _context["search_func"]155    manager._instance._cli._additional_prompt_string = _context[156        "additional_prompt_string"...context.py
Source:context.py  
...25            os.environ[env] = value26        for env in self._exclude_env:27            if env in os.environ:28                os.environ.pop(env)29    def _restore_context(self):30        if self._include_env is not None:31            for env in self._include_env.keys():32                if value := self._contexts[env]:33                    os.environ[env] = self._contexts[env]34                else:  # value is None, it didn't exist before, so we remove it35                    os.environ.pop(env)36        if self._exclude_env is not None:37            for env in self._exclude_env:38                if value := self._contexts[env]:  # It existed before39                    os.environ[env] = value40                # else it didn't exist before41    def __enter__(self):42        self._save_contexts()43        self._set_contexts()44    def __exit__(self, exc_type, exc_value, exc_traceback):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
