Best Python code snippet using localstack_python
asgi.py
Source:asgi.py  
...8from urllib.parse import quote, unquote, urlparse9if t.TYPE_CHECKING:10    from _typeshed import WSGIApplication, WSGIEnvironment11    from hypercorn.typing import ASGIReceiveCallable, ASGISendCallable, HTTPScope, Scope12def populate_wsgi_environment(environ: "WSGIEnvironment", scope: "HTTPScope"):13    """14    Adds the non-IO parts (e.g., excluding wsgi.input) from the ASGI HTTPScope to the WSGI Environment.15    :param environ: the WSGI environment to populate16    :param scope: the ASGI scope as source17    """18    environ["REQUEST_METHOD"] = scope["method"]19    # path/uri info20    # prepare the paths for the "WSGI decoding dance" done by werkzeug21    environ["SCRIPT_NAME"] = unquote(quote(scope.get("root_path", "").rstrip("/")), "latin-1")22    path = scope["path"]23    path = path if path[0] == "/" else urlparse(path).path24    environ["PATH_INFO"] = unquote(quote(path), "latin-1")25    query_string = scope.get("query_string")26    if query_string:27        raw_uri = scope["raw_path"] + b"?" + query_string28        environ["QUERY_STRING"] = query_string.decode("latin1")29    else:30        raw_uri = scope["raw_path"]31    environ["RAW_URI"] = environ["REQUEST_URI"] = raw_uri.decode("utf-8")32    # server address / host33    server = scope.get("server") or ("localhost", 80)34    environ["SERVER_NAME"] = server[0]35    environ["SERVER_PORT"] = str(server[1]) if server[1] else "80"36    # http version37    environ["SERVER_PROTOCOL"] = "HTTP/" + scope["http_version"]38    # client (remote) address39    client = scope.get("client")40    if client:41        environ["REMOTE_ADDR"] = client[0]42        environ["REMOTE_PORT"] = str(client[1])43    # headers44    for name, value in scope["headers"]:45        key = name.decode("latin1").upper().replace("-", "_")46        if key not in ["CONTENT_TYPE", "CONTENT_LENGTH"]:47            key = f"HTTP_{key}"48        environ[key] = value.decode("latin1")49    # wsgi specific keys50    environ["wsgi.version"] = (1, 0)51    environ["wsgi.url_scheme"] = scope.get("scheme", "http")52    environ["wsgi.errors"] = io.BytesIO()53    environ["wsgi.multithread"] = True54    environ["wsgi.multiprocess"] = False55    environ["wsgi.run_once"] = False56    # custom key to allow downstream applications to circumvent WSGI header processing57    environ["asgi.headers"] = scope.get("headers")58async def to_async_generator(59    it: t.Iterator,60    loop: t.Optional[AbstractEventLoop] = None,61    executor: t.Optional[Executor] = None,62) -> t.AsyncGenerator:63    """64    Wraps a given synchronous Iterator as an async generator, where each invocation to ``next(it)``65    will be wrapped in a coroutine execution.66    :param it: the iterator to wrap67    :param loop: the event loop to run the next invocations68    :param executor: the executor to run the synchronous code69    :return: an async generator70    """71    loop = loop or asyncio.get_event_loop()72    stop = object()73    def _next_sync():74        try:75            # this call may potentially call blocking IO, which is why we call it in an executor76            return next(it)77        except StopIteration:78            return stop79    while True:80        val = await loop.run_in_executor(executor, _next_sync)81        if val is stop:82            return83        yield val84class HTTPRequestEventStreamAdapter:85    """86    An adapter to expose an ASGIReceiveCallable coroutine that returns HTTPRequestEvent87    instances, as a PEP 3333 InputStream for consumption in synchronous WSGI/Werkzeug code.88    """89    def __init__(90        self, receive: "ASGIReceiveCallable", event_loop: t.Optional[AbstractEventLoop] = None91    ) -> None:92        super().__init__()93        self.receive = receive94        self.event_loop = event_loop or asyncio.get_event_loop()95        self._more_body = True96        self._buffer = bytearray()97    def _read_into(self, buf: bytearray) -> t.Tuple[int, bool]:98        if not self._more_body:99            return 0, False100        recv_future = asyncio.run_coroutine_threadsafe(self.receive(), self.event_loop)101        event = recv_future.result()102        # TODO: disconnect events103        body = event["body"]104        more = event.get("more_body", False)105        buf.extend(body)106        self._more_body = more107        return len(body), more108    def read(self, size: t.Optional[int] = None) -> bytes:109        """110        Reads up to ``size`` bytes from the object and returns them. As a convenience, if ``size`` is unspecified or111        ``-1``, all bytes until EOF are returned. Like RawIOBase specifies, only one system call is ever made (in112        this case, a call to the ASGI receive callable). Fewer than ``size`` bytes may be returned if the underlying113        call returns fewer than ``size`` bytes.114        :param size: the number of bytes to read115        :return:116        """117        buf = self._buffer118        if not buf and not self._more_body:119            return b""120        if size is None or size == -1:121            while True:122                read, more = self._read_into(buf)123                if not more:124                    break125            arr = bytes(buf)126            buf.clear()127            return arr128        if len(buf) < size:129            self._read_into(buf)130        copy = bytes(buf[:size])131        self._buffer = buf[size:]132        return copy133    def readline(self, size: t.Optional[int] = None) -> bytes:134        buf = self._buffer135        size = size if size is not None else -1136        while True:137            i = buf.find(b"\n")  # FIXME: scans the whole buffer every time138            if i >= 0:139                if 0 < size < i:140                    break  # no '\n' in range141                else:142                    arr = bytes(buf[: (i + 1)])143                    self._buffer = buf[(i + 1) :]144                    return arr145            # ensure the buffer has at least `size` bytes (or all)146            if size > 0:147                if len(buf) >= size:148                    break149            _, more = self._read_into(buf)150            if not more:151                break152        if size > 0:153            arr = bytes(buf[:size])154            self._buffer = buf[size:]155            return arr156        else:157            arr = bytes(buf)158            buf.clear()159            return arr160    def readlines(self, size: t.Optional[int] = None) -> t.List[bytes]:161        if size is None or size < 0:162            return [line for line in self]163        lines = []164        while size > 0:165            try:166                line = self.__next__()167            except StopIteration:168                return lines169            lines.append(line)170            size = size - len(line)171        return lines172    def __next__(self):173        line = self.readline()174        if line == b"" and not self._more_body:175            raise StopIteration()176        return line177    def __iter__(self):178        return self179class WsgiStartResponse:180    """181    A wrapper that exposes an async ``ASGISendCallable`` as synchronous a WSGI ``StartResponse`` protocol callable.182    See this stackoverflow post for a good explanation: https://stackoverflow.com/a/16775731/804840.183    """184    def __init__(185        self,186        send: "ASGISendCallable",187        event_loop: AbstractEventLoop = None,188    ):189        self.send = send190        self.event_loop = event_loop or asyncio.get_event_loop()191        self.sent = 0192        self.content_length = math.inf193        self.finalized = False194        self.started = False195    def __call__(196        self, status: str, headers: t.List[t.Tuple[str, str]], exec_info=None197    ) -> t.Callable[[bytes], t.Any]:198        return self.start_response_sync(status, headers, exec_info)199    def start_response_sync(200        self, status: str, headers: t.List[t.Tuple[str, str]], exec_info=None201    ) -> t.Callable[[bytes], t.Any]:202        """203        The WSGI start_response protocol.204        :param status: the HTTP status (e.g., ``200 OK``) to write205        :param headers: the HTTP headers to write206        :param exec_info: ignored207        :return: a callable that lets you write bytes to the response body208        """209        send = self.send210        loop = self.event_loop211        # start sending response212        asyncio.run_coroutine_threadsafe(213            send(214                {215                    "type": "http.response.start",216                    "status": int(status[:3]),217                    "headers": [(h[0].encode("latin1"), h[1].encode("latin1")) for h in headers],218                }219            ),220            loop,221        ).result()222        self.started = True223        # find out content length if set224        self.content_length = math.inf  # unknown content-length225        for k, v in headers:226            if k.lower() == "content-length":227                self.content_length = int(v)228                break229        return self.write_sync230    def write_sync(self, data: bytes) -> None:231        return asyncio.run_coroutine_threadsafe(self.write(data), self.event_loop).result()232    async def write(self, data: bytes) -> None:233        if not self.started:234            raise ValueError("not started the response yet")235        await self.send({"type": "http.response.body", "body": data, "more_body": True})236        self.sent += len(data)237        if self.sent >= self.content_length:238            await self.close()239    async def close(self):240        if not self.started:241            raise ValueError("not started the response yet")242        if not self.finalized:243            self.finalized = True244            await self.send({"type": "http.response.body", "body": b"", "more_body": False})245class ASGIAdapter:246    """247    Adapter to expose a WSGIApplication as an ASGI3Application. This allows you to serve synchronous WSGI applications248    through ASGI servers (e.g., Hypercorn).249    https://asgi.readthedocs.io/en/latest/specs/main.html250    """251    def __init__(252        self,253        wsgi_app: "WSGIApplication",254        event_loop: AbstractEventLoop = None,255        executor: Executor = None,256    ):257        self.wsgi_app = wsgi_app258        self.event_loop = event_loop or asyncio.get_event_loop()259        self.executor = executor260    async def __call__(261        self, scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable"262    ):263        """264        The ASGI 3 interface. Can only handle HTTP calls.265        :param scope: the connection scope266        :param receive: the receive callable267        :param send: the send callable268        """269        if scope["type"] == "http":270            return await self.handle_http(scope, receive, send)271        raise NotImplementedError("Unhandled protocol %s" % scope["type"])272    def to_wsgi_environment(273        self,274        scope: "HTTPScope",275        receive: "ASGIReceiveCallable",276    ) -> "WSGIEnvironment":277        """278        Creates an IO-ready WSGIEnvironment from the given ASGI HTTP call.279        :param scope: the ASGI HTTP Scope280        :param receive: the ASGI callable to receive the HTTP request281        :return: a WSGIEnvironment282        """283        environ: "WSGIEnvironment" = {}284        populate_wsgi_environment(environ, scope)285        # add IO wrappers286        environ["wsgi.input"] = HTTPRequestEventStreamAdapter(receive, event_loop=self.event_loop)287        environ[288            "wsgi.input_terminated"289        ] = True  # indicates that the stream is EOF terminated per request290        return environ291    async def handle_http(292        self, scope: "HTTPScope", receive: "ASGIReceiveCallable", send: "ASGISendCallable"293    ):294        env = self.to_wsgi_environment(scope, receive)295        response = WsgiStartResponse(send, self.event_loop)296        iterable = await self.event_loop.run_in_executor(297            self.executor, self.wsgi_app, env, response298        )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
