Best Python code snippet using localstack_python
parser.py
Source:parser.py  
...431          }432        ::433        """434        # The keys might be prefixed (f.e. for flattened lists)435        key_prefix = self._get_list_key_prefix(shape)436        # We collect the list value as well as the integer indicating the list position so we can437        # later sort the list by the position, in case they attribute values are unordered438        result: List[Tuple[int, Any]] = []439        i = 0440        while True:441            i += 1442            key_name = f"{key_prefix}{i}"443            value = self._process_member(request, key_name, shape.member, node)444            if value is None:445                break446            result.append((i, value))447        return [r[1] for r in sorted(result)] if len(result) > 0 else None448    @staticmethod449    def _get_first(node: Any) -> Any:450        if isinstance(node, (list, tuple)):451            return node[0]452        return node453    @staticmethod454    def _filter_node(name: str, node: dict) -> dict:455        """Filters the node dict for entries where the key starts with the given name."""456        filtered = {k[len(name) + 1 :]: v for k, v in node.items() if k.startswith(name)}457        return filtered if len(filtered) > 0 else None458    def _get_serialized_name(self, shape: Shape, default_name: str) -> str:459        """460        Returns the serialized name for the shape if it exists.461        Otherwise it will return the given default_name.462        """463        return shape.serialization.get("name", default_name)464    def _get_list_key_prefix(self, shape: ListShape):465        key_prefix = ""466        # Non-flattened lists have an additional hierarchy level:467        # https://awslabs.github.io/smithy/1.0/spec/core/xml-traits.html#xmlflattened-trait468        # The hierarchy level's name is the serialization name of its member or (by default) "member".469        if not shape.serialization.get("flattened"):470            key_prefix += f"{self._get_serialized_name(shape.member, 'member')}."471        return key_prefix472class BaseRestRequestParser(RequestParser):473    """474    The ``BaseRestRequestParser`` is the base class for all "resty" AWS service protocols.475    The operation which should be invoked is determined based on the HTTP method and the path suffix.476    The body encoding is done in the respective subclasses.477    """478    def __init__(self, service: ServiceModel) -> None:479        super().__init__(service)480        self._operation_router = RestServiceOperationRouter(service)481    def _get_normalized_request_uri_length(self, operation_model: OperationModel) -> int:482        """483        Fings the length of the normalized request URI for the given operation model.484        See #_get_normalized_request_uri for a description of the normalization.485        """486        return len(self._get_normalized_request_uri(operation_model))487    def _get_normalized_request_uri(self, operation_model: OperationModel) -> str:488        """489        Fings the normalized request URI for the given operation model.490        A normalized request URI has a static, common replacement for path parameter placeholders, starting with a491        space character (which is the lowest non-control character in ASCII and is not expected to be present in a492        service specification's request URI pattern).493        This allows the resulting normalized request URIs to be sorted.494        :param operation_model: to get the normalized request URI for.495            This function expects that the given operation model has HTTP metadata!496        :return: normalized request URI for the given operation model497        """498        request_uri: str = operation_model.http.get("requestUri")499        # Make sure that all path parameter placeholders have the same name and length500        return re.sub(r"{(.*?)}", " param", request_uri)501    @_handle_exceptions502    def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:503        try:504            operation, uri_params = self._operation_router.match(request)505        except NotFound as e:506            raise OperationNotFoundParserError(507                f"Unable to find operation for request to service "508                f"{self.service.service_name}: {request.method} {request.path}"509            ) from e510        shape: StructureShape = operation.input_shape511        final_parsed = {}512        if shape is not None:513            self._parse_payload(request, shape, shape.members, uri_params, final_parsed)514        return operation, final_parsed515    def _parse_payload(516        self,517        request: HttpRequest,518        shape: Shape,519        member_shapes: Dict[str, Shape],520        uri_params: Mapping[str, Any],521        final_parsed: dict,522    ) -> None:523        """Parses all attributes which are located in the payload / body of the incoming request."""524        payload_parsed = {}525        non_payload_parsed = {}526        if "payload" in shape.serialization:527            # If a payload is specified in the output shape, then only that shape is used for the body payload.528            payload_member_name = shape.serialization["payload"]529            body_shape = member_shapes[payload_member_name]530            if body_shape.serialization.get("eventstream"):531                body = self._create_event_stream(request, body_shape)532                payload_parsed[payload_member_name] = body533            elif body_shape.type_name == "string":534                # Only set the value if it's not empty (the request's data is an empty binary by default)535                if request.data:536                    body = request.data537                    if isinstance(body, bytes):538                        body = body.decode(self.DEFAULT_ENCODING)539                    payload_parsed[payload_member_name] = body540            elif body_shape.type_name == "blob":541                # Only set the value if it's not empty (the request's data is an empty binary by default)542                if request.data:543                    payload_parsed[payload_member_name] = request.data544            else:545                original_parsed = self._initial_body_parse(request)546                payload_parsed[payload_member_name] = self._parse_shape(547                    request, body_shape, original_parsed, uri_params548                )549        else:550            # The payload covers the whole body. We only parse the body if it hasn't been handled by the payload logic.551            non_payload_parsed = self._initial_body_parse(request)552        # even if the payload has been parsed, the rest of the shape needs to be processed as well553        # (for members which are located outside of the body, like uri or header)554        non_payload_parsed = self._parse_shape(request, shape, non_payload_parsed, uri_params)555        # update the final result with the parsed body and the parsed payload (where the payload has precedence)556        final_parsed.update(non_payload_parsed)557        final_parsed.update(payload_parsed)558    def _initial_body_parse(self, request: HttpRequest) -> Any:559        """560        This method executes the initial parsing of the body (XML, JSON, or CBOR).561        The parsed body will afterwards still be walked through and the nodes will be converted to the appropriate562        types, but this method does the first round of parsing.563        :param request: of which the body should be parsed564        :return: depending on the actual implementation565        """566        raise NotImplementedError("_initial_body_parse")567    def _create_event_stream(self, request: HttpRequest, shape: Shape) -> Any:568        # TODO handle event streams569        raise NotImplementedError("_create_event_stream")570class RestXMLRequestParser(BaseRestRequestParser):571    """572    The ``RestXMLRequestParser`` is responsible for parsing incoming requests for services which use the ``rest-xml``573    protocol. The requests for these services encode the majority of their parameters as XML in the request body.574    **Experimental:** This parser is still experimental.575    When implementing services with this parser, some edge cases might not work out-of-the-box.576    """577    def __init__(self, service_model: ServiceModel):578        super(RestXMLRequestParser, self).__init__(service_model)579        self._namespace_re = re.compile("{.*}")580    def _initial_body_parse(self, request: HttpRequest) -> ETree.Element:581        body = request.data582        if not body:583            return ETree.Element("")584        return self._parse_xml_string_to_dom(body)585    def _parse_structure(586        self,587        request: HttpRequest,588        shape: StructureShape,589        node: ETree.Element,590        uri_params: Mapping[str, Any] = None,591    ) -> dict:592        parsed = {}593        xml_dict = self._build_name_to_xml_node(node)594        for member_name, member_shape in shape.members.items():595            xml_name = self._member_key_name(member_shape, member_name)596            member_node = xml_dict.get(xml_name)597            # If a shape defines a location trait, the node might be None (since these are extracted from the request's598            # metadata like headers or the URI)599            if (600                member_node is not None601                or "location" in member_shape.serialization602                or member_shape.serialization.get("eventheader")603            ):604                parsed[member_name] = self._parse_shape(605                    request, member_shape, member_node, uri_params606                )607            elif member_shape.serialization.get("xmlAttribute"):608                attributes = {}609                location_name = member_shape.serialization["name"]610                for key, value in node.attrib.items():611                    new_key = self._namespace_re.sub(location_name.split(":")[0] + ":", key)612                    attributes[new_key] = value613                if location_name in attributes:614                    parsed[member_name] = attributes[location_name]615            elif member_name in shape.required_members:616                # If the member is required, but not existing, we explicitly set None617                parsed[member_name] = None618        return parsed619    def _parse_map(620        self,621        request: HttpRequest,622        shape: MapShape,623        node: dict,624        uri_params: Mapping[str, Any] = None,625    ) -> dict:626        parsed = {}627        key_shape = shape.key628        value_shape = shape.value629        key_location_name = key_shape.serialization.get("name", "key")630        value_location_name = value_shape.serialization.get("name", "value")631        if shape.serialization.get("flattened") and not isinstance(node, list):632            node = [node]633        for keyval_node in node:634            key_name = val_name = None635            for single_pair in keyval_node:636                # Within each <entry> there's a <key> and a <value>637                tag_name = self._node_tag(single_pair)638                if tag_name == key_location_name:639                    key_name = self._parse_shape(request, key_shape, single_pair, uri_params)640                elif tag_name == value_location_name:641                    val_name = self._parse_shape(request, value_shape, single_pair, uri_params)642                else:643                    raise ProtocolParserError("Unknown tag: %s" % tag_name)644            parsed[key_name] = val_name645        return parsed646    def _parse_list(647        self,648        request: HttpRequest,649        shape: ListShape,650        node: dict,651        uri_params: Mapping[str, Any] = None,652    ) -> list:653        # When we use _build_name_to_xml_node, repeated elements are aggregated654        # into a list. However, we can't tell the difference between a scalar655        # value and a single element flattened list. So before calling the656        # real _handle_list, we know that "node" should actually be a list if657        # it's flattened, and if it's not, then we make it a one element list.658        if shape.serialization.get("flattened") and not isinstance(node, list):659            node = [node]660        return super(RestXMLRequestParser, self)._parse_list(request, shape, node, uri_params)661    def _node_tag(self, node: ETree.Element) -> str:662        return self._namespace_re.sub("", node.tag)663    @staticmethod664    def _member_key_name(shape: Shape, member_name: str) -> str:665        # This method is needed because we have to special case flattened list666        # with a serialization name.  If this is the case we use the667        # locationName from the list's member shape as the key name for the668        # surrounding structure.669        if isinstance(shape, ListShape) and shape.serialization.get("flattened"):670            list_member_serialized_name = shape.member.serialization.get("name")671            if list_member_serialized_name is not None:672                return list_member_serialized_name673        serialized_name = shape.serialization.get("name")674        if serialized_name is not None:675            return serialized_name676        return member_name677    def _parse_xml_string_to_dom(self, xml_string: bytes) -> ETree.Element:678        try:679            parser = ETree.XMLParser(target=ETree.TreeBuilder(), encoding=self.DEFAULT_ENCODING)680            parser.feed(xml_string)681            root = parser.close()682        except ETree.ParseError as e:683            raise ProtocolParserError(684                "Unable to parse request (%s), invalid XML received:\n%s" % (e, xml_string)685            ) from e686        return root687    def _build_name_to_xml_node(self, parent_node: Union[list, ETree.Element]) -> dict:688        # If the parent node is actually a list. We should not be trying689        # to serialize it to a dictionary. Instead, return the first element690        # in the list.691        if isinstance(parent_node, list):692            return self._build_name_to_xml_node(parent_node[0])693        xml_dict = {}694        for item in parent_node:695            key = self._node_tag(item)696            if key in xml_dict:697                # If the key already exists, the most natural698                # way to handle this is to aggregate repeated699                # keys into a single list.700                # <foo>1</foo><foo>2</foo> -> {'foo': [Node(1), Node(2)]}701                if isinstance(xml_dict[key], list):702                    xml_dict[key].append(item)703                else:704                    # Convert from a scalar to a list.705                    xml_dict[key] = [xml_dict[key], item]706            else:707                xml_dict[key] = item708        return xml_dict709    def _create_event_stream(self, request: HttpRequest, shape: Shape) -> Any:710        # TODO handle event streams711        raise NotImplementedError("_create_event_stream")712class BaseJSONRequestParser(RequestParser, ABC):713    """714    The ``BaseJSONRequestParser`` is the base class for all JSON-based AWS service protocols.715    This base-class handles parsing the payload / body as JSON.716    """717    TIMESTAMP_FORMAT = "unixtimestamp"718    def _parse_structure(719        self,720        request: HttpRequest,721        shape: StructureShape,722        value: Optional[dict],723        uri_params: Mapping[str, Any] = None,724    ) -> Optional[dict]:725        if shape.is_document_type:726            final_parsed = value727        else:728            if value is None:729                # If the comes across the wire as "null" (None in python),730                # we should be returning this unchanged, instead of as an731                # empty dict.732                return None733            final_parsed = {}734            for member_name, member_shape in shape.members.items():735                json_name = member_shape.serialization.get("name", member_name)736                raw_value = value.get(json_name)737                parsed = self._parse_shape(request, member_shape, raw_value, uri_params)738                if parsed is not None or member_name in shape.required_members:739                    # If the member is required, but not existing, we set it to None anyways740                    final_parsed[member_name] = parsed741        return final_parsed742    def _parse_map(743        self,744        request: HttpRequest,745        shape: MapShape,746        value: Optional[dict],747        uri_params: Mapping[str, Any] = None,748    ) -> Optional[dict]:749        if value is None:750            return None751        parsed = {}752        key_shape = shape.key753        value_shape = shape.value754        for key, value in value.items():755            actual_key = self._parse_shape(request, key_shape, key, uri_params)756            actual_value = self._parse_shape(request, value_shape, value, uri_params)757            parsed[actual_key] = actual_value758        return parsed759    def _parse_body_as_json(self, request: HttpRequest) -> dict:760        body_contents = request.data761        if not body_contents:762            return {}763        if request.mimetype.startswith("application/x-amz-cbor"):764            try:765                return cbor2.loads(body_contents)766            except ValueError as e:767                raise ProtocolParserError("HTTP body could not be parsed as CBOR.") from e768        else:769            try:770                return request.get_json(force=True)771            except ValueError as e:772                raise ProtocolParserError("HTTP body could not be parsed as JSON.") from e773    def _parse_boolean(774        self, request: HttpRequest, shape: Shape, node: bool, uri_params: Mapping[str, Any] = None775    ) -> bool:776        return super()._noop_parser(request, shape, node, uri_params)777class JSONRequestParser(BaseJSONRequestParser):778    """779    The ``JSONRequestParser`` is responsible for parsing incoming requests for services which use the ``json``780    protocol.781    The requests for these services encode the majority of their parameters as JSON in the request body.782    The operation is defined in an HTTP header field.783    **Experimental:** This parser is still experimental.784    When implementing services with this parser, some edge cases might not work out-of-the-box.785    """786    @_handle_exceptions787    def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:788        target = request.headers["X-Amz-Target"]789        # assuming that the last part of the target string (e.g., "x.y.z.MyAction") contains the operation name790        operation_name = target.rpartition(".")[2]791        operation = self.service.operation_model(operation_name)792        shape = operation.input_shape793        # There are no uri params in the query protocol794        uri_params = {}795        final_parsed = self._do_parse(request, shape, uri_params)796        return operation, final_parsed797    def _do_parse(798        self, request: HttpRequest, shape: Shape, uri_params: Mapping[str, Any] = None799    ) -> dict:800        parsed = {}801        if shape is not None:802            event_name = shape.event_stream_name803            if event_name:804                parsed = self._handle_event_stream(request, shape, event_name)805            else:806                parsed = self._handle_json_body(request, shape, uri_params)807        return parsed808    def _handle_event_stream(self, request: HttpRequest, shape: Shape, event_name: str):809        # TODO handle event streams810        raise NotImplementedError811    def _handle_json_body(812        self, request: HttpRequest, shape: Shape, uri_params: Mapping[str, Any] = None813    ) -> Any:814        # The json.loads() gives us the primitive JSON types, but we need to traverse the parsed JSON data to convert815        # to richer types (blobs, timestamps, etc.)816        parsed_json = self._parse_body_as_json(request)817        return self._parse_shape(request, shape, parsed_json, uri_params)818class RestJSONRequestParser(BaseRestRequestParser, BaseJSONRequestParser):819    """820    The ``RestJSONRequestParser`` is responsible for parsing incoming requests for services which use the ``rest-json``821    protocol.822    The requests for these services encode the majority of their parameters as JSON in the request body.823    The operation is defined by the HTTP method and the path suffix.824    **Experimental:** This parser is still experimental.825    When implementing services with this parser, some edge cases might not work out-of-the-box.826    """827    def _initial_body_parse(self, request: HttpRequest) -> dict:828        return self._parse_body_as_json(request)829    def _create_event_stream(self, request: HttpRequest, shape: Shape) -> Any:830        raise NotImplementedError831class EC2RequestParser(QueryRequestParser):832    """833    The ``EC2RequestParser`` is responsible for parsing incoming requests for services which use the ``ec2``834    protocol (which only is EC2). Protocol is quite similar to the ``query`` protocol with some small differences.835    **Experimental:** This parser is still experimental.836    When implementing services with this parser, some edge cases might not work out-of-the-box.837    """838    def _get_serialized_name(self, shape: Shape, default_name: str) -> str:839        # Returns the serialized name for the shape if it exists.840        # Otherwise it will return the passed in default_name.841        if "queryName" in shape.serialization:842            return shape.serialization["queryName"]843        elif "name" in shape.serialization:844            # A locationName is always capitalized on input for the ec2 protocol.845            name = shape.serialization["name"]846            return name[0].upper() + name[1:]847        else:848            return default_name849    def _get_list_key_prefix(self, shape: ListShape):850        # The EC2 protocol does not use a prefix notation for flattened lists851        return ""852class S3RequestParser(RestXMLRequestParser):853    @_handle_exceptions854    def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:855        """Handle virtual-host-addressing for S3."""856        if (857            # TODO implement a more sophisticated determination if the host contains S3 virtual host addressing858            not request.host.startswith("s3.")859            and not request.host.startswith("localhost.")860            and not request.host.startswith("127.0.0.1")861        ):862            self._revert_virtual_host_style(request)863        return super().parse(request)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
